4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (c) 2014, Intel Corporation.
26 * lustre/lfsck/lfsck_layout.c
28 * Author: Fan, Yong <fan.yong@intel.com>
32 # define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_LFSCK
36 #include <linux/bitops.h>
37 #include <linux/rbtree.h>
39 #include <lustre/lustre_idl.h>
40 #include <lu_object.h>
41 #include <dt_object.h>
42 #include <lustre_fid.h>
43 #include <lustre_lib.h>
44 #include <lustre_net.h>
45 #include <lustre/lustre_user.h>
46 #include <md_object.h>
47 #include <obd_class.h>
49 #include "lfsck_internal.h"
51 #define LFSCK_LAYOUT_MAGIC_V1 0xB173AE14
52 #define LFSCK_LAYOUT_MAGIC_V2 0xB1734D76
54 #define LFSCK_LAYOUT_MAGIC LFSCK_LAYOUT_MAGIC_V2
56 struct lfsck_layout_seq {
57 struct list_head lls_list;
60 __u64 lls_lastid_known;
61 struct dt_object *lls_lastid_obj;
62 unsigned int lls_dirty:1;
65 struct lfsck_layout_slave_target {
66 /* link into lfsck_layout_slave_data::llsd_master_list. */
67 struct list_head llst_list;
68 /* The position for next record in the rbtree for iteration. */
69 struct lu_fid llst_fid;
70 /* Dummy hash for iteration against the rbtree. */
77 struct lfsck_layout_slave_data {
78 /* list for lfsck_layout_seq */
79 struct list_head llsd_seq_list;
81 /* list for the masters involve layout verification. */
82 struct list_head llsd_master_list;
85 struct dt_object *llsd_rb_obj;
86 struct rb_root llsd_rb_root;
87 rwlock_t llsd_rb_lock;
88 unsigned int llsd_rbtree_valid:1;
91 struct lfsck_layout_object {
92 struct lu_attr llo_attr;
98 struct lfsck_layout_req {
99 struct lfsck_assistant_req llr_lar;
100 struct lfsck_layout_object *llr_parent;
101 struct dt_object *llr_child;
103 __u32 llr_lov_idx; /* offset in LOV EA */
106 struct lfsck_layout_slave_async_args {
107 struct obd_export *llsaa_exp;
108 struct lfsck_component *llsaa_com;
109 struct lfsck_layout_slave_target *llsaa_llst;
112 static struct lfsck_layout_object *
113 lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
114 __u64 cookie, __u16 gen)
116 struct lfsck_layout_object *llo;
121 return ERR_PTR(-ENOMEM);
123 rc = dt_attr_get(env, obj, &llo->llo_attr, BYPASS_CAPA);
130 llo->llo_cookie = cookie;
131 /* The gen can be used to check whether some others have changed the
132 * file layout after LFSCK pre-fetching but before real verification. */
134 atomic_set(&llo->llo_ref, 1);
140 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
142 if (atomic_dec_and_test(&llst->llst_ref)) {
143 LASSERT(list_empty(&llst->llst_list));
150 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
152 struct lfsck_layout_slave_target *llst;
153 struct lfsck_layout_slave_target *tmp;
160 INIT_LIST_HEAD(&llst->llst_list);
162 llst->llst_index = index;
163 atomic_set(&llst->llst_ref, 1);
165 spin_lock(&llsd->llsd_lock);
166 list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
167 if (tmp->llst_index == index) {
173 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
174 spin_unlock(&llsd->llsd_lock);
183 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
184 struct lfsck_layout_slave_target *llst)
188 spin_lock(&llsd->llsd_lock);
189 if (!list_empty(&llst->llst_list)) {
190 list_del_init(&llst->llst_list);
193 spin_unlock(&llsd->llsd_lock);
196 lfsck_layout_llst_put(llst);
199 static inline struct lfsck_layout_slave_target *
200 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
201 __u32 index, bool unlink)
203 struct lfsck_layout_slave_target *llst;
205 spin_lock(&llsd->llsd_lock);
206 list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
207 if (llst->llst_index == index) {
209 list_del_init(&llst->llst_list);
211 atomic_inc(&llst->llst_ref);
212 spin_unlock(&llsd->llsd_lock);
217 spin_unlock(&llsd->llsd_lock);
222 static inline void lfsck_layout_object_put(const struct lu_env *env,
223 struct lfsck_layout_object *llo)
225 if (atomic_dec_and_test(&llo->llo_ref))
229 static struct lfsck_layout_req *
230 lfsck_layout_assistant_req_init(struct lfsck_layout_object *parent,
231 const struct lu_fid *pfid,
232 struct dt_object *child, __u32 ost_idx,
235 struct lfsck_layout_req *llr;
239 return ERR_PTR(-ENOMEM);
241 INIT_LIST_HEAD(&llr->llr_lar.lar_list);
242 llr->llr_lar.lar_fid = *pfid;
244 atomic_inc(&parent->llo_ref);
245 llr->llr_parent = parent;
246 llr->llr_child = child;
247 llr->llr_ost_idx = ost_idx;
248 llr->llr_lov_idx = lov_idx;
253 static void lfsck_layout_assistant_req_fini(const struct lu_env *env,
254 struct lfsck_assistant_req *lar)
256 struct lfsck_layout_req *llr =
257 container_of0(lar, struct lfsck_layout_req, llr_lar);
259 lu_object_put(env, &llr->llr_child->do_lu);
260 lfsck_layout_object_put(env, llr->llr_parent);
265 lfsck_layout_assistant_sync_failures_interpret(const struct lu_env *env,
266 struct ptlrpc_request *req,
270 struct lfsck_async_interpret_args *laia = args;
271 struct lfsck_tgt_desc *ltd = laia->laia_ltd;
273 ltd->ltd_synced_failures = 1;
274 atomic_dec(laia->laia_count);
281 * Notify remote LFSCK instances about former failures.
283 * The local LFSCK instance has recorded which OSTs have ever failed to respond
284 * some LFSCK verification requests (maybe because of network issues or the OST
285 * itself trouble). During the respond gap, the OST may missed some OST-objects
286 * verification, then the OST cannot know whether related OST-objects have been
287 * referenced by related MDT-objects or not, then in the second-stage scanning,
288 * these OST-objects will be regarded as orphan, if the OST-object contains bad
289 * parent FID for back reference, then it will misguide the LFSCK to make wrong
290 * fixing for the fake orphan.
292 * To avoid above trouble, when layout LFSCK finishes the first-stage scanning,
293 * it will scan the bitmap for the ever failed OSTs, and notify them that they
294 * have ever missed some OST-object verification and should skip the handling
295 * for orphan OST-objects on all MDTs that are in the layout LFSCK.
297 * \param[in] env pointer to the thread context
298 * \param[in] com pointer to the lfsck component
299 * \param[in] lr pointer to the lfsck request
301 static void lfsck_layout_assistant_sync_failures(const struct lu_env *env,
302 struct lfsck_component *com,
303 struct lfsck_request *lr)
305 struct lfsck_async_interpret_args *laia =
306 &lfsck_env_info(env)->lti_laia2;
307 struct lfsck_assistant_data *lad = com->lc_data;
308 struct lfsck_layout *lo = com->lc_file_ram;
309 struct lfsck_instance *lfsck = com->lc_lfsck;
310 struct lfsck_tgt_descs *ltds = &lfsck->li_ost_descs;
311 struct lfsck_tgt_desc *ltd;
312 struct ptlrpc_request_set *set;
318 if (!lad->lad_incomplete || lo->ll_flags & LF_INCOMPLETE)
321 /* If the MDT has ever failed to verfiy some OST-objects,
322 * then sync failures with them firstly. */
323 lr->lr_flags2 = lo->ll_flags | LF_INCOMPLETE;
325 atomic_set(&count, 0);
326 memset(laia, 0, sizeof(*laia));
327 laia->laia_count = &count;
328 set = ptlrpc_prep_set();
330 GOTO(out, rc = -ENOMEM);
332 down_read(<ds->ltd_rw_sem);
333 cfs_foreach_bit(lad->lad_bitmap, idx) {
334 ltd = LTD_TGT(ltds, idx);
335 LASSERT(ltd != NULL);
337 laia->laia_ltd = ltd;
338 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
339 lfsck_layout_assistant_sync_failures_interpret,
342 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
343 "notify target %x for %s phase1 done: "
344 "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
345 ltd->ltd_index, lad->lad_name, rc);
352 up_read(<ds->ltd_rw_sem);
354 if (rc == 0 && atomic_read(&count) > 0)
355 rc = ptlrpc_set_wait(set);
357 ptlrpc_set_destroy(set);
359 if (rc == 0 && atomic_read(&count) > 0)
366 /* If failed to sync failures with the OSTs, then have to
367 * mark the whole LFSCK as LF_INCOMPLETE to skip the whole
368 * subsequent orphan OST-object handling. */
369 lo->ll_flags |= LF_INCOMPLETE;
371 lr->lr_flags2 = lo->ll_flags;
374 static int lfsck_layout_get_lovea(const struct lu_env *env,
375 struct dt_object *obj, struct lu_buf *buf)
380 rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LOV, BYPASS_CAPA);
382 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
387 lu_buf_realloc(buf, rc);
388 if (buf->lb_buf == NULL)
400 if (unlikely(buf->lb_buf == NULL)) {
401 lu_buf_alloc(buf, rc);
402 if (buf->lb_buf == NULL)
411 static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
416 magic = le32_to_cpu(lmm->lmm_magic);
417 /* If magic crashed, keep it there. Sometime later, during OST-object
418 * orphan handling, if some OST-object(s) back-point to it, it can be
419 * verified and repaired. */
420 if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3) {
424 lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
425 if ((magic & LOV_MAGIC_MASK) == LOV_MAGIC_MAGIC)
430 CDEBUG(D_LFSCK, "%s LOV EA magic %u on "DOSTID"\n",
431 rc == -EINVAL ? "Unknown" : "Unsupported",
437 pattern = le32_to_cpu(lmm->lmm_pattern);
438 /* XXX: currently, we only support LOV_PATTERN_RAID0. */
439 if (lov_pattern(pattern) != LOV_PATTERN_RAID0) {
442 lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
443 CDEBUG(D_LFSCK, "Unsupported LOV EA pattern %u on "DOSTID"\n",
444 pattern, POSTID(&oi));
452 #define LFSCK_RBTREE_BITMAP_SIZE PAGE_CACHE_SIZE
453 #define LFSCK_RBTREE_BITMAP_WIDTH (LFSCK_RBTREE_BITMAP_SIZE << 3)
454 #define LFSCK_RBTREE_BITMAP_MASK (LFSCK_RBTREE_BITMAP_WIDTH - 1)
456 struct lfsck_rbtree_node {
457 struct rb_node lrn_node;
460 atomic_t lrn_known_count;
461 atomic_t lrn_accessed_count;
462 void *lrn_known_bitmap;
463 void *lrn_accessed_bitmap;
466 static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn,
467 __u64 seq, __u32 oid)
469 if (seq < lrn->lrn_seq)
472 if (seq > lrn->lrn_seq)
475 if (oid < lrn->lrn_first_oid)
478 if (oid - lrn->lrn_first_oid >= LFSCK_RBTREE_BITMAP_WIDTH)
484 /* The caller should hold llsd->llsd_rb_lock. */
485 static struct lfsck_rbtree_node *
486 lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd,
487 const struct lu_fid *fid, bool *exact)
489 struct rb_node *node = llsd->llsd_rb_root.rb_node;
490 struct rb_node *prev = NULL;
491 struct lfsck_rbtree_node *lrn = NULL;
497 while (node != NULL) {
499 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
500 rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid));
502 node = node->rb_left;
504 node = node->rb_right;
512 /* If there is no exactly matched one, then to the next valid one. */
515 /* The rbtree is empty. */
522 node = rb_next(prev);
524 /* The end of the rbtree. */
528 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
533 static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env,
534 const struct lu_fid *fid)
536 struct lfsck_rbtree_node *lrn;
540 return ERR_PTR(-ENOMEM);
542 OBD_ALLOC(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
543 if (lrn->lrn_known_bitmap == NULL) {
546 return ERR_PTR(-ENOMEM);
549 OBD_ALLOC(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
550 if (lrn->lrn_accessed_bitmap == NULL) {
551 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
554 return ERR_PTR(-ENOMEM);
557 RB_CLEAR_NODE(&lrn->lrn_node);
558 lrn->lrn_seq = fid_seq(fid);
559 lrn->lrn_first_oid = fid_oid(fid) & ~LFSCK_RBTREE_BITMAP_MASK;
560 atomic_set(&lrn->lrn_known_count, 0);
561 atomic_set(&lrn->lrn_accessed_count, 0);
566 static void lfsck_rbtree_free(struct lfsck_rbtree_node *lrn)
568 OBD_FREE(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
569 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
573 /* The caller should hold lock. */
574 static struct lfsck_rbtree_node *
575 lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd,
576 struct lfsck_rbtree_node *lrn)
578 struct rb_node **pos = &llsd->llsd_rb_root.rb_node;
579 struct rb_node *parent = NULL;
580 struct lfsck_rbtree_node *tmp;
583 while (*pos != NULL) {
585 tmp = rb_entry(parent, struct lfsck_rbtree_node, lrn_node);
586 rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid);
588 pos = &(*pos)->rb_left;
590 pos = &(*pos)->rb_right;
595 rb_link_node(&lrn->lrn_node, parent, pos);
596 rb_insert_color(&lrn->lrn_node, &llsd->llsd_rb_root);
601 extern const struct dt_index_operations lfsck_orphan_index_ops;
603 static int lfsck_rbtree_setup(const struct lu_env *env,
604 struct lfsck_component *com)
606 struct lu_fid *fid = &lfsck_env_info(env)->lti_fid;
607 struct lfsck_instance *lfsck = com->lc_lfsck;
608 struct dt_device *dev = lfsck->li_bottom;
609 struct lfsck_layout_slave_data *llsd = com->lc_data;
610 struct dt_object *obj;
612 fid->f_seq = FID_SEQ_LAYOUT_RBTREE;
613 fid->f_oid = lfsck_dev_idx(dev);
615 obj = dt_locate(env, dev, fid);
617 RETURN(PTR_ERR(obj));
619 /* Generate an in-RAM object to stand for the layout rbtree.
620 * Scanning the layout rbtree will be via the iteration over
621 * the object. In the future, the rbtree may be written onto
622 * disk with the object.
624 * Mark the object to be as exist. */
625 obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
626 obj->do_index_ops = &lfsck_orphan_index_ops;
627 llsd->llsd_rb_obj = obj;
628 llsd->llsd_rbtree_valid = 1;
629 dev->dd_record_fid_accessed = 1;
631 CDEBUG(D_LFSCK, "%s: layout LFSCK init OST-objects accessing bitmap\n",
632 lfsck_lfsck2name(lfsck));
637 static void lfsck_rbtree_cleanup(const struct lu_env *env,
638 struct lfsck_component *com)
640 struct lfsck_instance *lfsck = com->lc_lfsck;
641 struct lfsck_layout_slave_data *llsd = com->lc_data;
642 struct rb_node *node = rb_first(&llsd->llsd_rb_root);
643 struct rb_node *next;
644 struct lfsck_rbtree_node *lrn;
646 lfsck->li_bottom->dd_record_fid_accessed = 0;
647 /* Invalid the rbtree, then no others will use it. */
648 write_lock(&llsd->llsd_rb_lock);
649 llsd->llsd_rbtree_valid = 0;
650 write_unlock(&llsd->llsd_rb_lock);
652 while (node != NULL) {
653 next = rb_next(node);
654 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
655 rb_erase(node, &llsd->llsd_rb_root);
656 lfsck_rbtree_free(lrn);
660 if (llsd->llsd_rb_obj != NULL) {
661 lu_object_put(env, &llsd->llsd_rb_obj->do_lu);
662 llsd->llsd_rb_obj = NULL;
665 CDEBUG(D_LFSCK, "%s: layout LFSCK fini OST-objects accessing bitmap\n",
666 lfsck_lfsck2name(lfsck));
669 static void lfsck_rbtree_update_bitmap(const struct lu_env *env,
670 struct lfsck_component *com,
671 const struct lu_fid *fid,
674 struct lfsck_layout_slave_data *llsd = com->lc_data;
675 struct lfsck_rbtree_node *lrn;
681 if (unlikely(!fid_is_sane(fid) || fid_is_last_id(fid)))
684 if (!fid_is_idif(fid) && !fid_is_norm(fid))
687 read_lock(&llsd->llsd_rb_lock);
688 if (!llsd->llsd_rbtree_valid)
689 GOTO(unlock, rc = 0);
691 lrn = lfsck_rbtree_search(llsd, fid, NULL);
693 struct lfsck_rbtree_node *tmp;
697 read_unlock(&llsd->llsd_rb_lock);
698 tmp = lfsck_rbtree_new(env, fid);
700 GOTO(out, rc = PTR_ERR(tmp));
703 write_lock(&llsd->llsd_rb_lock);
704 if (!llsd->llsd_rbtree_valid) {
705 lfsck_rbtree_free(tmp);
706 GOTO(unlock, rc = 0);
709 lrn = lfsck_rbtree_insert(llsd, tmp);
711 lfsck_rbtree_free(tmp);
714 idx = fid_oid(fid) & LFSCK_RBTREE_BITMAP_MASK;
715 /* Any accessed object must be a known object. */
716 if (!test_and_set_bit(idx, lrn->lrn_known_bitmap))
717 atomic_inc(&lrn->lrn_known_count);
718 if (accessed && !test_and_set_bit(idx, lrn->lrn_accessed_bitmap))
719 atomic_inc(&lrn->lrn_accessed_count);
721 GOTO(unlock, rc = 0);
725 write_unlock(&llsd->llsd_rb_lock);
727 read_unlock(&llsd->llsd_rb_lock);
729 if (rc != 0 && accessed) {
730 struct lfsck_layout *lo = com->lc_file_ram;
732 CDEBUG(D_LFSCK, "%s: fail to update OST-objects accessing "
733 "bitmap, and will cause incorrect LFSCK OST-object "
734 "handling, so disable it to cancel orphan handling "
735 "for related device. rc = %d\n",
736 lfsck_lfsck2name(com->lc_lfsck), rc);
738 lo->ll_flags |= LF_INCOMPLETE;
739 lfsck_rbtree_cleanup(env, com);
743 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
744 const struct lfsck_layout *src)
748 des->ll_magic = le32_to_cpu(src->ll_magic);
749 des->ll_status = le32_to_cpu(src->ll_status);
750 des->ll_flags = le32_to_cpu(src->ll_flags);
751 des->ll_success_count = le32_to_cpu(src->ll_success_count);
752 des->ll_run_time_phase1 = le32_to_cpu(src->ll_run_time_phase1);
753 des->ll_run_time_phase2 = le32_to_cpu(src->ll_run_time_phase2);
754 des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
755 des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
756 des->ll_time_last_checkpoint =
757 le64_to_cpu(src->ll_time_last_checkpoint);
758 des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
759 des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
760 des->ll_pos_first_inconsistent =
761 le64_to_cpu(src->ll_pos_first_inconsistent);
762 des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
763 des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
764 des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
765 des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
766 for (i = 0; i < LLIT_MAX; i++)
767 des->ll_objs_repaired[i] =
768 le64_to_cpu(src->ll_objs_repaired[i]);
769 des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
770 des->ll_bitmap_size = le32_to_cpu(src->ll_bitmap_size);
773 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
774 const struct lfsck_layout *src)
778 des->ll_magic = cpu_to_le32(src->ll_magic);
779 des->ll_status = cpu_to_le32(src->ll_status);
780 des->ll_flags = cpu_to_le32(src->ll_flags);
781 des->ll_success_count = cpu_to_le32(src->ll_success_count);
782 des->ll_run_time_phase1 = cpu_to_le32(src->ll_run_time_phase1);
783 des->ll_run_time_phase2 = cpu_to_le32(src->ll_run_time_phase2);
784 des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
785 des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
786 des->ll_time_last_checkpoint =
787 cpu_to_le64(src->ll_time_last_checkpoint);
788 des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
789 des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
790 des->ll_pos_first_inconsistent =
791 cpu_to_le64(src->ll_pos_first_inconsistent);
792 des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
793 des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
794 des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
795 des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
796 for (i = 0; i < LLIT_MAX; i++)
797 des->ll_objs_repaired[i] =
798 cpu_to_le64(src->ll_objs_repaired[i]);
799 des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
800 des->ll_bitmap_size = cpu_to_le32(src->ll_bitmap_size);
804 * Load the OST bitmap from the lfsck_layout trace file.
806 * \param[in] env pointer to the thread context
807 * \param[in] com pointer to the lfsck component
809 * \retval 0 for success
810 * \retval negative error number on failure or data corruption
812 static int lfsck_layout_load_bitmap(const struct lu_env *env,
813 struct lfsck_component *com)
815 struct dt_object *obj = com->lc_obj;
816 struct lfsck_assistant_data *lad = com->lc_data;
817 struct lfsck_layout *lo = com->lc_file_ram;
818 cfs_bitmap_t *bitmap = lad->lad_bitmap;
819 loff_t pos = com->lc_file_size;
825 if (com->lc_lfsck->li_ost_descs.ltd_tgts_bitmap->size >
827 nbits = com->lc_lfsck->li_ost_descs.ltd_tgts_bitmap->size;
829 nbits = lo->ll_bitmap_size;
831 if (unlikely(nbits < BITS_PER_LONG))
832 nbits = BITS_PER_LONG;
834 if (nbits > bitmap->size) {
835 __u32 new_bits = bitmap->size;
836 cfs_bitmap_t *new_bitmap;
838 while (new_bits < nbits)
841 new_bitmap = CFS_ALLOCATE_BITMAP(new_bits);
842 if (new_bitmap == NULL)
845 lad->lad_bitmap = new_bitmap;
846 CFS_FREE_BITMAP(bitmap);
850 if (lo->ll_bitmap_size == 0) {
851 lad->lad_incomplete = 0;
852 CFS_RESET_BITMAP(bitmap);
857 size = (lo->ll_bitmap_size + 7) >> 3;
858 rc = dt_read(env, obj, lfsck_buf_get(env, bitmap->data, size), &pos);
860 RETURN(rc >= 0 ? -EINVAL : rc);
862 if (cfs_bitmap_check_empty(bitmap))
863 lad->lad_incomplete = 0;
865 lad->lad_incomplete = 1;
871 * Load the layout LFSCK trace file from disk.
873 * The layout LFSCK trace file records the layout LFSCK status information
874 * and other statistics, such as how many objects have been scanned, and how
875 * many objects have been repaired, and etc. It also contains the bitmap for
876 * failed OSTs during the layout LFSCK. All these information will be loaded
877 * from disk to RAM when the layout LFSCK component setup.
879 * \param[in] env pointer to the thread context
880 * \param[in] com pointer to the lfsck component
882 * \retval positive number for file data corruption, the caller
883 * should reset the layout LFSCK trace file
884 * \retval 0 for success
885 * \retval negative error number on failure
887 static int lfsck_layout_load(const struct lu_env *env,
888 struct lfsck_component *com)
890 struct lfsck_layout *lo = com->lc_file_ram;
891 ssize_t size = com->lc_file_size;
895 rc = dt_read(env, com->lc_obj,
896 lfsck_buf_get(env, com->lc_file_disk, size), &pos);
900 CDEBUG(D_LFSCK, "%s: failed to load lfsck_layout: rc = %d\n",
901 lfsck_lfsck2name(com->lc_lfsck), rc);
903 } else if (rc != size) {
904 CDEBUG(D_LFSCK, "%s: lfsck_layout size %u != %u; reset it\n",
905 lfsck_lfsck2name(com->lc_lfsck), rc, (unsigned int)size);
909 lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
910 if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
911 CDEBUG(D_LFSCK, "%s: invalid lfsck_layout magic %#x != %#x, "
912 "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
913 lo->ll_magic, LFSCK_LAYOUT_MAGIC);
921 * Store the layout LFSCK trace file on disk.
923 * The layout LFSCK trace file records the layout LFSCK status information
924 * and other statistics, such as how many objects have been scanned, and how
925 * many objects have been repaired, and etc. It also contains the bitmap for
926 * failed OSTs during the layout LFSCK. All these information will be synced
927 * from RAM to disk periodically.
929 * \param[in] env pointer to the thread context
930 * \param[in] com pointer to the lfsck component
932 * \retval 0 for success
933 * \retval negative error number on failure
935 static int lfsck_layout_store(const struct lu_env *env,
936 struct lfsck_component *com)
938 struct dt_object *obj = com->lc_obj;
939 struct lfsck_instance *lfsck = com->lc_lfsck;
940 struct lfsck_layout *lo_ram = com->lc_file_ram;
941 struct lfsck_layout *lo = com->lc_file_disk;
943 struct dt_device *dev = lfsck->li_bottom;
944 cfs_bitmap_t *bitmap = NULL;
946 ssize_t size = com->lc_file_size;
951 if (lfsck->li_master) {
952 struct lfsck_assistant_data *lad = com->lc_data;
954 bitmap = lad->lad_bitmap;
955 nbits = bitmap->size;
958 LASSERTF((nbits & 7) == 0, "Invalid nbits %u\n", nbits);
961 lo_ram->ll_bitmap_size = nbits;
962 lfsck_layout_cpu_to_le(lo, lo_ram);
963 th = dt_trans_create(env, dev);
965 GOTO(log, rc = PTR_ERR(th));
967 rc = dt_declare_record_write(env, obj, lfsck_buf_get(env, lo, size),
972 if (bitmap != NULL) {
973 rc = dt_declare_record_write(env, obj,
974 lfsck_buf_get(env, bitmap->data, nbits >> 3),
980 rc = dt_trans_start_local(env, dev, th);
985 rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos, th);
989 if (bitmap != NULL) {
991 rc = dt_record_write(env, obj,
992 lfsck_buf_get(env, bitmap->data, nbits >> 3),
999 dt_trans_stop(env, dev, th);
1003 CDEBUG(D_LFSCK, "%s: fail to store lfsck_layout: rc = %d\n",
1004 lfsck_lfsck2name(lfsck), rc);
1009 static int lfsck_layout_init(const struct lu_env *env,
1010 struct lfsck_component *com)
1012 struct lfsck_layout *lo = com->lc_file_ram;
1015 memset(lo, 0, com->lc_file_size);
1016 lo->ll_magic = LFSCK_LAYOUT_MAGIC;
1017 lo->ll_status = LS_INIT;
1018 down_write(&com->lc_sem);
1019 rc = lfsck_layout_store(env, com);
1020 up_write(&com->lc_sem);
1025 static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
1026 struct dt_object *obj, const struct lu_fid *fid)
1028 struct seq_server_site *ss = lu_site2seq(dt->dd_lu_dev.ld_site);
1029 struct lu_seq_range *range = &lfsck_env_info(env)->lti_range;
1030 struct lustre_mdt_attrs *lma;
1033 fld_range_set_any(range);
1034 rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
1036 if (fld_range_is_ost(range))
1042 lma = &lfsck_env_info(env)->lti_lma;
1043 rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
1044 XATTR_NAME_LMA, BYPASS_CAPA);
1045 if (rc == sizeof(*lma)) {
1046 lustre_lma_swab(lma);
1048 return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
1051 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
1056 static struct lfsck_layout_seq *
1057 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
1059 struct lfsck_layout_seq *lls;
1061 list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1062 if (lls->lls_seq == seq)
1065 if (lls->lls_seq > seq)
1073 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
1074 struct lfsck_layout_seq *lls)
1076 struct lfsck_layout_seq *tmp;
1077 struct list_head *pos = &llsd->llsd_seq_list;
1079 list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
1080 if (lls->lls_seq < tmp->lls_seq) {
1081 pos = &tmp->lls_list;
1085 list_add_tail(&lls->lls_list, pos);
1089 lfsck_layout_lastid_create(const struct lu_env *env,
1090 struct lfsck_instance *lfsck,
1091 struct dt_object *obj)
1093 struct lfsck_thread_info *info = lfsck_env_info(env);
1094 struct lu_attr *la = &info->lti_la;
1095 struct dt_object_format *dof = &info->lti_dof;
1096 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1097 struct dt_device *dt = lfsck->li_bottom;
1104 if (bk->lb_param & LPF_DRYRUN)
1107 memset(la, 0, sizeof(*la));
1108 la->la_mode = S_IFREG | S_IRUGO | S_IWUSR;
1109 la->la_valid = LA_MODE | LA_UID | LA_GID;
1110 dof->dof_type = dt_mode_to_dft(S_IFREG);
1112 th = dt_trans_create(env, dt);
1114 GOTO(log, rc = PTR_ERR(th));
1116 rc = dt_declare_create(env, obj, la, NULL, dof, th);
1120 rc = dt_declare_record_write(env, obj,
1121 lfsck_buf_get(env, &lastid,
1127 rc = dt_trans_start_local(env, dt, th);
1131 dt_write_lock(env, obj, 0);
1132 if (likely(dt_object_exists(obj) == 0)) {
1133 rc = dt_create(env, obj, la, NULL, dof, th);
1135 rc = dt_record_write(env, obj,
1136 lfsck_buf_get(env, &lastid, sizeof(lastid)),
1139 dt_write_unlock(env, obj);
1144 dt_trans_stop(env, dt, th);
1147 CDEBUG(D_LFSCK, "%s: layout LFSCK will create LAST_ID for <seq> "
1149 lfsck_lfsck2name(lfsck), fid_seq(lfsck_dto2fid(obj)), rc);
1155 lfsck_layout_lastid_reload(const struct lu_env *env,
1156 struct lfsck_component *com,
1157 struct lfsck_layout_seq *lls)
1163 dt_read_lock(env, lls->lls_lastid_obj, 0);
1164 rc = dt_record_read(env, lls->lls_lastid_obj,
1165 lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
1166 dt_read_unlock(env, lls->lls_lastid_obj);
1167 if (unlikely(rc != 0))
1170 lastid = le64_to_cpu(lastid);
1171 if (lastid < lls->lls_lastid_known) {
1172 struct lfsck_instance *lfsck = com->lc_lfsck;
1173 struct lfsck_layout *lo = com->lc_file_ram;
1175 lls->lls_lastid = lls->lls_lastid_known;
1177 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1178 LASSERT(lfsck->li_out_notify != NULL);
1180 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1181 LE_LASTID_REBUILDING);
1182 lo->ll_flags |= LF_CRASHED_LASTID;
1184 CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
1185 "LAST_ID file (1) for the sequence "LPX64
1186 ", old value "LPU64", known value "LPU64"\n",
1187 lfsck_lfsck2name(lfsck), lls->lls_seq,
1188 lastid, lls->lls_lastid);
1190 } else if (lastid >= lls->lls_lastid) {
1191 lls->lls_lastid = lastid;
1199 lfsck_layout_lastid_store(const struct lu_env *env,
1200 struct lfsck_component *com)
1202 struct lfsck_instance *lfsck = com->lc_lfsck;
1203 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1204 struct dt_device *dt = lfsck->li_bottom;
1205 struct lfsck_layout_slave_data *llsd = com->lc_data;
1206 struct lfsck_layout_seq *lls;
1212 list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1215 if (!lls->lls_dirty)
1218 CDEBUG(D_LFSCK, "%s: layout LFSCK will sync the LAST_ID for "
1219 "<seq> "LPX64" as <oid> "LPU64"\n",
1220 lfsck_lfsck2name(lfsck), lls->lls_seq, lls->lls_lastid);
1222 if (bk->lb_param & LPF_DRYRUN) {
1227 th = dt_trans_create(env, dt);
1230 CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1231 "the LAST_ID for <seq> "LPX64"(1): rc = %d\n",
1232 lfsck_lfsck2name(com->lc_lfsck),
1237 lastid = cpu_to_le64(lls->lls_lastid);
1238 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
1239 lfsck_buf_get(env, &lastid,
1245 rc = dt_trans_start_local(env, dt, th);
1249 dt_write_lock(env, lls->lls_lastid_obj, 0);
1250 rc = dt_record_write(env, lls->lls_lastid_obj,
1251 lfsck_buf_get(env, &lastid,
1252 sizeof(lastid)), &pos, th);
1253 dt_write_unlock(env, lls->lls_lastid_obj);
1258 dt_trans_stop(env, dt, th);
1261 CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1262 "the LAST_ID for <seq> "LPX64"(2): rc = %d\n",
1263 lfsck_lfsck2name(com->lc_lfsck),
1272 lfsck_layout_lastid_load(const struct lu_env *env,
1273 struct lfsck_component *com,
1274 struct lfsck_layout_seq *lls)
1276 struct lfsck_instance *lfsck = com->lc_lfsck;
1277 struct lfsck_layout *lo = com->lc_file_ram;
1278 struct lu_fid *fid = &lfsck_env_info(env)->lti_fid;
1279 struct dt_object *obj;
1284 lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
1285 obj = dt_locate(env, lfsck->li_bottom, fid);
1287 RETURN(PTR_ERR(obj));
1289 /* LAST_ID crashed, to be rebuilt */
1290 if (dt_object_exists(obj) == 0) {
1291 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1292 LASSERT(lfsck->li_out_notify != NULL);
1294 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1295 LE_LASTID_REBUILDING);
1296 lo->ll_flags |= LF_CRASHED_LASTID;
1298 CDEBUG(D_LFSCK, "%s: layout LFSCK cannot find the "
1299 "LAST_ID file for sequence "LPX64"\n",
1300 lfsck_lfsck2name(lfsck), lls->lls_seq);
1302 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
1304 struct l_wait_info lwi = LWI_TIMEOUT(
1305 cfs_time_seconds(cfs_fail_val),
1308 /* Some others may changed the cfs_fail_val
1309 * as zero after above check, re-check it for
1310 * sure to avoid falling into wait for ever. */
1311 if (likely(lwi.lwi_timeout > 0)) {
1312 struct ptlrpc_thread *thread =
1315 up_write(&com->lc_sem);
1316 l_wait_event(thread->t_ctl_waitq,
1317 !thread_is_running(thread),
1319 down_write(&com->lc_sem);
1324 rc = lfsck_layout_lastid_create(env, lfsck, obj);
1326 dt_read_lock(env, obj, 0);
1327 rc = dt_read(env, obj,
1328 lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
1330 dt_read_unlock(env, obj);
1331 if (rc != 0 && rc != sizeof(__u64))
1332 GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
1334 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
1335 LASSERT(lfsck->li_out_notify != NULL);
1337 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1338 LE_LASTID_REBUILDING);
1339 lo->ll_flags |= LF_CRASHED_LASTID;
1341 CDEBUG(D_LFSCK, "%s: layout LFSCK finds invalid "
1342 "LAST_ID file for the sequence "LPX64
1344 lfsck_lfsck2name(lfsck), lls->lls_seq, rc);
1347 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
1355 lfsck_object_put(env, obj);
1357 lls->lls_lastid_obj = obj;
1362 static void lfsck_layout_record_failure(const struct lu_env *env,
1363 struct lfsck_instance *lfsck,
1364 struct lfsck_layout *lo)
1368 lo->ll_objs_failed_phase1++;
1369 cookie = lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
1371 if (lo->ll_pos_first_inconsistent == 0 ||
1372 lo->ll_pos_first_inconsistent < cookie) {
1373 lo->ll_pos_first_inconsistent = cookie;
1375 CDEBUG(D_LFSCK, "%s: layout LFSCK hit first non-repaired "
1376 "inconsistency at the pos ["LPU64"]\n",
1377 lfsck_lfsck2name(lfsck),
1378 lo->ll_pos_first_inconsistent);
1382 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1383 struct lfsck_component *com,
1386 struct lfsck_instance *lfsck = com->lc_lfsck;
1387 struct lfsck_layout *lo = com->lc_file_ram;
1389 down_write(&com->lc_sem);
1390 lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1391 HALF_SEC - lfsck->li_time_last_checkpoint);
1392 lo->ll_time_last_checkpoint = cfs_time_current_sec();
1393 lo->ll_objs_checked_phase2 += com->lc_new_checked;
1396 if (lo->ll_flags & LF_INCOMPLETE) {
1397 lo->ll_status = LS_PARTIAL;
1399 if (lfsck->li_master) {
1400 struct lfsck_assistant_data *lad = com->lc_data;
1402 if (lad->lad_incomplete)
1403 lo->ll_status = LS_PARTIAL;
1405 lo->ll_status = LS_COMPLETED;
1407 lo->ll_status = LS_COMPLETED;
1410 if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
1411 lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1412 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1413 lo->ll_success_count++;
1414 } else if (rc == 0) {
1415 if (lfsck->li_status != 0)
1416 lo->ll_status = lfsck->li_status;
1418 lo->ll_status = LS_STOPPED;
1420 lo->ll_status = LS_FAILED;
1423 rc = lfsck_layout_store(env, com);
1424 up_write(&com->lc_sem);
1429 static int lfsck_layout_trans_stop(const struct lu_env *env,
1430 struct dt_device *dev,
1431 struct thandle *handle, int result)
1435 handle->th_result = result;
1436 rc = dt_trans_stop(env, dev, handle);
1446 * Get the system default stripe size.
1448 * \param[in] env pointer to the thread context
1449 * \param[in] lfsck pointer to the lfsck instance
1450 * \param[out] size pointer to the default stripe size
1452 * \retval 0 for success
1453 * \retval negative error number on failure
1455 static int lfsck_layout_get_def_stripesize(const struct lu_env *env,
1456 struct lfsck_instance *lfsck,
1459 struct lov_user_md *lum = &lfsck_env_info(env)->lti_lum;
1460 struct dt_object *root;
1463 root = dt_locate(env, lfsck->li_next, &lfsck->li_local_root_fid);
1465 return PTR_ERR(root);
1467 /* Get the default stripe size via xattr_get on the backend root. */
1468 rc = dt_xattr_get(env, root, lfsck_buf_get(env, lum, sizeof(*lum)),
1469 XATTR_NAME_LOV, BYPASS_CAPA);
1471 /* The lum->lmm_stripe_size is LE mode. The *size also
1472 * should be LE mode. So it is unnecessary to convert. */
1473 *size = lum->lmm_stripe_size;
1475 } else if (unlikely(rc == 0)) {
1479 lfsck_object_put(env, root);
1485 * \retval +1: repaired
1486 * \retval 0: did nothing
1487 * \retval -ve: on error
1489 static int lfsck_layout_refill_lovea(const struct lu_env *env,
1490 struct thandle *handle,
1491 struct dt_object *parent,
1492 struct lu_fid *cfid,
1494 struct lov_ost_data_v1 *slot,
1495 int fl, __u32 ost_idx)
1497 struct ost_id *oi = &lfsck_env_info(env)->lti_oi;
1498 struct lov_mds_md_v1 *lmm = buf->lb_buf;
1499 struct lu_buf ea_buf;
1504 magic = le32_to_cpu(lmm->lmm_magic);
1505 count = le16_to_cpu(lmm->lmm_stripe_count);
1507 fid_to_ostid(cfid, oi);
1508 ostid_cpu_to_le(oi, &slot->l_ost_oi);
1509 slot->l_ost_gen = cpu_to_le32(0);
1510 slot->l_ost_idx = cpu_to_le32(ost_idx);
1512 if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE) {
1513 struct lov_ost_data_v1 *objs;
1516 if (magic == LOV_MAGIC_V1)
1517 objs = &lmm->lmm_objects[0];
1519 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1520 for (i = 0; i < count; i++, objs++) {
1521 if (objs != slot && lovea_slot_is_dummy(objs))
1525 /* If the @slot is the last dummy slot to be refilled,
1526 * then drop LOV_PATTERN_F_HOLE from lmm::lmm_pattern. */
1528 lmm->lmm_pattern &= ~cpu_to_le32(LOV_PATTERN_F_HOLE);
1531 lfsck_buf_init(&ea_buf, lmm, lov_mds_md_size(count, magic));
1532 rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV, fl, handle,
1541 * \retval +1: repaired
1542 * \retval 0: did nothing
1543 * \retval -ve: on error
1545 static int lfsck_layout_extend_lovea(const struct lu_env *env,
1546 struct lfsck_instance *lfsck,
1547 struct thandle *handle,
1548 struct dt_object *parent,
1549 struct lu_fid *cfid,
1550 struct lu_buf *buf, int fl,
1551 __u32 ost_idx, __u32 ea_off, bool reset)
1553 struct lov_mds_md_v1 *lmm = buf->lb_buf;
1554 struct lov_ost_data_v1 *objs;
1560 if (fl == LU_XATTR_CREATE || reset) {
1561 __u32 pattern = LOV_PATTERN_RAID0;
1564 LASSERT(buf->lb_len >= lov_mds_md_size(count, LOV_MAGIC_V1));
1566 if (ea_off != 0 || reset) {
1567 pattern |= LOV_PATTERN_F_HOLE;
1571 memset(lmm, 0, buf->lb_len);
1572 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1573 lmm->lmm_pattern = cpu_to_le32(pattern);
1574 fid_to_lmm_oi(lfsck_dto2fid(parent), &lmm->lmm_oi);
1575 lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
1577 rc = lfsck_layout_get_def_stripesize(env, lfsck,
1578 &lmm->lmm_stripe_size);
1582 objs = &lmm->lmm_objects[ea_off];
1584 __u32 magic = le32_to_cpu(lmm->lmm_magic);
1587 count = le16_to_cpu(lmm->lmm_stripe_count);
1588 if (magic == LOV_MAGIC_V1)
1589 objs = &lmm->lmm_objects[count];
1591 objs = &((struct lov_mds_md_v3 *)lmm)->
1594 gap = ea_off - count;
1597 LASSERT(buf->lb_len >= lov_mds_md_size(count, magic));
1600 memset(objs, 0, gap * sizeof(*objs));
1601 lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_HOLE);
1605 lmm->lmm_layout_gen =
1606 cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
1610 lmm->lmm_stripe_count = cpu_to_le16(count);
1611 rc = lfsck_layout_refill_lovea(env, handle, parent, cfid, buf, objs,
1614 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant extend layout EA for "
1615 DFID": parent "DFID", OST-index %u, stripe-index %u, fl %d, "
1616 "reset %s, %s LOV EA hole: rc = %d\n",
1617 lfsck_lfsck2name(lfsck), PFID(cfid), PFID(lfsck_dto2fid(parent)),
1618 ost_idx, ea_off, fl, reset ? "yes" : "no",
1619 hole ? "with" : "without", rc);
1625 * \retval +1: repaired
1626 * \retval 0: did nothing
1627 * \retval -ve: on error
1629 static int lfsck_layout_update_pfid(const struct lu_env *env,
1630 struct lfsck_component *com,
1631 struct dt_object *parent,
1632 struct lu_fid *cfid,
1633 struct dt_device *cdev, __u32 ea_off)
1635 struct filter_fid *pfid = &lfsck_env_info(env)->lti_new_pfid;
1636 struct dt_object *child;
1637 struct thandle *handle;
1638 const struct lu_fid *tfid = lu_object_fid(&parent->do_lu);
1643 child = lfsck_object_find_by_dev(env, cdev, cfid);
1645 RETURN(PTR_ERR(child));
1647 handle = dt_trans_create(env, cdev);
1649 GOTO(out, rc = PTR_ERR(handle));
1651 pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
1652 pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
1653 /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
1654 * MDT-object's FID::f_ver, instead it is the OST-object index in its
1655 * parent MDT-object's layout EA. */
1656 pfid->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1657 buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
1659 rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
1663 rc = dt_trans_start(env, cdev, handle);
1667 rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
1670 GOTO(stop, rc = (rc == 0 ? 1 : rc));
1673 dt_trans_stop(env, cdev, handle);
1676 lu_object_put(env, &child->do_lu);
1682 * This function will create the MDT-object with the given (partial) LOV EA.
1684 * Under some data corruption cases, the MDT-object of the file may be lost,
1685 * but its OST-objects, or some of them are there. The layout LFSCK needs to
1686 * re-create the MDT-object with the orphan OST-object(s) information.
1688 * On the other hand, the LFSCK may has created some OST-object for repairing
1689 * dangling LOV EA reference, but as the LFSCK processing, it may find that
1690 * the old OST-object is there and should replace the former new created OST
1691 * object. Unfortunately, some others have modified such newly created object.
1692 * To keep the data (both new and old), the LFSCK will create MDT-object with
1693 * new FID to reference the original OST-object.
1695 * \param[in] env pointer to the thread context
1696 * \param[in] com pointer to the lfsck component
1697 * \param[in] ltd pointer to target device descriptor
1698 * \param[in] rec pointer to the record for the orphan OST-object
1699 * \param[in] cfid pointer to FID for the orphan OST-object
1700 * \param[in] infix additional information, such as the FID for original
1701 * MDT-object and the stripe offset in the LOV EA
1702 * \param[in] type the type for describing why the orphan MDT-object is
1703 * created. The rules are as following:
1705 * type "C": Multiple OST-objects claim the same MDT-object and the
1706 * same slot in the layout EA. Then the LFSCK will create
1707 * new MDT-object(s) to hold the conflict OST-object(s).
1709 * type "N": The orphan OST-object does not know which one was the
1710 * real parent MDT-object, so the LFSCK uses new FID for
1711 * its parent MDT-object.
1713 * type "R": The orphan OST-object knows its parent MDT-object FID,
1714 * but does not know the position (the file name) in the
1717 * type "D": The MDT-object is a directory, it may knows its parent
1718 * but because there is no valid linkEA, the LFSCK cannot
1719 * know where to put it back to the namespace.
1720 * type "O": The MDT-object has no linkEA, and there is no name
1721 * entry that references the MDT-object.
1723 * type "P": The orphan object to be created was a parent directory
1724 * of some MDT-object which linkEA shows that the @orphan
1725 * object is missing.
1727 * The orphan name will be like:
1728 * ${FID}-${infix}-${type}-${conflict_version}
1730 * \param[in] ea_off the stripe offset in the LOV EA
1732 * \retval positive on repaired something
1733 * \retval 0 if needs to repair nothing
1734 * \retval negative error number on failure
1736 static int lfsck_layout_recreate_parent(const struct lu_env *env,
1737 struct lfsck_component *com,
1738 struct lfsck_tgt_desc *ltd,
1739 struct lu_orphan_rec *rec,
1740 struct lu_fid *cfid,
1745 struct lfsck_thread_info *info = lfsck_env_info(env);
1746 struct dt_insert_rec *dtrec = &info->lti_dt_rec;
1747 char *name = info->lti_key;
1748 struct lu_attr *la = &info->lti_la;
1749 struct dt_object_format *dof = &info->lti_dof;
1750 struct lfsck_instance *lfsck = com->lc_lfsck;
1751 struct lu_fid *pfid = &rec->lor_fid;
1752 struct lu_fid *tfid = &info->lti_fid3;
1753 struct dt_device *next = lfsck->li_next;
1754 struct dt_object *pobj = NULL;
1755 struct dt_object *cobj = NULL;
1756 struct thandle *th = NULL;
1757 struct lu_buf pbuf = { NULL };
1758 struct lu_buf *ea_buf = &info->lti_big_buf;
1759 struct lu_buf lov_buf;
1760 struct lustre_handle lh = { 0 };
1761 struct linkea_data ldata = { NULL };
1762 struct lu_buf linkea_buf;
1763 const struct lu_name *pname;
1769 if (unlikely(lfsck->li_lpf_obj == NULL))
1770 GOTO(log, rc = -ENXIO);
1772 if (fid_is_zero(pfid)) {
1773 struct filter_fid *ff = &info->lti_new_pfid;
1775 rc = lfsck_fid_alloc(env, lfsck, pfid, false);
1779 ff->ff_parent.f_seq = cpu_to_le64(pfid->f_seq);
1780 ff->ff_parent.f_oid = cpu_to_le32(pfid->f_oid);
1781 /* Currently, the filter_fid::ff_parent::f_ver is not the
1782 * real parent MDT-object's FID::f_ver, instead it is the
1783 * OST-object index in its parent MDT-object's layout EA. */
1784 ff->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1785 lfsck_buf_init(&pbuf, ff, sizeof(struct filter_fid));
1786 cobj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid);
1788 GOTO(log, rc = PTR_ERR(cobj));
1791 pobj = lfsck_object_find_by_dev(env, lfsck->li_bottom, pfid);
1793 GOTO(put, rc = PTR_ERR(pobj));
1795 LASSERT(infix != NULL);
1796 LASSERT(type != NULL);
1799 snprintf(name, NAME_MAX, DFID"%s-%s-%d", PFID(pfid), infix,
1801 rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
1802 (const struct dt_key *)name, BYPASS_CAPA);
1803 if (rc != 0 && rc != -ENOENT)
1807 rc = linkea_data_new(&ldata,
1808 &lfsck_env_info(env)->lti_linkea_buf);
1812 pname = lfsck_name_get_const(env, name, strlen(name));
1813 rc = linkea_add_buf(&ldata, pname, lfsck_dto2fid(lfsck->li_lpf_obj));
1817 memset(la, 0, sizeof(*la));
1818 la->la_uid = rec->lor_uid;
1819 la->la_gid = rec->lor_gid;
1820 la->la_mode = S_IFREG | S_IRUSR;
1821 la->la_valid = LA_MODE | LA_UID | LA_GID;
1823 memset(dof, 0, sizeof(*dof));
1824 dof->dof_type = dt_mode_to_dft(S_IFREG);
1826 size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
1827 if (ea_buf->lb_len < size) {
1828 lu_buf_realloc(ea_buf, size);
1829 if (ea_buf->lb_buf == NULL)
1830 GOTO(put, rc = -ENOMEM);
1833 /* Hold update lock on the .lustre/lost+found/MDTxxxx/.
1835 * XXX: Currently, we do not grab the PDO lock as normal create cases,
1836 * because creating MDT-object for orphan OST-object is rare, we
1837 * do not much care about the performance. It can be improved in
1838 * the future when needed. */
1839 rc = lfsck_ibits_lock(env, lfsck, lfsck->li_lpf_obj, &lh,
1840 MDS_INODELOCK_UPDATE, LCK_EX);
1844 th = dt_trans_create(env, next);
1846 GOTO(unlock, rc = PTR_ERR(th));
1848 /* 1a. Update OST-object's parent information remotely.
1850 * If other subsequent modifications failed, then next LFSCK scanning
1851 * will process the OST-object as orphan again with known parent FID. */
1853 rc = dt_declare_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID,
1859 /* 2a. Create the MDT-object locally. */
1860 rc = dt_declare_create(env, pobj, la, NULL, dof, th);
1864 /* 3a. Add layout EA for the MDT-object. */
1865 lfsck_buf_init(&lov_buf, ea_buf->lb_buf, size);
1866 rc = dt_declare_xattr_set(env, pobj, &lov_buf, XATTR_NAME_LOV,
1867 LU_XATTR_CREATE, th);
1871 /* 4a. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
1872 dtrec->rec_fid = pfid;
1873 dtrec->rec_type = S_IFREG;
1874 rc = dt_declare_insert(env, lfsck->li_lpf_obj,
1875 (const struct dt_rec *)dtrec,
1876 (const struct dt_key *)name, th);
1880 /* 5a. insert linkEA for parent. */
1881 lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1882 ldata.ld_leh->leh_len);
1883 rc = dt_declare_xattr_set(env, pobj, &linkea_buf,
1884 XATTR_NAME_LINK, 0, th);
1888 rc = dt_trans_start(env, next, th);
1892 /* 1b. Update OST-object's parent information remotely. */
1894 rc = dt_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID, 0, th,
1900 dt_write_lock(env, pobj, 0);
1901 /* 2b. Create the MDT-object locally. */
1902 rc = dt_create(env, pobj, la, NULL, dof, th);
1904 /* 3b. Add layout EA for the MDT-object. */
1905 rc = lfsck_layout_extend_lovea(env, lfsck, th, pobj, cfid,
1906 &lov_buf, LU_XATTR_CREATE,
1907 ltd->ltd_index, ea_off, false);
1908 dt_write_unlock(env, pobj);
1912 /* 4b. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
1913 rc = dt_insert(env, lfsck->li_lpf_obj, (const struct dt_rec *)dtrec,
1914 (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1918 /* 5b. insert linkEA for parent. */
1919 rc = dt_xattr_set(env, pobj, &linkea_buf,
1920 XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1925 dt_trans_stop(env, next, th);
1928 lfsck_ibits_unlock(&lh, LCK_EX);
1931 if (cobj != NULL && !IS_ERR(cobj))
1932 lu_object_put(env, &cobj->do_lu);
1933 if (pobj != NULL && !IS_ERR(pobj))
1934 lu_object_put(env, &pobj->do_lu);
1938 CDEBUG(D_LFSCK, "%s layout LFSCK assistant failed to "
1939 "recreate the lost MDT-object: parent "DFID
1940 ", child "DFID", OST-index %u, stripe-index %u, "
1941 "infix %s, type %s: rc = %d\n",
1942 lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid),
1943 ltd->ltd_index, ea_off, infix, type, rc);
1945 return rc >= 0 ? 1 : rc;
1948 static int lfsck_layout_master_conditional_destroy(const struct lu_env *env,
1949 struct lfsck_component *com,
1950 const struct lu_fid *fid,
1953 struct lfsck_thread_info *info = lfsck_env_info(env);
1954 struct lfsck_request *lr = &info->lti_lr;
1955 struct lfsck_instance *lfsck = com->lc_lfsck;
1956 struct lfsck_tgt_desc *ltd;
1957 struct ptlrpc_request *req;
1958 struct lfsck_request *tmp;
1959 struct obd_export *exp;
1963 ltd = lfsck_tgt_get(&lfsck->li_ost_descs, index);
1964 if (unlikely(ltd == NULL))
1968 if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
1969 GOTO(put, rc = -EOPNOTSUPP);
1971 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
1973 GOTO(put, rc = -ENOMEM);
1975 rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
1977 ptlrpc_request_free(req);
1982 memset(lr, 0, sizeof(*lr));
1983 lr->lr_event = LE_CONDITIONAL_DESTROY;
1984 lr->lr_active = LFSCK_TYPE_LAYOUT;
1987 tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1989 ptlrpc_request_set_replen(req);
1991 rc = ptlrpc_queue_wait(req);
1992 ptlrpc_req_finished(req);
2002 static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
2003 struct lfsck_component *com,
2004 struct lfsck_request *lr)
2006 struct lfsck_thread_info *info = lfsck_env_info(env);
2007 struct lu_attr *la = &info->lti_la;
2008 ldlm_policy_data_t *policy = &info->lti_policy;
2009 struct ldlm_res_id *resid = &info->lti_resid;
2010 struct lfsck_instance *lfsck = com->lc_lfsck;
2011 struct dt_device *dev = lfsck->li_bottom;
2012 struct lu_fid *fid = &lr->lr_fid;
2013 struct dt_object *obj;
2014 struct thandle *th = NULL;
2015 struct lustre_handle lh = { 0 };
2020 obj = lfsck_object_find_by_dev(env, dev, fid);
2022 RETURN(PTR_ERR(obj));
2024 dt_read_lock(env, obj, 0);
2025 if (dt_object_exists(obj) == 0 ||
2026 lfsck_is_dead_obj(obj)) {
2027 dt_read_unlock(env, obj);
2029 GOTO(put, rc = -ENOENT);
2032 /* Get obj's attr without lock firstly. */
2033 rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2034 dt_read_unlock(env, obj);
2038 if (likely(la->la_ctime != 0 || la->la_mode & S_ISUID))
2039 GOTO(put, rc = -ETXTBSY);
2041 /* Acquire extent lock on [0, EOF] to sync with all possible written. */
2042 LASSERT(lfsck->li_namespace != NULL);
2044 memset(policy, 0, sizeof(*policy));
2045 policy->l_extent.end = OBD_OBJECT_EOF;
2046 ost_fid_build_resid(fid, resid);
2047 rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_EXTENT,
2048 policy, LCK_EX, &flags, ldlm_blocking_ast,
2049 ldlm_completion_ast, NULL, NULL, 0,
2050 LVB_T_NONE, NULL, &lh);
2052 GOTO(put, rc = -EIO);
2054 dt_write_lock(env, obj, 0);
2055 /* Get obj's attr within lock again. */
2056 rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2060 if (la->la_ctime != 0)
2061 GOTO(unlock, rc = -ETXTBSY);
2063 th = dt_trans_create(env, dev);
2065 GOTO(unlock, rc = PTR_ERR(th));
2067 rc = dt_declare_ref_del(env, obj, th);
2071 rc = dt_declare_destroy(env, obj, th);
2075 rc = dt_trans_start_local(env, dev, th);
2079 rc = dt_ref_del(env, obj, th);
2083 rc = dt_destroy(env, obj, th);
2085 CDEBUG(D_LFSCK, "%s: layout LFSCK destroyed the empty "
2086 "OST-object "DFID" that was created for reparing "
2087 "dangling referenced case. But the original missing "
2088 "OST-object is found now.\n",
2089 lfsck_lfsck2name(lfsck), PFID(fid));
2094 dt_trans_stop(env, dev, th);
2097 dt_write_unlock(env, obj);
2098 ldlm_lock_decref(&lh, LCK_EX);
2101 lu_object_put(env, &obj->do_lu);
2107 * Some OST-object has occupied the specified layout EA slot.
2108 * Such OST-object may be generated by the LFSCK when repair
2109 * dangling referenced MDT-object, which can be indicated by
2110 * attr::la_ctime == 0 but without S_ISUID in la_mode. If it
2111 * is true and such OST-object has not been modified yet, we
2112 * will replace it with the orphan OST-object; otherwise the
2113 * LFSCK will create new MDT-object to reference the orphan.
2115 * \retval +1: repaired
2116 * \retval 0: did nothing
2117 * \retval -ve: on error
2119 static int lfsck_layout_conflict_create(const struct lu_env *env,
2120 struct lfsck_component *com,
2121 struct lfsck_tgt_desc *ltd,
2122 struct lu_orphan_rec *rec,
2123 struct dt_object *parent,
2124 struct lu_fid *cfid,
2125 struct lu_buf *ea_buf,
2126 struct lov_ost_data_v1 *slot,
2129 struct lfsck_thread_info *info = lfsck_env_info(env);
2130 struct lu_fid *cfid2 = &info->lti_fid2;
2131 struct ost_id *oi = &info->lti_oi;
2132 struct lov_mds_md_v1 *lmm = ea_buf->lb_buf;
2133 struct dt_device *dev = com->lc_lfsck->li_bottom;
2134 struct thandle *th = NULL;
2135 struct lustre_handle lh = { 0 };
2136 __u32 ost_idx2 = le32_to_cpu(slot->l_ost_idx);
2140 ostid_le_to_cpu(&slot->l_ost_oi, oi);
2141 rc = ostid_to_fid(cfid2, oi, ost_idx2);
2145 /* Hold layout lock on the parent to prevent others to access. */
2146 rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2147 MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2152 rc = lfsck_layout_master_conditional_destroy(env, com, cfid2, ost_idx2);
2154 /* If the conflict OST-obejct is not created for fixing dangling
2155 * referenced MDT-object in former LFSCK check/repair, or it has
2156 * been modified by others, then we cannot destroy it. Re-create
2157 * a new MDT-object for the orphan OST-object. */
2158 if (rc == -ETXTBSY) {
2159 /* No need the layout lock on the original parent. */
2160 lfsck_ibits_unlock(&lh, LCK_EX);
2162 fid_zero(&rec->lor_fid);
2163 snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2164 "-"DFID"-%x", PFID(lu_object_fid(&parent->do_lu)),
2166 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2167 info->lti_tmpbuf, "C", ea_off);
2172 if (rc != 0 && rc != -ENOENT)
2175 th = dt_trans_create(env, dev);
2177 GOTO(unlock, rc = PTR_ERR(th));
2179 rc = dt_declare_xattr_set(env, parent, ea_buf, XATTR_NAME_LOV,
2180 LU_XATTR_REPLACE, th);
2184 rc = dt_trans_start_local(env, dev, th);
2188 dt_write_lock(env, parent, 0);
2189 lmm->lmm_layout_gen = cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2190 rc = lfsck_layout_refill_lovea(env, th, parent, cfid, ea_buf, slot,
2191 LU_XATTR_REPLACE, ltd->ltd_index);
2192 dt_write_unlock(env, parent);
2197 dt_trans_stop(env, dev, th);
2200 lfsck_ibits_unlock(&lh, LCK_EX);
2203 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant replaced the conflict "
2204 "OST-object "DFID" on the OST %x with the orphan "DFID" on "
2205 "the OST %x: parent "DFID", stripe-index %u: rc = %d\n",
2206 lfsck_lfsck2name(com->lc_lfsck), PFID(cfid2), ost_idx2,
2207 PFID(cfid), ltd->ltd_index, PFID(lfsck_dto2fid(parent)),
2210 return rc >= 0 ? 1 : rc;
2214 * \retval +1: repaired
2215 * \retval 0: did nothing
2216 * \retval -ve: on error
2218 static int lfsck_layout_recreate_lovea(const struct lu_env *env,
2219 struct lfsck_component *com,
2220 struct lfsck_tgt_desc *ltd,
2221 struct lu_orphan_rec *rec,
2222 struct dt_object *parent,
2223 struct lu_fid *cfid,
2224 __u32 ost_idx, __u32 ea_off)
2226 struct lfsck_thread_info *info = lfsck_env_info(env);
2227 struct lu_buf *buf = &info->lti_big_buf;
2228 struct lu_fid *fid = &info->lti_fid2;
2229 struct ost_id *oi = &info->lti_oi;
2230 struct lfsck_instance *lfsck = com->lc_lfsck;
2231 struct dt_device *dt = lfsck->li_bottom;
2232 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
2233 struct thandle *handle = NULL;
2235 struct lov_mds_md_v1 *lmm;
2236 struct lov_ost_data_v1 *objs;
2237 struct lustre_handle lh = { 0 };
2244 bool locked = false;
2247 rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
2248 MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2251 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to recreate "
2252 "LOV EA for "DFID": parent "DFID", OST-index %u, "
2253 "stripe-index %u: rc = %d\n",
2254 lfsck_lfsck2name(lfsck), PFID(cfid),
2255 PFID(lfsck_dto2fid(parent)), ost_idx, ea_off, rc);
2262 dt_write_unlock(env, parent);
2266 if (handle != NULL) {
2267 dt_trans_stop(env, dt, handle);
2272 GOTO(unlock_layout, rc);
2275 if (buf->lb_len < lovea_size) {
2276 lu_buf_realloc(buf, lovea_size);
2277 if (buf->lb_buf == NULL)
2278 GOTO(unlock_layout, rc = -ENOMEM);
2281 if (!(bk->lb_param & LPF_DRYRUN)) {
2282 handle = dt_trans_create(env, dt);
2284 GOTO(unlock_layout, rc = PTR_ERR(handle));
2286 rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2291 rc = dt_trans_start_local(env, dt, handle);
2296 dt_write_lock(env, parent, 0);
2298 rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2299 if (rc == -ERANGE) {
2300 rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV,
2304 } else if (rc == -ENODATA || rc == 0) {
2305 lovea_size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2306 /* If the declared is not big enough, re-try. */
2307 if (buf->lb_len < lovea_size) {
2311 fl = LU_XATTR_CREATE;
2312 } else if (rc < 0) {
2313 GOTO(unlock_parent, rc);
2314 } else if (unlikely(buf->lb_len == 0)) {
2317 fl = LU_XATTR_REPLACE;
2321 if (fl == LU_XATTR_CREATE) {
2322 if (bk->lb_param & LPF_DRYRUN)
2323 GOTO(unlock_parent, rc = 1);
2325 LASSERT(buf->lb_len >= lovea_size);
2327 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2328 buf, fl, ost_idx, ea_off, false);
2330 GOTO(unlock_parent, rc);
2334 rc1 = lfsck_layout_verify_header(lmm);
2336 /* If the LOV EA crashed, the rebuild it. */
2337 if (rc1 == -EINVAL) {
2338 if (bk->lb_param & LPF_DRYRUN)
2339 GOTO(unlock_parent, rc = 1);
2341 LASSERT(buf->lb_len >= lovea_size);
2343 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2344 buf, fl, ost_idx, ea_off, true);
2346 GOTO(unlock_parent, rc);
2349 /* For other unknown magic/pattern, keep the current LOV EA. */
2351 GOTO(unlock_parent, rc = rc1);
2353 /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2354 * been verified in lfsck_layout_verify_header() already. If some
2355 * new magic introduced in the future, then layout LFSCK needs to
2356 * be updated also. */
2357 magic = le32_to_cpu(lmm->lmm_magic);
2358 if (magic == LOV_MAGIC_V1) {
2359 objs = &lmm->lmm_objects[0];
2361 LASSERT(magic == LOV_MAGIC_V3);
2362 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2365 count = le16_to_cpu(lmm->lmm_stripe_count);
2367 GOTO(unlock_parent, rc = -EINVAL);
2370 /* Exceed the current end of MDT-object layout EA. Then extend it. */
2371 if (count <= ea_off) {
2372 if (bk->lb_param & LPF_DRYRUN)
2373 GOTO(unlock_parent, rc = 1);
2375 lovea_size = lov_mds_md_size(ea_off + 1, magic);
2376 /* If the declared is not big enough, re-try. */
2377 if (buf->lb_len < lovea_size) {
2382 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2383 buf, fl, ost_idx, ea_off, false);
2385 GOTO(unlock_parent, rc);
2388 LASSERTF(rc > 0, "invalid rc = %d\n", rc);
2390 for (i = 0; i < count; i++, objs++) {
2391 /* The MDT-object was created via lfsck_layout_recover_create()
2392 * by others before, and we fill the dummy layout EA. */
2393 if (lovea_slot_is_dummy(objs)) {
2397 if (bk->lb_param & LPF_DRYRUN)
2398 GOTO(unlock_parent, rc = 1);
2400 lmm->lmm_layout_gen =
2401 cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2402 rc = lfsck_layout_refill_lovea(env, handle, parent,
2403 cfid, buf, objs, fl,
2406 CDEBUG(D_LFSCK, "%s layout LFSCK assistant fill "
2407 "dummy layout slot for "DFID": parent "DFID
2408 ", OST-index %u, stripe-index %u: rc = %d\n",
2409 lfsck_lfsck2name(lfsck), PFID(cfid),
2410 PFID(lfsck_dto2fid(parent)), ost_idx, i, rc);
2412 GOTO(unlock_parent, rc);
2415 ostid_le_to_cpu(&objs->l_ost_oi, oi);
2416 rc = ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
2418 CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
2419 "invalid layout EA at the slot %d, index %u\n",
2420 lfsck_lfsck2name(lfsck),
2421 PFID(lfsck_dto2fid(parent)), i,
2422 le32_to_cpu(objs->l_ost_idx));
2424 GOTO(unlock_parent, rc);
2427 /* It should be rare case, the slot is there, but the LFSCK
2428 * does not handle it during the first-phase cycle scanning. */
2429 if (unlikely(lu_fid_eq(fid, cfid))) {
2431 GOTO(unlock_parent, rc = 0);
2433 /* Rare case that the OST-object index
2434 * does not match the parent MDT-object
2435 * layout EA. We trust the later one. */
2436 if (bk->lb_param & LPF_DRYRUN)
2437 GOTO(unlock_parent, rc = 1);
2439 dt_write_unlock(env, parent);
2441 dt_trans_stop(env, dt, handle);
2442 lfsck_ibits_unlock(&lh, LCK_EX);
2443 rc = lfsck_layout_update_pfid(env, com, parent,
2444 cfid, ltd->ltd_tgt, i);
2446 CDEBUG(D_LFSCK, "%s layout LFSCK assistant "
2447 "updated OST-object's pfid for "DFID
2448 ": parent "DFID", OST-index %u, "
2449 "stripe-index %u: rc = %d\n",
2450 lfsck_lfsck2name(lfsck), PFID(cfid),
2451 PFID(lfsck_dto2fid(parent)),
2452 ltd->ltd_index, i, rc);
2459 /* The MDT-object exists, but related layout EA slot is occupied
2461 if (bk->lb_param & LPF_DRYRUN)
2462 GOTO(unlock_parent, rc = 1);
2464 dt_write_unlock(env, parent);
2466 dt_trans_stop(env, dt, handle);
2467 lfsck_ibits_unlock(&lh, LCK_EX);
2468 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
2469 objs = &lmm->lmm_objects[ea_off];
2471 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off];
2472 rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid,
2479 dt_write_unlock(env, parent);
2483 dt_trans_stop(env, dt, handle);
2486 lfsck_ibits_unlock(&lh, LCK_EX);
2491 static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
2492 struct lfsck_component *com,
2493 struct lfsck_tgt_desc *ltd,
2494 struct lu_orphan_rec *rec,
2495 struct lu_fid *cfid)
2497 struct lfsck_layout *lo = com->lc_file_ram;
2498 struct lu_fid *pfid = &rec->lor_fid;
2499 struct dt_object *parent = NULL;
2500 __u32 ea_off = pfid->f_stripe_idx;
2504 if (!fid_is_sane(cfid))
2505 GOTO(out, rc = -EINVAL);
2507 if (fid_is_zero(pfid)) {
2508 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2514 if (!fid_is_sane(pfid))
2515 GOTO(out, rc = -EINVAL);
2517 parent = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
2519 GOTO(out, rc = PTR_ERR(parent));
2521 if (unlikely(dt_object_remote(parent) != 0))
2522 GOTO(put, rc = -EXDEV);
2524 if (dt_object_exists(parent) == 0) {
2525 lu_object_put(env, &parent->do_lu);
2526 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2531 if (!S_ISREG(lu_object_attr(&parent->do_lu)))
2532 GOTO(put, rc = -EISDIR);
2534 rc = lfsck_layout_recreate_lovea(env, com, ltd, rec, parent, cfid,
2535 ltd->ltd_index, ea_off);
2541 lu_object_put(env, &parent->do_lu);
2543 /* The layout EA is changed, need to be reloaded next time. */
2544 lu_object_put_nocache(env, &parent->do_lu);
2547 down_write(&com->lc_sem);
2548 com->lc_new_scanned++;
2549 com->lc_new_checked++;
2551 lo->ll_objs_repaired[LLIT_ORPHAN - 1]++;
2553 } else if (rc < 0) {
2554 lo->ll_objs_failed_phase2++;
2556 up_write(&com->lc_sem);
2561 static int lfsck_layout_scan_orphan(const struct lu_env *env,
2562 struct lfsck_component *com,
2563 struct lfsck_tgt_desc *ltd)
2565 struct lfsck_assistant_data *lad = com->lc_data;
2566 struct lfsck_instance *lfsck = com->lc_lfsck;
2567 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
2568 struct lfsck_thread_info *info = lfsck_env_info(env);
2569 struct ost_id *oi = &info->lti_oi;
2570 struct lu_fid *fid = &info->lti_fid;
2571 struct dt_object *obj;
2572 const struct dt_it_ops *iops;
2577 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant starts the orphan "
2578 "scanning for OST%04x\n",
2579 lfsck_lfsck2name(lfsck), ltd->ltd_index);
2581 if (cfs_bitmap_check(lad->lad_bitmap, ltd->ltd_index)) {
2582 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant skip the orphan "
2583 "scanning for OST%04x\n",
2584 lfsck_lfsck2name(lfsck), ltd->ltd_index);
2589 ostid_set_seq(oi, FID_SEQ_IDIF);
2590 ostid_set_id(oi, 0);
2591 rc = ostid_to_fid(fid, oi, ltd->ltd_index);
2595 obj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, fid);
2596 if (unlikely(IS_ERR(obj)))
2597 GOTO(log, rc = PTR_ERR(obj));
2599 rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_orphan_features);
2603 iops = &obj->do_index_ops->dio_it;
2604 di = iops->init(env, obj, 0, BYPASS_CAPA);
2606 GOTO(put, rc = PTR_ERR(di));
2608 rc = iops->load(env, di, 0);
2610 /* -ESRCH means that the orphan OST-objects rbtree has been
2611 * cleanup because of the OSS server restart or other errors. */
2612 lfsck_lad_set_bitmap(env, com, ltd->ltd_index);
2617 rc = iops->next(env, di);
2629 struct lu_orphan_rec *rec = &info->lti_rec;
2631 if (CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_DELAY3, cfs_fail_val) &&
2632 unlikely(!thread_is_running(&lfsck->li_thread)))
2635 key = iops->key(env, di);
2636 com->lc_fid_latest_scanned_phase2 = *(struct lu_fid *)key;
2637 rc = iops->rec(env, di, (struct dt_rec *)rec, 0);
2639 rc = lfsck_layout_scan_orphan_one(env, com, ltd, rec,
2640 &com->lc_fid_latest_scanned_phase2);
2641 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
2644 lfsck_control_speed_by_self(com);
2646 rc = iops->next(env, di);
2647 } while (rc < 0 && !(bk->lb_param & LPF_FAILOUT));
2654 iops->fini(env, di);
2656 lu_object_put(env, &obj->do_lu);
2659 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant finished the orphan "
2660 "scanning for OST%04x: rc = %d\n",
2661 lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
2663 return rc > 0 ? 0 : rc;
2666 /* For the MDT-object with dangling reference, we need to repare the
2667 * inconsistency according to the LFSCK sponsor's requirement:
2669 * 1) Keep the inconsistency there and report the inconsistency case,
2670 * then give the chance to the application to find related issues,
2671 * and the users can make the decision about how to handle it with
2672 * more human knownledge. (by default)
2674 * 2) Re-create the missing OST-object with the FID/owner information. */
2675 static int lfsck_layout_repair_dangling(const struct lu_env *env,
2676 struct lfsck_component *com,
2677 struct dt_object *parent,
2678 struct lfsck_layout_req *llr,
2679 const struct lu_attr *pla)
2681 struct lfsck_thread_info *info = lfsck_env_info(env);
2682 struct filter_fid *pfid = &info->lti_new_pfid;
2683 struct dt_allocation_hint *hint = &info->lti_hint;
2684 struct lu_attr *cla = &info->lti_la2;
2685 struct dt_object *child = llr->llr_child;
2686 struct dt_device *dev = lfsck_obj2dt_dev(child);
2687 const struct lu_fid *tfid = lu_object_fid(&parent->do_lu);
2688 struct thandle *handle;
2690 struct lustre_handle lh = { 0 };
2695 if (com->lc_lfsck->li_bookmark_ram.lb_param & LPF_CREATE_OSTOBJ)
2703 memset(cla, 0, sizeof(*cla));
2704 cla->la_uid = pla->la_uid;
2705 cla->la_gid = pla->la_gid;
2706 cla->la_mode = S_IFREG | 0666;
2707 cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
2708 LA_ATIME | LA_MTIME | LA_CTIME;
2710 rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2711 MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2716 handle = dt_trans_create(env, dev);
2718 GOTO(unlock1, rc = PTR_ERR(handle));
2720 hint->dah_parent = NULL;
2722 pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2723 pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2724 /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2725 * MDT-object's FID::f_ver, instead it is the OST-object index in its
2726 * parent MDT-object's layout EA. */
2727 pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
2728 buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2730 rc = dt_declare_create(env, child, cla, hint, NULL, handle);
2734 rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID,
2735 LU_XATTR_CREATE, handle);
2739 rc = dt_trans_start(env, dev, handle);
2743 dt_read_lock(env, parent, 0);
2744 if (unlikely(lfsck_is_dead_obj(parent)))
2745 GOTO(unlock2, rc = 1);
2747 rc = dt_create(env, child, cla, hint, NULL, handle);
2751 rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, LU_XATTR_CREATE,
2752 handle, BYPASS_CAPA);
2757 dt_read_unlock(env, parent);
2760 rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2763 lfsck_ibits_unlock(&lh, LCK_EX);
2766 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant found dangling "
2767 "reference for: parent "DFID", child "DFID", OST-index %u, "
2768 "stripe-index %u, owner %u/%u. %s: rc = %d\n",
2769 lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
2770 PFID(lfsck_dto2fid(child)), llr->llr_ost_idx,
2771 llr->llr_lov_idx, pla->la_uid, pla->la_gid,
2772 create ? "Create the lost OST-object as required" :
2773 "Keep the MDT-object there by default", rc);
2778 /* If the OST-object does not recognize the MDT-object as its parent, and
2779 * there is no other MDT-object claims as its parent, then just trust the
2780 * given MDT-object as its parent. So update the OST-object filter_fid. */
2781 static int lfsck_layout_repair_unmatched_pair(const struct lu_env *env,
2782 struct lfsck_component *com,
2783 struct dt_object *parent,
2784 struct lfsck_layout_req *llr,
2785 const struct lu_attr *pla)
2787 struct lfsck_thread_info *info = lfsck_env_info(env);
2788 struct filter_fid *pfid = &info->lti_new_pfid;
2789 struct lu_attr *tla = &info->lti_la3;
2790 struct dt_object *child = llr->llr_child;
2791 struct dt_device *dev = lfsck_obj2dt_dev(child);
2792 const struct lu_fid *tfid = lu_object_fid(&parent->do_lu);
2793 struct thandle *handle;
2795 struct lustre_handle lh = { 0 };
2799 rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2800 MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2805 handle = dt_trans_create(env, dev);
2807 GOTO(unlock1, rc = PTR_ERR(handle));
2809 pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2810 pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2811 /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2812 * MDT-object's FID::f_ver, instead it is the OST-object index in its
2813 * parent MDT-object's layout EA. */
2814 pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
2815 buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2817 rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
2821 tla->la_valid = LA_UID | LA_GID;
2822 tla->la_uid = pla->la_uid;
2823 tla->la_gid = pla->la_gid;
2824 rc = dt_declare_attr_set(env, child, tla, handle);
2828 rc = dt_trans_start(env, dev, handle);
2832 dt_write_lock(env, parent, 0);
2833 if (unlikely(lfsck_is_dead_obj(parent)))
2834 GOTO(unlock2, rc = 1);
2836 rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
2841 /* Get the latest parent's owner. */
2842 rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
2846 tla->la_valid = LA_UID | LA_GID;
2847 rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
2852 dt_write_unlock(env, parent);
2855 rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2858 lfsck_ibits_unlock(&lh, LCK_EX);
2861 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired unmatched "
2862 "MDT-OST pair for: parent "DFID", child "DFID", OST-index %u, "
2863 "stripe-index %u, owner %u/%u: rc = %d\n",
2864 lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
2865 PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
2866 pla->la_uid, pla->la_gid, rc);
2871 /* If there are more than one MDT-objects claim as the OST-object's parent,
2872 * and the OST-object only recognizes one of them, then we need to generate
2873 * new OST-object(s) with new fid(s) for the non-recognized MDT-object(s). */
2874 static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
2875 struct lfsck_component *com,
2876 struct dt_object *parent,
2877 struct lfsck_layout_req *llr,
2881 struct lfsck_thread_info *info = lfsck_env_info(env);
2882 struct dt_allocation_hint *hint = &info->lti_hint;
2883 struct dt_object_format *dof = &info->lti_dof;
2884 struct dt_device *pdev = com->lc_lfsck->li_next;
2885 struct ost_id *oi = &info->lti_oi;
2886 struct dt_device *cdev = lfsck_obj2dt_dev(llr->llr_child);
2887 struct dt_object *child = NULL;
2888 struct lu_device *d = &cdev->dd_lu_dev;
2889 struct lu_object *o = NULL;
2890 struct thandle *handle;
2891 struct lov_mds_md_v1 *lmm;
2892 struct lov_ost_data_v1 *objs;
2893 struct lustre_handle lh = { 0 };
2894 struct lu_buf ea_buf;
2899 rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2900 MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2905 handle = dt_trans_create(env, pdev);
2907 GOTO(unlock1, rc = PTR_ERR(handle));
2909 o = lu_object_anon(env, d, NULL);
2911 GOTO(stop, rc = PTR_ERR(o));
2913 child = container_of(o, struct dt_object, do_lu);
2914 o = lu_object_locate(o->lo_header, d->ld_type);
2915 if (unlikely(o == NULL))
2916 GOTO(stop, rc = -EINVAL);
2918 child = container_of(o, struct dt_object, do_lu);
2919 la->la_valid = LA_UID | LA_GID;
2920 hint->dah_parent = NULL;
2922 dof->dof_type = DFT_REGULAR;
2923 rc = dt_declare_create(env, child, la, NULL, NULL, handle);
2927 rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2928 LU_XATTR_REPLACE, handle);
2932 rc = dt_trans_start(env, pdev, handle);
2936 dt_write_lock(env, parent, 0);
2937 if (unlikely(lfsck_is_dead_obj(parent)))
2938 GOTO(unlock2, rc = 0);
2940 rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2941 if (unlikely(rc == 0 || rc == -ENODATA || rc == -ERANGE))
2942 GOTO(unlock2, rc = 0);
2945 /* Someone change layout during the LFSCK, no need to repair then. */
2946 if (le16_to_cpu(lmm->lmm_layout_gen) != llr->llr_parent->llo_gen)
2947 GOTO(unlock2, rc = 0);
2949 rc = dt_create(env, child, la, hint, dof, handle);
2953 /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2954 * been verified in lfsck_layout_verify_header() already. If some
2955 * new magic introduced in the future, then layout LFSCK needs to
2956 * be updated also. */
2957 magic = le32_to_cpu(lmm->lmm_magic);
2958 if (magic == LOV_MAGIC_V1) {
2959 objs = &lmm->lmm_objects[0];
2961 LASSERT(magic == LOV_MAGIC_V3);
2962 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2965 lmm->lmm_layout_gen = cpu_to_le16(llr->llr_parent->llo_gen + 1);
2966 fid_to_ostid(lu_object_fid(&child->do_lu), oi);
2967 ostid_cpu_to_le(oi, &objs[llr->llr_lov_idx].l_ost_oi);
2968 objs[llr->llr_lov_idx].l_ost_gen = cpu_to_le32(0);
2969 objs[llr->llr_lov_idx].l_ost_idx = cpu_to_le32(llr->llr_ost_idx);
2970 lfsck_buf_init(&ea_buf, lmm,
2971 lov_mds_md_size(le16_to_cpu(lmm->lmm_stripe_count),
2973 rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV,
2974 LU_XATTR_REPLACE, handle, BYPASS_CAPA);
2976 GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
2979 dt_write_unlock(env, parent);
2983 lu_object_put(env, &child->do_lu);
2985 dt_trans_stop(env, pdev, handle);
2988 lfsck_ibits_unlock(&lh, LCK_EX);
2991 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired multiple "
2992 "references for: parent "DFID", OST-index %u, stripe-index %u, "
2993 "owner %u/%u: rc = %d\n",
2994 lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
2995 llr->llr_ost_idx, llr->llr_lov_idx, la->la_uid, la->la_gid, rc);
3000 /* If the MDT-object and the OST-object have different owner information,
3001 * then trust the MDT-object, because the normal chown/chgrp handle order
3002 * is from MDT to OST, and it is possible that some chown/chgrp operation
3003 * is partly done. */
3004 static int lfsck_layout_repair_owner(const struct lu_env *env,
3005 struct lfsck_component *com,
3006 struct dt_object *parent,
3007 struct lfsck_layout_req *llr,
3008 struct lu_attr *pla)
3010 struct lfsck_thread_info *info = lfsck_env_info(env);
3011 struct lu_attr *tla = &info->lti_la3;
3012 struct dt_object *child = llr->llr_child;
3013 struct dt_device *dev = lfsck_obj2dt_dev(child);
3014 struct thandle *handle;
3018 handle = dt_trans_create(env, dev);
3020 GOTO(log, rc = PTR_ERR(handle));
3022 tla->la_uid = pla->la_uid;
3023 tla->la_gid = pla->la_gid;
3024 tla->la_valid = LA_UID | LA_GID;
3025 rc = dt_declare_attr_set(env, child, tla, handle);
3029 rc = dt_trans_start(env, dev, handle);
3033 /* Use the dt_object lock to serialize with destroy and attr_set. */
3034 dt_read_lock(env, parent, 0);
3035 if (unlikely(lfsck_is_dead_obj(parent)))
3036 GOTO(unlock, rc = 1);
3038 /* Get the latest parent's owner. */
3039 rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
3043 /* Some others chown/chgrp during the LFSCK, needs to do nothing. */
3044 if (unlikely(tla->la_uid != pla->la_uid ||
3045 tla->la_gid != pla->la_gid))
3046 GOTO(unlock, rc = 1);
3048 tla->la_valid = LA_UID | LA_GID;
3049 rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
3054 dt_read_unlock(env, parent);
3057 rc = lfsck_layout_trans_stop(env, dev, handle, rc);
3060 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired inconsistent "
3061 "file owner for: parent "DFID", child "DFID", OST-index %u, "
3062 "stripe-index %u, owner %u/%u: rc = %d\n",
3063 lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3064 PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
3065 pla->la_uid, pla->la_gid, rc);
3070 /* Check whether the OST-object correctly back points to the
3071 * MDT-object (@parent) via the XATTR_NAME_FID xattr (@pfid). */
3072 static int lfsck_layout_check_parent(const struct lu_env *env,
3073 struct lfsck_component *com,
3074 struct dt_object *parent,
3075 const struct lu_fid *pfid,
3076 const struct lu_fid *cfid,
3077 const struct lu_attr *pla,
3078 const struct lu_attr *cla,
3079 struct lfsck_layout_req *llr,
3080 struct lu_buf *lov_ea, __u32 idx)
3082 struct lfsck_thread_info *info = lfsck_env_info(env);
3083 struct lu_buf *buf = &info->lti_big_buf;
3084 struct dt_object *tobj;
3085 struct lov_mds_md_v1 *lmm;
3086 struct lov_ost_data_v1 *objs;
3087 struct lustre_handle lh = { 0 };
3094 if (fid_is_zero(pfid)) {
3095 /* client never wrote. */
3096 if (cla->la_size == 0 && cla->la_blocks == 0) {
3097 if (unlikely(cla->la_uid != pla->la_uid ||
3098 cla->la_gid != pla->la_gid))
3099 RETURN (LLIT_INCONSISTENT_OWNER);
3104 RETURN(LLIT_UNMATCHED_PAIR);
3107 if (unlikely(!fid_is_sane(pfid)))
3108 RETURN(LLIT_UNMATCHED_PAIR);
3110 if (lu_fid_eq(pfid, lu_object_fid(&parent->do_lu))) {
3111 if (llr->llr_lov_idx == idx)
3114 RETURN(LLIT_UNMATCHED_PAIR);
3117 tobj = lfsck_object_find(env, com->lc_lfsck, pfid);
3119 RETURN(PTR_ERR(tobj));
3121 if (dt_object_exists(tobj) == 0 ||
3122 lfsck_is_dead_obj(tobj))
3123 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3125 if (!S_ISREG(lfsck_object_type(tobj)))
3126 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3128 /* Load the tobj's layout EA, in spite of it is a local MDT-object or
3129 * remote one on another MDT. Then check whether the given OST-object
3130 * is in such layout. If yes, it is multiple referenced, otherwise it
3131 * is unmatched referenced case. */
3132 rc = lfsck_layout_get_lovea(env, tobj, buf);
3133 if (rc == 0 || rc == -ENOENT)
3134 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3140 magic = le32_to_cpu(lmm->lmm_magic);
3141 if (magic == LOV_MAGIC_V1) {
3142 objs = &lmm->lmm_objects[0];
3144 LASSERT(magic == LOV_MAGIC_V3);
3145 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3148 count = le16_to_cpu(lmm->lmm_stripe_count);
3149 for (i = 0; i < count; i++, objs++) {
3150 struct lu_fid *tfid = &info->lti_fid2;
3151 struct ost_id *oi = &info->lti_oi;
3154 if (lovea_slot_is_dummy(objs))
3157 ostid_le_to_cpu(&objs->l_ost_oi, oi);
3158 idx2 = le32_to_cpu(objs->l_ost_idx);
3159 rc = ostid_to_fid(tfid, oi, idx2);
3161 CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
3162 "invalid layout EA at the slot %d, index %u\n",
3163 lfsck_lfsck2name(com->lc_lfsck),
3164 PFID(pfid), i, idx2);
3166 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3169 if (lu_fid_eq(cfid, tfid)) {
3170 rc = lfsck_ibits_lock(env, com->lc_lfsck, tobj, &lh,
3171 MDS_INODELOCK_UPDATE |
3172 MDS_INODELOCK_LAYOUT |
3173 MDS_INODELOCK_XATTR,
3178 dt_read_lock(env, tobj, 0);
3180 /* For local MDT-object, re-check existence
3181 * after taken the lock. */
3182 if (!dt_object_remote(tobj)) {
3183 if (dt_object_exists(tobj) == 0 ||
3184 lfsck_is_dead_obj(tobj)) {
3185 rc = LLIT_UNMATCHED_PAIR;
3188 rc = LLIT_MULTIPLE_REFERENCED;
3194 /* For migration case, the new MDT-object and old
3195 * MDT-object may reference the same OST-object at
3196 * some migration internal time.
3198 * For remote MDT-object, the local MDT may not know
3199 * whether it has been removed or not. Try checking
3200 * for a non-existent xattr to check if this object
3201 * has been been removed or not. */
3202 rc = dt_xattr_get(env, tobj, &LU_BUF_NULL,
3203 XATTR_NAME_DUMMY, BYPASS_CAPA);
3204 if (unlikely(rc == -ENOENT || rc >= 0)) {
3205 rc = LLIT_UNMATCHED_PAIR;
3206 } else if (rc == -ENODATA) {
3208 rc = LLIT_MULTIPLE_REFERENCED;
3215 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3218 if (lustre_handle_is_used(&lh)) {
3219 dt_read_unlock(env, tobj);
3220 lfsck_ibits_unlock(&lh, LCK_EX);
3224 lfsck_object_put(env, tobj);
3229 static int lfsck_layout_assistant_handler_p1(const struct lu_env *env,
3230 struct lfsck_component *com,
3231 struct lfsck_assistant_req *lar)
3233 struct lfsck_layout_req *llr =
3234 container_of0(lar, struct lfsck_layout_req, llr_lar);
3235 struct lfsck_layout *lo = com->lc_file_ram;
3236 struct lfsck_thread_info *info = lfsck_env_info(env);
3237 struct filter_fid_old *pea = &info->lti_old_pfid;
3238 struct lu_fid *pfid = &info->lti_fid;
3239 struct lu_buf buf = { NULL };
3240 struct dt_object *parent;
3241 struct dt_object *child = llr->llr_child;
3242 struct lu_attr *pla = &info->lti_la;
3243 struct lu_attr *cla = &info->lti_la2;
3244 struct lfsck_instance *lfsck = com->lc_lfsck;
3245 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
3246 enum lfsck_layout_inconsistency_type type = LLIT_NONE;
3251 parent = lfsck_object_find(env, lfsck, &lar->lar_fid);
3253 RETURN(PTR_ERR(parent));
3255 if (unlikely(lfsck_is_dead_obj(parent)))
3256 GOTO(put_parent, rc = 0);
3258 rc = dt_attr_get(env, parent, pla, BYPASS_CAPA);
3262 rc = dt_attr_get(env, child, cla, BYPASS_CAPA);
3263 if (rc == -ENOENT) {
3264 if (unlikely(lfsck_is_dead_obj(parent)))
3265 GOTO(put_parent, rc = 0);
3267 type = LLIT_DANGLING;
3274 lfsck_buf_init(&buf, pea, sizeof(struct filter_fid_old));
3275 rc = dt_xattr_get(env, child, &buf, XATTR_NAME_FID, BYPASS_CAPA);
3276 if (unlikely(rc >= 0 && rc != sizeof(struct filter_fid_old) &&
3277 rc != sizeof(struct filter_fid))) {
3278 type = LLIT_UNMATCHED_PAIR;
3282 if (rc < 0 && rc != -ENODATA)
3285 if (rc == -ENODATA) {
3288 fid_le_to_cpu(pfid, &pea->ff_parent);
3289 /* Currently, the filter_fid::ff_parent::f_ver is not the
3290 * real parent MDT-object's FID::f_ver, instead it is the
3291 * OST-object index in its parent MDT-object's layout EA. */
3292 idx = pfid->f_stripe_idx;
3296 rc = lfsck_layout_check_parent(env, com, parent, pfid,
3297 lu_object_fid(&child->do_lu),
3298 pla, cla, llr, &buf, idx);
3307 if (unlikely(cla->la_uid != pla->la_uid ||
3308 cla->la_gid != pla->la_gid)) {
3309 type = LLIT_INCONSISTENT_OWNER;
3314 if (bk->lb_param & LPF_DRYRUN) {
3315 if (type != LLIT_NONE)
3323 rc = lfsck_layout_repair_dangling(env, com, parent, llr, pla);
3325 case LLIT_UNMATCHED_PAIR:
3326 rc = lfsck_layout_repair_unmatched_pair(env, com, parent,
3329 case LLIT_MULTIPLE_REFERENCED:
3330 rc = lfsck_layout_repair_multiple_references(env, com, parent,
3333 case LLIT_INCONSISTENT_OWNER:
3334 rc = lfsck_layout_repair_owner(env, com, parent, llr, pla);
3344 down_write(&com->lc_sem);
3346 struct lfsck_assistant_data *lad = com->lc_data;
3348 if (unlikely(lad->lad_exit)) {
3350 } else if (rc == -ENOTCONN || rc == -ESHUTDOWN ||
3351 rc == -ETIMEDOUT || rc == -EHOSTDOWN ||
3352 rc == -EHOSTUNREACH) {
3353 /* If cannot touch the target server,
3354 * mark the LFSCK as INCOMPLETE. */
3355 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant fail to "
3356 "talk with OST %x: rc = %d\n",
3357 lfsck_lfsck2name(lfsck), llr->llr_ost_idx, rc);
3358 lfsck_lad_set_bitmap(env, com, llr->llr_ost_idx);
3359 lo->ll_objs_skipped++;
3362 lfsck_layout_record_failure(env, lfsck, lo);
3364 } else if (rc > 0) {
3365 LASSERTF(type > LLIT_NONE && type <= LLIT_MAX,
3366 "unknown type = %d\n", type);
3368 lo->ll_objs_repaired[type - 1]++;
3369 if (bk->lb_param & LPF_DRYRUN &&
3370 unlikely(lo->ll_pos_first_inconsistent == 0))
3371 lo->ll_pos_first_inconsistent =
3372 lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
3375 up_write(&com->lc_sem);
3378 lu_object_put(env, &parent->do_lu);
3383 static int lfsck_layout_assistant_handler_p2(const struct lu_env *env,
3384 struct lfsck_component *com)
3386 struct lfsck_assistant_data *lad = com->lc_data;
3387 struct lfsck_instance *lfsck = com->lc_lfsck;
3388 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
3389 struct lfsck_tgt_descs *ltds = &lfsck->li_ost_descs;
3390 struct lfsck_tgt_desc *ltd;
3394 CDEBUG(D_LFSCK, "%s: layout LFSCK phase2 scan start\n",
3395 lfsck_lfsck2name(lfsck));
3397 spin_lock(<ds->ltd_lock);
3398 while (!list_empty(&lad->lad_ost_phase2_list)) {
3399 ltd = list_entry(lad->lad_ost_phase2_list.next,
3400 struct lfsck_tgt_desc,
3401 ltd_layout_phase_list);
3402 list_del_init(<d->ltd_layout_phase_list);
3403 if (bk->lb_param & LPF_ALL_TGT) {
3404 spin_unlock(<ds->ltd_lock);
3405 rc = lfsck_layout_scan_orphan(env, com, ltd);
3406 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
3409 if (unlikely(lad->lad_exit ||
3410 !thread_is_running(&lfsck->li_thread)))
3412 spin_lock(<ds->ltd_lock);
3416 if (list_empty(&lad->lad_ost_phase1_list))
3420 spin_unlock(<ds->ltd_lock);
3422 CDEBUG(D_LFSCK, "%s: layout LFSCK phase2 scan stop: rc = %d\n",
3423 lfsck_lfsck2name(lfsck), rc);
3429 lfsck_layout_slave_async_interpret(const struct lu_env *env,
3430 struct ptlrpc_request *req,
3433 struct lfsck_layout_slave_async_args *llsaa = args;
3434 struct obd_export *exp = llsaa->llsaa_exp;
3435 struct lfsck_component *com = llsaa->llsaa_com;
3436 struct lfsck_layout_slave_target *llst = llsaa->llsaa_llst;
3437 struct lfsck_layout_slave_data *llsd = com->lc_data;
3438 struct lfsck_reply *lr = NULL;
3442 /* It is quite probably caused by target crash,
3443 * to make the LFSCK can go ahead, assume that
3444 * the target finished the LFSCK prcoessing. */
3447 lr = req_capsule_server_get(&req->rq_pill, &RMF_LFSCK_REPLY);
3448 if (lr->lr_status != LS_SCANNING_PHASE1 &&
3449 lr->lr_status != LS_SCANNING_PHASE2)
3454 CDEBUG(D_LFSCK, "%s: layout LFSCK slave gets the MDT %x "
3455 "status %d\n", lfsck_lfsck2name(com->lc_lfsck),
3456 llst->llst_index, lr != NULL ? lr->lr_status : rc);
3458 lfsck_layout_llst_del(llsd, llst);
3461 lfsck_layout_llst_put(llst);
3462 lfsck_component_put(env, com);
3463 class_export_put(exp);
3468 static int lfsck_layout_async_query(const struct lu_env *env,
3469 struct lfsck_component *com,
3470 struct obd_export *exp,
3471 struct lfsck_layout_slave_target *llst,
3472 struct lfsck_request *lr,
3473 struct ptlrpc_request_set *set)
3475 struct lfsck_layout_slave_async_args *llsaa;
3476 struct ptlrpc_request *req;
3477 struct lfsck_request *tmp;
3481 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_QUERY);
3485 rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_QUERY);
3487 ptlrpc_request_free(req);
3491 tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3493 ptlrpc_request_set_replen(req);
3495 llsaa = ptlrpc_req_async_args(req);
3496 llsaa->llsaa_exp = exp;
3497 llsaa->llsaa_com = lfsck_component_get(com);
3498 llsaa->llsaa_llst = llst;
3499 req->rq_interpret_reply = lfsck_layout_slave_async_interpret;
3500 ptlrpc_set_add_req(set, req);
3505 static int lfsck_layout_async_notify(const struct lu_env *env,
3506 struct obd_export *exp,
3507 struct lfsck_request *lr,
3508 struct ptlrpc_request_set *set)
3510 struct ptlrpc_request *req;
3511 struct lfsck_request *tmp;
3515 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
3519 rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
3521 ptlrpc_request_free(req);
3525 tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3527 ptlrpc_request_set_replen(req);
3528 ptlrpc_set_add_req(set, req);
3534 lfsck_layout_slave_query_master(const struct lu_env *env,
3535 struct lfsck_component *com)
3537 struct lfsck_request *lr = &lfsck_env_info(env)->lti_lr;
3538 struct lfsck_instance *lfsck = com->lc_lfsck;
3539 struct lfsck_layout_slave_data *llsd = com->lc_data;
3540 struct lfsck_layout_slave_target *llst;
3541 struct obd_export *exp;
3542 struct ptlrpc_request_set *set;
3547 set = ptlrpc_prep_set();
3549 GOTO(log, rc = -ENOMEM);
3551 memset(lr, 0, sizeof(*lr));
3552 lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
3553 lr->lr_event = LE_QUERY;
3554 lr->lr_active = LFSCK_TYPE_LAYOUT;
3556 llsd->llsd_touch_gen++;
3557 spin_lock(&llsd->llsd_lock);
3558 while (!list_empty(&llsd->llsd_master_list)) {
3559 llst = list_entry(llsd->llsd_master_list.next,
3560 struct lfsck_layout_slave_target,
3562 if (llst->llst_gen == llsd->llsd_touch_gen)
3565 llst->llst_gen = llsd->llsd_touch_gen;
3566 list_move_tail(&llst->llst_list,
3567 &llsd->llsd_master_list);
3568 atomic_inc(&llst->llst_ref);
3569 spin_unlock(&llsd->llsd_lock);
3571 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
3574 lfsck_layout_llst_del(llsd, llst);
3575 lfsck_layout_llst_put(llst);
3576 spin_lock(&llsd->llsd_lock);
3580 rc = lfsck_layout_async_query(env, com, exp, llst, lr, set);
3582 CDEBUG(D_LFSCK, "%s: layout LFSCK slave fail to "
3583 "query %s for layout: rc = %d\n",
3584 lfsck_lfsck2name(lfsck),
3585 exp->exp_obd->obd_name, rc);
3588 lfsck_layout_llst_put(llst);
3589 class_export_put(exp);
3591 spin_lock(&llsd->llsd_lock);
3593 spin_unlock(&llsd->llsd_lock);
3595 rc = ptlrpc_set_wait(set);
3596 ptlrpc_set_destroy(set);
3598 GOTO(log, rc = (rc1 != 0 ? rc1 : rc));
3601 CDEBUG(D_LFSCK, "%s: layout LFSCK slave queries master: rc = %d\n",
3602 lfsck_lfsck2name(com->lc_lfsck), rc);
3608 lfsck_layout_slave_notify_master(const struct lu_env *env,
3609 struct lfsck_component *com,
3610 enum lfsck_events event, int result)
3612 struct lfsck_layout *lo = com->lc_file_ram;
3613 struct lfsck_instance *lfsck = com->lc_lfsck;
3614 struct lfsck_layout_slave_data *llsd = com->lc_data;
3615 struct lfsck_request *lr = &lfsck_env_info(env)->lti_lr;
3616 struct lfsck_layout_slave_target *llst;
3617 struct obd_export *exp;
3618 struct ptlrpc_request_set *set;
3622 CDEBUG(D_LFSCK, "%s: layout LFSCK slave notifies master\n",
3623 lfsck_lfsck2name(com->lc_lfsck));
3625 set = ptlrpc_prep_set();
3629 memset(lr, 0, sizeof(*lr));
3630 lr->lr_event = event;
3631 lr->lr_flags = LEF_FROM_OST;
3632 lr->lr_status = result;
3633 lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
3634 lr->lr_active = LFSCK_TYPE_LAYOUT;
3635 lr->lr_flags2 = lo->ll_flags;
3636 llsd->llsd_touch_gen++;
3637 spin_lock(&llsd->llsd_lock);
3638 while (!list_empty(&llsd->llsd_master_list)) {
3639 llst = list_entry(llsd->llsd_master_list.next,
3640 struct lfsck_layout_slave_target,
3642 if (llst->llst_gen == llsd->llsd_touch_gen)
3645 llst->llst_gen = llsd->llsd_touch_gen;
3646 list_move_tail(&llst->llst_list,
3647 &llsd->llsd_master_list);
3648 atomic_inc(&llst->llst_ref);
3649 spin_unlock(&llsd->llsd_lock);
3651 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
3654 lfsck_layout_llst_del(llsd, llst);
3655 lfsck_layout_llst_put(llst);
3656 spin_lock(&llsd->llsd_lock);
3660 rc = lfsck_layout_async_notify(env, exp, lr, set);
3662 CDEBUG(D_LFSCK, "%s: layout LFSCK slave fail to "
3663 "notify %s for layout: rc = %d\n",
3664 lfsck_lfsck2name(lfsck),
3665 exp->exp_obd->obd_name, rc);
3667 lfsck_layout_llst_put(llst);
3668 class_export_put(exp);
3669 spin_lock(&llsd->llsd_lock);
3671 spin_unlock(&llsd->llsd_lock);
3673 ptlrpc_set_wait(set);
3674 ptlrpc_set_destroy(set);
3680 * \ret -ENODATA: unrecognized stripe
3681 * \ret = 0 : recognized stripe
3682 * \ret < 0 : other failures
3684 static int lfsck_layout_master_check_pairs(const struct lu_env *env,
3685 struct lfsck_component *com,
3686 struct lu_fid *cfid,
3687 struct lu_fid *pfid)
3689 struct lfsck_thread_info *info = lfsck_env_info(env);
3690 struct lu_buf *buf = &info->lti_big_buf;
3691 struct ost_id *oi = &info->lti_oi;
3692 struct dt_object *obj;
3693 struct lov_mds_md_v1 *lmm;
3694 struct lov_ost_data_v1 *objs;
3695 __u32 idx = pfid->f_stripe_idx;
3703 obj = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
3705 RETURN(PTR_ERR(obj));
3707 dt_read_lock(env, obj, 0);
3708 if (unlikely(dt_object_exists(obj) == 0 ||
3709 lfsck_is_dead_obj(obj)))
3710 GOTO(unlock, rc = -ENOENT);
3712 if (!S_ISREG(lfsck_object_type(obj)))
3713 GOTO(unlock, rc = -ENODATA);
3715 rc = lfsck_layout_get_lovea(env, obj, buf);
3720 GOTO(unlock, rc = -ENODATA);
3723 rc = lfsck_layout_verify_header(lmm);
3727 /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
3728 * been verified in lfsck_layout_verify_header() already. If some
3729 * new magic introduced in the future, then layout LFSCK needs to
3730 * be updated also. */
3731 magic = le32_to_cpu(lmm->lmm_magic);
3732 if (magic == LOV_MAGIC_V1) {
3733 objs = &lmm->lmm_objects[0];
3735 LASSERT(magic == LOV_MAGIC_V3);
3736 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3739 fid_to_ostid(cfid, oi);
3740 count = le16_to_cpu(lmm->lmm_stripe_count);
3741 for (i = 0; i < count; i++, objs++) {
3744 ostid_le_to_cpu(&objs->l_ost_oi, &oi2);
3745 if (memcmp(oi, &oi2, sizeof(*oi)) == 0)
3746 GOTO(unlock, rc = (i != idx ? -ENODATA : 0));
3749 GOTO(unlock, rc = -ENODATA);
3752 dt_read_unlock(env, obj);
3753 lu_object_put(env, &obj->do_lu);
3759 * The LFSCK-on-OST will ask the LFSCK-on-MDT to check whether the given
3760 * MDT-object/OST-object pairs match or not to aviod transfer MDT-object
3761 * layout EA from MDT to OST. On one hand, the OST no need to understand
3762 * the layout EA structure; on the other hand, it may cause trouble when
3763 * transfer large layout EA from MDT to OST via normal OUT RPC.
3765 * \ret > 0: unrecognized stripe
3766 * \ret = 0: recognized stripe
3767 * \ret < 0: other failures
3769 static int lfsck_layout_slave_check_pairs(const struct lu_env *env,
3770 struct lfsck_component *com,
3771 struct lu_fid *cfid,
3772 struct lu_fid *pfid)
3774 struct lfsck_instance *lfsck = com->lc_lfsck;
3775 struct obd_device *obd = lfsck->li_obd;
3776 struct seq_server_site *ss =
3777 lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
3778 struct obd_export *exp = NULL;
3779 struct ptlrpc_request *req = NULL;
3780 struct lfsck_request *lr;
3781 struct lu_seq_range *range = &lfsck_env_info(env)->lti_range;
3785 if (unlikely(fid_is_idif(pfid)))
3788 fld_range_set_any(range);
3789 rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(pfid), range);
3791 RETURN(rc == -ENOENT ? 1 : rc);
3793 if (unlikely(!fld_range_is_mdt(range)))
3796 exp = lustre_find_lwp_by_index(obd->obd_name, range->lsr_index);
3797 if (unlikely(exp == NULL))
3800 if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
3801 GOTO(out, rc = -EOPNOTSUPP);
3803 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
3805 GOTO(out, rc = -ENOMEM);
3807 rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
3809 ptlrpc_request_free(req);
3814 lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3815 memset(lr, 0, sizeof(*lr));
3816 lr->lr_event = LE_PAIRS_VERIFY;
3817 lr->lr_active = LFSCK_TYPE_LAYOUT;
3818 lr->lr_fid = *cfid; /* OST-object itself FID. */
3819 lr->lr_fid2 = *pfid; /* The claimed parent FID. */
3821 ptlrpc_request_set_replen(req);
3822 rc = ptlrpc_queue_wait(req);
3823 ptlrpc_req_finished(req);
3825 if (rc == -ENOENT || rc == -ENODATA)
3832 class_export_put(exp);
3837 static int lfsck_layout_slave_repair_pfid(const struct lu_env *env,
3838 struct lfsck_component *com,
3839 struct lfsck_request *lr)
3841 struct lfsck_thread_info *info = lfsck_env_info(env);
3842 struct filter_fid *ff = &info->lti_new_pfid;
3844 struct dt_device *dev = com->lc_lfsck->li_bottom;
3845 struct dt_object *obj;
3846 struct thandle *th = NULL;
3850 obj = lfsck_object_find_by_dev(env, dev, &lr->lr_fid);
3852 GOTO(log, rc = PTR_ERR(obj));
3854 fid_cpu_to_le(&ff->ff_parent, &lr->lr_fid2);
3855 buf = lfsck_buf_get(env, ff, sizeof(*ff));
3856 dt_write_lock(env, obj, 0);
3857 if (unlikely(dt_object_exists(obj) == 0 ||
3858 lfsck_is_dead_obj(obj)))
3859 GOTO(unlock, rc = 0);
3861 th = dt_trans_create(env, dev);
3863 GOTO(unlock, rc = PTR_ERR(th));
3865 rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th);
3869 rc = dt_trans_start_local(env, dev, th);
3873 rc = dt_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th, BYPASS_CAPA);
3878 dt_trans_stop(env, dev, th);
3881 dt_write_unlock(env, obj);
3882 lu_object_put(env, &obj->do_lu);
3885 CDEBUG(D_LFSCK, "%s: layout LFSCK slave repaired pfid for "DFID
3886 ", parent "DFID": rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
3887 PFID(&lr->lr_fid), PFID(&lr->lr_fid2), rc);
3894 static void lfsck_layout_slave_quit(const struct lu_env *env,
3895 struct lfsck_component *com);
3897 static int lfsck_layout_reset(const struct lu_env *env,
3898 struct lfsck_component *com, bool init)
3900 struct lfsck_layout *lo = com->lc_file_ram;
3903 down_write(&com->lc_sem);
3905 memset(lo, 0, com->lc_file_size);
3907 __u32 count = lo->ll_success_count;
3908 __u64 last_time = lo->ll_time_last_complete;
3910 memset(lo, 0, com->lc_file_size);
3911 lo->ll_success_count = count;
3912 lo->ll_time_last_complete = last_time;
3915 lo->ll_magic = LFSCK_LAYOUT_MAGIC;
3916 lo->ll_status = LS_INIT;
3918 if (com->lc_lfsck->li_master) {
3919 struct lfsck_assistant_data *lad = com->lc_data;
3921 lad->lad_incomplete = 0;
3922 CFS_RESET_BITMAP(lad->lad_bitmap);
3925 rc = lfsck_layout_store(env, com);
3926 up_write(&com->lc_sem);
3928 CDEBUG(D_LFSCK, "%s: layout LFSCK reset: rc = %d\n",
3929 lfsck_lfsck2name(com->lc_lfsck), rc);
3934 static void lfsck_layout_fail(const struct lu_env *env,
3935 struct lfsck_component *com, bool new_checked)
3937 struct lfsck_layout *lo = com->lc_file_ram;
3939 down_write(&com->lc_sem);
3941 com->lc_new_checked++;
3942 lfsck_layout_record_failure(env, com->lc_lfsck, lo);
3943 up_write(&com->lc_sem);
3946 static int lfsck_layout_master_checkpoint(const struct lu_env *env,
3947 struct lfsck_component *com, bool init)
3949 struct lfsck_instance *lfsck = com->lc_lfsck;
3950 struct lfsck_layout *lo = com->lc_file_ram;
3954 rc = lfsck_checkpoint_generic(env, com);
3956 return rc > 0 ? 0 : rc;
3959 down_write(&com->lc_sem);
3961 lo->ll_pos_latest_start =
3962 lfsck->li_pos_checkpoint.lp_oit_cookie;
3964 lo->ll_pos_last_checkpoint =
3965 lfsck->li_pos_checkpoint.lp_oit_cookie;
3966 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
3967 HALF_SEC - lfsck->li_time_last_checkpoint);
3968 lo->ll_time_last_checkpoint = cfs_time_current_sec();
3969 lo->ll_objs_checked_phase1 += com->lc_new_checked;
3970 com->lc_new_checked = 0;
3973 rc = lfsck_layout_store(env, com);
3974 up_write(&com->lc_sem);
3976 CDEBUG(D_LFSCK, "%s: layout LFSCK master checkpoint at the pos ["
3977 LPU64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
3978 lfsck->li_pos_current.lp_oit_cookie, rc);
3983 static int lfsck_layout_slave_checkpoint(const struct lu_env *env,
3984 struct lfsck_component *com, bool init)
3986 struct lfsck_instance *lfsck = com->lc_lfsck;
3987 struct lfsck_layout *lo = com->lc_file_ram;
3990 if (com->lc_new_checked == 0 && !init)
3993 down_write(&com->lc_sem);
3995 lo->ll_pos_latest_start =
3996 lfsck->li_pos_checkpoint.lp_oit_cookie;
3998 lo->ll_pos_last_checkpoint =
3999 lfsck->li_pos_checkpoint.lp_oit_cookie;
4000 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4001 HALF_SEC - lfsck->li_time_last_checkpoint);
4002 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4003 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4004 com->lc_new_checked = 0;
4007 rc = lfsck_layout_store(env, com);
4008 up_write(&com->lc_sem);
4010 CDEBUG(D_LFSCK, "%s: layout LFSCK slave checkpoint at the pos ["
4011 LPU64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
4012 lfsck->li_pos_current.lp_oit_cookie, rc);
4017 static int lfsck_layout_prep(const struct lu_env *env,
4018 struct lfsck_component *com,
4019 struct lfsck_start *start)
4021 struct lfsck_instance *lfsck = com->lc_lfsck;
4022 struct lfsck_layout *lo = com->lc_file_ram;
4023 struct lfsck_position *pos = &com->lc_pos_start;
4025 fid_zero(&pos->lp_dir_parent);
4026 pos->lp_dir_cookie = 0;
4027 if (lo->ll_status == LS_COMPLETED ||
4028 lo->ll_status == LS_PARTIAL ||
4029 /* To handle orphan, must scan from the beginning. */
4030 (start != NULL && start->ls_flags & LPF_OST_ORPHAN)) {
4033 rc = lfsck_layout_reset(env, com, false);
4035 rc = lfsck_set_param(env, lfsck, start, true);
4038 CDEBUG(D_LFSCK, "%s: layout LFSCK prep failed: "
4039 "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
4045 down_write(&com->lc_sem);
4046 lo->ll_time_latest_start = cfs_time_current_sec();
4047 spin_lock(&lfsck->li_lock);
4048 if (lo->ll_flags & LF_SCANNED_ONCE) {
4049 if (!lfsck->li_drop_dryrun ||
4050 lo->ll_pos_first_inconsistent == 0) {
4051 lo->ll_status = LS_SCANNING_PHASE2;
4052 list_move_tail(&com->lc_link,
4053 &lfsck->li_list_double_scan);
4054 pos->lp_oit_cookie = 0;
4058 lo->ll_status = LS_SCANNING_PHASE1;
4059 lo->ll_run_time_phase1 = 0;
4060 lo->ll_run_time_phase2 = 0;
4061 lo->ll_objs_checked_phase1 = 0;
4062 lo->ll_objs_checked_phase2 = 0;
4063 lo->ll_objs_failed_phase1 = 0;
4064 lo->ll_objs_failed_phase2 = 0;
4065 for (i = 0; i < LLIT_MAX; i++)
4066 lo->ll_objs_repaired[i] = 0;
4068 pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
4069 fid_zero(&com->lc_fid_latest_scanned_phase2);
4072 lo->ll_status = LS_SCANNING_PHASE1;
4073 if (!lfsck->li_drop_dryrun ||
4074 lo->ll_pos_first_inconsistent == 0)
4075 pos->lp_oit_cookie = lo->ll_pos_last_checkpoint + 1;
4077 pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
4079 spin_unlock(&lfsck->li_lock);
4080 up_write(&com->lc_sem);
4085 static int lfsck_layout_slave_prep(const struct lu_env *env,
4086 struct lfsck_component *com,
4087 struct lfsck_start_param *lsp)
4089 struct lfsck_layout_slave_data *llsd = com->lc_data;
4090 struct lfsck_instance *lfsck = com->lc_lfsck;
4091 struct lfsck_layout *lo = com->lc_file_ram;
4092 struct lfsck_start *start = lsp->lsp_start;
4095 rc = lfsck_layout_prep(env, com, start);
4099 if (lo->ll_flags & LF_CRASHED_LASTID &&
4100 list_empty(&llsd->llsd_master_list)) {
4101 LASSERT(lfsck->li_out_notify != NULL);
4103 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4104 LE_LASTID_REBUILDING);
4107 if (!lsp->lsp_index_valid)
4110 rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
4111 if (rc == 0 && start != NULL && start->ls_flags & LPF_OST_ORPHAN) {
4112 LASSERT(!llsd->llsd_rbtree_valid);
4114 write_lock(&llsd->llsd_rb_lock);
4115 rc = lfsck_rbtree_setup(env, com);
4116 write_unlock(&llsd->llsd_rb_lock);
4119 CDEBUG(D_LFSCK, "%s: layout LFSCK slave prep done, start pos ["
4120 LPU64"]\n", lfsck_lfsck2name(lfsck),
4121 com->lc_pos_start.lp_oit_cookie);
4126 static int lfsck_layout_master_prep(const struct lu_env *env,
4127 struct lfsck_component *com,
4128 struct lfsck_start_param *lsp)
4133 rc = lfsck_layout_load_bitmap(env, com);
4135 rc = lfsck_layout_reset(env, com, false);
4137 rc = lfsck_set_param(env, com->lc_lfsck,
4138 lsp->lsp_start, true);
4144 rc = lfsck_layout_prep(env, com, lsp->lsp_start);
4148 rc = lfsck_start_assistant(env, com, lsp);
4153 CDEBUG(D_LFSCK, "%s: layout LFSCK master prep done, start pos ["
4154 LPU64"]\n", lfsck_lfsck2name(com->lc_lfsck),
4155 com->lc_pos_start.lp_oit_cookie);
4160 /* Pre-fetch the attribute for each stripe in the given layout EA. */
4161 static int lfsck_layout_scan_stripes(const struct lu_env *env,
4162 struct lfsck_component *com,
4163 struct dt_object *parent,
4164 struct lov_mds_md_v1 *lmm)
4166 struct lfsck_thread_info *info = lfsck_env_info(env);
4167 struct lfsck_instance *lfsck = com->lc_lfsck;
4168 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
4169 struct lfsck_layout *lo = com->lc_file_ram;
4170 struct lfsck_assistant_data *lad = com->lc_data;
4171 struct lfsck_layout_object *llo = NULL;
4172 struct lov_ost_data_v1 *objs;
4173 struct lfsck_tgt_descs *ltds = &lfsck->li_ost_descs;
4174 struct ptlrpc_thread *mthread = &lfsck->li_thread;
4175 struct ptlrpc_thread *athread = &lad->lad_thread;
4176 struct l_wait_info lwi = { 0 };
4185 lfsck_buf_init(&buf, &info->lti_old_pfid,
4186 sizeof(struct filter_fid_old));
4187 count = le16_to_cpu(lmm->lmm_stripe_count);
4188 gen = le16_to_cpu(lmm->lmm_layout_gen);
4189 /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
4190 * been verified in lfsck_layout_verify_header() already. If some
4191 * new magic introduced in the future, then layout LFSCK needs to
4192 * be updated also. */
4193 magic = le32_to_cpu(lmm->lmm_magic);
4194 if (magic == LOV_MAGIC_V1) {
4195 objs = &lmm->lmm_objects[0];
4197 LASSERT(magic == LOV_MAGIC_V3);
4198 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
4201 for (i = 0; i < count; i++, objs++) {
4202 struct lu_fid *fid = &info->lti_fid;
4203 struct ost_id *oi = &info->lti_oi;
4204 struct lfsck_layout_req *llr;
4205 struct lfsck_tgt_desc *tgt = NULL;
4206 struct dt_object *cobj = NULL;
4208 bool wakeup = false;
4210 if (unlikely(lovea_slot_is_dummy(objs)))
4213 l_wait_event(mthread->t_ctl_waitq,
4214 bk->lb_async_windows == 0 ||
4215 lad->lad_prefetched < bk->lb_async_windows ||
4216 !thread_is_running(mthread) ||
4217 thread_is_stopped(athread),
4220 if (unlikely(!thread_is_running(mthread)) ||
4221 thread_is_stopped(athread))
4224 if (unlikely(lfsck_is_dead_obj(parent)))
4227 ostid_le_to_cpu(&objs->l_ost_oi, oi);
4228 index = le32_to_cpu(objs->l_ost_idx);
4229 rc = ostid_to_fid(fid, oi, index);
4231 CDEBUG(D_LFSCK, "%s: get invalid layout EA for "DFID
4232 ": "DOSTID", idx:%u\n", lfsck_lfsck2name(lfsck),
4233 PFID(lfsck_dto2fid(parent)), POSTID(oi), index);
4237 tgt = lfsck_tgt_get(ltds, index);
4238 if (unlikely(tgt == NULL)) {
4239 CDEBUG(D_LFSCK, "%s: cannot talk with OST %x which "
4240 "did not join the layout LFSCK\n",
4241 lfsck_lfsck2name(lfsck), index);
4242 lfsck_lad_set_bitmap(env, com, index);
4246 /* There is potential deadlock race condition between object
4247 * destroy and layout LFSCK. Consider the following scenario:
4249 * 1) The LFSCK thread obtained the parent object firstly, at
4250 * that time, the parent object has not been destroyed yet.
4252 * 2) One RPC service thread destroyed the parent and all its
4253 * children objects. Because the LFSCK is referencing the
4254 * parent object, then the parent object will be marked as
4255 * dying in RAM. On the other hand, the parent object is
4256 * referencing all its children objects, then all children
4257 * objects will be marked as dying in RAM also.
4259 * 3) The LFSCK thread tries to find some child object with
4260 * the parent object referenced. Then it will find that the
4261 * child object is dying. According to the object visibility
4262 * rules: the object with dying flag cannot be returned to
4263 * others. So the LFSCK thread has to wait until the dying
4264 * object has been purged from RAM, then it can allocate a
4265 * new object (with the same FID) in RAM. Unfortunately, the
4266 * LFSCK thread itself is referencing the parent object, and
4267 * cause the parent object cannot be purged, then cause the
4268 * child object cannot be purged also. So the LFSCK thread
4269 * will fall into deadlock.
4271 * We introduce non-blocked version lu_object_find() to allow
4272 * the LFSCK thread to return failure immediately (instead of
4273 * wait) when it finds dying (child) object, then the LFSCK
4274 * thread can check whether the parent object is dying or not.
4275 * So avoid above deadlock. LU-5395 */
4276 cobj = lfsck_object_find_by_dev_nowait(env, tgt->ltd_tgt, fid);
4278 if (lfsck_is_dead_obj(parent)) {
4288 rc = dt_declare_attr_get(env, cobj, BYPASS_CAPA);
4292 rc = dt_declare_xattr_get(env, cobj, &buf, XATTR_NAME_FID,
4298 llo = lfsck_layout_object_init(env, parent,
4299 lfsck->li_pos_current.lp_oit_cookie, gen);
4306 llr = lfsck_layout_assistant_req_init(llo,
4307 lfsck_dto2fid(parent),
4315 spin_lock(&lad->lad_lock);
4316 if (lad->lad_assistant_status < 0) {
4317 spin_unlock(&lad->lad_lock);
4318 lfsck_layout_assistant_req_fini(env, &llr->llr_lar);
4320 RETURN(lad->lad_assistant_status);
4323 list_add_tail(&llr->llr_lar.lar_list, &lad->lad_req_list);
4324 if (lad->lad_prefetched == 0)
4327 lad->lad_prefetched++;
4328 spin_unlock(&lad->lad_lock);
4330 wake_up_all(&athread->t_ctl_waitq);
4333 down_write(&com->lc_sem);
4334 com->lc_new_checked++;
4336 lfsck_layout_record_failure(env, lfsck, lo);
4337 up_write(&com->lc_sem);
4339 if (cobj != NULL && !IS_ERR(cobj))
4340 lu_object_put(env, &cobj->do_lu);
4342 if (likely(tgt != NULL))
4345 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
4352 if (llo != NULL && !IS_ERR(llo))
4353 lfsck_layout_object_put(env, llo);
4358 /* For the given object, read its layout EA locally. For each stripe, pre-fetch
4359 * the OST-object's attribute and generate an structure lfsck_layout_req on the
4360 * list ::lad_req_list.
4362 * For each request on above list, the lfsck_layout_assistant thread compares
4363 * the OST side attribute with local attribute, if inconsistent, then repair it.
4365 * All above processing is async mode with pipeline. */
4366 static int lfsck_layout_master_exec_oit(const struct lu_env *env,
4367 struct lfsck_component *com,
4368 struct dt_object *obj)
4370 struct lfsck_thread_info *info = lfsck_env_info(env);
4371 struct ost_id *oi = &info->lti_oi;
4372 struct lfsck_layout *lo = com->lc_file_ram;
4373 struct lfsck_assistant_data *lad = com->lc_data;
4374 struct lfsck_instance *lfsck = com->lc_lfsck;
4375 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
4376 struct thandle *handle = NULL;
4377 struct lu_buf *buf = &info->lti_big_buf;
4378 struct lov_mds_md_v1 *lmm = NULL;
4379 struct dt_device *dev = lfsck->li_bottom;
4380 struct lustre_handle lh = { 0 };
4381 struct lu_buf ea_buf = { NULL };
4384 bool locked = false;
4385 bool stripe = false;
4386 bool bad_oi = false;
4389 if (!S_ISREG(lfsck_object_type(obj)))
4392 if (lad->lad_assistant_status < 0)
4393 GOTO(out, rc = -ESRCH);
4395 fid_to_lmm_oi(lfsck_dto2fid(obj), oi);
4396 lmm_oi_cpu_to_le(oi, oi);
4397 dt_read_lock(env, obj, 0);
4401 if (dt_object_exists(obj) == 0 ||
4402 lfsck_is_dead_obj(obj))
4405 rc = lfsck_layout_get_lovea(env, obj, buf);
4411 rc = lfsck_layout_verify_header(lmm);
4412 /* If the LOV EA crashed, then it is possible to be rebuilt later
4413 * when handle orphan OST-objects. */
4417 if (memcmp(oi, &lmm->lmm_oi, sizeof(*oi)) == 0)
4418 GOTO(out, stripe = true);
4420 /* Inconsistent lmm_oi, should be repaired. */
4424 if (bk->lb_param & LPF_DRYRUN) {
4425 lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4427 GOTO(out, stripe = true);
4430 if (!lustre_handle_is_used(&lh)) {
4431 dt_read_unlock(env, obj);
4433 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
4434 MDS_INODELOCK_LAYOUT |
4435 MDS_INODELOCK_XATTR, LCK_EX);
4439 handle = dt_trans_create(env, dev);
4441 GOTO(out, rc = PTR_ERR(handle));
4443 lfsck_buf_init(&ea_buf, lmm, size);
4444 rc = dt_declare_xattr_set(env, obj, &ea_buf, XATTR_NAME_LOV,
4445 LU_XATTR_REPLACE, handle);
4449 rc = dt_trans_start_local(env, dev, handle);
4453 dt_write_lock(env, obj, 0);
4459 rc = dt_xattr_set(env, obj, &ea_buf, XATTR_NAME_LOV,
4460 LU_XATTR_REPLACE, handle, BYPASS_CAPA);
4464 lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4466 GOTO(out, stripe = true);
4470 if (lustre_handle_is_used(&lh))
4471 dt_write_unlock(env, obj);
4473 dt_read_unlock(env, obj);
4476 if (handle != NULL && !IS_ERR(handle))
4477 dt_trans_stop(env, dev, handle);
4479 lfsck_ibits_unlock(&lh, LCK_EX);
4482 CDEBUG(D_LFSCK, "%s: layout LFSCK master %s bad lmm_oi for "
4483 DFID": rc = %d\n", lfsck_lfsck2name(lfsck),
4484 bk->lb_param & LPF_DRYRUN ? "found" : "repaired",
4485 PFID(lfsck_dto2fid(obj)), rc);
4488 rc = lfsck_layout_scan_stripes(env, com, obj, lmm);
4490 down_write(&com->lc_sem);
4491 com->lc_new_checked++;
4493 lfsck_layout_record_failure(env, lfsck, lo);
4494 up_write(&com->lc_sem);
4500 static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
4501 struct lfsck_component *com,
4502 struct dt_object *obj)
4504 struct lfsck_instance *lfsck = com->lc_lfsck;
4505 struct lfsck_layout *lo = com->lc_file_ram;
4506 const struct lu_fid *fid = lfsck_dto2fid(obj);
4507 struct lfsck_layout_slave_data *llsd = com->lc_data;
4508 struct lfsck_layout_seq *lls;
4514 LASSERT(llsd != NULL);
4516 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY5) &&
4517 cfs_fail_val == lfsck_dev_idx(lfsck->li_bottom)) {
4518 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(1),
4520 struct ptlrpc_thread *thread = &lfsck->li_thread;
4522 l_wait_event(thread->t_ctl_waitq,
4523 !thread_is_running(thread),
4527 lfsck_rbtree_update_bitmap(env, com, fid, false);
4529 down_write(&com->lc_sem);
4530 if (fid_is_idif(fid))
4532 else if (!fid_is_norm(fid) ||
4533 !fid_is_for_ostobj(env, lfsck->li_next, obj, fid))
4534 GOTO(unlock, rc = 0);
4537 com->lc_new_checked++;
4539 lls = lfsck_layout_seq_lookup(llsd, seq);
4542 if (unlikely(lls == NULL))
4543 GOTO(unlock, rc = -ENOMEM);
4545 INIT_LIST_HEAD(&lls->lls_list);
4547 rc = lfsck_layout_lastid_load(env, com, lls);
4549 CDEBUG(D_LFSCK, "%s: layout LFSCK failed to "
4550 "load LAST_ID for "LPX64": rc = %d\n",
4551 lfsck_lfsck2name(com->lc_lfsck), seq, rc);
4552 lo->ll_objs_failed_phase1++;
4557 lfsck_layout_seq_insert(llsd, lls);
4560 if (unlikely(fid_is_last_id(fid)))
4561 GOTO(unlock, rc = 0);
4563 if (fid_is_idif(fid))
4564 oid = fid_idif_id(fid_seq(fid), fid_oid(fid), fid_ver(fid));
4568 if (oid > lls->lls_lastid_known)
4569 lls->lls_lastid_known = oid;
4571 if (oid > lls->lls_lastid) {
4572 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
4573 /* OFD may create new objects during LFSCK scanning. */
4574 rc = lfsck_layout_lastid_reload(env, com, lls);
4575 if (unlikely(rc != 0)) {
4576 CDEBUG(D_LFSCK, "%s: layout LFSCK failed to "
4577 "reload LAST_ID for "LPX64": rc = %d\n",
4578 lfsck_lfsck2name(com->lc_lfsck),
4584 if (oid <= lls->lls_lastid ||
4585 lo->ll_flags & LF_CRASHED_LASTID)
4586 GOTO(unlock, rc = 0);
4588 LASSERT(lfsck->li_out_notify != NULL);
4590 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4591 LE_LASTID_REBUILDING);
4592 lo->ll_flags |= LF_CRASHED_LASTID;
4594 CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
4595 "LAST_ID file (2) for the sequence "LPX64
4596 ", old value "LPU64", known value "LPU64"\n",
4597 lfsck_lfsck2name(lfsck), lls->lls_seq,
4598 lls->lls_lastid, oid);
4601 lls->lls_lastid = oid;
4605 GOTO(unlock, rc = 0);
4608 up_write(&com->lc_sem);
4613 static int lfsck_layout_exec_dir(const struct lu_env *env,
4614 struct lfsck_component *com,
4615 struct lu_dirent *ent, __u16 type)
4620 static int lfsck_layout_master_post(const struct lu_env *env,
4621 struct lfsck_component *com,
4622 int result, bool init)
4624 struct lfsck_instance *lfsck = com->lc_lfsck;
4625 struct lfsck_layout *lo = com->lc_file_ram;
4629 lfsck_post_generic(env, com, &result);
4631 down_write(&com->lc_sem);
4632 spin_lock(&lfsck->li_lock);
4634 lo->ll_pos_last_checkpoint =
4635 lfsck->li_pos_checkpoint.lp_oit_cookie;
4638 if (lo->ll_flags & LF_INCOMPLETE)
4639 lo->ll_status = LS_PARTIAL;
4641 lo->ll_status = LS_SCANNING_PHASE2;
4642 lo->ll_flags |= LF_SCANNED_ONCE;
4643 lo->ll_flags &= ~LF_UPGRADE;
4644 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
4645 } else if (result == 0) {
4646 if (lfsck->li_status != 0)
4647 lo->ll_status = lfsck->li_status;
4649 lo->ll_status = LS_STOPPED;
4650 if (lo->ll_status != LS_PAUSED)
4651 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
4653 lo->ll_status = LS_FAILED;
4654 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
4656 spin_unlock(&lfsck->li_lock);
4659 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4660 HALF_SEC - lfsck->li_time_last_checkpoint);
4661 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4662 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4663 com->lc_new_checked = 0;
4666 rc = lfsck_layout_store(env, com);
4667 up_write(&com->lc_sem);
4669 CDEBUG(D_LFSCK, "%s: layout LFSCK master post done: rc = %d\n",
4670 lfsck_lfsck2name(lfsck), rc);
4675 static int lfsck_layout_slave_post(const struct lu_env *env,
4676 struct lfsck_component *com,
4677 int result, bool init)
4679 struct lfsck_instance *lfsck = com->lc_lfsck;
4680 struct lfsck_layout *lo = com->lc_file_ram;
4684 rc = lfsck_layout_lastid_store(env, com);
4688 LASSERT(lfsck->li_out_notify != NULL);
4690 down_write(&com->lc_sem);
4691 spin_lock(&lfsck->li_lock);
4693 lo->ll_pos_last_checkpoint =
4694 lfsck->li_pos_checkpoint.lp_oit_cookie;
4697 lo->ll_status = LS_SCANNING_PHASE2;
4698 lo->ll_flags |= LF_SCANNED_ONCE;
4699 if (lo->ll_flags & LF_CRASHED_LASTID) {
4701 lo->ll_flags &= ~LF_CRASHED_LASTID;
4703 CDEBUG(D_LFSCK, "%s: layout LFSCK has rebuilt "
4704 "crashed LAST_ID files successfully\n",
4705 lfsck_lfsck2name(lfsck));
4707 lo->ll_flags &= ~LF_UPGRADE;
4708 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
4709 } else if (result == 0) {
4710 if (lfsck->li_status != 0)
4711 lo->ll_status = lfsck->li_status;
4713 lo->ll_status = LS_STOPPED;
4714 if (lo->ll_status != LS_PAUSED)
4715 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
4717 lo->ll_status = LS_FAILED;
4718 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
4720 spin_unlock(&lfsck->li_lock);
4723 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4727 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4728 HALF_SEC - lfsck->li_time_last_checkpoint);
4729 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4730 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4731 com->lc_new_checked = 0;
4734 rc = lfsck_layout_store(env, com);
4735 up_write(&com->lc_sem);
4737 lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result);
4739 CDEBUG(D_LFSCK, "%s: layout LFSCK slave post done: rc = %d\n",
4740 lfsck_lfsck2name(lfsck), rc);
4745 static int lfsck_layout_dump(const struct lu_env *env,
4746 struct lfsck_component *com, struct seq_file *m)
4748 struct lfsck_instance *lfsck = com->lc_lfsck;
4749 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
4750 struct lfsck_layout *lo = com->lc_file_ram;
4753 down_read(&com->lc_sem);
4754 seq_printf(m, "name: lfsck_layout\n"
4760 lfsck_status2names(lo->ll_status));
4762 rc = lfsck_bits_dump(m, lo->ll_flags, lfsck_flags_names, "flags");
4766 rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
4770 rc = lfsck_time_dump(m, lo->ll_time_last_complete,
4771 "time_since_last_completed");
4775 rc = lfsck_time_dump(m, lo->ll_time_latest_start,
4776 "time_since_latest_start");
4780 rc = lfsck_time_dump(m, lo->ll_time_last_checkpoint,
4781 "time_since_last_checkpoint");
4785 seq_printf(m, "latest_start_position: "LPU64"\n"
4786 "last_checkpoint_position: "LPU64"\n"
4787 "first_failure_position: "LPU64"\n",
4788 lo->ll_pos_latest_start,
4789 lo->ll_pos_last_checkpoint,
4790 lo->ll_pos_first_inconsistent);
4792 seq_printf(m, "success_count: %u\n"
4793 "repaired_dangling: "LPU64"\n"
4794 "repaired_unmatched_pair: "LPU64"\n"
4795 "repaired_multiple_referenced: "LPU64"\n"
4796 "repaired_orphan: "LPU64"\n"
4797 "repaired_inconsistent_owner: "LPU64"\n"
4798 "repaired_others: "LPU64"\n"
4799 "skipped: "LPU64"\n"
4800 "failed_phase1: "LPU64"\n"
4801 "failed_phase2: "LPU64"\n",
4802 lo->ll_success_count,
4803 lo->ll_objs_repaired[LLIT_DANGLING - 1],
4804 lo->ll_objs_repaired[LLIT_UNMATCHED_PAIR - 1],
4805 lo->ll_objs_repaired[LLIT_MULTIPLE_REFERENCED - 1],
4806 lo->ll_objs_repaired[LLIT_ORPHAN - 1],
4807 lo->ll_objs_repaired[LLIT_INCONSISTENT_OWNER - 1],
4808 lo->ll_objs_repaired[LLIT_OTHERS - 1],
4809 lo->ll_objs_skipped,
4810 lo->ll_objs_failed_phase1,
4811 lo->ll_objs_failed_phase2);
4813 if (lo->ll_status == LS_SCANNING_PHASE1) {
4815 const struct dt_it_ops *iops;
4816 cfs_duration_t duration = cfs_time_current() -
4817 lfsck->li_time_last_checkpoint;
4818 __u64 checked = lo->ll_objs_checked_phase1 +
4819 com->lc_new_checked;
4820 __u64 speed = checked;
4821 __u64 new_checked = com->lc_new_checked *
4822 msecs_to_jiffies(MSEC_PER_SEC);
4823 __u32 rtime = lo->ll_run_time_phase1 +
4824 cfs_duration_sec(duration + HALF_SEC);
4827 do_div(new_checked, duration);
4829 do_div(speed, rtime);
4830 seq_printf(m, "checked_phase1: "LPU64"\n"
4831 "checked_phase2: "LPU64"\n"
4832 "run_time_phase1: %u seconds\n"
4833 "run_time_phase2: %u seconds\n"
4834 "average_speed_phase1: "LPU64" items/sec\n"
4835 "average_speed_phase2: N/A\n"
4836 "real-time_speed_phase1: "LPU64" items/sec\n"
4837 "real-time_speed_phase2: N/A\n",
4839 lo->ll_objs_checked_phase2,
4841 lo->ll_run_time_phase2,
4845 LASSERT(lfsck->li_di_oit != NULL);
4847 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
4849 /* The low layer otable-based iteration position may NOT
4850 * exactly match the layout-based directory traversal
4851 * cookie. Generally, it is not a serious issue. But the
4852 * caller should NOT make assumption on that. */
4853 pos = iops->store(env, lfsck->li_di_oit);
4854 if (!lfsck->li_current_oit_processed)
4856 seq_printf(m, "current_position: "LPU64"\n", pos);
4858 } else if (lo->ll_status == LS_SCANNING_PHASE2) {
4859 cfs_duration_t duration = cfs_time_current() -
4860 lfsck->li_time_last_checkpoint;
4861 __u64 checked = lo->ll_objs_checked_phase2 +
4862 com->lc_new_checked;
4863 __u64 speed1 = lo->ll_objs_checked_phase1;
4864 __u64 speed2 = checked;
4865 __u64 new_checked = com->lc_new_checked *
4866 msecs_to_jiffies(MSEC_PER_SEC);
4867 __u32 rtime = lo->ll_run_time_phase2 +
4868 cfs_duration_sec(duration + HALF_SEC);
4871 do_div(new_checked, duration);
4872 if (lo->ll_run_time_phase1 != 0)
4873 do_div(speed1, lo->ll_run_time_phase1);
4875 do_div(speed2, rtime);
4876 rc = seq_printf(m, "checked_phase1: "LPU64"\n"
4877 "checked_phase2: "LPU64"\n"
4878 "run_time_phase1: %u seconds\n"
4879 "run_time_phase2: %u seconds\n"
4880 "average_speed_phase1: "LPU64" items/sec\n"
4881 "average_speed_phase2: "LPU64" items/sec\n"
4882 "real-time_speed_phase1: N/A\n"
4883 "real-time_speed_phase2: "LPU64" items/sec\n"
4884 "current_position: "DFID"\n",
4885 lo->ll_objs_checked_phase1,
4887 lo->ll_run_time_phase1,
4892 PFID(&com->lc_fid_latest_scanned_phase2));
4897 __u64 speed1 = lo->ll_objs_checked_phase1;
4898 __u64 speed2 = lo->ll_objs_checked_phase2;
4900 if (lo->ll_run_time_phase1 != 0)
4901 do_div(speed1, lo->ll_run_time_phase1);
4902 if (lo->ll_run_time_phase2 != 0)
4903 do_div(speed2, lo->ll_run_time_phase2);
4904 seq_printf(m, "checked_phase1: "LPU64"\n"
4905 "checked_phase2: "LPU64"\n"
4906 "run_time_phase1: %u seconds\n"
4907 "run_time_phase2: %u seconds\n"
4908 "average_speed_phase1: "LPU64" items/sec\n"
4909 "average_speed_phase2: "LPU64" objs/sec\n"
4910 "real-time_speed_phase1: N/A\n"
4911 "real-time_speed_phase2: N/A\n"
4912 "current_position: N/A\n",
4913 lo->ll_objs_checked_phase1,
4914 lo->ll_objs_checked_phase2,
4915 lo->ll_run_time_phase1,
4916 lo->ll_run_time_phase2,
4921 up_read(&com->lc_sem);
4926 static int lfsck_layout_master_double_scan(const struct lu_env *env,
4927 struct lfsck_component *com)
4929 struct lfsck_layout *lo = com->lc_file_ram;
4930 struct lfsck_assistant_data *lad = com->lc_data;
4931 struct lfsck_instance *lfsck = com->lc_lfsck;
4932 struct lfsck_tgt_descs *ltds;
4933 struct lfsck_tgt_desc *ltd;
4934 struct lfsck_tgt_desc *next;
4937 rc = lfsck_double_scan_generic(env, com, lo->ll_status);
4939 if (thread_is_stopped(&lad->lad_thread)) {
4940 LASSERT(list_empty(&lad->lad_req_list));
4941 LASSERT(list_empty(&lad->lad_ost_phase1_list));
4942 LASSERT(list_empty(&lad->lad_mdt_phase1_list));
4944 ltds = &lfsck->li_ost_descs;
4945 spin_lock(<ds->ltd_lock);
4946 list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase2_list,
4947 ltd_layout_phase_list) {
4948 list_del_init(<d->ltd_layout_phase_list);
4950 spin_unlock(<ds->ltd_lock);
4952 ltds = &lfsck->li_mdt_descs;
4953 spin_lock(<ds->ltd_lock);
4954 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
4955 ltd_layout_phase_list) {
4956 list_del_init(<d->ltd_layout_phase_list);
4958 spin_unlock(<ds->ltd_lock);
4964 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
4965 struct lfsck_component *com)
4967 struct lfsck_instance *lfsck = com->lc_lfsck;
4968 struct lfsck_layout_slave_data *llsd = com->lc_data;
4969 struct lfsck_layout *lo = com->lc_file_ram;
4970 struct ptlrpc_thread *thread = &lfsck->li_thread;
4974 CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan start\n",
4975 lfsck_lfsck2name(lfsck));
4977 atomic_inc(&lfsck->li_double_scan_count);
4979 if (lo->ll_flags & LF_INCOMPLETE)
4982 com->lc_new_checked = 0;
4983 com->lc_new_scanned = 0;
4984 com->lc_time_last_checkpoint = cfs_time_current();
4985 com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
4986 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
4989 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(30),
4992 rc = lfsck_layout_slave_query_master(env, com);
4993 if (list_empty(&llsd->llsd_master_list)) {
4994 if (unlikely(!thread_is_running(thread)))
5005 rc = l_wait_event(thread->t_ctl_waitq,
5006 !thread_is_running(thread) ||
5007 lo->ll_flags & LF_INCOMPLETE ||
5008 list_empty(&llsd->llsd_master_list),
5010 if (unlikely(!thread_is_running(thread)))
5013 if (lo->ll_flags & LF_INCOMPLETE)
5016 if (rc == -ETIMEDOUT)
5019 GOTO(done, rc = (rc < 0 ? rc : 1));
5023 rc = lfsck_layout_double_scan_result(env, com, rc);
5024 lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE,
5025 (rc > 0 && lo->ll_flags & LF_INCOMPLETE) ? 0 : rc);
5026 lfsck_layout_slave_quit(env, com);
5027 if (atomic_dec_and_test(&lfsck->li_double_scan_count))
5028 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5030 CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan finished, "
5031 "status %d: rc = %d\n",
5032 lfsck_lfsck2name(lfsck), lo->ll_status, rc);
5037 static void lfsck_layout_master_data_release(const struct lu_env *env,
5038 struct lfsck_component *com)
5040 struct lfsck_assistant_data *lad = com->lc_data;
5041 struct lfsck_instance *lfsck = com->lc_lfsck;
5042 struct lfsck_tgt_descs *ltds;
5043 struct lfsck_tgt_desc *ltd;
5044 struct lfsck_tgt_desc *next;
5046 LASSERT(lad != NULL);
5047 LASSERT(thread_is_init(&lad->lad_thread) ||
5048 thread_is_stopped(&lad->lad_thread));
5049 LASSERT(list_empty(&lad->lad_req_list));
5051 com->lc_data = NULL;
5053 ltds = &lfsck->li_ost_descs;
5054 spin_lock(<ds->ltd_lock);
5055 list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase1_list,
5056 ltd_layout_phase_list) {
5057 list_del_init(<d->ltd_layout_phase_list);
5059 list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase2_list,
5060 ltd_layout_phase_list) {
5061 list_del_init(<d->ltd_layout_phase_list);
5063 list_for_each_entry_safe(ltd, next, &lad->lad_ost_list,
5065 list_del_init(<d->ltd_layout_list);
5067 spin_unlock(<ds->ltd_lock);
5069 ltds = &lfsck->li_mdt_descs;
5070 spin_lock(<ds->ltd_lock);
5071 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
5072 ltd_layout_phase_list) {
5073 list_del_init(<d->ltd_layout_phase_list);
5075 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
5076 ltd_layout_phase_list) {
5077 list_del_init(<d->ltd_layout_phase_list);
5079 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_list,
5081 list_del_init(<d->ltd_layout_list);
5083 spin_unlock(<ds->ltd_lock);
5085 if (likely(lad->lad_bitmap != NULL))
5086 CFS_FREE_BITMAP(lad->lad_bitmap);
5091 static void lfsck_layout_slave_data_release(const struct lu_env *env,
5092 struct lfsck_component *com)
5094 struct lfsck_layout_slave_data *llsd = com->lc_data;
5096 lfsck_layout_slave_quit(env, com);
5097 com->lc_data = NULL;
5101 static void lfsck_layout_master_quit(const struct lu_env *env,
5102 struct lfsck_component *com)
5104 struct lfsck_assistant_data *lad = com->lc_data;
5105 struct lfsck_instance *lfsck = com->lc_lfsck;
5106 struct lfsck_tgt_descs *ltds;
5107 struct lfsck_tgt_desc *ltd;
5108 struct lfsck_tgt_desc *next;
5110 LASSERT(lad != NULL);
5112 lfsck_quit_generic(env, com);
5114 LASSERT(thread_is_init(&lad->lad_thread) ||
5115 thread_is_stopped(&lad->lad_thread));
5116 LASSERT(list_empty(&lad->lad_req_list));
5118 ltds = &lfsck->li_ost_descs;
5119 spin_lock(<ds->ltd_lock);
5120 list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase1_list,
5121 ltd_layout_phase_list) {
5122 list_del_init(<d->ltd_layout_phase_list);
5124 list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase2_list,
5125 ltd_layout_phase_list) {
5126 list_del_init(<d->ltd_layout_phase_list);
5128 spin_unlock(<ds->ltd_lock);
5130 ltds = &lfsck->li_mdt_descs;
5131 spin_lock(<ds->ltd_lock);
5132 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
5133 ltd_layout_phase_list) {
5134 list_del_init(<d->ltd_layout_phase_list);
5136 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
5137 ltd_layout_phase_list) {
5138 list_del_init(<d->ltd_layout_phase_list);
5140 spin_unlock(<ds->ltd_lock);
5143 static void lfsck_layout_slave_quit(const struct lu_env *env,
5144 struct lfsck_component *com)
5146 struct lfsck_layout_slave_data *llsd = com->lc_data;
5147 struct lfsck_layout_seq *lls;
5148 struct lfsck_layout_seq *next;
5149 struct lfsck_layout_slave_target *llst;
5151 LASSERT(llsd != NULL);
5153 list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
5155 list_del_init(&lls->lls_list);
5156 lfsck_object_put(env, lls->lls_lastid_obj);
5160 spin_lock(&llsd->llsd_lock);
5161 while (!list_empty(&llsd->llsd_master_list)) {
5162 llst = list_entry(llsd->llsd_master_list.next,
5163 struct lfsck_layout_slave_target, llst_list);
5164 list_del_init(&llst->llst_list);
5165 spin_unlock(&llsd->llsd_lock);
5166 lfsck_layout_llst_put(llst);
5168 spin_unlock(&llsd->llsd_lock);
5170 lfsck_rbtree_cleanup(env, com);
5173 static int lfsck_layout_master_in_notify(const struct lu_env *env,
5174 struct lfsck_component *com,
5175 struct lfsck_request *lr,
5178 struct lfsck_instance *lfsck = com->lc_lfsck;
5179 struct lfsck_layout *lo = com->lc_file_ram;
5180 struct lfsck_assistant_data *lad = com->lc_data;
5181 struct lfsck_tgt_descs *ltds;
5182 struct lfsck_tgt_desc *ltd;
5186 if (lr->lr_event == LE_PAIRS_VERIFY) {
5189 rc = lfsck_layout_master_check_pairs(env, com, &lr->lr_fid,
5195 CDEBUG(D_LFSCK, "%s: layout LFSCK master handles notify %u "
5196 "from %s %x, status %d, flags %x, flags2 %x\n",
5197 lfsck_lfsck2name(lfsck), lr->lr_event,
5198 (lr->lr_flags & LEF_FROM_OST) ? "OST" : "MDT",
5199 lr->lr_index, lr->lr_status, lr->lr_flags, lr->lr_flags2);
5201 if (lr->lr_event != LE_PHASE1_DONE &&
5202 lr->lr_event != LE_PHASE2_DONE &&
5203 lr->lr_event != LE_PEER_EXIT)
5206 if (lr->lr_flags & LEF_FROM_OST)
5207 ltds = &lfsck->li_ost_descs;
5209 ltds = &lfsck->li_mdt_descs;
5210 spin_lock(<ds->ltd_lock);
5211 ltd = LTD_TGT(ltds, lr->lr_index);
5213 spin_unlock(<ds->ltd_lock);
5218 list_del_init(<d->ltd_layout_phase_list);
5219 switch (lr->lr_event) {
5220 case LE_PHASE1_DONE:
5221 if (lr->lr_status <= 0 || lr->lr_flags2 & LF_INCOMPLETE) {
5222 if (lr->lr_flags2 & LF_INCOMPLETE) {
5223 if (lr->lr_flags & LEF_FROM_OST)
5224 lfsck_lad_set_bitmap(env, com,
5227 lo->ll_flags |= LF_INCOMPLETE;
5229 ltd->ltd_layout_done = 1;
5230 list_del_init(<d->ltd_layout_list);
5235 if (lr->lr_flags & LEF_FROM_OST) {
5236 if (list_empty(<d->ltd_layout_list))
5237 list_add_tail(<d->ltd_layout_list,
5238 &lad->lad_ost_list);
5239 list_add_tail(<d->ltd_layout_phase_list,
5240 &lad->lad_ost_phase2_list);
5242 if (list_empty(<d->ltd_layout_list))
5243 list_add_tail(<d->ltd_layout_list,
5244 &lad->lad_mdt_list);
5245 list_add_tail(<d->ltd_layout_phase_list,
5246 &lad->lad_mdt_phase2_list);
5249 case LE_PHASE2_DONE:
5250 ltd->ltd_layout_done = 1;
5251 if (!list_empty(<d->ltd_layout_list)) {
5252 list_del_init(<d->ltd_layout_list);
5253 if (lr->lr_flags2 & LF_INCOMPLETE) {
5254 lfsck_lad_set_bitmap(env, com, ltd->ltd_index);
5262 ltd->ltd_layout_done = 1;
5263 list_del_init(<d->ltd_layout_list);
5264 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) &&
5265 !(lr->lr_flags & LEF_FROM_OST))
5266 lo->ll_flags |= LF_INCOMPLETE;
5271 spin_unlock(<ds->ltd_lock);
5273 if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
5274 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
5276 memset(stop, 0, sizeof(*stop));
5277 stop->ls_status = lr->lr_status;
5278 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
5279 lfsck_stop(env, lfsck->li_bottom, stop);
5280 } else if (lfsck_phase2_next_ready(lad)) {
5281 wake_up_all(&lad->lad_thread.t_ctl_waitq);
5287 static int lfsck_layout_slave_in_notify(const struct lu_env *env,
5288 struct lfsck_component *com,
5289 struct lfsck_request *lr,
5292 struct lfsck_instance *lfsck = com->lc_lfsck;
5293 struct lfsck_layout_slave_data *llsd = com->lc_data;
5294 struct lfsck_layout_slave_target *llst;
5298 switch (lr->lr_event) {
5299 case LE_FID_ACCESSED:
5300 lfsck_rbtree_update_bitmap(env, com, &lr->lr_fid, true);
5302 case LE_CONDITIONAL_DESTROY:
5303 rc = lfsck_layout_slave_conditional_destroy(env, com, lr);
5305 case LE_PAIRS_VERIFY: {
5306 lr->lr_status = LPVS_INIT;
5307 /* Firstly, if the MDT-object which is claimed via OST-object
5308 * local stored PFID xattr recognizes the OST-object, then it
5309 * must be that the client given PFID is wrong. */
5310 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
5315 lr->lr_status = LPVS_INCONSISTENT;
5316 /* The OST-object local stored PFID xattr is stale. We need to
5317 * check whether the MDT-object that is claimed via the client
5318 * given PFID information recognizes the OST-object or not. If
5319 * matches, then need to update the OST-object's PFID xattr. */
5320 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
5323 * We are not sure whether the client given PFID information
5324 * is correct or not, do nothing to avoid improper fixing.
5327 * The client given PFID information is also invalid, we can
5328 * NOT fix the OST-object inconsistency.
5333 lr->lr_status = LPVS_INCONSISTENT_TOFIX;
5334 rc = lfsck_layout_slave_repair_pfid(env, com, lr);
5338 case LE_PHASE1_DONE: {
5339 if (lr->lr_flags2 & LF_INCOMPLETE) {
5340 struct lfsck_layout *lo = com->lc_file_ram;
5342 lo->ll_flags |= LF_INCOMPLETE;
5343 llst = lfsck_layout_llst_find_and_del(llsd,
5347 lfsck_layout_llst_put(llst);
5348 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5354 case LE_PHASE2_DONE:
5356 CDEBUG(D_LFSCK, "%s: layout LFSCK slave handle notify %u "
5357 "from MDT %x, status %d\n", lfsck_lfsck2name(lfsck),
5358 lr->lr_event, lr->lr_index, lr->lr_status);
5364 llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index, true);
5368 lfsck_layout_llst_put(llst);
5369 if (list_empty(&llsd->llsd_master_list))
5370 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5372 if (lr->lr_event == LE_PEER_EXIT &&
5373 (lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT ||
5374 (list_empty(&llsd->llsd_master_list) &&
5375 (lr->lr_status == LS_STOPPED ||
5376 lr->lr_status == LS_CO_STOPPED)))) {
5377 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
5379 memset(stop, 0, sizeof(*stop));
5380 stop->ls_status = lr->lr_status;
5381 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
5382 lfsck_stop(env, lfsck->li_bottom, stop);
5388 static int lfsck_layout_query(const struct lu_env *env,
5389 struct lfsck_component *com)
5391 struct lfsck_layout *lo = com->lc_file_ram;
5393 return lo->ll_status;
5396 /* with lfsck::li_lock held */
5397 static int lfsck_layout_slave_join(const struct lu_env *env,
5398 struct lfsck_component *com,
5399 struct lfsck_start_param *lsp)
5401 struct lfsck_instance *lfsck = com->lc_lfsck;
5402 struct lfsck_layout_slave_data *llsd = com->lc_data;
5403 struct lfsck_layout_slave_target *llst;
5404 struct lfsck_start *start = lsp->lsp_start;
5408 if (start == NULL || !(start->ls_flags & LPF_OST_ORPHAN))
5411 if (!lsp->lsp_index_valid)
5414 /* If someone is running the LFSCK without orphan handling,
5415 * it will not maintain the object accessing rbtree. So we
5416 * cannot join it for orphan handling. */
5417 if (!llsd->llsd_rbtree_valid)
5420 spin_unlock(&lfsck->li_lock);
5421 rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
5422 spin_lock(&lfsck->li_lock);
5423 if (rc == 0 && !thread_is_running(&lfsck->li_thread)) {
5424 spin_unlock(&lfsck->li_lock);
5425 llst = lfsck_layout_llst_find_and_del(llsd, lsp->lsp_index,
5428 lfsck_layout_llst_put(llst);
5429 spin_lock(&lfsck->li_lock);
5436 static struct lfsck_operations lfsck_layout_master_ops = {
5437 .lfsck_reset = lfsck_layout_reset,
5438 .lfsck_fail = lfsck_layout_fail,
5439 .lfsck_checkpoint = lfsck_layout_master_checkpoint,
5440 .lfsck_prep = lfsck_layout_master_prep,
5441 .lfsck_exec_oit = lfsck_layout_master_exec_oit,
5442 .lfsck_exec_dir = lfsck_layout_exec_dir,
5443 .lfsck_post = lfsck_layout_master_post,
5444 .lfsck_dump = lfsck_layout_dump,
5445 .lfsck_double_scan = lfsck_layout_master_double_scan,
5446 .lfsck_data_release = lfsck_layout_master_data_release,
5447 .lfsck_quit = lfsck_layout_master_quit,
5448 .lfsck_in_notify = lfsck_layout_master_in_notify,
5449 .lfsck_query = lfsck_layout_query,
5452 static struct lfsck_operations lfsck_layout_slave_ops = {
5453 .lfsck_reset = lfsck_layout_reset,
5454 .lfsck_fail = lfsck_layout_fail,
5455 .lfsck_checkpoint = lfsck_layout_slave_checkpoint,
5456 .lfsck_prep = lfsck_layout_slave_prep,
5457 .lfsck_exec_oit = lfsck_layout_slave_exec_oit,
5458 .lfsck_exec_dir = lfsck_layout_exec_dir,
5459 .lfsck_post = lfsck_layout_slave_post,
5460 .lfsck_dump = lfsck_layout_dump,
5461 .lfsck_double_scan = lfsck_layout_slave_double_scan,
5462 .lfsck_data_release = lfsck_layout_slave_data_release,
5463 .lfsck_quit = lfsck_layout_slave_quit,
5464 .lfsck_in_notify = lfsck_layout_slave_in_notify,
5465 .lfsck_query = lfsck_layout_query,
5466 .lfsck_join = lfsck_layout_slave_join,
5469 static void lfsck_layout_assistant_fill_pos(const struct lu_env *env,
5470 struct lfsck_component *com,
5471 struct lfsck_position *pos)
5473 struct lfsck_assistant_data *lad = com->lc_data;
5474 struct lfsck_layout_req *llr;
5476 if (list_empty(&lad->lad_req_list))
5479 llr = list_entry(lad->lad_req_list.next,
5480 struct lfsck_layout_req,
5482 pos->lp_oit_cookie = llr->llr_parent->llo_cookie - 1;
5485 struct lfsck_assistant_operations lfsck_layout_assistant_ops = {
5486 .la_handler_p1 = lfsck_layout_assistant_handler_p1,
5487 .la_handler_p2 = lfsck_layout_assistant_handler_p2,
5488 .la_fill_pos = lfsck_layout_assistant_fill_pos,
5489 .la_double_scan_result = lfsck_layout_double_scan_result,
5490 .la_req_fini = lfsck_layout_assistant_req_fini,
5491 .la_sync_failures = lfsck_layout_assistant_sync_failures,
5494 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
5496 struct lfsck_component *com;
5497 struct lfsck_layout *lo;
5498 struct dt_object *root = NULL;
5499 struct dt_object *obj;
5507 INIT_LIST_HEAD(&com->lc_link);
5508 INIT_LIST_HEAD(&com->lc_link_dir);
5509 init_rwsem(&com->lc_sem);
5510 atomic_set(&com->lc_ref, 1);
5511 com->lc_lfsck = lfsck;
5512 com->lc_type = LFSCK_TYPE_LAYOUT;
5513 if (lfsck->li_master) {
5514 com->lc_ops = &lfsck_layout_master_ops;
5515 com->lc_data = lfsck_assistant_data_init(
5516 &lfsck_layout_assistant_ops,
5518 if (com->lc_data == NULL)
5519 GOTO(out, rc = -ENOMEM);
5521 struct lfsck_layout_slave_data *llsd;
5523 com->lc_ops = &lfsck_layout_slave_ops;
5524 OBD_ALLOC_PTR(llsd);
5526 GOTO(out, rc = -ENOMEM);
5528 INIT_LIST_HEAD(&llsd->llsd_seq_list);
5529 INIT_LIST_HEAD(&llsd->llsd_master_list);
5530 spin_lock_init(&llsd->llsd_lock);
5531 llsd->llsd_rb_root = RB_ROOT;
5532 rwlock_init(&llsd->llsd_rb_lock);
5533 com->lc_data = llsd;
5535 com->lc_file_size = sizeof(*lo);
5536 OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
5537 if (com->lc_file_ram == NULL)
5538 GOTO(out, rc = -ENOMEM);
5540 OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
5541 if (com->lc_file_disk == NULL)
5542 GOTO(out, rc = -ENOMEM);
5544 root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
5546 GOTO(out, rc = PTR_ERR(root));
5548 if (unlikely(!dt_try_as_dir(env, root)))
5549 GOTO(out, rc = -ENOTDIR);
5551 obj = local_file_find_or_create(env, lfsck->li_los, root,
5553 S_IFREG | S_IRUGO | S_IWUSR);
5555 GOTO(out, rc = PTR_ERR(obj));
5558 rc = lfsck_layout_load(env, com);
5560 rc = lfsck_layout_reset(env, com, true);
5561 else if (rc == -ENOENT)
5562 rc = lfsck_layout_init(env, com);
5567 lo = com->lc_file_ram;
5568 switch (lo->ll_status) {
5574 spin_lock(&lfsck->li_lock);
5575 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
5576 spin_unlock(&lfsck->li_lock);
5579 CERROR("%s: unknown lfsck_layout status %d\n",
5580 lfsck_lfsck2name(lfsck), lo->ll_status);
5582 case LS_SCANNING_PHASE1:
5583 case LS_SCANNING_PHASE2:
5584 /* No need to store the status to disk right now.
5585 * If the system crashed before the status stored,
5586 * it will be loaded back when next time. */
5587 lo->ll_status = LS_CRASHED;
5588 if (!lfsck->li_master)
5589 lo->ll_flags |= LF_INCOMPLETE;
5596 spin_lock(&lfsck->li_lock);
5597 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
5598 spin_unlock(&lfsck->li_lock);
5602 if (lo->ll_flags & LF_CRASHED_LASTID) {
5603 LASSERT(lfsck->li_out_notify != NULL);
5605 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
5606 LE_LASTID_REBUILDING);
5612 if (root != NULL && !IS_ERR(root))
5613 lu_object_put(env, &root->do_lu);
5616 lfsck_component_cleanup(env, com);
5617 CERROR("%s: fail to init layout LFSCK component: rc = %d\n",
5618 lfsck_lfsck2name(lfsck), rc);
5624 struct lfsck_orphan_it {
5625 struct lfsck_component *loi_com;
5626 struct lfsck_rbtree_node *loi_lrn;
5627 struct lfsck_layout_slave_target *loi_llst;
5628 struct lu_fid loi_key;
5629 struct lu_orphan_rec loi_rec;
5631 unsigned int loi_over:1;
5634 static int lfsck_fid_match_idx(const struct lu_env *env,
5635 struct lfsck_instance *lfsck,
5636 const struct lu_fid *fid, int idx)
5638 struct seq_server_site *ss;
5639 struct lu_server_fld *sf;
5640 struct lu_seq_range *range = &lfsck_env_info(env)->lti_range;
5643 /* All abnormal cases will be returned to MDT0. */
5644 if (!fid_is_norm(fid)) {
5651 ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
5652 if (unlikely(ss == NULL))
5655 sf = ss->ss_server_fld;
5656 LASSERT(sf != NULL);
5658 fld_range_set_any(range);
5659 rc = fld_server_lookup(env, sf, fid_seq(fid), range);
5663 if (!fld_range_is_mdt(range))
5666 if (range->lsr_index == idx)
5672 static void lfsck_layout_destroy_orphan(const struct lu_env *env,
5673 struct dt_device *dev,
5674 struct dt_object *obj)
5676 struct thandle *handle;
5680 handle = dt_trans_create(env, dev);
5684 rc = dt_declare_ref_del(env, obj, handle);
5688 rc = dt_declare_destroy(env, obj, handle);
5692 rc = dt_trans_start_local(env, dev, handle);
5696 dt_write_lock(env, obj, 0);
5697 rc = dt_ref_del(env, obj, handle);
5699 rc = dt_destroy(env, obj, handle);
5700 dt_write_unlock(env, obj);
5705 dt_trans_stop(env, dev, handle);
5707 CDEBUG(D_LFSCK, "destroy orphan OST-object "DFID": rc = %d\n",
5708 PFID(lfsck_dto2fid(obj)), rc);
5713 static int lfsck_orphan_index_lookup(const struct lu_env *env,
5714 struct dt_object *dt,
5716 const struct dt_key *key,
5717 struct lustre_capa *capa)
5722 static int lfsck_orphan_index_declare_insert(const struct lu_env *env,
5723 struct dt_object *dt,
5724 const struct dt_rec *rec,
5725 const struct dt_key *key,
5726 struct thandle *handle)
5731 static int lfsck_orphan_index_insert(const struct lu_env *env,
5732 struct dt_object *dt,
5733 const struct dt_rec *rec,
5734 const struct dt_key *key,
5735 struct thandle *handle,
5736 struct lustre_capa *capa,
5742 static int lfsck_orphan_index_declare_delete(const struct lu_env *env,
5743 struct dt_object *dt,
5744 const struct dt_key *key,
5745 struct thandle *handle)
5750 static int lfsck_orphan_index_delete(const struct lu_env *env,
5751 struct dt_object *dt,
5752 const struct dt_key *key,
5753 struct thandle *handle,
5754 struct lustre_capa *capa)
5759 static struct dt_it *lfsck_orphan_it_init(const struct lu_env *env,
5760 struct dt_object *dt,
5762 struct lustre_capa *capa)
5764 struct dt_device *dev = lu2dt_dev(dt->do_lu.lo_dev);
5765 struct lfsck_instance *lfsck;
5766 struct lfsck_component *com = NULL;
5767 struct lfsck_layout_slave_data *llsd;
5768 struct lfsck_orphan_it *it = NULL;
5769 struct lfsck_layout *lo;
5773 lfsck = lfsck_instance_find(dev, true, false);
5774 if (unlikely(lfsck == NULL))
5775 RETURN(ERR_PTR(-ENXIO));
5777 com = lfsck_component_find(lfsck, LFSCK_TYPE_LAYOUT);
5778 if (unlikely(com == NULL))
5779 GOTO(out, rc = -ENOENT);
5781 lo = com->lc_file_ram;
5782 if (lo->ll_flags & LF_INCOMPLETE)
5783 GOTO(out, rc = -ESRCH);
5785 llsd = com->lc_data;
5786 if (!llsd->llsd_rbtree_valid)
5787 GOTO(out, rc = -ESRCH);
5791 GOTO(out, rc = -ENOMEM);
5793 it->loi_llst = lfsck_layout_llst_find_and_del(llsd, attr, false);
5794 if (it->loi_llst == NULL)
5795 GOTO(out, rc = -ENXIO);
5797 if (dev->dd_record_fid_accessed) {
5798 /* The first iteration against the rbtree, scan the whole rbtree
5799 * to remove the nodes which do NOT need to be handled. */
5800 write_lock(&llsd->llsd_rb_lock);
5801 if (dev->dd_record_fid_accessed) {
5802 struct rb_node *node;
5803 struct rb_node *next;
5804 struct lfsck_rbtree_node *lrn;
5806 /* No need to record the fid accessing anymore. */
5807 dev->dd_record_fid_accessed = 0;
5809 node = rb_first(&llsd->llsd_rb_root);
5810 while (node != NULL) {
5811 next = rb_next(node);
5812 lrn = rb_entry(node, struct lfsck_rbtree_node,
5814 if (atomic_read(&lrn->lrn_known_count) <=
5815 atomic_read(&lrn->lrn_accessed_count)) {
5816 rb_erase(node, &llsd->llsd_rb_root);
5817 lfsck_rbtree_free(lrn);
5822 write_unlock(&llsd->llsd_rb_lock);
5825 /* read lock the rbtree when init, and unlock when fini */
5826 read_lock(&llsd->llsd_rb_lock);
5834 lfsck_component_put(env, com);
5836 CDEBUG(D_LFSCK, "%s: init the orphan iteration: rc = %d\n",
5837 lfsck_lfsck2name(lfsck), rc);
5839 lfsck_instance_put(env, lfsck);
5844 it = (struct lfsck_orphan_it *)ERR_PTR(rc);
5847 return (struct dt_it *)it;
5850 static void lfsck_orphan_it_fini(const struct lu_env *env,
5853 struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
5854 struct lfsck_component *com = it->loi_com;
5855 struct lfsck_layout_slave_data *llsd;
5856 struct lfsck_layout_slave_target *llst;
5859 CDEBUG(D_LFSCK, "%s: fini the orphan iteration\n",
5860 lfsck_lfsck2name(com->lc_lfsck));
5862 llsd = com->lc_data;
5863 read_unlock(&llsd->llsd_rb_lock);
5864 llst = it->loi_llst;
5865 LASSERT(llst != NULL);
5867 /* Save the key and hash for iterate next. */
5868 llst->llst_fid = it->loi_key;
5869 llst->llst_hash = it->loi_hash;
5870 lfsck_layout_llst_put(llst);
5871 lfsck_component_put(env, com);
5877 * \retval +1: the iteration finished
5878 * \retval 0: on success, not finished
5879 * \retval -ve: on error
5881 static int lfsck_orphan_it_next(const struct lu_env *env,
5884 struct lfsck_thread_info *info = lfsck_env_info(env);
5885 struct filter_fid_old *pfid = &info->lti_old_pfid;
5886 struct lu_attr *la = &info->lti_la;
5887 struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
5888 struct lu_fid *key = &it->loi_key;
5889 struct lu_orphan_rec *rec = &it->loi_rec;
5890 struct lfsck_component *com = it->loi_com;
5891 struct lfsck_instance *lfsck = com->lc_lfsck;
5892 struct lfsck_layout_slave_data *llsd = com->lc_data;
5893 struct dt_object *obj;
5894 struct lfsck_rbtree_node *lrn;
5898 __u32 idx = it->loi_llst->llst_index;
5908 lrn = lfsck_rbtree_search(llsd, key, &exact);
5916 key->f_seq = lrn->lrn_seq;
5917 key->f_oid = lrn->lrn_first_oid;
5922 if (unlikely(key->f_oid == 0)) {
5929 lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH) {
5935 if (unlikely(atomic_read(&lrn->lrn_known_count) <=
5936 atomic_read(&lrn->lrn_accessed_count))) {
5937 struct rb_node *next = rb_next(&lrn->lrn_node);
5939 while (next != NULL) {
5940 lrn = rb_entry(next, struct lfsck_rbtree_node,
5942 if (atomic_read(&lrn->lrn_known_count) >
5943 atomic_read(&lrn->lrn_accessed_count))
5945 next = rb_next(next);
5954 key->f_seq = lrn->lrn_seq;
5955 key->f_oid = lrn->lrn_first_oid;
5959 pos = key->f_oid - lrn->lrn_first_oid;
5962 pos = find_next_bit(lrn->lrn_known_bitmap,
5963 LFSCK_RBTREE_BITMAP_WIDTH, pos);
5964 if (pos >= LFSCK_RBTREE_BITMAP_WIDTH) {
5965 key->f_oid = lrn->lrn_first_oid + pos;
5966 if (unlikely(key->f_oid < lrn->lrn_first_oid)) {
5974 if (test_bit(pos, lrn->lrn_accessed_bitmap)) {
5979 key->f_oid = lrn->lrn_first_oid + pos;
5980 obj = lfsck_object_find(env, lfsck, key);
5983 if (rc == -ENOENT) {
5990 dt_read_lock(env, obj, 0);
5991 if (dt_object_exists(obj) == 0 ||
5992 lfsck_is_dead_obj(obj)) {
5993 dt_read_unlock(env, obj);
5994 lfsck_object_put(env, obj);
5999 rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
6003 rc = dt_xattr_get(env, obj, lfsck_buf_get(env, pfid, sizeof(*pfid)),
6004 XATTR_NAME_FID, BYPASS_CAPA);
6005 if (rc == -ENODATA) {
6006 /* For the pre-created OST-object, update the bitmap to avoid
6007 * others LFSCK (second phase) iteration to touch it again. */
6008 if (la->la_ctime == 0) {
6009 if (!test_and_set_bit(pos, lrn->lrn_accessed_bitmap))
6010 atomic_inc(&lrn->lrn_accessed_count);
6012 /* For the race between repairing dangling referenced
6013 * MDT-object and unlink the file, it may left orphan
6014 * OST-object there. Destroy it now! */
6015 if (unlikely(!(la->la_mode & S_ISUID))) {
6016 dt_read_unlock(env, obj);
6017 lfsck_layout_destroy_orphan(env,
6020 lfsck_object_put(env, obj);
6024 } else if (idx == 0) {
6025 /* If the orphan OST-object has no parent information,
6026 * regard it as referenced by the MDT-object on MDT0. */
6027 fid_zero(&rec->lor_fid);
6028 rec->lor_uid = la->la_uid;
6029 rec->lor_gid = la->la_gid;
6033 dt_read_unlock(env, obj);
6034 lfsck_object_put(env, obj);
6042 if (rc != sizeof(struct filter_fid) &&
6043 rc != sizeof(struct filter_fid_old))
6044 GOTO(out, rc = -EINVAL);
6046 fid_le_to_cpu(&rec->lor_fid, &pfid->ff_parent);
6047 /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
6048 * MDT-object's FID::f_ver, instead it is the OST-object index in its
6049 * parent MDT-object's layout EA. */
6050 save = rec->lor_fid.f_stripe_idx;
6051 rec->lor_fid.f_ver = 0;
6052 rc = lfsck_fid_match_idx(env, lfsck, &rec->lor_fid, idx);
6053 /* If the orphan OST-object does not claim the MDT, then next.
6055 * If we do not know whether it matches or not, then return it
6056 * to the MDT for further check. */
6058 dt_read_unlock(env, obj);
6059 lfsck_object_put(env, obj);
6064 rec->lor_fid.f_stripe_idx = save;
6065 rec->lor_uid = la->la_uid;
6066 rec->lor_gid = la->la_gid;
6068 CDEBUG(D_LFSCK, "%s: return orphan "DFID", PFID "DFID", owner %u:%u\n",
6069 lfsck_lfsck2name(com->lc_lfsck), PFID(key), PFID(&rec->lor_fid),
6070 rec->lor_uid, rec->lor_gid);
6075 dt_read_unlock(env, obj);
6076 lfsck_object_put(env, obj);
6084 * \retval +1: locate to the exactly position
6085 * \retval 0: cannot locate to the exactly position,
6086 * call next() to move to a valid position.
6087 * \retval -ve: on error
6089 static int lfsck_orphan_it_get(const struct lu_env *env,
6091 const struct dt_key *key)
6093 struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6096 it->loi_key = *(struct lu_fid *)key;
6097 rc = lfsck_orphan_it_next(env, di);
6107 static void lfsck_orphan_it_put(const struct lu_env *env,
6112 static struct dt_key *lfsck_orphan_it_key(const struct lu_env *env,
6113 const struct dt_it *di)
6115 struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6117 return (struct dt_key *)&it->loi_key;
6120 static int lfsck_orphan_it_key_size(const struct lu_env *env,
6121 const struct dt_it *di)
6123 return sizeof(struct lu_fid);
6126 static int lfsck_orphan_it_rec(const struct lu_env *env,
6127 const struct dt_it *di,
6131 struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6133 *(struct lu_orphan_rec *)rec = it->loi_rec;
6138 static __u64 lfsck_orphan_it_store(const struct lu_env *env,
6139 const struct dt_it *di)
6141 struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6143 return it->loi_hash;
6147 * \retval +1: locate to the exactly position
6148 * \retval 0: cannot locate to the exactly position,
6149 * call next() to move to a valid position.
6150 * \retval -ve: on error
6152 static int lfsck_orphan_it_load(const struct lu_env *env,
6153 const struct dt_it *di,
6156 struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6157 struct lfsck_layout_slave_target *llst = it->loi_llst;
6160 LASSERT(llst != NULL);
6162 if (hash != llst->llst_hash) {
6163 CDEBUG(D_LFSCK, "%s: the given hash "LPU64" for orphan "
6164 "iteration does not match the one when fini "
6165 LPU64", to be reset.\n",
6166 lfsck_lfsck2name(it->loi_com->lc_lfsck), hash,
6168 fid_zero(&llst->llst_fid);
6169 llst->llst_hash = 0;
6172 it->loi_key = llst->llst_fid;
6173 it->loi_hash = llst->llst_hash;
6174 rc = lfsck_orphan_it_next(env, (struct dt_it *)di);
6184 static int lfsck_orphan_it_key_rec(const struct lu_env *env,
6185 const struct dt_it *di,
6191 const struct dt_index_operations lfsck_orphan_index_ops = {
6192 .dio_lookup = lfsck_orphan_index_lookup,
6193 .dio_declare_insert = lfsck_orphan_index_declare_insert,
6194 .dio_insert = lfsck_orphan_index_insert,
6195 .dio_declare_delete = lfsck_orphan_index_declare_delete,
6196 .dio_delete = lfsck_orphan_index_delete,
6198 .init = lfsck_orphan_it_init,
6199 .fini = lfsck_orphan_it_fini,
6200 .get = lfsck_orphan_it_get,
6201 .put = lfsck_orphan_it_put,
6202 .next = lfsck_orphan_it_next,
6203 .key = lfsck_orphan_it_key,
6204 .key_size = lfsck_orphan_it_key_size,
6205 .rec = lfsck_orphan_it_rec,
6206 .store = lfsck_orphan_it_store,
6207 .load = lfsck_orphan_it_load,
6208 .key_rec = lfsck_orphan_it_key_rec,