4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (c) 2014, Intel Corporation.
26 * lustre/lfsck/lfsck_layout.c
28 * Author: Fan, Yong <fan.yong@intel.com>
32 # define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_LFSCK
36 #include <linux/bitops.h>
37 #include <linux/rbtree.h>
39 #include <lustre/lustre_idl.h>
40 #include <lu_object.h>
41 #include <dt_object.h>
42 #include <lustre_fid.h>
43 #include <lustre_lib.h>
44 #include <lustre_net.h>
45 #include <lustre/lustre_user.h>
46 #include <md_object.h>
47 #include <obd_class.h>
49 #include "lfsck_internal.h"
51 #define LFSCK_LAYOUT_MAGIC_V1 0xB173AE14
52 #define LFSCK_LAYOUT_MAGIC_V2 0xB1734D76
54 #define LFSCK_LAYOUT_MAGIC LFSCK_LAYOUT_MAGIC_V2
56 struct lfsck_layout_seq {
57 struct list_head lls_list;
60 __u64 lls_lastid_known;
61 struct dt_object *lls_lastid_obj;
62 unsigned int lls_dirty:1;
65 struct lfsck_layout_slave_target {
66 /* link into lfsck_layout_slave_data::llsd_master_list. */
67 struct list_head llst_list;
68 /* The position for next record in the rbtree for iteration. */
69 struct lu_fid llst_fid;
70 /* Dummy hash for iteration against the rbtree. */
77 struct lfsck_layout_slave_data {
78 /* list for lfsck_layout_seq */
79 struct list_head llsd_seq_list;
81 /* list for the masters involve layout verification. */
82 struct list_head llsd_master_list;
85 struct dt_object *llsd_rb_obj;
86 struct rb_root llsd_rb_root;
87 rwlock_t llsd_rb_lock;
88 unsigned int llsd_rbtree_valid:1;
91 struct lfsck_layout_object {
92 struct dt_object *llo_obj;
93 struct lu_attr llo_attr;
99 struct lfsck_layout_req {
100 struct lfsck_assistant_req llr_lar;
101 struct lfsck_layout_object *llr_parent;
102 struct dt_object *llr_child;
104 __u32 llr_lov_idx; /* offset in LOV EA */
107 struct lfsck_layout_slave_async_args {
108 struct obd_export *llsaa_exp;
109 struct lfsck_component *llsaa_com;
110 struct lfsck_layout_slave_target *llsaa_llst;
113 static struct lfsck_layout_object *
114 lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
115 __u64 cookie, __u16 gen)
117 struct lfsck_layout_object *llo;
122 return ERR_PTR(-ENOMEM);
124 rc = dt_attr_get(env, obj, &llo->llo_attr, BYPASS_CAPA);
131 lu_object_get(&obj->do_lu);
133 llo->llo_cookie = cookie;
134 /* The gen can be used to check whether some others have changed the
135 * file layout after LFSCK pre-fetching but before real verification. */
137 atomic_set(&llo->llo_ref, 1);
143 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
145 if (atomic_dec_and_test(&llst->llst_ref)) {
146 LASSERT(list_empty(&llst->llst_list));
153 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
155 struct lfsck_layout_slave_target *llst;
156 struct lfsck_layout_slave_target *tmp;
163 INIT_LIST_HEAD(&llst->llst_list);
165 llst->llst_index = index;
166 atomic_set(&llst->llst_ref, 1);
168 spin_lock(&llsd->llsd_lock);
169 list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
170 if (tmp->llst_index == index) {
176 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
177 spin_unlock(&llsd->llsd_lock);
186 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
187 struct lfsck_layout_slave_target *llst)
191 spin_lock(&llsd->llsd_lock);
192 if (!list_empty(&llst->llst_list)) {
193 list_del_init(&llst->llst_list);
196 spin_unlock(&llsd->llsd_lock);
199 lfsck_layout_llst_put(llst);
202 static inline struct lfsck_layout_slave_target *
203 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
204 __u32 index, bool unlink)
206 struct lfsck_layout_slave_target *llst;
208 spin_lock(&llsd->llsd_lock);
209 list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
210 if (llst->llst_index == index) {
212 list_del_init(&llst->llst_list);
214 atomic_inc(&llst->llst_ref);
215 spin_unlock(&llsd->llsd_lock);
220 spin_unlock(&llsd->llsd_lock);
225 static inline void lfsck_layout_object_put(const struct lu_env *env,
226 struct lfsck_layout_object *llo)
228 if (atomic_dec_and_test(&llo->llo_ref)) {
229 lfsck_object_put(env, llo->llo_obj);
234 static struct lfsck_layout_req *
235 lfsck_layout_assistant_req_init(struct lfsck_layout_object *parent,
236 struct dt_object *child, __u32 ost_idx,
239 struct lfsck_layout_req *llr;
243 return ERR_PTR(-ENOMEM);
245 INIT_LIST_HEAD(&llr->llr_lar.lar_list);
246 atomic_inc(&parent->llo_ref);
247 llr->llr_parent = parent;
248 llr->llr_child = child;
249 llr->llr_ost_idx = ost_idx;
250 llr->llr_lov_idx = lov_idx;
255 static void lfsck_layout_assistant_req_fini(const struct lu_env *env,
256 struct lfsck_assistant_req *lar)
258 struct lfsck_layout_req *llr =
259 container_of0(lar, struct lfsck_layout_req, llr_lar);
261 lu_object_put(env, &llr->llr_child->do_lu);
262 lfsck_layout_object_put(env, llr->llr_parent);
267 lfsck_layout_assistant_sync_failures_interpret(const struct lu_env *env,
268 struct ptlrpc_request *req,
271 struct lfsck_async_interpret_args *laia = args;
274 atomic_dec(laia->laia_count);
280 * Notify remote LFSCK instances about former failures.
282 * The local LFSCK instance has recorded which OSTs have ever failed to respond
283 * some LFSCK verification requests (maybe because of network issues or the OST
284 * itself trouble). During the respond gap, the OST may missed some OST-objects
285 * verification, then the OST cannot know whether related OST-objects have been
286 * referenced by related MDT-objects or not, then in the second-stage scanning,
287 * these OST-objects will be regarded as orphan, if the OST-object contains bad
288 * parent FID for back reference, then it will misguide the LFSCK to make wrong
289 * fixing for the fake orphan.
291 * To avoid above trouble, when layout LFSCK finishes the first-stage scanning,
292 * it will scan the bitmap for the ever failed OSTs, and notify them that they
293 * have ever missed some OST-object verification and should skip the handling
294 * for orphan OST-objects on all MDTs that are in the layout LFSCK.
296 * \param[in] env pointer to the thread context
297 * \param[in] com pointer to the lfsck component
298 * \param[in] lr pointer to the lfsck request
300 static void lfsck_layout_assistant_sync_failures(const struct lu_env *env,
301 struct lfsck_component *com,
302 struct lfsck_request *lr)
304 struct lfsck_async_interpret_args *laia =
305 &lfsck_env_info(env)->lti_laia2;
306 struct lfsck_assistant_data *lad = com->lc_data;
307 struct lfsck_layout *lo = com->lc_file_ram;
308 struct lfsck_instance *lfsck = com->lc_lfsck;
309 struct lfsck_tgt_descs *ltds = &lfsck->li_ost_descs;
310 struct lfsck_tgt_desc *ltd;
311 struct ptlrpc_request_set *set;
317 if (!lad->lad_incomplete || lo->ll_flags & LF_INCOMPLETE)
320 /* If the MDT has ever failed to verfiy some OST-objects,
321 * then sync failures with them firstly. */
322 lr->lr_flags2 = lo->ll_flags | LF_INCOMPLETE;
324 atomic_set(&count, 0);
325 memset(laia, 0, sizeof(*laia));
326 laia->laia_count = &count;
327 set = ptlrpc_prep_set();
329 GOTO(out, rc = -ENOMEM);
331 down_read(<ds->ltd_rw_sem);
332 cfs_foreach_bit(lad->lad_bitmap, idx) {
333 ltd = LTD_TGT(ltds, idx);
334 LASSERT(ltd != NULL);
336 spin_lock(<ds->ltd_lock);
337 list_del_init(<d->ltd_layout_phase_list);
338 list_del_init(<d->ltd_layout_list);
339 spin_unlock(<ds->ltd_lock);
341 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
342 lfsck_layout_assistant_sync_failures_interpret,
345 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
346 "notify target %x for %s phase1 done: "
347 "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
348 ltd->ltd_index, lad->lad_name, rc);
355 up_read(<ds->ltd_rw_sem);
357 if (rc == 0 && atomic_read(&count) > 0)
358 rc = ptlrpc_set_wait(set);
360 ptlrpc_set_destroy(set);
362 if (rc == 0 && atomic_read(&count) > 0)
369 /* If failed to sync failures with the OSTs, then have to
370 * mark the whole LFSCK as LF_INCOMPLETE to skip the whole
371 * subsequent orphan OST-object handling. */
372 lo->ll_flags |= LF_INCOMPLETE;
374 lr->lr_flags2 = lo->ll_flags;
377 static int lfsck_layout_get_lovea(const struct lu_env *env,
378 struct dt_object *obj, struct lu_buf *buf)
383 rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LOV, BYPASS_CAPA);
385 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
390 lu_buf_realloc(buf, rc);
391 if (buf->lb_buf == NULL)
403 if (unlikely(buf->lb_buf == NULL)) {
404 lu_buf_alloc(buf, rc);
405 if (buf->lb_buf == NULL)
414 static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
419 magic = le32_to_cpu(lmm->lmm_magic);
420 /* If magic crashed, keep it there. Sometime later, during OST-object
421 * orphan handling, if some OST-object(s) back-point to it, it can be
422 * verified and repaired. */
423 if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3) {
427 lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
428 if ((magic & LOV_MAGIC_MASK) == LOV_MAGIC_MAGIC)
433 CDEBUG(D_LFSCK, "%s LOV EA magic %u on "DOSTID"\n",
434 rc == -EINVAL ? "Unknown" : "Unsupported",
440 pattern = le32_to_cpu(lmm->lmm_pattern);
441 /* XXX: currently, we only support LOV_PATTERN_RAID0. */
442 if (lov_pattern(pattern) != LOV_PATTERN_RAID0) {
445 lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
446 CDEBUG(D_LFSCK, "Unsupported LOV EA pattern %u on "DOSTID"\n",
447 pattern, POSTID(&oi));
455 #define LFSCK_RBTREE_BITMAP_SIZE PAGE_CACHE_SIZE
456 #define LFSCK_RBTREE_BITMAP_WIDTH (LFSCK_RBTREE_BITMAP_SIZE << 3)
457 #define LFSCK_RBTREE_BITMAP_MASK (LFSCK_RBTREE_BITMAP_WIDTH - 1)
459 struct lfsck_rbtree_node {
460 struct rb_node lrn_node;
463 atomic_t lrn_known_count;
464 atomic_t lrn_accessed_count;
465 void *lrn_known_bitmap;
466 void *lrn_accessed_bitmap;
469 static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn,
470 __u64 seq, __u32 oid)
472 if (seq < lrn->lrn_seq)
475 if (seq > lrn->lrn_seq)
478 if (oid < lrn->lrn_first_oid)
481 if (oid - lrn->lrn_first_oid >= LFSCK_RBTREE_BITMAP_WIDTH)
487 /* The caller should hold llsd->llsd_rb_lock. */
488 static struct lfsck_rbtree_node *
489 lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd,
490 const struct lu_fid *fid, bool *exact)
492 struct rb_node *node = llsd->llsd_rb_root.rb_node;
493 struct rb_node *prev = NULL;
494 struct lfsck_rbtree_node *lrn = NULL;
500 while (node != NULL) {
502 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
503 rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid));
505 node = node->rb_left;
507 node = node->rb_right;
515 /* If there is no exactly matched one, then to the next valid one. */
518 /* The rbtree is empty. */
525 node = rb_next(prev);
527 /* The end of the rbtree. */
531 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
536 static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env,
537 const struct lu_fid *fid)
539 struct lfsck_rbtree_node *lrn;
543 return ERR_PTR(-ENOMEM);
545 OBD_ALLOC(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
546 if (lrn->lrn_known_bitmap == NULL) {
549 return ERR_PTR(-ENOMEM);
552 OBD_ALLOC(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
553 if (lrn->lrn_accessed_bitmap == NULL) {
554 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
557 return ERR_PTR(-ENOMEM);
560 RB_CLEAR_NODE(&lrn->lrn_node);
561 lrn->lrn_seq = fid_seq(fid);
562 lrn->lrn_first_oid = fid_oid(fid) & ~LFSCK_RBTREE_BITMAP_MASK;
563 atomic_set(&lrn->lrn_known_count, 0);
564 atomic_set(&lrn->lrn_accessed_count, 0);
569 static void lfsck_rbtree_free(struct lfsck_rbtree_node *lrn)
571 OBD_FREE(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
572 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
576 /* The caller should hold lock. */
577 static struct lfsck_rbtree_node *
578 lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd,
579 struct lfsck_rbtree_node *lrn)
581 struct rb_node **pos = &llsd->llsd_rb_root.rb_node;
582 struct rb_node *parent = NULL;
583 struct lfsck_rbtree_node *tmp;
586 while (*pos != NULL) {
588 tmp = rb_entry(parent, struct lfsck_rbtree_node, lrn_node);
589 rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid);
591 pos = &(*pos)->rb_left;
593 pos = &(*pos)->rb_right;
598 rb_link_node(&lrn->lrn_node, parent, pos);
599 rb_insert_color(&lrn->lrn_node, &llsd->llsd_rb_root);
604 extern const struct dt_index_operations lfsck_orphan_index_ops;
606 static int lfsck_rbtree_setup(const struct lu_env *env,
607 struct lfsck_component *com)
609 struct lu_fid *fid = &lfsck_env_info(env)->lti_fid;
610 struct lfsck_instance *lfsck = com->lc_lfsck;
611 struct dt_device *dev = lfsck->li_bottom;
612 struct lfsck_layout_slave_data *llsd = com->lc_data;
613 struct dt_object *obj;
615 fid->f_seq = FID_SEQ_LAYOUT_RBTREE;
616 fid->f_oid = lfsck_dev_idx(dev);
618 obj = dt_locate(env, dev, fid);
620 RETURN(PTR_ERR(obj));
622 /* Generate an in-RAM object to stand for the layout rbtree.
623 * Scanning the layout rbtree will be via the iteration over
624 * the object. In the future, the rbtree may be written onto
625 * disk with the object.
627 * Mark the object to be as exist. */
628 obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
629 obj->do_index_ops = &lfsck_orphan_index_ops;
630 llsd->llsd_rb_obj = obj;
631 llsd->llsd_rbtree_valid = 1;
632 dev->dd_record_fid_accessed = 1;
634 CDEBUG(D_LFSCK, "%s: layout LFSCK init OST-objects accessing bitmap\n",
635 lfsck_lfsck2name(lfsck));
640 static void lfsck_rbtree_cleanup(const struct lu_env *env,
641 struct lfsck_component *com)
643 struct lfsck_instance *lfsck = com->lc_lfsck;
644 struct lfsck_layout_slave_data *llsd = com->lc_data;
645 struct rb_node *node = rb_first(&llsd->llsd_rb_root);
646 struct rb_node *next;
647 struct lfsck_rbtree_node *lrn;
649 lfsck->li_bottom->dd_record_fid_accessed = 0;
650 /* Invalid the rbtree, then no others will use it. */
651 write_lock(&llsd->llsd_rb_lock);
652 llsd->llsd_rbtree_valid = 0;
653 write_unlock(&llsd->llsd_rb_lock);
655 while (node != NULL) {
656 next = rb_next(node);
657 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
658 rb_erase(node, &llsd->llsd_rb_root);
659 lfsck_rbtree_free(lrn);
663 if (llsd->llsd_rb_obj != NULL) {
664 lu_object_put(env, &llsd->llsd_rb_obj->do_lu);
665 llsd->llsd_rb_obj = NULL;
668 CDEBUG(D_LFSCK, "%s: layout LFSCK fini OST-objects accessing bitmap\n",
669 lfsck_lfsck2name(lfsck));
672 static void lfsck_rbtree_update_bitmap(const struct lu_env *env,
673 struct lfsck_component *com,
674 const struct lu_fid *fid,
677 struct lfsck_layout_slave_data *llsd = com->lc_data;
678 struct lfsck_rbtree_node *lrn;
684 if (unlikely(!fid_is_sane(fid) || fid_is_last_id(fid)))
687 if (!fid_is_idif(fid) && !fid_is_norm(fid))
690 read_lock(&llsd->llsd_rb_lock);
691 if (!llsd->llsd_rbtree_valid)
692 GOTO(unlock, rc = 0);
694 lrn = lfsck_rbtree_search(llsd, fid, NULL);
696 struct lfsck_rbtree_node *tmp;
700 read_unlock(&llsd->llsd_rb_lock);
701 tmp = lfsck_rbtree_new(env, fid);
703 GOTO(out, rc = PTR_ERR(tmp));
706 write_lock(&llsd->llsd_rb_lock);
707 if (!llsd->llsd_rbtree_valid) {
708 lfsck_rbtree_free(tmp);
709 GOTO(unlock, rc = 0);
712 lrn = lfsck_rbtree_insert(llsd, tmp);
714 lfsck_rbtree_free(tmp);
717 idx = fid_oid(fid) & LFSCK_RBTREE_BITMAP_MASK;
718 /* Any accessed object must be a known object. */
719 if (!test_and_set_bit(idx, lrn->lrn_known_bitmap))
720 atomic_inc(&lrn->lrn_known_count);
721 if (accessed && !test_and_set_bit(idx, lrn->lrn_accessed_bitmap))
722 atomic_inc(&lrn->lrn_accessed_count);
724 GOTO(unlock, rc = 0);
728 write_unlock(&llsd->llsd_rb_lock);
730 read_unlock(&llsd->llsd_rb_lock);
732 if (rc != 0 && accessed) {
733 struct lfsck_layout *lo = com->lc_file_ram;
735 CDEBUG(D_LFSCK, "%s: fail to update OST-objects accessing "
736 "bitmap, and will cause incorrect LFSCK OST-object "
737 "handling, so disable it to cancel orphan handling "
738 "for related device. rc = %d\n",
739 lfsck_lfsck2name(com->lc_lfsck), rc);
741 lo->ll_flags |= LF_INCOMPLETE;
742 lfsck_rbtree_cleanup(env, com);
746 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
747 const struct lfsck_layout *src)
751 des->ll_magic = le32_to_cpu(src->ll_magic);
752 des->ll_status = le32_to_cpu(src->ll_status);
753 des->ll_flags = le32_to_cpu(src->ll_flags);
754 des->ll_success_count = le32_to_cpu(src->ll_success_count);
755 des->ll_run_time_phase1 = le32_to_cpu(src->ll_run_time_phase1);
756 des->ll_run_time_phase2 = le32_to_cpu(src->ll_run_time_phase2);
757 des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
758 des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
759 des->ll_time_last_checkpoint =
760 le64_to_cpu(src->ll_time_last_checkpoint);
761 des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
762 des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
763 des->ll_pos_first_inconsistent =
764 le64_to_cpu(src->ll_pos_first_inconsistent);
765 des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
766 des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
767 des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
768 des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
769 for (i = 0; i < LLIT_MAX; i++)
770 des->ll_objs_repaired[i] =
771 le64_to_cpu(src->ll_objs_repaired[i]);
772 des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
773 des->ll_bitmap_size = le32_to_cpu(src->ll_bitmap_size);
776 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
777 const struct lfsck_layout *src)
781 des->ll_magic = cpu_to_le32(src->ll_magic);
782 des->ll_status = cpu_to_le32(src->ll_status);
783 des->ll_flags = cpu_to_le32(src->ll_flags);
784 des->ll_success_count = cpu_to_le32(src->ll_success_count);
785 des->ll_run_time_phase1 = cpu_to_le32(src->ll_run_time_phase1);
786 des->ll_run_time_phase2 = cpu_to_le32(src->ll_run_time_phase2);
787 des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
788 des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
789 des->ll_time_last_checkpoint =
790 cpu_to_le64(src->ll_time_last_checkpoint);
791 des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
792 des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
793 des->ll_pos_first_inconsistent =
794 cpu_to_le64(src->ll_pos_first_inconsistent);
795 des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
796 des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
797 des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
798 des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
799 for (i = 0; i < LLIT_MAX; i++)
800 des->ll_objs_repaired[i] =
801 cpu_to_le64(src->ll_objs_repaired[i]);
802 des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
803 des->ll_bitmap_size = cpu_to_le32(src->ll_bitmap_size);
807 * Load the OST bitmap from the lfsck_layout trace file.
809 * \param[in] env pointer to the thread context
810 * \param[in] com pointer to the lfsck component
812 * \retval 0 for success
813 * \retval negative error number on failure or data corruption
815 static int lfsck_layout_load_bitmap(const struct lu_env *env,
816 struct lfsck_component *com)
818 struct dt_object *obj = com->lc_obj;
819 struct lfsck_assistant_data *lad = com->lc_data;
820 struct lfsck_layout *lo = com->lc_file_ram;
821 cfs_bitmap_t *bitmap = lad->lad_bitmap;
822 loff_t pos = com->lc_file_size;
828 if (com->lc_lfsck->li_ost_descs.ltd_tgts_bitmap->size >
830 nbits = com->lc_lfsck->li_ost_descs.ltd_tgts_bitmap->size;
832 nbits = lo->ll_bitmap_size;
834 if (unlikely(nbits < BITS_PER_LONG))
835 nbits = BITS_PER_LONG;
837 if (nbits > bitmap->size) {
838 __u32 new_bits = bitmap->size;
839 cfs_bitmap_t *new_bitmap;
841 while (new_bits < nbits)
844 new_bitmap = CFS_ALLOCATE_BITMAP(new_bits);
845 if (new_bitmap == NULL)
848 lad->lad_bitmap = new_bitmap;
849 CFS_FREE_BITMAP(bitmap);
853 if (lo->ll_bitmap_size == 0) {
854 lad->lad_incomplete = 0;
855 CFS_RESET_BITMAP(bitmap);
860 size = (lo->ll_bitmap_size + 7) >> 3;
861 rc = dt_read(env, obj, lfsck_buf_get(env, bitmap->data, size), &pos);
863 RETURN(rc >= 0 ? -EINVAL : rc);
865 if (cfs_bitmap_check_empty(bitmap))
866 lad->lad_incomplete = 0;
868 lad->lad_incomplete = 1;
874 * Load the layout LFSCK trace file from disk.
876 * The layout LFSCK trace file records the layout LFSCK status information
877 * and other statistics, such as how many objects have been scanned, and how
878 * many objects have been repaired, and etc. It also contains the bitmap for
879 * failed OSTs during the layout LFSCK. All these information will be loaded
880 * from disk to RAM when the layout LFSCK component setup.
882 * \param[in] env pointer to the thread context
883 * \param[in] com pointer to the lfsck component
885 * \retval positive number for file data corruption, the caller
886 * should reset the layout LFSCK trace file
887 * \retval 0 for success
888 * \retval negative error number on failure
890 static int lfsck_layout_load(const struct lu_env *env,
891 struct lfsck_component *com)
893 struct lfsck_layout *lo = com->lc_file_ram;
894 ssize_t size = com->lc_file_size;
898 rc = dt_read(env, com->lc_obj,
899 lfsck_buf_get(env, com->lc_file_disk, size), &pos);
903 CDEBUG(D_LFSCK, "%s: failed to load lfsck_layout: rc = %d\n",
904 lfsck_lfsck2name(com->lc_lfsck), rc);
906 } else if (rc != size) {
907 CDEBUG(D_LFSCK, "%s: lfsck_layout size %u != %u; reset it\n",
908 lfsck_lfsck2name(com->lc_lfsck), rc, (unsigned int)size);
912 lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
913 if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
914 CDEBUG(D_LFSCK, "%s: invalid lfsck_layout magic %#x != %#x, "
915 "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
916 lo->ll_magic, LFSCK_LAYOUT_MAGIC);
924 * Store the layout LFSCK trace file on disk.
926 * The layout LFSCK trace file records the layout LFSCK status information
927 * and other statistics, such as how many objects have been scanned, and how
928 * many objects have been repaired, and etc. It also contains the bitmap for
929 * failed OSTs during the layout LFSCK. All these information will be synced
930 * from RAM to disk periodically.
932 * \param[in] env pointer to the thread context
933 * \param[in] com pointer to the lfsck component
935 * \retval 0 for success
936 * \retval negative error number on failure
938 static int lfsck_layout_store(const struct lu_env *env,
939 struct lfsck_component *com)
941 struct dt_object *obj = com->lc_obj;
942 struct lfsck_instance *lfsck = com->lc_lfsck;
943 struct lfsck_layout *lo_ram = com->lc_file_ram;
944 struct lfsck_layout *lo = com->lc_file_disk;
946 struct dt_device *dev = lfsck->li_bottom;
947 cfs_bitmap_t *bitmap = NULL;
949 ssize_t size = com->lc_file_size;
954 if (lfsck->li_master) {
955 struct lfsck_assistant_data *lad = com->lc_data;
957 bitmap = lad->lad_bitmap;
958 nbits = bitmap->size;
961 LASSERTF((nbits & 7) == 0, "Invalid nbits %u\n", nbits);
964 lo_ram->ll_bitmap_size = nbits;
965 lfsck_layout_cpu_to_le(lo, lo_ram);
966 th = dt_trans_create(env, dev);
968 GOTO(log, rc = PTR_ERR(th));
970 rc = dt_declare_record_write(env, obj, lfsck_buf_get(env, lo, size),
975 if (bitmap != NULL) {
976 rc = dt_declare_record_write(env, obj,
977 lfsck_buf_get(env, bitmap->data, nbits >> 3),
983 rc = dt_trans_start_local(env, dev, th);
988 rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos, th);
992 if (bitmap != NULL) {
994 rc = dt_record_write(env, obj,
995 lfsck_buf_get(env, bitmap->data, nbits >> 3),
1002 dt_trans_stop(env, dev, th);
1006 CDEBUG(D_LFSCK, "%s: fail to store lfsck_layout: rc = %d\n",
1007 lfsck_lfsck2name(lfsck), rc);
1012 static int lfsck_layout_init(const struct lu_env *env,
1013 struct lfsck_component *com)
1015 struct lfsck_layout *lo = com->lc_file_ram;
1018 memset(lo, 0, com->lc_file_size);
1019 lo->ll_magic = LFSCK_LAYOUT_MAGIC;
1020 lo->ll_status = LS_INIT;
1021 down_write(&com->lc_sem);
1022 rc = lfsck_layout_store(env, com);
1023 up_write(&com->lc_sem);
1028 static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
1029 struct dt_object *obj, const struct lu_fid *fid)
1031 struct seq_server_site *ss = lu_site2seq(dt->dd_lu_dev.ld_site);
1032 struct lu_seq_range *range = &lfsck_env_info(env)->lti_range;
1033 struct lustre_mdt_attrs *lma;
1036 fld_range_set_any(range);
1037 rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
1039 if (fld_range_is_ost(range))
1045 lma = &lfsck_env_info(env)->lti_lma;
1046 rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
1047 XATTR_NAME_LMA, BYPASS_CAPA);
1048 if (rc == sizeof(*lma)) {
1049 lustre_lma_swab(lma);
1051 return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
1054 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
1059 static struct lfsck_layout_seq *
1060 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
1062 struct lfsck_layout_seq *lls;
1064 list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1065 if (lls->lls_seq == seq)
1068 if (lls->lls_seq > seq)
1076 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
1077 struct lfsck_layout_seq *lls)
1079 struct lfsck_layout_seq *tmp;
1080 struct list_head *pos = &llsd->llsd_seq_list;
1082 list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
1083 if (lls->lls_seq < tmp->lls_seq) {
1084 pos = &tmp->lls_list;
1088 list_add_tail(&lls->lls_list, pos);
1092 lfsck_layout_lastid_create(const struct lu_env *env,
1093 struct lfsck_instance *lfsck,
1094 struct dt_object *obj)
1096 struct lfsck_thread_info *info = lfsck_env_info(env);
1097 struct lu_attr *la = &info->lti_la;
1098 struct dt_object_format *dof = &info->lti_dof;
1099 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1100 struct dt_device *dt = lfsck->li_bottom;
1107 if (bk->lb_param & LPF_DRYRUN)
1110 memset(la, 0, sizeof(*la));
1111 la->la_mode = S_IFREG | S_IRUGO | S_IWUSR;
1112 la->la_valid = LA_MODE | LA_UID | LA_GID;
1113 dof->dof_type = dt_mode_to_dft(S_IFREG);
1115 th = dt_trans_create(env, dt);
1117 GOTO(log, rc = PTR_ERR(th));
1119 rc = dt_declare_create(env, obj, la, NULL, dof, th);
1123 rc = dt_declare_record_write(env, obj,
1124 lfsck_buf_get(env, &lastid,
1130 rc = dt_trans_start_local(env, dt, th);
1134 dt_write_lock(env, obj, 0);
1135 if (likely(dt_object_exists(obj) == 0)) {
1136 rc = dt_create(env, obj, la, NULL, dof, th);
1138 rc = dt_record_write(env, obj,
1139 lfsck_buf_get(env, &lastid, sizeof(lastid)),
1142 dt_write_unlock(env, obj);
1147 dt_trans_stop(env, dt, th);
1150 CDEBUG(D_LFSCK, "%s: layout LFSCK will create LAST_ID for <seq> "
1152 lfsck_lfsck2name(lfsck), fid_seq(lfsck_dto2fid(obj)), rc);
1158 lfsck_layout_lastid_reload(const struct lu_env *env,
1159 struct lfsck_component *com,
1160 struct lfsck_layout_seq *lls)
1166 dt_read_lock(env, lls->lls_lastid_obj, 0);
1167 rc = dt_record_read(env, lls->lls_lastid_obj,
1168 lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
1169 dt_read_unlock(env, lls->lls_lastid_obj);
1170 if (unlikely(rc != 0))
1173 lastid = le64_to_cpu(lastid);
1174 if (lastid < lls->lls_lastid_known) {
1175 struct lfsck_instance *lfsck = com->lc_lfsck;
1176 struct lfsck_layout *lo = com->lc_file_ram;
1178 lls->lls_lastid = lls->lls_lastid_known;
1180 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1181 LASSERT(lfsck->li_out_notify != NULL);
1183 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1184 LE_LASTID_REBUILDING);
1185 lo->ll_flags |= LF_CRASHED_LASTID;
1187 CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
1188 "LAST_ID file (1) for the sequence "LPX64
1189 ", old value "LPU64", known value "LPU64"\n",
1190 lfsck_lfsck2name(lfsck), lls->lls_seq,
1191 lastid, lls->lls_lastid);
1193 } else if (lastid >= lls->lls_lastid) {
1194 lls->lls_lastid = lastid;
1202 lfsck_layout_lastid_store(const struct lu_env *env,
1203 struct lfsck_component *com)
1205 struct lfsck_instance *lfsck = com->lc_lfsck;
1206 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1207 struct dt_device *dt = lfsck->li_bottom;
1208 struct lfsck_layout_slave_data *llsd = com->lc_data;
1209 struct lfsck_layout_seq *lls;
1215 list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1218 if (!lls->lls_dirty)
1221 CDEBUG(D_LFSCK, "%s: layout LFSCK will sync the LAST_ID for "
1222 "<seq> "LPX64" as <oid> "LPU64"\n",
1223 lfsck_lfsck2name(lfsck), lls->lls_seq, lls->lls_lastid);
1225 if (bk->lb_param & LPF_DRYRUN) {
1230 th = dt_trans_create(env, dt);
1233 CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1234 "the LAST_ID for <seq> "LPX64"(1): rc = %d\n",
1235 lfsck_lfsck2name(com->lc_lfsck),
1240 lastid = cpu_to_le64(lls->lls_lastid);
1241 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
1242 lfsck_buf_get(env, &lastid,
1248 rc = dt_trans_start_local(env, dt, th);
1252 dt_write_lock(env, lls->lls_lastid_obj, 0);
1253 rc = dt_record_write(env, lls->lls_lastid_obj,
1254 lfsck_buf_get(env, &lastid,
1255 sizeof(lastid)), &pos, th);
1256 dt_write_unlock(env, lls->lls_lastid_obj);
1261 dt_trans_stop(env, dt, th);
1264 CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1265 "the LAST_ID for <seq> "LPX64"(2): rc = %d\n",
1266 lfsck_lfsck2name(com->lc_lfsck),
1275 lfsck_layout_lastid_load(const struct lu_env *env,
1276 struct lfsck_component *com,
1277 struct lfsck_layout_seq *lls)
1279 struct lfsck_instance *lfsck = com->lc_lfsck;
1280 struct lfsck_layout *lo = com->lc_file_ram;
1281 struct lu_fid *fid = &lfsck_env_info(env)->lti_fid;
1282 struct dt_object *obj;
1287 lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
1288 obj = dt_locate(env, lfsck->li_bottom, fid);
1290 RETURN(PTR_ERR(obj));
1292 /* LAST_ID crashed, to be rebuilt */
1293 if (dt_object_exists(obj) == 0) {
1294 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1295 LASSERT(lfsck->li_out_notify != NULL);
1297 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1298 LE_LASTID_REBUILDING);
1299 lo->ll_flags |= LF_CRASHED_LASTID;
1301 CDEBUG(D_LFSCK, "%s: layout LFSCK cannot find the "
1302 "LAST_ID file for sequence "LPX64"\n",
1303 lfsck_lfsck2name(lfsck), lls->lls_seq);
1305 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
1307 struct l_wait_info lwi = LWI_TIMEOUT(
1308 cfs_time_seconds(cfs_fail_val),
1311 /* Some others may changed the cfs_fail_val
1312 * as zero after above check, re-check it for
1313 * sure to avoid falling into wait for ever. */
1314 if (likely(lwi.lwi_timeout > 0)) {
1315 struct ptlrpc_thread *thread =
1318 up_write(&com->lc_sem);
1319 l_wait_event(thread->t_ctl_waitq,
1320 !thread_is_running(thread),
1322 down_write(&com->lc_sem);
1327 rc = lfsck_layout_lastid_create(env, lfsck, obj);
1329 dt_read_lock(env, obj, 0);
1330 rc = dt_read(env, obj,
1331 lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
1333 dt_read_unlock(env, obj);
1334 if (rc != 0 && rc != sizeof(__u64))
1335 GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
1337 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
1338 LASSERT(lfsck->li_out_notify != NULL);
1340 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1341 LE_LASTID_REBUILDING);
1342 lo->ll_flags |= LF_CRASHED_LASTID;
1344 CDEBUG(D_LFSCK, "%s: layout LFSCK finds invalid "
1345 "LAST_ID file for the sequence "LPX64
1347 lfsck_lfsck2name(lfsck), lls->lls_seq, rc);
1350 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
1358 lfsck_object_put(env, obj);
1360 lls->lls_lastid_obj = obj;
1365 static void lfsck_layout_record_failure(const struct lu_env *env,
1366 struct lfsck_instance *lfsck,
1367 struct lfsck_layout *lo)
1371 lo->ll_objs_failed_phase1++;
1372 cookie = lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
1374 if (lo->ll_pos_first_inconsistent == 0 ||
1375 lo->ll_pos_first_inconsistent < cookie) {
1376 lo->ll_pos_first_inconsistent = cookie;
1378 CDEBUG(D_LFSCK, "%s: layout LFSCK hit first non-repaired "
1379 "inconsistency at the pos ["LPU64"]\n",
1380 lfsck_lfsck2name(lfsck),
1381 lo->ll_pos_first_inconsistent);
1385 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1386 struct lfsck_component *com,
1389 struct lfsck_instance *lfsck = com->lc_lfsck;
1390 struct lfsck_layout *lo = com->lc_file_ram;
1392 down_write(&com->lc_sem);
1393 lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1394 HALF_SEC - lfsck->li_time_last_checkpoint);
1395 lo->ll_time_last_checkpoint = cfs_time_current_sec();
1396 lo->ll_objs_checked_phase2 += com->lc_new_checked;
1399 if (lo->ll_flags & LF_INCOMPLETE) {
1400 lo->ll_status = LS_PARTIAL;
1402 if (lfsck->li_master) {
1403 struct lfsck_assistant_data *lad = com->lc_data;
1405 if (lad->lad_incomplete)
1406 lo->ll_status = LS_PARTIAL;
1408 lo->ll_status = LS_COMPLETED;
1410 lo->ll_status = LS_COMPLETED;
1413 if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
1414 lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1415 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1416 lo->ll_success_count++;
1417 } else if (rc == 0) {
1418 if (lfsck->li_status != 0)
1419 lo->ll_status = lfsck->li_status;
1421 lo->ll_status = LS_STOPPED;
1423 lo->ll_status = LS_FAILED;
1426 rc = lfsck_layout_store(env, com);
1427 up_write(&com->lc_sem);
1432 static int lfsck_layout_trans_stop(const struct lu_env *env,
1433 struct dt_device *dev,
1434 struct thandle *handle, int result)
1438 handle->th_result = result;
1439 rc = dt_trans_stop(env, dev, handle);
1449 * Get the system default stripe size.
1451 * \param[in] env pointer to the thread context
1452 * \param[in] lfsck pointer to the lfsck instance
1453 * \param[out] size pointer to the default stripe size
1455 * \retval 0 for success
1456 * \retval negative error number on failure
1458 static int lfsck_layout_get_def_stripesize(const struct lu_env *env,
1459 struct lfsck_instance *lfsck,
1462 struct lov_user_md *lum = &lfsck_env_info(env)->lti_lum;
1463 struct dt_object *root;
1466 root = dt_locate(env, lfsck->li_next, &lfsck->li_local_root_fid);
1468 return PTR_ERR(root);
1470 /* Get the default stripe size via xattr_get on the backend root. */
1471 rc = dt_xattr_get(env, root, lfsck_buf_get(env, lum, sizeof(*lum)),
1472 XATTR_NAME_LOV, BYPASS_CAPA);
1474 /* The lum->lmm_stripe_size is LE mode. The *size also
1475 * should be LE mode. So it is unnecessary to convert. */
1476 *size = lum->lmm_stripe_size;
1478 } else if (unlikely(rc == 0)) {
1482 lfsck_object_put(env, root);
1488 * \retval +1: repaired
1489 * \retval 0: did nothing
1490 * \retval -ve: on error
1492 static int lfsck_layout_refill_lovea(const struct lu_env *env,
1493 struct thandle *handle,
1494 struct dt_object *parent,
1495 struct lu_fid *cfid,
1497 struct lov_ost_data_v1 *slot,
1498 int fl, __u32 ost_idx)
1500 struct ost_id *oi = &lfsck_env_info(env)->lti_oi;
1501 struct lov_mds_md_v1 *lmm = buf->lb_buf;
1502 struct lu_buf ea_buf;
1507 magic = le32_to_cpu(lmm->lmm_magic);
1508 count = le16_to_cpu(lmm->lmm_stripe_count);
1510 fid_to_ostid(cfid, oi);
1511 ostid_cpu_to_le(oi, &slot->l_ost_oi);
1512 slot->l_ost_gen = cpu_to_le32(0);
1513 slot->l_ost_idx = cpu_to_le32(ost_idx);
1515 if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE) {
1516 struct lov_ost_data_v1 *objs;
1519 if (magic == LOV_MAGIC_V1)
1520 objs = &lmm->lmm_objects[0];
1522 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1523 for (i = 0; i < count; i++, objs++) {
1524 if (objs != slot && lovea_slot_is_dummy(objs))
1528 /* If the @slot is the last dummy slot to be refilled,
1529 * then drop LOV_PATTERN_F_HOLE from lmm::lmm_pattern. */
1531 lmm->lmm_pattern &= ~cpu_to_le32(LOV_PATTERN_F_HOLE);
1534 lfsck_buf_init(&ea_buf, lmm, lov_mds_md_size(count, magic));
1535 rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV, fl, handle,
1544 * \retval +1: repaired
1545 * \retval 0: did nothing
1546 * \retval -ve: on error
1548 static int lfsck_layout_extend_lovea(const struct lu_env *env,
1549 struct lfsck_instance *lfsck,
1550 struct thandle *handle,
1551 struct dt_object *parent,
1552 struct lu_fid *cfid,
1553 struct lu_buf *buf, int fl,
1554 __u32 ost_idx, __u32 ea_off, bool reset)
1556 struct lov_mds_md_v1 *lmm = buf->lb_buf;
1557 struct lov_ost_data_v1 *objs;
1563 if (fl == LU_XATTR_CREATE || reset) {
1564 __u32 pattern = LOV_PATTERN_RAID0;
1567 LASSERT(buf->lb_len >= lov_mds_md_size(count, LOV_MAGIC_V1));
1569 if (ea_off != 0 || reset) {
1570 pattern |= LOV_PATTERN_F_HOLE;
1574 memset(lmm, 0, buf->lb_len);
1575 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1576 lmm->lmm_pattern = cpu_to_le32(pattern);
1577 fid_to_lmm_oi(lfsck_dto2fid(parent), &lmm->lmm_oi);
1578 lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
1580 rc = lfsck_layout_get_def_stripesize(env, lfsck,
1581 &lmm->lmm_stripe_size);
1585 objs = &lmm->lmm_objects[ea_off];
1587 __u32 magic = le32_to_cpu(lmm->lmm_magic);
1590 count = le16_to_cpu(lmm->lmm_stripe_count);
1591 if (magic == LOV_MAGIC_V1)
1592 objs = &lmm->lmm_objects[count];
1594 objs = &((struct lov_mds_md_v3 *)lmm)->
1597 gap = ea_off - count;
1600 LASSERT(buf->lb_len >= lov_mds_md_size(count, magic));
1603 memset(objs, 0, gap * sizeof(*objs));
1604 lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_HOLE);
1608 lmm->lmm_layout_gen =
1609 cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
1613 lmm->lmm_stripe_count = cpu_to_le16(count);
1614 rc = lfsck_layout_refill_lovea(env, handle, parent, cfid, buf, objs,
1617 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant extend layout EA for "
1618 DFID": parent "DFID", OST-index %u, stripe-index %u, fl %d, "
1619 "reset %s, %s LOV EA hole: rc = %d\n",
1620 lfsck_lfsck2name(lfsck), PFID(cfid), PFID(lfsck_dto2fid(parent)),
1621 ost_idx, ea_off, fl, reset ? "yes" : "no",
1622 hole ? "with" : "without", rc);
1628 * \retval +1: repaired
1629 * \retval 0: did nothing
1630 * \retval -ve: on error
1632 static int lfsck_layout_update_pfid(const struct lu_env *env,
1633 struct lfsck_component *com,
1634 struct dt_object *parent,
1635 struct lu_fid *cfid,
1636 struct dt_device *cdev, __u32 ea_off)
1638 struct filter_fid *pfid = &lfsck_env_info(env)->lti_new_pfid;
1639 struct dt_object *child;
1640 struct thandle *handle;
1641 const struct lu_fid *tfid = lu_object_fid(&parent->do_lu);
1646 child = lfsck_object_find_by_dev(env, cdev, cfid);
1648 RETURN(PTR_ERR(child));
1650 handle = dt_trans_create(env, cdev);
1652 GOTO(out, rc = PTR_ERR(handle));
1654 pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
1655 pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
1656 /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
1657 * MDT-object's FID::f_ver, instead it is the OST-object index in its
1658 * parent MDT-object's layout EA. */
1659 pfid->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1660 buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
1662 rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
1666 rc = dt_trans_start(env, cdev, handle);
1670 rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
1673 GOTO(stop, rc = (rc == 0 ? 1 : rc));
1676 dt_trans_stop(env, cdev, handle);
1679 lu_object_put(env, &child->do_lu);
1685 * This function will create the MDT-object with the given (partial) LOV EA.
1687 * Under some data corruption cases, the MDT-object of the file may be lost,
1688 * but its OST-objects, or some of them are there. The layout LFSCK needs to
1689 * re-create the MDT-object with the orphan OST-object(s) information.
1691 * On the other hand, the LFSCK may has created some OST-object for repairing
1692 * dangling LOV EA reference, but as the LFSCK processing, it may find that
1693 * the old OST-object is there and should replace the former new created OST
1694 * object. Unfortunately, some others have modified such newly created object.
1695 * To keep the data (both new and old), the LFSCK will create MDT-object with
1696 * new FID to reference the original OST-object.
1698 * \param[in] env pointer to the thread context
1699 * \param[in] com pointer to the lfsck component
1700 * \param[in] ltd pointer to target device descriptor
1701 * \param[in] rec pointer to the record for the orphan OST-object
1702 * \param[in] cfid pointer to FID for the orphan OST-object
1703 * \param[in] infix additional information, such as the FID for original
1704 * MDT-object and the stripe offset in the LOV EA
1705 * \param[in] type the type for describing why the orphan MDT-object is
1706 * created. The rules are as following:
1708 * type "C": Multiple OST-objects claim the same MDT-object and the
1709 * same slot in the layout EA. Then the LFSCK will create
1710 * new MDT-object(s) to hold the conflict OST-object(s).
1712 * type "N": The orphan OST-object does not know which one was the
1713 * real parent MDT-object, so the LFSCK uses new FID for
1714 * its parent MDT-object.
1716 * type "R": The orphan OST-object knows its parent MDT-object FID,
1717 * but does not know the position (the file name) in the
1720 * type "D": The MDT-object is a directory, it may knows its parent
1721 * but because there is no valid linkEA, the LFSCK cannot
1722 * know where to put it back to the namespace.
1723 * type "O": The MDT-object has no linkEA, and there is no name
1724 * entry that references the MDT-object.
1726 * type "P": The orphan object to be created was a parent directory
1727 * of some MDT-object which linkEA shows that the @orphan
1728 * object is missing.
1730 * The orphan name will be like:
1731 * ${FID}-${infix}-${type}-${conflict_version}
1733 * \param[in] ea_off the stripe offset in the LOV EA
1735 * \retval positive on repaired something
1736 * \retval 0 if needs to repair nothing
1737 * \retval negative error number on failure
1739 static int lfsck_layout_recreate_parent(const struct lu_env *env,
1740 struct lfsck_component *com,
1741 struct lfsck_tgt_desc *ltd,
1742 struct lu_orphan_rec *rec,
1743 struct lu_fid *cfid,
1748 struct lfsck_thread_info *info = lfsck_env_info(env);
1749 struct dt_insert_rec *dtrec = &info->lti_dt_rec;
1750 char *name = info->lti_key;
1751 struct lu_attr *la = &info->lti_la;
1752 struct dt_object_format *dof = &info->lti_dof;
1753 struct lfsck_instance *lfsck = com->lc_lfsck;
1754 struct lu_fid *pfid = &rec->lor_fid;
1755 struct lu_fid *tfid = &info->lti_fid3;
1756 struct dt_device *next = lfsck->li_next;
1757 struct dt_object *pobj = NULL;
1758 struct dt_object *cobj = NULL;
1759 struct thandle *th = NULL;
1760 struct lu_buf pbuf = { NULL };
1761 struct lu_buf *ea_buf = &info->lti_big_buf;
1762 struct lu_buf lov_buf;
1763 struct lustre_handle lh = { 0 };
1764 struct linkea_data ldata = { NULL };
1765 struct lu_buf linkea_buf;
1766 const struct lu_name *pname;
1772 if (unlikely(lfsck->li_lpf_obj == NULL))
1773 GOTO(log, rc = -ENXIO);
1775 if (fid_is_zero(pfid)) {
1776 struct filter_fid *ff = &info->lti_new_pfid;
1778 rc = lfsck_fid_alloc(env, lfsck, pfid, false);
1782 ff->ff_parent.f_seq = cpu_to_le64(pfid->f_seq);
1783 ff->ff_parent.f_oid = cpu_to_le32(pfid->f_oid);
1784 /* Currently, the filter_fid::ff_parent::f_ver is not the
1785 * real parent MDT-object's FID::f_ver, instead it is the
1786 * OST-object index in its parent MDT-object's layout EA. */
1787 ff->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1788 lfsck_buf_init(&pbuf, ff, sizeof(struct filter_fid));
1789 cobj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid);
1791 GOTO(log, rc = PTR_ERR(cobj));
1794 pobj = lfsck_object_find_by_dev(env, lfsck->li_bottom, pfid);
1796 GOTO(put, rc = PTR_ERR(pobj));
1798 LASSERT(infix != NULL);
1799 LASSERT(type != NULL);
1802 snprintf(name, NAME_MAX, DFID"%s-%s-%d", PFID(pfid), infix,
1804 rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
1805 (const struct dt_key *)name, BYPASS_CAPA);
1806 if (rc != 0 && rc != -ENOENT)
1810 rc = linkea_data_new(&ldata,
1811 &lfsck_env_info(env)->lti_linkea_buf);
1815 pname = lfsck_name_get_const(env, name, strlen(name));
1816 rc = linkea_add_buf(&ldata, pname, lfsck_dto2fid(lfsck->li_lpf_obj));
1820 memset(la, 0, sizeof(*la));
1821 la->la_uid = rec->lor_uid;
1822 la->la_gid = rec->lor_gid;
1823 la->la_mode = S_IFREG | S_IRUSR;
1824 la->la_valid = LA_MODE | LA_UID | LA_GID;
1826 memset(dof, 0, sizeof(*dof));
1827 dof->dof_type = dt_mode_to_dft(S_IFREG);
1829 size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
1830 if (ea_buf->lb_len < size) {
1831 lu_buf_realloc(ea_buf, size);
1832 if (ea_buf->lb_buf == NULL)
1833 GOTO(put, rc = -ENOMEM);
1836 /* Hold update lock on the .lustre/lost+found/MDTxxxx/.
1838 * XXX: Currently, we do not grab the PDO lock as normal create cases,
1839 * because creating MDT-object for orphan OST-object is rare, we
1840 * do not much care about the performance. It can be improved in
1841 * the future when needed. */
1842 rc = lfsck_ibits_lock(env, lfsck, lfsck->li_lpf_obj, &lh,
1843 MDS_INODELOCK_UPDATE, LCK_EX);
1847 th = dt_trans_create(env, next);
1849 GOTO(unlock, rc = PTR_ERR(th));
1851 /* 1a. Update OST-object's parent information remotely.
1853 * If other subsequent modifications failed, then next LFSCK scanning
1854 * will process the OST-object as orphan again with known parent FID. */
1856 rc = dt_declare_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID,
1862 /* 2a. Create the MDT-object locally. */
1863 rc = dt_declare_create(env, pobj, la, NULL, dof, th);
1867 /* 3a. Add layout EA for the MDT-object. */
1868 lfsck_buf_init(&lov_buf, ea_buf->lb_buf, size);
1869 rc = dt_declare_xattr_set(env, pobj, &lov_buf, XATTR_NAME_LOV,
1870 LU_XATTR_CREATE, th);
1874 /* 4a. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
1875 dtrec->rec_fid = pfid;
1876 dtrec->rec_type = S_IFREG;
1877 rc = dt_declare_insert(env, lfsck->li_lpf_obj,
1878 (const struct dt_rec *)dtrec,
1879 (const struct dt_key *)name, th);
1883 /* 5a. insert linkEA for parent. */
1884 lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1885 ldata.ld_leh->leh_len);
1886 rc = dt_declare_xattr_set(env, pobj, &linkea_buf,
1887 XATTR_NAME_LINK, 0, th);
1891 rc = dt_trans_start(env, next, th);
1895 /* 1b. Update OST-object's parent information remotely. */
1897 rc = dt_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID, 0, th,
1903 dt_write_lock(env, pobj, 0);
1904 /* 2b. Create the MDT-object locally. */
1905 rc = dt_create(env, pobj, la, NULL, dof, th);
1907 /* 3b. Add layout EA for the MDT-object. */
1908 rc = lfsck_layout_extend_lovea(env, lfsck, th, pobj, cfid,
1909 &lov_buf, LU_XATTR_CREATE,
1910 ltd->ltd_index, ea_off, false);
1911 dt_write_unlock(env, pobj);
1915 /* 4b. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
1916 rc = dt_insert(env, lfsck->li_lpf_obj, (const struct dt_rec *)dtrec,
1917 (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1921 /* 5b. insert linkEA for parent. */
1922 rc = dt_xattr_set(env, pobj, &linkea_buf,
1923 XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1928 dt_trans_stop(env, next, th);
1931 lfsck_ibits_unlock(&lh, LCK_EX);
1934 if (cobj != NULL && !IS_ERR(cobj))
1935 lu_object_put(env, &cobj->do_lu);
1936 if (pobj != NULL && !IS_ERR(pobj))
1937 lu_object_put(env, &pobj->do_lu);
1941 CDEBUG(D_LFSCK, "%s layout LFSCK assistant failed to "
1942 "recreate the lost MDT-object: parent "DFID
1943 ", child "DFID", OST-index %u, stripe-index %u, "
1944 "infix %s, type %s: rc = %d\n",
1945 lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid),
1946 ltd->ltd_index, ea_off, infix, type, rc);
1948 return rc >= 0 ? 1 : rc;
1951 static int lfsck_layout_master_conditional_destroy(const struct lu_env *env,
1952 struct lfsck_component *com,
1953 const struct lu_fid *fid,
1956 struct lfsck_thread_info *info = lfsck_env_info(env);
1957 struct lfsck_request *lr = &info->lti_lr;
1958 struct lfsck_instance *lfsck = com->lc_lfsck;
1959 struct lfsck_tgt_desc *ltd;
1960 struct ptlrpc_request *req;
1961 struct lfsck_request *tmp;
1962 struct obd_export *exp;
1966 ltd = lfsck_tgt_get(&lfsck->li_ost_descs, index);
1967 if (unlikely(ltd == NULL))
1971 if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
1972 GOTO(put, rc = -EOPNOTSUPP);
1974 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
1976 GOTO(put, rc = -ENOMEM);
1978 rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
1980 ptlrpc_request_free(req);
1985 memset(lr, 0, sizeof(*lr));
1986 lr->lr_event = LE_CONDITIONAL_DESTROY;
1987 lr->lr_active = LFSCK_TYPE_LAYOUT;
1990 tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1992 ptlrpc_request_set_replen(req);
1994 rc = ptlrpc_queue_wait(req);
1995 ptlrpc_req_finished(req);
2005 static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
2006 struct lfsck_component *com,
2007 struct lfsck_request *lr)
2009 struct lfsck_thread_info *info = lfsck_env_info(env);
2010 struct lu_attr *la = &info->lti_la;
2011 ldlm_policy_data_t *policy = &info->lti_policy;
2012 struct ldlm_res_id *resid = &info->lti_resid;
2013 struct lfsck_instance *lfsck = com->lc_lfsck;
2014 struct dt_device *dev = lfsck->li_bottom;
2015 struct lu_fid *fid = &lr->lr_fid;
2016 struct dt_object *obj;
2017 struct thandle *th = NULL;
2018 struct lustre_handle lh = { 0 };
2023 obj = lfsck_object_find_by_dev(env, dev, fid);
2025 RETURN(PTR_ERR(obj));
2027 dt_read_lock(env, obj, 0);
2028 if (dt_object_exists(obj) == 0 ||
2029 lfsck_is_dead_obj(obj)) {
2030 dt_read_unlock(env, obj);
2032 GOTO(put, rc = -ENOENT);
2035 /* Get obj's attr without lock firstly. */
2036 rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2037 dt_read_unlock(env, obj);
2041 if (likely(la->la_ctime != 0 || la->la_mode & S_ISUID))
2042 GOTO(put, rc = -ETXTBSY);
2044 /* Acquire extent lock on [0, EOF] to sync with all possible written. */
2045 LASSERT(lfsck->li_namespace != NULL);
2047 memset(policy, 0, sizeof(*policy));
2048 policy->l_extent.end = OBD_OBJECT_EOF;
2049 ost_fid_build_resid(fid, resid);
2050 rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_EXTENT,
2051 policy, LCK_EX, &flags, ldlm_blocking_ast,
2052 ldlm_completion_ast, NULL, NULL, 0,
2053 LVB_T_NONE, NULL, &lh);
2055 GOTO(put, rc = -EIO);
2057 dt_write_lock(env, obj, 0);
2058 /* Get obj's attr within lock again. */
2059 rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2063 if (la->la_ctime != 0)
2064 GOTO(unlock, rc = -ETXTBSY);
2066 th = dt_trans_create(env, dev);
2068 GOTO(unlock, rc = PTR_ERR(th));
2070 rc = dt_declare_ref_del(env, obj, th);
2074 rc = dt_declare_destroy(env, obj, th);
2078 rc = dt_trans_start_local(env, dev, th);
2082 rc = dt_ref_del(env, obj, th);
2086 rc = dt_destroy(env, obj, th);
2088 CDEBUG(D_LFSCK, "%s: layout LFSCK destroyed the empty "
2089 "OST-object "DFID" that was created for reparing "
2090 "dangling referenced case. But the original missing "
2091 "OST-object is found now.\n",
2092 lfsck_lfsck2name(lfsck), PFID(fid));
2097 dt_trans_stop(env, dev, th);
2100 dt_write_unlock(env, obj);
2101 ldlm_lock_decref(&lh, LCK_EX);
2104 lu_object_put(env, &obj->do_lu);
2110 * Some OST-object has occupied the specified layout EA slot.
2111 * Such OST-object may be generated by the LFSCK when repair
2112 * dangling referenced MDT-object, which can be indicated by
2113 * attr::la_ctime == 0 but without S_ISUID in la_mode. If it
2114 * is true and such OST-object has not been modified yet, we
2115 * will replace it with the orphan OST-object; otherwise the
2116 * LFSCK will create new MDT-object to reference the orphan.
2118 * \retval +1: repaired
2119 * \retval 0: did nothing
2120 * \retval -ve: on error
2122 static int lfsck_layout_conflict_create(const struct lu_env *env,
2123 struct lfsck_component *com,
2124 struct lfsck_tgt_desc *ltd,
2125 struct lu_orphan_rec *rec,
2126 struct dt_object *parent,
2127 struct lu_fid *cfid,
2128 struct lu_buf *ea_buf,
2129 struct lov_ost_data_v1 *slot,
2132 struct lfsck_thread_info *info = lfsck_env_info(env);
2133 struct lu_fid *cfid2 = &info->lti_fid2;
2134 struct ost_id *oi = &info->lti_oi;
2135 struct lov_mds_md_v1 *lmm = ea_buf->lb_buf;
2136 struct dt_device *dev = com->lc_lfsck->li_bottom;
2137 struct thandle *th = NULL;
2138 struct lustre_handle lh = { 0 };
2139 __u32 ost_idx2 = le32_to_cpu(slot->l_ost_idx);
2143 ostid_le_to_cpu(&slot->l_ost_oi, oi);
2144 rc = ostid_to_fid(cfid2, oi, ost_idx2);
2148 /* Hold layout lock on the parent to prevent others to access. */
2149 rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2150 MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2155 rc = lfsck_layout_master_conditional_destroy(env, com, cfid2, ost_idx2);
2157 /* If the conflict OST-obejct is not created for fixing dangling
2158 * referenced MDT-object in former LFSCK check/repair, or it has
2159 * been modified by others, then we cannot destroy it. Re-create
2160 * a new MDT-object for the orphan OST-object. */
2161 if (rc == -ETXTBSY) {
2162 /* No need the layout lock on the original parent. */
2163 lfsck_ibits_unlock(&lh, LCK_EX);
2165 fid_zero(&rec->lor_fid);
2166 snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2167 "-"DFID"-%x", PFID(lu_object_fid(&parent->do_lu)),
2169 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2170 info->lti_tmpbuf, "C", ea_off);
2175 if (rc != 0 && rc != -ENOENT)
2178 th = dt_trans_create(env, dev);
2180 GOTO(unlock, rc = PTR_ERR(th));
2182 rc = dt_declare_xattr_set(env, parent, ea_buf, XATTR_NAME_LOV,
2183 LU_XATTR_REPLACE, th);
2187 rc = dt_trans_start_local(env, dev, th);
2191 dt_write_lock(env, parent, 0);
2192 lmm->lmm_layout_gen = cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2193 rc = lfsck_layout_refill_lovea(env, th, parent, cfid, ea_buf, slot,
2194 LU_XATTR_REPLACE, ltd->ltd_index);
2195 dt_write_unlock(env, parent);
2200 dt_trans_stop(env, dev, th);
2203 lfsck_ibits_unlock(&lh, LCK_EX);
2206 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant replaced the conflict "
2207 "OST-object "DFID" on the OST %x with the orphan "DFID" on "
2208 "the OST %x: parent "DFID", stripe-index %u: rc = %d\n",
2209 lfsck_lfsck2name(com->lc_lfsck), PFID(cfid2), ost_idx2,
2210 PFID(cfid), ltd->ltd_index, PFID(lfsck_dto2fid(parent)),
2213 return rc >= 0 ? 1 : rc;
2217 * \retval +1: repaired
2218 * \retval 0: did nothing
2219 * \retval -ve: on error
2221 static int lfsck_layout_recreate_lovea(const struct lu_env *env,
2222 struct lfsck_component *com,
2223 struct lfsck_tgt_desc *ltd,
2224 struct lu_orphan_rec *rec,
2225 struct dt_object *parent,
2226 struct lu_fid *cfid,
2227 __u32 ost_idx, __u32 ea_off)
2229 struct lfsck_thread_info *info = lfsck_env_info(env);
2230 struct lu_buf *buf = &info->lti_big_buf;
2231 struct lu_fid *fid = &info->lti_fid2;
2232 struct ost_id *oi = &info->lti_oi;
2233 struct lfsck_instance *lfsck = com->lc_lfsck;
2234 struct dt_device *dt = lfsck->li_bottom;
2235 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
2236 struct thandle *handle = NULL;
2238 struct lov_mds_md_v1 *lmm;
2239 struct lov_ost_data_v1 *objs;
2240 struct lustre_handle lh = { 0 };
2247 bool locked = false;
2250 rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
2251 MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2254 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to recreate "
2255 "LOV EA for "DFID": parent "DFID", OST-index %u, "
2256 "stripe-index %u: rc = %d\n",
2257 lfsck_lfsck2name(lfsck), PFID(cfid),
2258 PFID(lfsck_dto2fid(parent)), ost_idx, ea_off, rc);
2265 dt_write_unlock(env, parent);
2269 if (handle != NULL) {
2270 dt_trans_stop(env, dt, handle);
2275 GOTO(unlock_layout, rc);
2278 if (buf->lb_len < lovea_size) {
2279 lu_buf_realloc(buf, lovea_size);
2280 if (buf->lb_buf == NULL)
2281 GOTO(unlock_layout, rc = -ENOMEM);
2284 if (!(bk->lb_param & LPF_DRYRUN)) {
2285 handle = dt_trans_create(env, dt);
2287 GOTO(unlock_layout, rc = PTR_ERR(handle));
2289 rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2294 rc = dt_trans_start_local(env, dt, handle);
2299 dt_write_lock(env, parent, 0);
2301 rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2302 if (rc == -ERANGE) {
2303 rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV,
2307 } else if (rc == -ENODATA || rc == 0) {
2308 lovea_size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2309 /* If the declared is not big enough, re-try. */
2310 if (buf->lb_len < lovea_size) {
2314 fl = LU_XATTR_CREATE;
2315 } else if (rc < 0) {
2316 GOTO(unlock_parent, rc);
2317 } else if (unlikely(buf->lb_len == 0)) {
2320 fl = LU_XATTR_REPLACE;
2324 if (fl == LU_XATTR_CREATE) {
2325 if (bk->lb_param & LPF_DRYRUN)
2326 GOTO(unlock_parent, rc = 1);
2328 LASSERT(buf->lb_len >= lovea_size);
2330 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2331 buf, fl, ost_idx, ea_off, false);
2333 GOTO(unlock_parent, rc);
2337 rc1 = lfsck_layout_verify_header(lmm);
2339 /* If the LOV EA crashed, the rebuild it. */
2340 if (rc1 == -EINVAL) {
2341 if (bk->lb_param & LPF_DRYRUN)
2342 GOTO(unlock_parent, rc = 1);
2344 LASSERT(buf->lb_len >= lovea_size);
2346 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2347 buf, fl, ost_idx, ea_off, true);
2349 GOTO(unlock_parent, rc);
2352 /* For other unknown magic/pattern, keep the current LOV EA. */
2354 GOTO(unlock_parent, rc = rc1);
2356 /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2357 * been verified in lfsck_layout_verify_header() already. If some
2358 * new magic introduced in the future, then layout LFSCK needs to
2359 * be updated also. */
2360 magic = le32_to_cpu(lmm->lmm_magic);
2361 if (magic == LOV_MAGIC_V1) {
2362 objs = &lmm->lmm_objects[0];
2364 LASSERT(magic == LOV_MAGIC_V3);
2365 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2368 count = le16_to_cpu(lmm->lmm_stripe_count);
2370 GOTO(unlock_parent, rc = -EINVAL);
2373 /* Exceed the current end of MDT-object layout EA. Then extend it. */
2374 if (count <= ea_off) {
2375 if (bk->lb_param & LPF_DRYRUN)
2376 GOTO(unlock_parent, rc = 1);
2378 lovea_size = lov_mds_md_size(ea_off + 1, magic);
2379 /* If the declared is not big enough, re-try. */
2380 if (buf->lb_len < lovea_size) {
2385 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2386 buf, fl, ost_idx, ea_off, false);
2388 GOTO(unlock_parent, rc);
2391 LASSERTF(rc > 0, "invalid rc = %d\n", rc);
2393 for (i = 0; i < count; i++, objs++) {
2394 /* The MDT-object was created via lfsck_layout_recover_create()
2395 * by others before, and we fill the dummy layout EA. */
2396 if (lovea_slot_is_dummy(objs)) {
2400 if (bk->lb_param & LPF_DRYRUN)
2401 GOTO(unlock_parent, rc = 1);
2403 lmm->lmm_layout_gen =
2404 cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2405 rc = lfsck_layout_refill_lovea(env, handle, parent,
2406 cfid, buf, objs, fl,
2409 CDEBUG(D_LFSCK, "%s layout LFSCK assistant fill "
2410 "dummy layout slot for "DFID": parent "DFID
2411 ", OST-index %u, stripe-index %u: rc = %d\n",
2412 lfsck_lfsck2name(lfsck), PFID(cfid),
2413 PFID(lfsck_dto2fid(parent)), ost_idx, i, rc);
2415 GOTO(unlock_parent, rc);
2418 ostid_le_to_cpu(&objs->l_ost_oi, oi);
2419 rc = ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
2421 CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
2422 "invalid layout EA at the slot %d, index %u\n",
2423 lfsck_lfsck2name(lfsck),
2424 PFID(lfsck_dto2fid(parent)), i,
2425 le32_to_cpu(objs->l_ost_idx));
2427 GOTO(unlock_parent, rc);
2430 /* It should be rare case, the slot is there, but the LFSCK
2431 * does not handle it during the first-phase cycle scanning. */
2432 if (unlikely(lu_fid_eq(fid, cfid))) {
2434 GOTO(unlock_parent, rc = 0);
2436 /* Rare case that the OST-object index
2437 * does not match the parent MDT-object
2438 * layout EA. We trust the later one. */
2439 if (bk->lb_param & LPF_DRYRUN)
2440 GOTO(unlock_parent, rc = 1);
2442 dt_write_unlock(env, parent);
2444 dt_trans_stop(env, dt, handle);
2445 lfsck_ibits_unlock(&lh, LCK_EX);
2446 rc = lfsck_layout_update_pfid(env, com, parent,
2447 cfid, ltd->ltd_tgt, i);
2449 CDEBUG(D_LFSCK, "%s layout LFSCK assistant "
2450 "updated OST-object's pfid for "DFID
2451 ": parent "DFID", OST-index %u, "
2452 "stripe-index %u: rc = %d\n",
2453 lfsck_lfsck2name(lfsck), PFID(cfid),
2454 PFID(lfsck_dto2fid(parent)),
2455 ltd->ltd_index, i, rc);
2462 /* The MDT-object exists, but related layout EA slot is occupied
2464 if (bk->lb_param & LPF_DRYRUN)
2465 GOTO(unlock_parent, rc = 1);
2467 dt_write_unlock(env, parent);
2469 dt_trans_stop(env, dt, handle);
2470 lfsck_ibits_unlock(&lh, LCK_EX);
2471 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
2472 objs = &lmm->lmm_objects[ea_off];
2474 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off];
2475 rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid,
2482 dt_write_unlock(env, parent);
2486 dt_trans_stop(env, dt, handle);
2489 lfsck_ibits_unlock(&lh, LCK_EX);
2494 static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
2495 struct lfsck_component *com,
2496 struct lfsck_tgt_desc *ltd,
2497 struct lu_orphan_rec *rec,
2498 struct lu_fid *cfid)
2500 struct lfsck_layout *lo = com->lc_file_ram;
2501 struct lu_fid *pfid = &rec->lor_fid;
2502 struct dt_object *parent = NULL;
2503 __u32 ea_off = pfid->f_stripe_idx;
2507 if (!fid_is_sane(cfid))
2508 GOTO(out, rc = -EINVAL);
2510 if (fid_is_zero(pfid)) {
2511 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2517 if (!fid_is_sane(pfid))
2518 GOTO(out, rc = -EINVAL);
2520 parent = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
2522 GOTO(out, rc = PTR_ERR(parent));
2524 if (unlikely(dt_object_remote(parent) != 0))
2525 GOTO(put, rc = -EXDEV);
2527 if (dt_object_exists(parent) == 0) {
2528 lu_object_put(env, &parent->do_lu);
2529 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2534 if (!S_ISREG(lu_object_attr(&parent->do_lu)))
2535 GOTO(put, rc = -EISDIR);
2537 rc = lfsck_layout_recreate_lovea(env, com, ltd, rec, parent, cfid,
2538 ltd->ltd_index, ea_off);
2544 lu_object_put(env, &parent->do_lu);
2546 /* The layout EA is changed, need to be reloaded next time. */
2547 lu_object_put_nocache(env, &parent->do_lu);
2550 down_write(&com->lc_sem);
2551 com->lc_new_scanned++;
2552 com->lc_new_checked++;
2554 lo->ll_objs_repaired[LLIT_ORPHAN - 1]++;
2556 } else if (rc < 0) {
2557 lo->ll_objs_failed_phase2++;
2559 up_write(&com->lc_sem);
2564 static int lfsck_layout_scan_orphan(const struct lu_env *env,
2565 struct lfsck_component *com,
2566 struct lfsck_tgt_desc *ltd)
2568 struct lfsck_assistant_data *lad = com->lc_data;
2569 struct lfsck_instance *lfsck = com->lc_lfsck;
2570 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
2571 struct lfsck_thread_info *info = lfsck_env_info(env);
2572 struct ost_id *oi = &info->lti_oi;
2573 struct lu_fid *fid = &info->lti_fid;
2574 struct dt_object *obj;
2575 const struct dt_it_ops *iops;
2580 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant starts the orphan "
2581 "scanning for OST%04x\n",
2582 lfsck_lfsck2name(lfsck), ltd->ltd_index);
2584 if (cfs_bitmap_check(lad->lad_bitmap, ltd->ltd_index)) {
2585 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant skip the orphan "
2586 "scanning for OST%04x\n",
2587 lfsck_lfsck2name(lfsck), ltd->ltd_index);
2592 ostid_set_seq(oi, FID_SEQ_IDIF);
2593 ostid_set_id(oi, 0);
2594 rc = ostid_to_fid(fid, oi, ltd->ltd_index);
2598 obj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, fid);
2599 if (unlikely(IS_ERR(obj)))
2600 GOTO(log, rc = PTR_ERR(obj));
2602 rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_orphan_features);
2606 iops = &obj->do_index_ops->dio_it;
2607 di = iops->init(env, obj, 0, BYPASS_CAPA);
2609 GOTO(put, rc = PTR_ERR(di));
2611 rc = iops->load(env, di, 0);
2613 /* -ESRCH means that the orphan OST-objects rbtree has been
2614 * cleanup because of the OSS server restart or other errors. */
2615 lfsck_lad_set_bitmap(env, com, ltd->ltd_index);
2620 rc = iops->next(env, di);
2632 struct lu_orphan_rec *rec = &info->lti_rec;
2634 if (CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_DELAY3, cfs_fail_val) &&
2635 unlikely(!thread_is_running(&lfsck->li_thread)))
2638 key = iops->key(env, di);
2639 com->lc_fid_latest_scanned_phase2 = *(struct lu_fid *)key;
2640 rc = iops->rec(env, di, (struct dt_rec *)rec, 0);
2642 rc = lfsck_layout_scan_orphan_one(env, com, ltd, rec,
2643 &com->lc_fid_latest_scanned_phase2);
2644 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
2647 lfsck_control_speed_by_self(com);
2649 rc = iops->next(env, di);
2650 } while (rc < 0 && !(bk->lb_param & LPF_FAILOUT));
2657 iops->fini(env, di);
2659 lu_object_put(env, &obj->do_lu);
2662 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant finished the orphan "
2663 "scanning for OST%04x: rc = %d\n",
2664 lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
2666 return rc > 0 ? 0 : rc;
2669 /* For the MDT-object with dangling reference, we need to repare the
2670 * inconsistency according to the LFSCK sponsor's requirement:
2672 * 1) Keep the inconsistency there and report the inconsistency case,
2673 * then give the chance to the application to find related issues,
2674 * and the users can make the decision about how to handle it with
2675 * more human knownledge. (by default)
2677 * 2) Re-create the missing OST-object with the FID/owner information. */
2678 static int lfsck_layout_repair_dangling(const struct lu_env *env,
2679 struct lfsck_component *com,
2680 struct lfsck_layout_req *llr,
2681 const struct lu_attr *pla)
2683 struct lfsck_thread_info *info = lfsck_env_info(env);
2684 struct filter_fid *pfid = &info->lti_new_pfid;
2685 struct dt_allocation_hint *hint = &info->lti_hint;
2686 struct lu_attr *cla = &info->lti_la2;
2687 struct dt_object *parent = llr->llr_parent->llo_obj;
2688 struct dt_object *child = llr->llr_child;
2689 struct dt_device *dev = lfsck_obj2dt_dev(child);
2690 const struct lu_fid *tfid = lu_object_fid(&parent->do_lu);
2691 struct thandle *handle;
2693 struct lustre_handle lh = { 0 };
2698 if (com->lc_lfsck->li_bookmark_ram.lb_param & LPF_CREATE_OSTOBJ)
2706 memset(cla, 0, sizeof(*cla));
2707 cla->la_uid = pla->la_uid;
2708 cla->la_gid = pla->la_gid;
2709 cla->la_mode = S_IFREG | 0666;
2710 cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
2711 LA_ATIME | LA_MTIME | LA_CTIME;
2713 rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2714 MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2719 handle = dt_trans_create(env, dev);
2721 GOTO(unlock1, rc = PTR_ERR(handle));
2723 hint->dah_parent = NULL;
2725 pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2726 pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2727 /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2728 * MDT-object's FID::f_ver, instead it is the OST-object index in its
2729 * parent MDT-object's layout EA. */
2730 pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
2731 buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2733 rc = dt_declare_create(env, child, cla, hint, NULL, handle);
2737 rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID,
2738 LU_XATTR_CREATE, handle);
2742 rc = dt_trans_start(env, dev, handle);
2746 dt_read_lock(env, parent, 0);
2747 if (unlikely(lfsck_is_dead_obj(parent)))
2748 GOTO(unlock2, rc = 1);
2750 rc = dt_create(env, child, cla, hint, NULL, handle);
2754 rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, LU_XATTR_CREATE,
2755 handle, BYPASS_CAPA);
2760 dt_read_unlock(env, parent);
2763 rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2766 lfsck_ibits_unlock(&lh, LCK_EX);
2769 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant found dangling "
2770 "reference for: parent "DFID", child "DFID", OST-index %u, "
2771 "stripe-index %u, owner %u/%u. %s: rc = %d\n",
2772 lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
2773 PFID(lfsck_dto2fid(child)), llr->llr_ost_idx,
2774 llr->llr_lov_idx, pla->la_uid, pla->la_gid,
2775 create ? "Create the lost OST-object as required" :
2776 "Keep the MDT-object there by default", rc);
2781 /* If the OST-object does not recognize the MDT-object as its parent, and
2782 * there is no other MDT-object claims as its parent, then just trust the
2783 * given MDT-object as its parent. So update the OST-object filter_fid. */
2784 static int lfsck_layout_repair_unmatched_pair(const struct lu_env *env,
2785 struct lfsck_component *com,
2786 struct lfsck_layout_req *llr,
2787 const struct lu_attr *pla)
2789 struct lfsck_thread_info *info = lfsck_env_info(env);
2790 struct filter_fid *pfid = &info->lti_new_pfid;
2791 struct lu_attr *tla = &info->lti_la3;
2792 struct dt_object *parent = llr->llr_parent->llo_obj;
2793 struct dt_object *child = llr->llr_child;
2794 struct dt_device *dev = lfsck_obj2dt_dev(child);
2795 const struct lu_fid *tfid = lu_object_fid(&parent->do_lu);
2796 struct thandle *handle;
2798 struct lustre_handle lh = { 0 };
2802 rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2803 MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2808 handle = dt_trans_create(env, dev);
2810 GOTO(unlock1, rc = PTR_ERR(handle));
2812 pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2813 pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2814 /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2815 * MDT-object's FID::f_ver, instead it is the OST-object index in its
2816 * parent MDT-object's layout EA. */
2817 pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
2818 buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2820 rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
2824 tla->la_valid = LA_UID | LA_GID;
2825 tla->la_uid = pla->la_uid;
2826 tla->la_gid = pla->la_gid;
2827 rc = dt_declare_attr_set(env, child, tla, handle);
2831 rc = dt_trans_start(env, dev, handle);
2835 dt_write_lock(env, parent, 0);
2836 if (unlikely(lfsck_is_dead_obj(parent)))
2837 GOTO(unlock2, rc = 1);
2839 rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
2844 /* Get the latest parent's owner. */
2845 rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
2849 tla->la_valid = LA_UID | LA_GID;
2850 rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
2855 dt_write_unlock(env, parent);
2858 rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2861 lfsck_ibits_unlock(&lh, LCK_EX);
2864 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired unmatched "
2865 "MDT-OST pair for: parent "DFID", child "DFID", OST-index %u, "
2866 "stripe-index %u, owner %u/%u: rc = %d\n",
2867 lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
2868 PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
2869 pla->la_uid, pla->la_gid, rc);
2874 /* If there are more than one MDT-objects claim as the OST-object's parent,
2875 * and the OST-object only recognizes one of them, then we need to generate
2876 * new OST-object(s) with new fid(s) for the non-recognized MDT-object(s). */
2877 static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
2878 struct lfsck_component *com,
2879 struct lfsck_layout_req *llr,
2883 struct lfsck_thread_info *info = lfsck_env_info(env);
2884 struct dt_allocation_hint *hint = &info->lti_hint;
2885 struct dt_object_format *dof = &info->lti_dof;
2886 struct dt_device *pdev = com->lc_lfsck->li_next;
2887 struct ost_id *oi = &info->lti_oi;
2888 struct dt_object *parent = llr->llr_parent->llo_obj;
2889 struct dt_device *cdev = lfsck_obj2dt_dev(llr->llr_child);
2890 struct dt_object *child = NULL;
2891 struct lu_device *d = &cdev->dd_lu_dev;
2892 struct lu_object *o = NULL;
2893 struct thandle *handle;
2894 struct lov_mds_md_v1 *lmm;
2895 struct lov_ost_data_v1 *objs;
2896 struct lustre_handle lh = { 0 };
2897 struct lu_buf ea_buf;
2902 rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2903 MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2908 handle = dt_trans_create(env, pdev);
2910 GOTO(unlock1, rc = PTR_ERR(handle));
2912 o = lu_object_anon(env, d, NULL);
2914 GOTO(stop, rc = PTR_ERR(o));
2916 child = container_of(o, struct dt_object, do_lu);
2917 o = lu_object_locate(o->lo_header, d->ld_type);
2918 if (unlikely(o == NULL))
2919 GOTO(stop, rc = -EINVAL);
2921 child = container_of(o, struct dt_object, do_lu);
2922 la->la_valid = LA_UID | LA_GID;
2923 hint->dah_parent = NULL;
2925 dof->dof_type = DFT_REGULAR;
2926 rc = dt_declare_create(env, child, la, NULL, NULL, handle);
2930 rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2931 LU_XATTR_REPLACE, handle);
2935 rc = dt_trans_start(env, pdev, handle);
2939 dt_write_lock(env, parent, 0);
2940 if (unlikely(lfsck_is_dead_obj(parent)))
2941 GOTO(unlock2, rc = 0);
2943 rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2944 if (unlikely(rc == 0 || rc == -ENODATA || rc == -ERANGE))
2945 GOTO(unlock2, rc = 0);
2948 /* Someone change layout during the LFSCK, no need to repair then. */
2949 if (le16_to_cpu(lmm->lmm_layout_gen) != llr->llr_parent->llo_gen)
2950 GOTO(unlock2, rc = 0);
2952 rc = dt_create(env, child, la, hint, dof, handle);
2956 /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2957 * been verified in lfsck_layout_verify_header() already. If some
2958 * new magic introduced in the future, then layout LFSCK needs to
2959 * be updated also. */
2960 magic = le32_to_cpu(lmm->lmm_magic);
2961 if (magic == LOV_MAGIC_V1) {
2962 objs = &lmm->lmm_objects[0];
2964 LASSERT(magic == LOV_MAGIC_V3);
2965 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2968 lmm->lmm_layout_gen = cpu_to_le16(llr->llr_parent->llo_gen + 1);
2969 fid_to_ostid(lu_object_fid(&child->do_lu), oi);
2970 ostid_cpu_to_le(oi, &objs[llr->llr_lov_idx].l_ost_oi);
2971 objs[llr->llr_lov_idx].l_ost_gen = cpu_to_le32(0);
2972 objs[llr->llr_lov_idx].l_ost_idx = cpu_to_le32(llr->llr_ost_idx);
2973 lfsck_buf_init(&ea_buf, lmm,
2974 lov_mds_md_size(le16_to_cpu(lmm->lmm_stripe_count),
2976 rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV,
2977 LU_XATTR_REPLACE, handle, BYPASS_CAPA);
2979 GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
2982 dt_write_unlock(env, parent);
2986 lu_object_put(env, &child->do_lu);
2988 dt_trans_stop(env, pdev, handle);
2991 lfsck_ibits_unlock(&lh, LCK_EX);
2994 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired multiple "
2995 "references for: parent "DFID", OST-index %u, stripe-index %u, "
2996 "owner %u/%u: rc = %d\n",
2997 lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
2998 llr->llr_ost_idx, llr->llr_lov_idx, la->la_uid, la->la_gid, rc);
3003 /* If the MDT-object and the OST-object have different owner information,
3004 * then trust the MDT-object, because the normal chown/chgrp handle order
3005 * is from MDT to OST, and it is possible that some chown/chgrp operation
3006 * is partly done. */
3007 static int lfsck_layout_repair_owner(const struct lu_env *env,
3008 struct lfsck_component *com,
3009 struct lfsck_layout_req *llr,
3010 struct lu_attr *pla)
3012 struct lfsck_thread_info *info = lfsck_env_info(env);
3013 struct lu_attr *tla = &info->lti_la3;
3014 struct dt_object *parent = llr->llr_parent->llo_obj;
3015 struct dt_object *child = llr->llr_child;
3016 struct dt_device *dev = lfsck_obj2dt_dev(child);
3017 struct thandle *handle;
3021 handle = dt_trans_create(env, dev);
3023 GOTO(log, rc = PTR_ERR(handle));
3025 tla->la_uid = pla->la_uid;
3026 tla->la_gid = pla->la_gid;
3027 tla->la_valid = LA_UID | LA_GID;
3028 rc = dt_declare_attr_set(env, child, tla, handle);
3032 rc = dt_trans_start(env, dev, handle);
3036 /* Use the dt_object lock to serialize with destroy and attr_set. */
3037 dt_read_lock(env, parent, 0);
3038 if (unlikely(lfsck_is_dead_obj(parent)))
3039 GOTO(unlock, rc = 1);
3041 /* Get the latest parent's owner. */
3042 rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
3046 /* Some others chown/chgrp during the LFSCK, needs to do nothing. */
3047 if (unlikely(tla->la_uid != pla->la_uid ||
3048 tla->la_gid != pla->la_gid))
3049 GOTO(unlock, rc = 1);
3051 tla->la_valid = LA_UID | LA_GID;
3052 rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
3057 dt_read_unlock(env, parent);
3060 rc = lfsck_layout_trans_stop(env, dev, handle, rc);
3063 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired inconsistent "
3064 "file owner for: parent "DFID", child "DFID", OST-index %u, "
3065 "stripe-index %u, owner %u/%u: rc = %d\n",
3066 lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3067 PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
3068 pla->la_uid, pla->la_gid, rc);
3073 /* Check whether the OST-object correctly back points to the
3074 * MDT-object (@parent) via the XATTR_NAME_FID xattr (@pfid). */
3075 static int lfsck_layout_check_parent(const struct lu_env *env,
3076 struct lfsck_component *com,
3077 struct dt_object *parent,
3078 const struct lu_fid *pfid,
3079 const struct lu_fid *cfid,
3080 const struct lu_attr *pla,
3081 const struct lu_attr *cla,
3082 struct lfsck_layout_req *llr,
3083 struct lu_buf *lov_ea, __u32 idx)
3085 struct lfsck_thread_info *info = lfsck_env_info(env);
3086 struct lu_buf *buf = &info->lti_big_buf;
3087 struct dt_object *tobj;
3088 struct lov_mds_md_v1 *lmm;
3089 struct lov_ost_data_v1 *objs;
3090 struct lustre_handle lh = { 0 };
3097 if (fid_is_zero(pfid)) {
3098 /* client never wrote. */
3099 if (cla->la_size == 0 && cla->la_blocks == 0) {
3100 if (unlikely(cla->la_uid != pla->la_uid ||
3101 cla->la_gid != pla->la_gid))
3102 RETURN (LLIT_INCONSISTENT_OWNER);
3107 RETURN(LLIT_UNMATCHED_PAIR);
3110 if (unlikely(!fid_is_sane(pfid)))
3111 RETURN(LLIT_UNMATCHED_PAIR);
3113 if (lu_fid_eq(pfid, lu_object_fid(&parent->do_lu))) {
3114 if (llr->llr_lov_idx == idx)
3117 RETURN(LLIT_UNMATCHED_PAIR);
3120 tobj = lfsck_object_find(env, com->lc_lfsck, pfid);
3122 RETURN(PTR_ERR(tobj));
3124 if (dt_object_exists(tobj) == 0 ||
3125 lfsck_is_dead_obj(tobj))
3126 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3128 if (!S_ISREG(lfsck_object_type(tobj)))
3129 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3131 /* Load the tobj's layout EA, in spite of it is a local MDT-object or
3132 * remote one on another MDT. Then check whether the given OST-object
3133 * is in such layout. If yes, it is multiple referenced, otherwise it
3134 * is unmatched referenced case. */
3135 rc = lfsck_layout_get_lovea(env, tobj, buf);
3136 if (rc == 0 || rc == -ENOENT)
3137 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3143 magic = le32_to_cpu(lmm->lmm_magic);
3144 if (magic == LOV_MAGIC_V1) {
3145 objs = &lmm->lmm_objects[0];
3147 LASSERT(magic == LOV_MAGIC_V3);
3148 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3151 count = le16_to_cpu(lmm->lmm_stripe_count);
3152 for (i = 0; i < count; i++, objs++) {
3153 struct lu_fid *tfid = &info->lti_fid2;
3154 struct ost_id *oi = &info->lti_oi;
3157 if (lovea_slot_is_dummy(objs))
3160 ostid_le_to_cpu(&objs->l_ost_oi, oi);
3161 idx2 = le32_to_cpu(objs->l_ost_idx);
3162 rc = ostid_to_fid(tfid, oi, idx2);
3164 CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
3165 "invalid layout EA at the slot %d, index %u\n",
3166 lfsck_lfsck2name(com->lc_lfsck),
3167 PFID(pfid), i, idx2);
3169 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3172 if (lu_fid_eq(cfid, tfid)) {
3173 rc = lfsck_ibits_lock(env, com->lc_lfsck, tobj, &lh,
3174 MDS_INODELOCK_UPDATE |
3175 MDS_INODELOCK_LAYOUT |
3176 MDS_INODELOCK_XATTR,
3181 dt_read_lock(env, tobj, 0);
3183 /* For local MDT-object, re-check existence
3184 * after taken the lock. */
3185 if (!dt_object_remote(tobj)) {
3186 if (dt_object_exists(tobj) == 0 ||
3187 lfsck_is_dead_obj(tobj)) {
3188 rc = LLIT_UNMATCHED_PAIR;
3191 rc = LLIT_MULTIPLE_REFERENCED;
3197 /* For migration case, the new MDT-object and old
3198 * MDT-object may reference the same OST-object at
3199 * some migration internal time.
3201 * For remote MDT-object, the local MDT may not know
3202 * whether it has been removed or not. Try checking
3203 * for a non-existent xattr to check if this object
3204 * has been been removed or not. */
3205 rc = dt_xattr_get(env, tobj, &LU_BUF_NULL,
3206 XATTR_NAME_DUMMY, BYPASS_CAPA);
3207 if (unlikely(rc == -ENOENT || rc >= 0)) {
3208 rc = LLIT_UNMATCHED_PAIR;
3209 } else if (rc == -ENODATA) {
3211 rc = LLIT_MULTIPLE_REFERENCED;
3218 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3221 if (lustre_handle_is_used(&lh)) {
3222 dt_read_unlock(env, tobj);
3223 lfsck_ibits_unlock(&lh, LCK_EX);
3227 lfsck_object_put(env, tobj);
3232 static int lfsck_layout_assistant_handler_p1(const struct lu_env *env,
3233 struct lfsck_component *com,
3234 struct lfsck_assistant_req *lar)
3236 struct lfsck_layout_req *llr =
3237 container_of0(lar, struct lfsck_layout_req, llr_lar);
3238 struct lfsck_layout *lo = com->lc_file_ram;
3239 struct lfsck_thread_info *info = lfsck_env_info(env);
3240 struct filter_fid_old *pea = &info->lti_old_pfid;
3241 struct lu_fid *pfid = &info->lti_fid;
3242 struct lu_buf buf = { NULL };
3243 struct dt_object *parent = llr->llr_parent->llo_obj;
3244 struct dt_object *child = llr->llr_child;
3245 struct lu_attr *pla = &info->lti_la;
3246 struct lu_attr *cla = &info->lti_la2;
3247 struct lfsck_instance *lfsck = com->lc_lfsck;
3248 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
3249 enum lfsck_layout_inconsistency_type type = LLIT_NONE;
3254 if (unlikely(lfsck_is_dead_obj(parent)))
3257 rc = dt_attr_get(env, parent, pla, BYPASS_CAPA);
3261 rc = dt_attr_get(env, child, cla, BYPASS_CAPA);
3262 if (rc == -ENOENT) {
3263 if (unlikely(lfsck_is_dead_obj(parent)))
3266 type = LLIT_DANGLING;
3273 lfsck_buf_init(&buf, pea, sizeof(struct filter_fid_old));
3274 rc = dt_xattr_get(env, child, &buf, XATTR_NAME_FID, BYPASS_CAPA);
3275 if (unlikely(rc >= 0 && rc != sizeof(struct filter_fid_old) &&
3276 rc != sizeof(struct filter_fid))) {
3277 type = LLIT_UNMATCHED_PAIR;
3281 if (rc < 0 && rc != -ENODATA)
3284 if (rc == -ENODATA) {
3287 fid_le_to_cpu(pfid, &pea->ff_parent);
3288 /* Currently, the filter_fid::ff_parent::f_ver is not the
3289 * real parent MDT-object's FID::f_ver, instead it is the
3290 * OST-object index in its parent MDT-object's layout EA. */
3291 idx = pfid->f_stripe_idx;
3295 rc = lfsck_layout_check_parent(env, com, parent, pfid,
3296 lu_object_fid(&child->do_lu),
3297 pla, cla, llr, &buf, idx);
3306 if (unlikely(cla->la_uid != pla->la_uid ||
3307 cla->la_gid != pla->la_gid)) {
3308 type = LLIT_INCONSISTENT_OWNER;
3313 if (bk->lb_param & LPF_DRYRUN) {
3314 if (type != LLIT_NONE)
3322 rc = lfsck_layout_repair_dangling(env, com, llr, pla);
3324 case LLIT_UNMATCHED_PAIR:
3325 rc = lfsck_layout_repair_unmatched_pair(env, com, llr, pla);
3327 case LLIT_MULTIPLE_REFERENCED:
3328 rc = lfsck_layout_repair_multiple_references(env, com, llr,
3331 case LLIT_INCONSISTENT_OWNER:
3332 rc = lfsck_layout_repair_owner(env, com, llr, pla);
3342 down_write(&com->lc_sem);
3344 struct lfsck_assistant_data *lad = com->lc_data;
3346 if (unlikely(lad->lad_exit)) {
3348 } else if (rc == -ENOTCONN || rc == -ESHUTDOWN ||
3349 rc == -ETIMEDOUT || rc == -EHOSTDOWN ||
3350 rc == -EHOSTUNREACH) {
3351 /* If cannot touch the target server,
3352 * mark the LFSCK as INCOMPLETE. */
3353 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant fail to "
3354 "talk with OST %x: rc = %d\n",
3355 lfsck_lfsck2name(lfsck), llr->llr_ost_idx, rc);
3356 lfsck_lad_set_bitmap(env, com, llr->llr_ost_idx);
3357 lo->ll_objs_skipped++;
3360 lfsck_layout_record_failure(env, lfsck, lo);
3362 } else if (rc > 0) {
3363 LASSERTF(type > LLIT_NONE && type <= LLIT_MAX,
3364 "unknown type = %d\n", type);
3366 lo->ll_objs_repaired[type - 1]++;
3367 if (bk->lb_param & LPF_DRYRUN &&
3368 unlikely(lo->ll_pos_first_inconsistent == 0))
3369 lo->ll_pos_first_inconsistent =
3370 lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
3373 up_write(&com->lc_sem);
3378 static int lfsck_layout_assistant_handler_p2(const struct lu_env *env,
3379 struct lfsck_component *com)
3381 struct lfsck_assistant_data *lad = com->lc_data;
3382 struct lfsck_instance *lfsck = com->lc_lfsck;
3383 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
3384 struct lfsck_tgt_descs *ltds = &lfsck->li_ost_descs;
3385 struct lfsck_tgt_desc *ltd;
3389 CDEBUG(D_LFSCK, "%s: layout LFSCK phase2 scan start\n",
3390 lfsck_lfsck2name(lfsck));
3392 spin_lock(<ds->ltd_lock);
3393 while (!list_empty(&lad->lad_ost_phase2_list)) {
3394 ltd = list_entry(lad->lad_ost_phase2_list.next,
3395 struct lfsck_tgt_desc,
3396 ltd_layout_phase_list);
3397 list_del_init(<d->ltd_layout_phase_list);
3398 if (bk->lb_param & LPF_ALL_TGT) {
3399 spin_unlock(<ds->ltd_lock);
3400 rc = lfsck_layout_scan_orphan(env, com, ltd);
3401 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
3404 if (unlikely(lad->lad_exit ||
3405 !thread_is_running(&lfsck->li_thread)))
3407 spin_lock(<ds->ltd_lock);
3411 if (list_empty(&lad->lad_ost_phase1_list))
3415 spin_unlock(<ds->ltd_lock);
3417 CDEBUG(D_LFSCK, "%s: layout LFSCK phase2 scan stop: rc = %d\n",
3418 lfsck_lfsck2name(lfsck), rc);
3424 lfsck_layout_slave_async_interpret(const struct lu_env *env,
3425 struct ptlrpc_request *req,
3428 struct lfsck_layout_slave_async_args *llsaa = args;
3429 struct obd_export *exp = llsaa->llsaa_exp;
3430 struct lfsck_component *com = llsaa->llsaa_com;
3431 struct lfsck_layout_slave_target *llst = llsaa->llsaa_llst;
3432 struct lfsck_layout_slave_data *llsd = com->lc_data;
3433 struct lfsck_reply *lr = NULL;
3437 /* It is quite probably caused by target crash,
3438 * to make the LFSCK can go ahead, assume that
3439 * the target finished the LFSCK prcoessing. */
3442 lr = req_capsule_server_get(&req->rq_pill, &RMF_LFSCK_REPLY);
3443 if (lr->lr_status != LS_SCANNING_PHASE1 &&
3444 lr->lr_status != LS_SCANNING_PHASE2)
3449 CDEBUG(D_LFSCK, "%s: layout LFSCK slave gets the MDT %x "
3450 "status %d\n", lfsck_lfsck2name(com->lc_lfsck),
3451 llst->llst_index, lr != NULL ? lr->lr_status : rc);
3453 lfsck_layout_llst_del(llsd, llst);
3456 lfsck_layout_llst_put(llst);
3457 lfsck_component_put(env, com);
3458 class_export_put(exp);
3463 static int lfsck_layout_async_query(const struct lu_env *env,
3464 struct lfsck_component *com,
3465 struct obd_export *exp,
3466 struct lfsck_layout_slave_target *llst,
3467 struct lfsck_request *lr,
3468 struct ptlrpc_request_set *set)
3470 struct lfsck_layout_slave_async_args *llsaa;
3471 struct ptlrpc_request *req;
3472 struct lfsck_request *tmp;
3476 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_QUERY);
3480 rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_QUERY);
3482 ptlrpc_request_free(req);
3486 tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3488 ptlrpc_request_set_replen(req);
3490 llsaa = ptlrpc_req_async_args(req);
3491 llsaa->llsaa_exp = exp;
3492 llsaa->llsaa_com = lfsck_component_get(com);
3493 llsaa->llsaa_llst = llst;
3494 req->rq_interpret_reply = lfsck_layout_slave_async_interpret;
3495 ptlrpc_set_add_req(set, req);
3500 static int lfsck_layout_async_notify(const struct lu_env *env,
3501 struct obd_export *exp,
3502 struct lfsck_request *lr,
3503 struct ptlrpc_request_set *set)
3505 struct ptlrpc_request *req;
3506 struct lfsck_request *tmp;
3510 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
3514 rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
3516 ptlrpc_request_free(req);
3520 tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3522 ptlrpc_request_set_replen(req);
3523 ptlrpc_set_add_req(set, req);
3529 lfsck_layout_slave_query_master(const struct lu_env *env,
3530 struct lfsck_component *com)
3532 struct lfsck_request *lr = &lfsck_env_info(env)->lti_lr;
3533 struct lfsck_instance *lfsck = com->lc_lfsck;
3534 struct lfsck_layout_slave_data *llsd = com->lc_data;
3535 struct lfsck_layout_slave_target *llst;
3536 struct obd_export *exp;
3537 struct ptlrpc_request_set *set;
3542 set = ptlrpc_prep_set();
3544 GOTO(log, rc = -ENOMEM);
3546 memset(lr, 0, sizeof(*lr));
3547 lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
3548 lr->lr_event = LE_QUERY;
3549 lr->lr_active = LFSCK_TYPE_LAYOUT;
3551 llsd->llsd_touch_gen++;
3552 spin_lock(&llsd->llsd_lock);
3553 while (!list_empty(&llsd->llsd_master_list)) {
3554 llst = list_entry(llsd->llsd_master_list.next,
3555 struct lfsck_layout_slave_target,
3557 if (llst->llst_gen == llsd->llsd_touch_gen)
3560 llst->llst_gen = llsd->llsd_touch_gen;
3561 list_move_tail(&llst->llst_list,
3562 &llsd->llsd_master_list);
3563 atomic_inc(&llst->llst_ref);
3564 spin_unlock(&llsd->llsd_lock);
3566 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
3569 lfsck_layout_llst_del(llsd, llst);
3570 lfsck_layout_llst_put(llst);
3571 spin_lock(&llsd->llsd_lock);
3575 rc = lfsck_layout_async_query(env, com, exp, llst, lr, set);
3577 CDEBUG(D_LFSCK, "%s: layout LFSCK slave fail to "
3578 "query %s for layout: rc = %d\n",
3579 lfsck_lfsck2name(lfsck),
3580 exp->exp_obd->obd_name, rc);
3583 lfsck_layout_llst_put(llst);
3584 class_export_put(exp);
3586 spin_lock(&llsd->llsd_lock);
3588 spin_unlock(&llsd->llsd_lock);
3590 rc = ptlrpc_set_wait(set);
3591 ptlrpc_set_destroy(set);
3593 GOTO(log, rc = (rc1 != 0 ? rc1 : rc));
3596 CDEBUG(D_LFSCK, "%s: layout LFSCK slave queries master: rc = %d\n",
3597 lfsck_lfsck2name(com->lc_lfsck), rc);
3603 lfsck_layout_slave_notify_master(const struct lu_env *env,
3604 struct lfsck_component *com,
3605 enum lfsck_events event, int result)
3607 struct lfsck_layout *lo = com->lc_file_ram;
3608 struct lfsck_instance *lfsck = com->lc_lfsck;
3609 struct lfsck_layout_slave_data *llsd = com->lc_data;
3610 struct lfsck_request *lr = &lfsck_env_info(env)->lti_lr;
3611 struct lfsck_layout_slave_target *llst;
3612 struct obd_export *exp;
3613 struct ptlrpc_request_set *set;
3617 CDEBUG(D_LFSCK, "%s: layout LFSCK slave notifies master\n",
3618 lfsck_lfsck2name(com->lc_lfsck));
3620 set = ptlrpc_prep_set();
3624 memset(lr, 0, sizeof(*lr));
3625 lr->lr_event = event;
3626 lr->lr_flags = LEF_FROM_OST;
3627 lr->lr_status = result;
3628 lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
3629 lr->lr_active = LFSCK_TYPE_LAYOUT;
3630 lr->lr_flags2 = lo->ll_flags;
3631 llsd->llsd_touch_gen++;
3632 spin_lock(&llsd->llsd_lock);
3633 while (!list_empty(&llsd->llsd_master_list)) {
3634 llst = list_entry(llsd->llsd_master_list.next,
3635 struct lfsck_layout_slave_target,
3637 if (llst->llst_gen == llsd->llsd_touch_gen)
3640 llst->llst_gen = llsd->llsd_touch_gen;
3641 list_move_tail(&llst->llst_list,
3642 &llsd->llsd_master_list);
3643 atomic_inc(&llst->llst_ref);
3644 spin_unlock(&llsd->llsd_lock);
3646 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
3649 lfsck_layout_llst_del(llsd, llst);
3650 lfsck_layout_llst_put(llst);
3651 spin_lock(&llsd->llsd_lock);
3655 rc = lfsck_layout_async_notify(env, exp, lr, set);
3657 CDEBUG(D_LFSCK, "%s: layout LFSCK slave fail to "
3658 "notify %s for layout: rc = %d\n",
3659 lfsck_lfsck2name(lfsck),
3660 exp->exp_obd->obd_name, rc);
3662 lfsck_layout_llst_put(llst);
3663 class_export_put(exp);
3664 spin_lock(&llsd->llsd_lock);
3666 spin_unlock(&llsd->llsd_lock);
3668 ptlrpc_set_wait(set);
3669 ptlrpc_set_destroy(set);
3675 * \ret -ENODATA: unrecognized stripe
3676 * \ret = 0 : recognized stripe
3677 * \ret < 0 : other failures
3679 static int lfsck_layout_master_check_pairs(const struct lu_env *env,
3680 struct lfsck_component *com,
3681 struct lu_fid *cfid,
3682 struct lu_fid *pfid)
3684 struct lfsck_thread_info *info = lfsck_env_info(env);
3685 struct lu_buf *buf = &info->lti_big_buf;
3686 struct ost_id *oi = &info->lti_oi;
3687 struct dt_object *obj;
3688 struct lov_mds_md_v1 *lmm;
3689 struct lov_ost_data_v1 *objs;
3690 __u32 idx = pfid->f_stripe_idx;
3698 obj = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
3700 RETURN(PTR_ERR(obj));
3702 dt_read_lock(env, obj, 0);
3703 if (unlikely(dt_object_exists(obj) == 0 ||
3704 lfsck_is_dead_obj(obj)))
3705 GOTO(unlock, rc = -ENOENT);
3707 if (!S_ISREG(lfsck_object_type(obj)))
3708 GOTO(unlock, rc = -ENODATA);
3710 rc = lfsck_layout_get_lovea(env, obj, buf);
3715 GOTO(unlock, rc = -ENODATA);
3718 rc = lfsck_layout_verify_header(lmm);
3722 /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
3723 * been verified in lfsck_layout_verify_header() already. If some
3724 * new magic introduced in the future, then layout LFSCK needs to
3725 * be updated also. */
3726 magic = le32_to_cpu(lmm->lmm_magic);
3727 if (magic == LOV_MAGIC_V1) {
3728 objs = &lmm->lmm_objects[0];
3730 LASSERT(magic == LOV_MAGIC_V3);
3731 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3734 fid_to_ostid(cfid, oi);
3735 count = le16_to_cpu(lmm->lmm_stripe_count);
3736 for (i = 0; i < count; i++, objs++) {
3739 ostid_le_to_cpu(&objs->l_ost_oi, &oi2);
3740 if (memcmp(oi, &oi2, sizeof(*oi)) == 0)
3741 GOTO(unlock, rc = (i != idx ? -ENODATA : 0));
3744 GOTO(unlock, rc = -ENODATA);
3747 dt_read_unlock(env, obj);
3748 lu_object_put(env, &obj->do_lu);
3754 * The LFSCK-on-OST will ask the LFSCK-on-MDT to check whether the given
3755 * MDT-object/OST-object pairs match or not to aviod transfer MDT-object
3756 * layout EA from MDT to OST. On one hand, the OST no need to understand
3757 * the layout EA structure; on the other hand, it may cause trouble when
3758 * transfer large layout EA from MDT to OST via normal OUT RPC.
3760 * \ret > 0: unrecognized stripe
3761 * \ret = 0: recognized stripe
3762 * \ret < 0: other failures
3764 static int lfsck_layout_slave_check_pairs(const struct lu_env *env,
3765 struct lfsck_component *com,
3766 struct lu_fid *cfid,
3767 struct lu_fid *pfid)
3769 struct lfsck_instance *lfsck = com->lc_lfsck;
3770 struct obd_device *obd = lfsck->li_obd;
3771 struct seq_server_site *ss =
3772 lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
3773 struct obd_export *exp = NULL;
3774 struct ptlrpc_request *req = NULL;
3775 struct lfsck_request *lr;
3776 struct lu_seq_range *range = &lfsck_env_info(env)->lti_range;
3780 if (unlikely(fid_is_idif(pfid)))
3783 fld_range_set_any(range);
3784 rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(pfid), range);
3786 RETURN(rc == -ENOENT ? 1 : rc);
3788 if (unlikely(!fld_range_is_mdt(range)))
3791 exp = lustre_find_lwp_by_index(obd->obd_name, range->lsr_index);
3792 if (unlikely(exp == NULL))
3795 if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
3796 GOTO(out, rc = -EOPNOTSUPP);
3798 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
3800 GOTO(out, rc = -ENOMEM);
3802 rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
3804 ptlrpc_request_free(req);
3809 lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3810 memset(lr, 0, sizeof(*lr));
3811 lr->lr_event = LE_PAIRS_VERIFY;
3812 lr->lr_active = LFSCK_TYPE_LAYOUT;
3813 lr->lr_fid = *cfid; /* OST-object itself FID. */
3814 lr->lr_fid2 = *pfid; /* The claimed parent FID. */
3816 ptlrpc_request_set_replen(req);
3817 rc = ptlrpc_queue_wait(req);
3818 ptlrpc_req_finished(req);
3820 if (rc == -ENOENT || rc == -ENODATA)
3827 class_export_put(exp);
3832 static int lfsck_layout_slave_repair_pfid(const struct lu_env *env,
3833 struct lfsck_component *com,
3834 struct lfsck_request *lr)
3836 struct lfsck_thread_info *info = lfsck_env_info(env);
3837 struct filter_fid *ff = &info->lti_new_pfid;
3839 struct dt_device *dev = com->lc_lfsck->li_bottom;
3840 struct dt_object *obj;
3841 struct thandle *th = NULL;
3845 obj = lfsck_object_find_by_dev(env, dev, &lr->lr_fid);
3847 GOTO(log, rc = PTR_ERR(obj));
3849 fid_cpu_to_le(&ff->ff_parent, &lr->lr_fid2);
3850 buf = lfsck_buf_get(env, ff, sizeof(*ff));
3851 dt_write_lock(env, obj, 0);
3852 if (unlikely(dt_object_exists(obj) == 0 ||
3853 lfsck_is_dead_obj(obj)))
3854 GOTO(unlock, rc = 0);
3856 th = dt_trans_create(env, dev);
3858 GOTO(unlock, rc = PTR_ERR(th));
3860 rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th);
3864 rc = dt_trans_start_local(env, dev, th);
3868 rc = dt_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th, BYPASS_CAPA);
3873 dt_trans_stop(env, dev, th);
3876 dt_write_unlock(env, obj);
3877 lu_object_put(env, &obj->do_lu);
3880 CDEBUG(D_LFSCK, "%s: layout LFSCK slave repaired pfid for "DFID
3881 ", parent "DFID": rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
3882 PFID(&lr->lr_fid), PFID(&lr->lr_fid2), rc);
3889 static void lfsck_layout_slave_quit(const struct lu_env *env,
3890 struct lfsck_component *com);
3892 static int lfsck_layout_reset(const struct lu_env *env,
3893 struct lfsck_component *com, bool init)
3895 struct lfsck_layout *lo = com->lc_file_ram;
3898 down_write(&com->lc_sem);
3900 memset(lo, 0, com->lc_file_size);
3902 __u32 count = lo->ll_success_count;
3903 __u64 last_time = lo->ll_time_last_complete;
3905 memset(lo, 0, com->lc_file_size);
3906 lo->ll_success_count = count;
3907 lo->ll_time_last_complete = last_time;
3910 lo->ll_magic = LFSCK_LAYOUT_MAGIC;
3911 lo->ll_status = LS_INIT;
3913 if (com->lc_lfsck->li_master) {
3914 struct lfsck_assistant_data *lad = com->lc_data;
3916 lad->lad_incomplete = 0;
3917 CFS_RESET_BITMAP(lad->lad_bitmap);
3920 rc = lfsck_layout_store(env, com);
3921 up_write(&com->lc_sem);
3923 CDEBUG(D_LFSCK, "%s: layout LFSCK reset: rc = %d\n",
3924 lfsck_lfsck2name(com->lc_lfsck), rc);
3929 static void lfsck_layout_fail(const struct lu_env *env,
3930 struct lfsck_component *com, bool new_checked)
3932 struct lfsck_layout *lo = com->lc_file_ram;
3934 down_write(&com->lc_sem);
3936 com->lc_new_checked++;
3937 lfsck_layout_record_failure(env, com->lc_lfsck, lo);
3938 up_write(&com->lc_sem);
3941 static int lfsck_layout_master_checkpoint(const struct lu_env *env,
3942 struct lfsck_component *com, bool init)
3944 struct lfsck_instance *lfsck = com->lc_lfsck;
3945 struct lfsck_layout *lo = com->lc_file_ram;
3949 rc = lfsck_checkpoint_generic(env, com);
3951 return rc > 0 ? 0 : rc;
3954 down_write(&com->lc_sem);
3956 lo->ll_pos_latest_start =
3957 lfsck->li_pos_checkpoint.lp_oit_cookie;
3959 lo->ll_pos_last_checkpoint =
3960 lfsck->li_pos_checkpoint.lp_oit_cookie;
3961 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
3962 HALF_SEC - lfsck->li_time_last_checkpoint);
3963 lo->ll_time_last_checkpoint = cfs_time_current_sec();
3964 lo->ll_objs_checked_phase1 += com->lc_new_checked;
3965 com->lc_new_checked = 0;
3968 rc = lfsck_layout_store(env, com);
3969 up_write(&com->lc_sem);
3971 CDEBUG(D_LFSCK, "%s: layout LFSCK master checkpoint at the pos ["
3972 LPU64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
3973 lfsck->li_pos_current.lp_oit_cookie, rc);
3978 static int lfsck_layout_slave_checkpoint(const struct lu_env *env,
3979 struct lfsck_component *com, bool init)
3981 struct lfsck_instance *lfsck = com->lc_lfsck;
3982 struct lfsck_layout *lo = com->lc_file_ram;
3985 if (com->lc_new_checked == 0 && !init)
3988 down_write(&com->lc_sem);
3990 lo->ll_pos_latest_start =
3991 lfsck->li_pos_checkpoint.lp_oit_cookie;
3993 lo->ll_pos_last_checkpoint =
3994 lfsck->li_pos_checkpoint.lp_oit_cookie;
3995 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
3996 HALF_SEC - lfsck->li_time_last_checkpoint);
3997 lo->ll_time_last_checkpoint = cfs_time_current_sec();
3998 lo->ll_objs_checked_phase1 += com->lc_new_checked;
3999 com->lc_new_checked = 0;
4002 rc = lfsck_layout_store(env, com);
4003 up_write(&com->lc_sem);
4005 CDEBUG(D_LFSCK, "%s: layout LFSCK slave checkpoint at the pos ["
4006 LPU64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
4007 lfsck->li_pos_current.lp_oit_cookie, rc);
4012 static int lfsck_layout_prep(const struct lu_env *env,
4013 struct lfsck_component *com,
4014 struct lfsck_start *start)
4016 struct lfsck_instance *lfsck = com->lc_lfsck;
4017 struct lfsck_layout *lo = com->lc_file_ram;
4018 struct lfsck_position *pos = &com->lc_pos_start;
4020 fid_zero(&pos->lp_dir_parent);
4021 pos->lp_dir_cookie = 0;
4022 if (lo->ll_status == LS_COMPLETED ||
4023 lo->ll_status == LS_PARTIAL ||
4024 /* To handle orphan, must scan from the beginning. */
4025 (start != NULL && start->ls_flags & LPF_OST_ORPHAN)) {
4028 rc = lfsck_layout_reset(env, com, false);
4030 rc = lfsck_set_param(env, lfsck, start, true);
4033 CDEBUG(D_LFSCK, "%s: layout LFSCK prep failed: "
4034 "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
4040 down_write(&com->lc_sem);
4041 lo->ll_time_latest_start = cfs_time_current_sec();
4042 spin_lock(&lfsck->li_lock);
4043 if (lo->ll_flags & LF_SCANNED_ONCE) {
4044 if (!lfsck->li_drop_dryrun ||
4045 lo->ll_pos_first_inconsistent == 0) {
4046 lo->ll_status = LS_SCANNING_PHASE2;
4047 list_move_tail(&com->lc_link,
4048 &lfsck->li_list_double_scan);
4049 pos->lp_oit_cookie = 0;
4053 lo->ll_status = LS_SCANNING_PHASE1;
4054 lo->ll_run_time_phase1 = 0;
4055 lo->ll_run_time_phase2 = 0;
4056 lo->ll_objs_checked_phase1 = 0;
4057 lo->ll_objs_checked_phase2 = 0;
4058 lo->ll_objs_failed_phase1 = 0;
4059 lo->ll_objs_failed_phase2 = 0;
4060 for (i = 0; i < LLIT_MAX; i++)
4061 lo->ll_objs_repaired[i] = 0;
4063 pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
4064 fid_zero(&com->lc_fid_latest_scanned_phase2);
4067 lo->ll_status = LS_SCANNING_PHASE1;
4068 if (!lfsck->li_drop_dryrun ||
4069 lo->ll_pos_first_inconsistent == 0)
4070 pos->lp_oit_cookie = lo->ll_pos_last_checkpoint + 1;
4072 pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
4074 spin_unlock(&lfsck->li_lock);
4075 up_write(&com->lc_sem);
4080 static int lfsck_layout_slave_prep(const struct lu_env *env,
4081 struct lfsck_component *com,
4082 struct lfsck_start_param *lsp)
4084 struct lfsck_layout_slave_data *llsd = com->lc_data;
4085 struct lfsck_instance *lfsck = com->lc_lfsck;
4086 struct lfsck_layout *lo = com->lc_file_ram;
4087 struct lfsck_start *start = lsp->lsp_start;
4090 rc = lfsck_layout_prep(env, com, start);
4094 if (lo->ll_flags & LF_CRASHED_LASTID &&
4095 list_empty(&llsd->llsd_master_list)) {
4096 LASSERT(lfsck->li_out_notify != NULL);
4098 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4099 LE_LASTID_REBUILDING);
4102 if (!lsp->lsp_index_valid)
4105 rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
4106 if (rc == 0 && start != NULL && start->ls_flags & LPF_OST_ORPHAN) {
4107 LASSERT(!llsd->llsd_rbtree_valid);
4109 write_lock(&llsd->llsd_rb_lock);
4110 rc = lfsck_rbtree_setup(env, com);
4111 write_unlock(&llsd->llsd_rb_lock);
4114 CDEBUG(D_LFSCK, "%s: layout LFSCK slave prep done, start pos ["
4115 LPU64"]\n", lfsck_lfsck2name(lfsck),
4116 com->lc_pos_start.lp_oit_cookie);
4121 static int lfsck_layout_master_prep(const struct lu_env *env,
4122 struct lfsck_component *com,
4123 struct lfsck_start_param *lsp)
4128 rc = lfsck_layout_load_bitmap(env, com);
4130 rc = lfsck_layout_reset(env, com, false);
4132 rc = lfsck_set_param(env, com->lc_lfsck,
4133 lsp->lsp_start, true);
4139 rc = lfsck_layout_prep(env, com, lsp->lsp_start);
4143 rc = lfsck_start_assistant(env, com, lsp);
4148 CDEBUG(D_LFSCK, "%s: layout LFSCK master prep done, start pos ["
4149 LPU64"]\n", lfsck_lfsck2name(com->lc_lfsck),
4150 com->lc_pos_start.lp_oit_cookie);
4155 /* Pre-fetch the attribute for each stripe in the given layout EA. */
4156 static int lfsck_layout_scan_stripes(const struct lu_env *env,
4157 struct lfsck_component *com,
4158 struct dt_object *parent,
4159 struct lov_mds_md_v1 *lmm)
4161 struct lfsck_thread_info *info = lfsck_env_info(env);
4162 struct lfsck_instance *lfsck = com->lc_lfsck;
4163 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
4164 struct lfsck_layout *lo = com->lc_file_ram;
4165 struct lfsck_assistant_data *lad = com->lc_data;
4166 struct lfsck_layout_object *llo = NULL;
4167 struct lov_ost_data_v1 *objs;
4168 struct lfsck_tgt_descs *ltds = &lfsck->li_ost_descs;
4169 struct ptlrpc_thread *mthread = &lfsck->li_thread;
4170 struct ptlrpc_thread *athread = &lad->lad_thread;
4171 struct l_wait_info lwi = { 0 };
4180 lfsck_buf_init(&buf, &info->lti_old_pfid,
4181 sizeof(struct filter_fid_old));
4182 count = le16_to_cpu(lmm->lmm_stripe_count);
4183 gen = le16_to_cpu(lmm->lmm_layout_gen);
4184 /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
4185 * been verified in lfsck_layout_verify_header() already. If some
4186 * new magic introduced in the future, then layout LFSCK needs to
4187 * be updated also. */
4188 magic = le32_to_cpu(lmm->lmm_magic);
4189 if (magic == LOV_MAGIC_V1) {
4190 objs = &lmm->lmm_objects[0];
4192 LASSERT(magic == LOV_MAGIC_V3);
4193 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
4196 for (i = 0; i < count; i++, objs++) {
4197 struct lu_fid *fid = &info->lti_fid;
4198 struct ost_id *oi = &info->lti_oi;
4199 struct lfsck_layout_req *llr;
4200 struct lfsck_tgt_desc *tgt = NULL;
4201 struct dt_object *cobj = NULL;
4203 bool wakeup = false;
4205 if (unlikely(lovea_slot_is_dummy(objs)))
4208 l_wait_event(mthread->t_ctl_waitq,
4209 bk->lb_async_windows == 0 ||
4210 lad->lad_prefetched < bk->lb_async_windows ||
4211 !thread_is_running(mthread) ||
4212 thread_is_stopped(athread),
4215 if (unlikely(!thread_is_running(mthread)) ||
4216 thread_is_stopped(athread))
4219 if (unlikely(lfsck_is_dead_obj(parent)))
4222 ostid_le_to_cpu(&objs->l_ost_oi, oi);
4223 index = le32_to_cpu(objs->l_ost_idx);
4224 rc = ostid_to_fid(fid, oi, index);
4226 CDEBUG(D_LFSCK, "%s: get invalid layout EA for "DFID
4227 ": "DOSTID", idx:%u\n", lfsck_lfsck2name(lfsck),
4228 PFID(lfsck_dto2fid(parent)), POSTID(oi), index);
4232 tgt = lfsck_tgt_get(ltds, index);
4233 if (unlikely(tgt == NULL)) {
4234 CDEBUG(D_LFSCK, "%s: cannot talk with OST %x which "
4235 "did not join the layout LFSCK\n",
4236 lfsck_lfsck2name(lfsck), index);
4237 lfsck_lad_set_bitmap(env, com, index);
4241 /* There is potential deadlock race condition between object
4242 * destroy and layout LFSCK. Consider the following scenario:
4244 * 1) The LFSCK thread obtained the parent object firstly, at
4245 * that time, the parent object has not been destroyed yet.
4247 * 2) One RPC service thread destroyed the parent and all its
4248 * children objects. Because the LFSCK is referencing the
4249 * parent object, then the parent object will be marked as
4250 * dying in RAM. On the other hand, the parent object is
4251 * referencing all its children objects, then all children
4252 * objects will be marked as dying in RAM also.
4254 * 3) The LFSCK thread tries to find some child object with
4255 * the parent object referenced. Then it will find that the
4256 * child object is dying. According to the object visibility
4257 * rules: the object with dying flag cannot be returned to
4258 * others. So the LFSCK thread has to wait until the dying
4259 * object has been purged from RAM, then it can allocate a
4260 * new object (with the same FID) in RAM. Unfortunately, the
4261 * LFSCK thread itself is referencing the parent object, and
4262 * cause the parent object cannot be purged, then cause the
4263 * child object cannot be purged also. So the LFSCK thread
4264 * will fall into deadlock.
4266 * We introduce non-blocked version lu_object_find() to allow
4267 * the LFSCK thread to return failure immediately (instead of
4268 * wait) when it finds dying (child) object, then the LFSCK
4269 * thread can check whether the parent object is dying or not.
4270 * So avoid above deadlock. LU-5395 */
4271 cobj = lfsck_object_find_by_dev_nowait(env, tgt->ltd_tgt, fid);
4273 if (lfsck_is_dead_obj(parent)) {
4283 rc = dt_declare_attr_get(env, cobj, BYPASS_CAPA);
4287 rc = dt_declare_xattr_get(env, cobj, &buf, XATTR_NAME_FID,
4293 llo = lfsck_layout_object_init(env, parent,
4294 lfsck->li_pos_current.lp_oit_cookie, gen);
4301 llr = lfsck_layout_assistant_req_init(llo, cobj, index, i);
4308 spin_lock(&lad->lad_lock);
4309 if (lad->lad_assistant_status < 0) {
4310 spin_unlock(&lad->lad_lock);
4311 lfsck_layout_assistant_req_fini(env, &llr->llr_lar);
4313 RETURN(lad->lad_assistant_status);
4316 list_add_tail(&llr->llr_lar.lar_list, &lad->lad_req_list);
4317 if (lad->lad_prefetched == 0)
4320 lad->lad_prefetched++;
4321 spin_unlock(&lad->lad_lock);
4323 wake_up_all(&athread->t_ctl_waitq);
4326 down_write(&com->lc_sem);
4327 com->lc_new_checked++;
4329 lfsck_layout_record_failure(env, lfsck, lo);
4330 up_write(&com->lc_sem);
4332 if (cobj != NULL && !IS_ERR(cobj))
4333 lu_object_put(env, &cobj->do_lu);
4335 if (likely(tgt != NULL))
4338 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
4345 if (llo != NULL && !IS_ERR(llo))
4346 lfsck_layout_object_put(env, llo);
4351 /* For the given object, read its layout EA locally. For each stripe, pre-fetch
4352 * the OST-object's attribute and generate an structure lfsck_layout_req on the
4353 * list ::lad_req_list.
4355 * For each request on above list, the lfsck_layout_assistant thread compares
4356 * the OST side attribute with local attribute, if inconsistent, then repair it.
4358 * All above processing is async mode with pipeline. */
4359 static int lfsck_layout_master_exec_oit(const struct lu_env *env,
4360 struct lfsck_component *com,
4361 struct dt_object *obj)
4363 struct lfsck_thread_info *info = lfsck_env_info(env);
4364 struct ost_id *oi = &info->lti_oi;
4365 struct lfsck_layout *lo = com->lc_file_ram;
4366 struct lfsck_assistant_data *lad = com->lc_data;
4367 struct lfsck_instance *lfsck = com->lc_lfsck;
4368 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
4369 struct thandle *handle = NULL;
4370 struct lu_buf *buf = &info->lti_big_buf;
4371 struct lov_mds_md_v1 *lmm = NULL;
4372 struct dt_device *dev = lfsck->li_bottom;
4373 struct lustre_handle lh = { 0 };
4374 struct lu_buf ea_buf = { NULL };
4377 bool locked = false;
4378 bool stripe = false;
4379 bool bad_oi = false;
4382 if (!S_ISREG(lfsck_object_type(obj)))
4385 if (lad->lad_assistant_status < 0)
4386 GOTO(out, rc = -ESRCH);
4388 fid_to_lmm_oi(lfsck_dto2fid(obj), oi);
4389 lmm_oi_cpu_to_le(oi, oi);
4390 dt_read_lock(env, obj, 0);
4394 if (dt_object_exists(obj) == 0 ||
4395 lfsck_is_dead_obj(obj))
4398 rc = lfsck_layout_get_lovea(env, obj, buf);
4404 rc = lfsck_layout_verify_header(lmm);
4405 /* If the LOV EA crashed, then it is possible to be rebuilt later
4406 * when handle orphan OST-objects. */
4410 if (memcmp(oi, &lmm->lmm_oi, sizeof(*oi)) == 0)
4411 GOTO(out, stripe = true);
4413 /* Inconsistent lmm_oi, should be repaired. */
4417 if (bk->lb_param & LPF_DRYRUN) {
4418 lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4420 GOTO(out, stripe = true);
4423 if (!lustre_handle_is_used(&lh)) {
4424 dt_read_unlock(env, obj);
4426 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
4427 MDS_INODELOCK_LAYOUT |
4428 MDS_INODELOCK_XATTR, LCK_EX);
4432 handle = dt_trans_create(env, dev);
4434 GOTO(out, rc = PTR_ERR(handle));
4436 lfsck_buf_init(&ea_buf, lmm, size);
4437 rc = dt_declare_xattr_set(env, obj, &ea_buf, XATTR_NAME_LOV,
4438 LU_XATTR_REPLACE, handle);
4442 rc = dt_trans_start_local(env, dev, handle);
4446 dt_write_lock(env, obj, 0);
4452 rc = dt_xattr_set(env, obj, &ea_buf, XATTR_NAME_LOV,
4453 LU_XATTR_REPLACE, handle, BYPASS_CAPA);
4457 lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4459 GOTO(out, stripe = true);
4463 if (lustre_handle_is_used(&lh))
4464 dt_write_unlock(env, obj);
4466 dt_read_unlock(env, obj);
4469 if (handle != NULL && !IS_ERR(handle))
4470 dt_trans_stop(env, dev, handle);
4472 lfsck_ibits_unlock(&lh, LCK_EX);
4475 CDEBUG(D_LFSCK, "%s: layout LFSCK master %s bad lmm_oi for "
4476 DFID": rc = %d\n", lfsck_lfsck2name(lfsck),
4477 bk->lb_param & LPF_DRYRUN ? "found" : "repaired",
4478 PFID(lfsck_dto2fid(obj)), rc);
4481 rc = lfsck_layout_scan_stripes(env, com, obj, lmm);
4483 down_write(&com->lc_sem);
4484 com->lc_new_checked++;
4486 lfsck_layout_record_failure(env, lfsck, lo);
4487 up_write(&com->lc_sem);
4493 static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
4494 struct lfsck_component *com,
4495 struct dt_object *obj)
4497 struct lfsck_instance *lfsck = com->lc_lfsck;
4498 struct lfsck_layout *lo = com->lc_file_ram;
4499 const struct lu_fid *fid = lfsck_dto2fid(obj);
4500 struct lfsck_layout_slave_data *llsd = com->lc_data;
4501 struct lfsck_layout_seq *lls;
4507 LASSERT(llsd != NULL);
4509 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY5) &&
4510 cfs_fail_val == lfsck_dev_idx(lfsck->li_bottom)) {
4511 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(1),
4513 struct ptlrpc_thread *thread = &lfsck->li_thread;
4515 l_wait_event(thread->t_ctl_waitq,
4516 !thread_is_running(thread),
4520 lfsck_rbtree_update_bitmap(env, com, fid, false);
4522 down_write(&com->lc_sem);
4523 if (fid_is_idif(fid))
4525 else if (!fid_is_norm(fid) ||
4526 !fid_is_for_ostobj(env, lfsck->li_next, obj, fid))
4527 GOTO(unlock, rc = 0);
4530 com->lc_new_checked++;
4532 lls = lfsck_layout_seq_lookup(llsd, seq);
4535 if (unlikely(lls == NULL))
4536 GOTO(unlock, rc = -ENOMEM);
4538 INIT_LIST_HEAD(&lls->lls_list);
4540 rc = lfsck_layout_lastid_load(env, com, lls);
4542 CDEBUG(D_LFSCK, "%s: layout LFSCK failed to "
4543 "load LAST_ID for "LPX64": rc = %d\n",
4544 lfsck_lfsck2name(com->lc_lfsck), seq, rc);
4545 lo->ll_objs_failed_phase1++;
4550 lfsck_layout_seq_insert(llsd, lls);
4553 if (unlikely(fid_is_last_id(fid)))
4554 GOTO(unlock, rc = 0);
4556 if (fid_is_idif(fid))
4557 oid = fid_idif_id(fid_seq(fid), fid_oid(fid), fid_ver(fid));
4561 if (oid > lls->lls_lastid_known)
4562 lls->lls_lastid_known = oid;
4564 if (oid > lls->lls_lastid) {
4565 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
4566 /* OFD may create new objects during LFSCK scanning. */
4567 rc = lfsck_layout_lastid_reload(env, com, lls);
4568 if (unlikely(rc != 0)) {
4569 CDEBUG(D_LFSCK, "%s: layout LFSCK failed to "
4570 "reload LAST_ID for "LPX64": rc = %d\n",
4571 lfsck_lfsck2name(com->lc_lfsck),
4577 if (oid <= lls->lls_lastid ||
4578 lo->ll_flags & LF_CRASHED_LASTID)
4579 GOTO(unlock, rc = 0);
4581 LASSERT(lfsck->li_out_notify != NULL);
4583 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4584 LE_LASTID_REBUILDING);
4585 lo->ll_flags |= LF_CRASHED_LASTID;
4587 CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
4588 "LAST_ID file (2) for the sequence "LPX64
4589 ", old value "LPU64", known value "LPU64"\n",
4590 lfsck_lfsck2name(lfsck), lls->lls_seq,
4591 lls->lls_lastid, oid);
4594 lls->lls_lastid = oid;
4598 GOTO(unlock, rc = 0);
4601 up_write(&com->lc_sem);
4606 static int lfsck_layout_exec_dir(const struct lu_env *env,
4607 struct lfsck_component *com,
4608 struct lu_dirent *ent, __u16 type)
4613 static int lfsck_layout_master_post(const struct lu_env *env,
4614 struct lfsck_component *com,
4615 int result, bool init)
4617 struct lfsck_instance *lfsck = com->lc_lfsck;
4618 struct lfsck_layout *lo = com->lc_file_ram;
4622 lfsck_post_generic(env, com, &result);
4624 down_write(&com->lc_sem);
4625 spin_lock(&lfsck->li_lock);
4627 lo->ll_pos_last_checkpoint =
4628 lfsck->li_pos_checkpoint.lp_oit_cookie;
4631 if (lo->ll_flags & LF_INCOMPLETE)
4632 lo->ll_status = LS_PARTIAL;
4634 lo->ll_status = LS_SCANNING_PHASE2;
4635 lo->ll_flags |= LF_SCANNED_ONCE;
4636 lo->ll_flags &= ~LF_UPGRADE;
4637 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
4638 } else if (result == 0) {
4639 if (lfsck->li_status != 0)
4640 lo->ll_status = lfsck->li_status;
4642 lo->ll_status = LS_STOPPED;
4643 if (lo->ll_status != LS_PAUSED)
4644 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
4646 lo->ll_status = LS_FAILED;
4647 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
4649 spin_unlock(&lfsck->li_lock);
4652 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4653 HALF_SEC - lfsck->li_time_last_checkpoint);
4654 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4655 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4656 com->lc_new_checked = 0;
4659 rc = lfsck_layout_store(env, com);
4660 up_write(&com->lc_sem);
4662 CDEBUG(D_LFSCK, "%s: layout LFSCK master post done: rc = %d\n",
4663 lfsck_lfsck2name(lfsck), rc);
4668 static int lfsck_layout_slave_post(const struct lu_env *env,
4669 struct lfsck_component *com,
4670 int result, bool init)
4672 struct lfsck_instance *lfsck = com->lc_lfsck;
4673 struct lfsck_layout *lo = com->lc_file_ram;
4677 rc = lfsck_layout_lastid_store(env, com);
4681 LASSERT(lfsck->li_out_notify != NULL);
4683 down_write(&com->lc_sem);
4684 spin_lock(&lfsck->li_lock);
4686 lo->ll_pos_last_checkpoint =
4687 lfsck->li_pos_checkpoint.lp_oit_cookie;
4690 lo->ll_status = LS_SCANNING_PHASE2;
4691 lo->ll_flags |= LF_SCANNED_ONCE;
4692 if (lo->ll_flags & LF_CRASHED_LASTID) {
4694 lo->ll_flags &= ~LF_CRASHED_LASTID;
4696 CDEBUG(D_LFSCK, "%s: layout LFSCK has rebuilt "
4697 "crashed LAST_ID files successfully\n",
4698 lfsck_lfsck2name(lfsck));
4700 lo->ll_flags &= ~LF_UPGRADE;
4701 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
4702 } else if (result == 0) {
4703 if (lfsck->li_status != 0)
4704 lo->ll_status = lfsck->li_status;
4706 lo->ll_status = LS_STOPPED;
4707 if (lo->ll_status != LS_PAUSED)
4708 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
4710 lo->ll_status = LS_FAILED;
4711 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
4713 spin_unlock(&lfsck->li_lock);
4716 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4720 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4721 HALF_SEC - lfsck->li_time_last_checkpoint);
4722 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4723 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4724 com->lc_new_checked = 0;
4727 rc = lfsck_layout_store(env, com);
4728 up_write(&com->lc_sem);
4730 lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result);
4732 CDEBUG(D_LFSCK, "%s: layout LFSCK slave post done: rc = %d\n",
4733 lfsck_lfsck2name(lfsck), rc);
4738 static int lfsck_layout_dump(const struct lu_env *env,
4739 struct lfsck_component *com, struct seq_file *m)
4741 struct lfsck_instance *lfsck = com->lc_lfsck;
4742 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
4743 struct lfsck_layout *lo = com->lc_file_ram;
4746 down_read(&com->lc_sem);
4747 seq_printf(m, "name: lfsck_layout\n"
4753 lfsck_status2names(lo->ll_status));
4755 rc = lfsck_bits_dump(m, lo->ll_flags, lfsck_flags_names, "flags");
4759 rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
4763 rc = lfsck_time_dump(m, lo->ll_time_last_complete,
4764 "time_since_last_completed");
4768 rc = lfsck_time_dump(m, lo->ll_time_latest_start,
4769 "time_since_latest_start");
4773 rc = lfsck_time_dump(m, lo->ll_time_last_checkpoint,
4774 "time_since_last_checkpoint");
4778 seq_printf(m, "latest_start_position: "LPU64"\n"
4779 "last_checkpoint_position: "LPU64"\n"
4780 "first_failure_position: "LPU64"\n",
4781 lo->ll_pos_latest_start,
4782 lo->ll_pos_last_checkpoint,
4783 lo->ll_pos_first_inconsistent);
4785 seq_printf(m, "success_count: %u\n"
4786 "repaired_dangling: "LPU64"\n"
4787 "repaired_unmatched_pair: "LPU64"\n"
4788 "repaired_multiple_referenced: "LPU64"\n"
4789 "repaired_orphan: "LPU64"\n"
4790 "repaired_inconsistent_owner: "LPU64"\n"
4791 "repaired_others: "LPU64"\n"
4792 "skipped: "LPU64"\n"
4793 "failed_phase1: "LPU64"\n"
4794 "failed_phase2: "LPU64"\n",
4795 lo->ll_success_count,
4796 lo->ll_objs_repaired[LLIT_DANGLING - 1],
4797 lo->ll_objs_repaired[LLIT_UNMATCHED_PAIR - 1],
4798 lo->ll_objs_repaired[LLIT_MULTIPLE_REFERENCED - 1],
4799 lo->ll_objs_repaired[LLIT_ORPHAN - 1],
4800 lo->ll_objs_repaired[LLIT_INCONSISTENT_OWNER - 1],
4801 lo->ll_objs_repaired[LLIT_OTHERS - 1],
4802 lo->ll_objs_skipped,
4803 lo->ll_objs_failed_phase1,
4804 lo->ll_objs_failed_phase2);
4806 if (lo->ll_status == LS_SCANNING_PHASE1) {
4808 const struct dt_it_ops *iops;
4809 cfs_duration_t duration = cfs_time_current() -
4810 lfsck->li_time_last_checkpoint;
4811 __u64 checked = lo->ll_objs_checked_phase1 +
4812 com->lc_new_checked;
4813 __u64 speed = checked;
4814 __u64 new_checked = com->lc_new_checked *
4815 msecs_to_jiffies(MSEC_PER_SEC);
4816 __u32 rtime = lo->ll_run_time_phase1 +
4817 cfs_duration_sec(duration + HALF_SEC);
4820 do_div(new_checked, duration);
4822 do_div(speed, rtime);
4823 seq_printf(m, "checked_phase1: "LPU64"\n"
4824 "checked_phase2: "LPU64"\n"
4825 "run_time_phase1: %u seconds\n"
4826 "run_time_phase2: %u seconds\n"
4827 "average_speed_phase1: "LPU64" items/sec\n"
4828 "average_speed_phase2: N/A\n"
4829 "real-time_speed_phase1: "LPU64" items/sec\n"
4830 "real-time_speed_phase2: N/A\n",
4832 lo->ll_objs_checked_phase2,
4834 lo->ll_run_time_phase2,
4838 LASSERT(lfsck->li_di_oit != NULL);
4840 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
4842 /* The low layer otable-based iteration position may NOT
4843 * exactly match the layout-based directory traversal
4844 * cookie. Generally, it is not a serious issue. But the
4845 * caller should NOT make assumption on that. */
4846 pos = iops->store(env, lfsck->li_di_oit);
4847 if (!lfsck->li_current_oit_processed)
4849 seq_printf(m, "current_position: "LPU64"\n", pos);
4851 } else if (lo->ll_status == LS_SCANNING_PHASE2) {
4852 cfs_duration_t duration = cfs_time_current() -
4853 lfsck->li_time_last_checkpoint;
4854 __u64 checked = lo->ll_objs_checked_phase2 +
4855 com->lc_new_checked;
4856 __u64 speed1 = lo->ll_objs_checked_phase1;
4857 __u64 speed2 = checked;
4858 __u64 new_checked = com->lc_new_checked *
4859 msecs_to_jiffies(MSEC_PER_SEC);
4860 __u32 rtime = lo->ll_run_time_phase2 +
4861 cfs_duration_sec(duration + HALF_SEC);
4864 do_div(new_checked, duration);
4865 if (lo->ll_run_time_phase1 != 0)
4866 do_div(speed1, lo->ll_run_time_phase1);
4868 do_div(speed2, rtime);
4869 rc = seq_printf(m, "checked_phase1: "LPU64"\n"
4870 "checked_phase2: "LPU64"\n"
4871 "run_time_phase1: %u seconds\n"
4872 "run_time_phase2: %u seconds\n"
4873 "average_speed_phase1: "LPU64" items/sec\n"
4874 "average_speed_phase2: "LPU64" items/sec\n"
4875 "real-time_speed_phase1: N/A\n"
4876 "real-time_speed_phase2: "LPU64" items/sec\n"
4877 "current_position: "DFID"\n",
4878 lo->ll_objs_checked_phase1,
4880 lo->ll_run_time_phase1,
4885 PFID(&com->lc_fid_latest_scanned_phase2));
4890 __u64 speed1 = lo->ll_objs_checked_phase1;
4891 __u64 speed2 = lo->ll_objs_checked_phase2;
4893 if (lo->ll_run_time_phase1 != 0)
4894 do_div(speed1, lo->ll_run_time_phase1);
4895 if (lo->ll_run_time_phase2 != 0)
4896 do_div(speed2, lo->ll_run_time_phase2);
4897 seq_printf(m, "checked_phase1: "LPU64"\n"
4898 "checked_phase2: "LPU64"\n"
4899 "run_time_phase1: %u seconds\n"
4900 "run_time_phase2: %u seconds\n"
4901 "average_speed_phase1: "LPU64" items/sec\n"
4902 "average_speed_phase2: "LPU64" objs/sec\n"
4903 "real-time_speed_phase1: N/A\n"
4904 "real-time_speed_phase2: N/A\n"
4905 "current_position: N/A\n",
4906 lo->ll_objs_checked_phase1,
4907 lo->ll_objs_checked_phase2,
4908 lo->ll_run_time_phase1,
4909 lo->ll_run_time_phase2,
4914 up_read(&com->lc_sem);
4919 static int lfsck_layout_master_double_scan(const struct lu_env *env,
4920 struct lfsck_component *com)
4922 struct lfsck_layout *lo = com->lc_file_ram;
4923 struct lfsck_assistant_data *lad = com->lc_data;
4924 struct lfsck_instance *lfsck = com->lc_lfsck;
4925 struct lfsck_tgt_descs *ltds;
4926 struct lfsck_tgt_desc *ltd;
4927 struct lfsck_tgt_desc *next;
4930 rc = lfsck_double_scan_generic(env, com, lo->ll_status);
4932 if (thread_is_stopped(&lad->lad_thread)) {
4933 LASSERT(list_empty(&lad->lad_req_list));
4934 LASSERT(list_empty(&lad->lad_ost_phase1_list));
4935 LASSERT(list_empty(&lad->lad_mdt_phase1_list));
4937 ltds = &lfsck->li_ost_descs;
4938 spin_lock(<ds->ltd_lock);
4939 list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase2_list,
4940 ltd_layout_phase_list) {
4941 list_del_init(<d->ltd_layout_phase_list);
4943 spin_unlock(<ds->ltd_lock);
4945 ltds = &lfsck->li_mdt_descs;
4946 spin_lock(<ds->ltd_lock);
4947 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
4948 ltd_layout_phase_list) {
4949 list_del_init(<d->ltd_layout_phase_list);
4951 spin_unlock(<ds->ltd_lock);
4957 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
4958 struct lfsck_component *com)
4960 struct lfsck_instance *lfsck = com->lc_lfsck;
4961 struct lfsck_layout_slave_data *llsd = com->lc_data;
4962 struct lfsck_layout *lo = com->lc_file_ram;
4963 struct ptlrpc_thread *thread = &lfsck->li_thread;
4967 CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan start\n",
4968 lfsck_lfsck2name(lfsck));
4970 if (lo->ll_flags & LF_INCOMPLETE)
4973 atomic_inc(&lfsck->li_double_scan_count);
4975 com->lc_new_checked = 0;
4976 com->lc_new_scanned = 0;
4977 com->lc_time_last_checkpoint = cfs_time_current();
4978 com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
4979 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
4982 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(30),
4985 rc = lfsck_layout_slave_query_master(env, com);
4986 if (list_empty(&llsd->llsd_master_list)) {
4987 if (unlikely(!thread_is_running(thread)))
4998 rc = l_wait_event(thread->t_ctl_waitq,
4999 !thread_is_running(thread) ||
5000 list_empty(&llsd->llsd_master_list),
5002 if (unlikely(!thread_is_running(thread)))
5005 if (rc == -ETIMEDOUT)
5008 GOTO(done, rc = (rc < 0 ? rc : 1));
5012 rc = lfsck_layout_double_scan_result(env, com, rc);
5013 lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE,
5014 (rc > 0 && lo->ll_flags & LF_INCOMPLETE) ? 0 : rc);
5015 lfsck_layout_slave_quit(env, com);
5016 if (atomic_dec_and_test(&lfsck->li_double_scan_count))
5017 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5019 CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan finished, "
5020 "status %d: rc = %d\n",
5021 lfsck_lfsck2name(lfsck), lo->ll_status, rc);
5026 static void lfsck_layout_master_data_release(const struct lu_env *env,
5027 struct lfsck_component *com)
5029 struct lfsck_assistant_data *lad = com->lc_data;
5030 struct lfsck_instance *lfsck = com->lc_lfsck;
5031 struct lfsck_tgt_descs *ltds;
5032 struct lfsck_tgt_desc *ltd;
5033 struct lfsck_tgt_desc *next;
5035 LASSERT(lad != NULL);
5036 LASSERT(thread_is_init(&lad->lad_thread) ||
5037 thread_is_stopped(&lad->lad_thread));
5038 LASSERT(list_empty(&lad->lad_req_list));
5040 com->lc_data = NULL;
5042 ltds = &lfsck->li_ost_descs;
5043 spin_lock(<ds->ltd_lock);
5044 list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase1_list,
5045 ltd_layout_phase_list) {
5046 list_del_init(<d->ltd_layout_phase_list);
5048 list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase2_list,
5049 ltd_layout_phase_list) {
5050 list_del_init(<d->ltd_layout_phase_list);
5052 list_for_each_entry_safe(ltd, next, &lad->lad_ost_list,
5054 list_del_init(<d->ltd_layout_list);
5056 spin_unlock(<ds->ltd_lock);
5058 ltds = &lfsck->li_mdt_descs;
5059 spin_lock(<ds->ltd_lock);
5060 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
5061 ltd_layout_phase_list) {
5062 list_del_init(<d->ltd_layout_phase_list);
5064 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
5065 ltd_layout_phase_list) {
5066 list_del_init(<d->ltd_layout_phase_list);
5068 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_list,
5070 list_del_init(<d->ltd_layout_list);
5072 spin_unlock(<ds->ltd_lock);
5074 if (likely(lad->lad_bitmap != NULL))
5075 CFS_FREE_BITMAP(lad->lad_bitmap);
5080 static void lfsck_layout_slave_data_release(const struct lu_env *env,
5081 struct lfsck_component *com)
5083 struct lfsck_layout_slave_data *llsd = com->lc_data;
5085 lfsck_layout_slave_quit(env, com);
5086 com->lc_data = NULL;
5090 static void lfsck_layout_master_quit(const struct lu_env *env,
5091 struct lfsck_component *com)
5093 struct lfsck_assistant_data *lad = com->lc_data;
5094 struct lfsck_instance *lfsck = com->lc_lfsck;
5095 struct lfsck_tgt_descs *ltds;
5096 struct lfsck_tgt_desc *ltd;
5097 struct lfsck_tgt_desc *next;
5099 LASSERT(lad != NULL);
5101 lfsck_quit_generic(env, com);
5103 LASSERT(thread_is_init(&lad->lad_thread) ||
5104 thread_is_stopped(&lad->lad_thread));
5105 LASSERT(list_empty(&lad->lad_req_list));
5107 ltds = &lfsck->li_ost_descs;
5108 spin_lock(<ds->ltd_lock);
5109 list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase1_list,
5110 ltd_layout_phase_list) {
5111 list_del_init(<d->ltd_layout_phase_list);
5113 list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase2_list,
5114 ltd_layout_phase_list) {
5115 list_del_init(<d->ltd_layout_phase_list);
5117 spin_unlock(<ds->ltd_lock);
5119 ltds = &lfsck->li_mdt_descs;
5120 spin_lock(<ds->ltd_lock);
5121 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
5122 ltd_layout_phase_list) {
5123 list_del_init(<d->ltd_layout_phase_list);
5125 list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
5126 ltd_layout_phase_list) {
5127 list_del_init(<d->ltd_layout_phase_list);
5129 spin_unlock(<ds->ltd_lock);
5132 static void lfsck_layout_slave_quit(const struct lu_env *env,
5133 struct lfsck_component *com)
5135 struct lfsck_layout_slave_data *llsd = com->lc_data;
5136 struct lfsck_layout_seq *lls;
5137 struct lfsck_layout_seq *next;
5138 struct lfsck_layout_slave_target *llst;
5140 LASSERT(llsd != NULL);
5142 list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
5144 list_del_init(&lls->lls_list);
5145 lfsck_object_put(env, lls->lls_lastid_obj);
5149 spin_lock(&llsd->llsd_lock);
5150 while (!list_empty(&llsd->llsd_master_list)) {
5151 llst = list_entry(llsd->llsd_master_list.next,
5152 struct lfsck_layout_slave_target, llst_list);
5153 list_del_init(&llst->llst_list);
5154 spin_unlock(&llsd->llsd_lock);
5155 lfsck_layout_llst_put(llst);
5157 spin_unlock(&llsd->llsd_lock);
5159 lfsck_rbtree_cleanup(env, com);
5162 static int lfsck_layout_master_in_notify(const struct lu_env *env,
5163 struct lfsck_component *com,
5164 struct lfsck_request *lr,
5167 struct lfsck_instance *lfsck = com->lc_lfsck;
5168 struct lfsck_layout *lo = com->lc_file_ram;
5169 struct lfsck_assistant_data *lad = com->lc_data;
5170 struct lfsck_tgt_descs *ltds;
5171 struct lfsck_tgt_desc *ltd;
5175 if (lr->lr_event == LE_PAIRS_VERIFY) {
5178 rc = lfsck_layout_master_check_pairs(env, com, &lr->lr_fid,
5184 CDEBUG(D_LFSCK, "%s: layout LFSCK master handles notify %u "
5185 "from %s %x, status %d, flags %x, flags2 %x\n",
5186 lfsck_lfsck2name(lfsck), lr->lr_event,
5187 (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
5188 lr->lr_index, lr->lr_status, lr->lr_flags, lr->lr_flags2);
5190 if (lr->lr_event != LE_PHASE1_DONE &&
5191 lr->lr_event != LE_PHASE2_DONE &&
5192 lr->lr_event != LE_PEER_EXIT)
5195 if (lr->lr_flags & LEF_FROM_OST)
5196 ltds = &lfsck->li_ost_descs;
5198 ltds = &lfsck->li_mdt_descs;
5199 spin_lock(<ds->ltd_lock);
5200 ltd = LTD_TGT(ltds, lr->lr_index);
5202 spin_unlock(<ds->ltd_lock);
5207 list_del_init(<d->ltd_layout_phase_list);
5208 switch (lr->lr_event) {
5209 case LE_PHASE1_DONE:
5210 if (lr->lr_status <= 0 || lr->lr_flags2 & LF_INCOMPLETE) {
5211 if (lr->lr_flags2 & LF_INCOMPLETE) {
5212 if (lr->lr_flags & LEF_FROM_OST)
5213 lfsck_lad_set_bitmap(env, com,
5216 lo->ll_flags |= LF_INCOMPLETE;
5218 ltd->ltd_layout_done = 1;
5219 list_del_init(<d->ltd_layout_list);
5224 if (lr->lr_flags & LEF_FROM_OST) {
5225 if (list_empty(<d->ltd_layout_list))
5226 list_add_tail(<d->ltd_layout_list,
5227 &lad->lad_ost_list);
5228 list_add_tail(<d->ltd_layout_phase_list,
5229 &lad->lad_ost_phase2_list);
5231 if (list_empty(<d->ltd_layout_list))
5232 list_add_tail(<d->ltd_layout_list,
5233 &lad->lad_mdt_list);
5234 list_add_tail(<d->ltd_layout_phase_list,
5235 &lad->lad_mdt_phase2_list);
5238 case LE_PHASE2_DONE:
5239 ltd->ltd_layout_done = 1;
5240 list_del_init(<d->ltd_layout_list);
5244 ltd->ltd_layout_done = 1;
5245 list_del_init(<d->ltd_layout_list);
5246 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) &&
5247 !(lr->lr_flags & LEF_FROM_OST))
5248 lo->ll_flags |= LF_INCOMPLETE;
5253 spin_unlock(<ds->ltd_lock);
5255 if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
5256 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
5258 memset(stop, 0, sizeof(*stop));
5259 stop->ls_status = lr->lr_status;
5260 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
5261 lfsck_stop(env, lfsck->li_bottom, stop);
5262 } else if (lfsck_phase2_next_ready(lad)) {
5263 wake_up_all(&lad->lad_thread.t_ctl_waitq);
5269 static int lfsck_layout_slave_in_notify(const struct lu_env *env,
5270 struct lfsck_component *com,
5271 struct lfsck_request *lr,
5274 struct lfsck_instance *lfsck = com->lc_lfsck;
5275 struct lfsck_layout_slave_data *llsd = com->lc_data;
5276 struct lfsck_layout_slave_target *llst;
5280 switch (lr->lr_event) {
5281 case LE_FID_ACCESSED:
5282 lfsck_rbtree_update_bitmap(env, com, &lr->lr_fid, true);
5284 case LE_CONDITIONAL_DESTROY:
5285 rc = lfsck_layout_slave_conditional_destroy(env, com, lr);
5287 case LE_PAIRS_VERIFY: {
5288 lr->lr_status = LPVS_INIT;
5289 /* Firstly, if the MDT-object which is claimed via OST-object
5290 * local stored PFID xattr recognizes the OST-object, then it
5291 * must be that the client given PFID is wrong. */
5292 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
5297 lr->lr_status = LPVS_INCONSISTENT;
5298 /* The OST-object local stored PFID xattr is stale. We need to
5299 * check whether the MDT-object that is claimed via the client
5300 * given PFID information recognizes the OST-object or not. If
5301 * matches, then need to update the OST-object's PFID xattr. */
5302 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
5305 * We are not sure whether the client given PFID information
5306 * is correct or not, do nothing to avoid improper fixing.
5309 * The client given PFID information is also invalid, we can
5310 * NOT fix the OST-object inconsistency.
5315 lr->lr_status = LPVS_INCONSISTENT_TOFIX;
5316 rc = lfsck_layout_slave_repair_pfid(env, com, lr);
5320 case LE_PHASE1_DONE: {
5321 if (lr->lr_flags2 & LF_INCOMPLETE) {
5322 struct lfsck_layout *lo = com->lc_file_ram;
5324 lo->ll_flags |= LF_INCOMPLETE;
5325 llst = lfsck_layout_llst_find_and_del(llsd,
5329 lfsck_layout_llst_put(llst);
5330 if (list_empty(&llsd->llsd_master_list))
5332 &lfsck->li_thread.t_ctl_waitq);
5338 case LE_PHASE2_DONE:
5340 CDEBUG(D_LFSCK, "%s: layout LFSCK slave handle notify %u "
5341 "from MDT %x, status %d\n", lfsck_lfsck2name(lfsck),
5342 lr->lr_event, lr->lr_index, lr->lr_status);
5348 llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index, true);
5352 lfsck_layout_llst_put(llst);
5353 if (list_empty(&llsd->llsd_master_list))
5354 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5356 if (lr->lr_event == LE_PEER_EXIT &&
5357 (lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT ||
5358 (list_empty(&llsd->llsd_master_list) &&
5359 (lr->lr_status == LS_STOPPED ||
5360 lr->lr_status == LS_CO_STOPPED)))) {
5361 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
5363 memset(stop, 0, sizeof(*stop));
5364 stop->ls_status = lr->lr_status;
5365 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
5366 lfsck_stop(env, lfsck->li_bottom, stop);
5372 static int lfsck_layout_query(const struct lu_env *env,
5373 struct lfsck_component *com)
5375 struct lfsck_layout *lo = com->lc_file_ram;
5377 return lo->ll_status;
5380 /* with lfsck::li_lock held */
5381 static int lfsck_layout_slave_join(const struct lu_env *env,
5382 struct lfsck_component *com,
5383 struct lfsck_start_param *lsp)
5385 struct lfsck_instance *lfsck = com->lc_lfsck;
5386 struct lfsck_layout_slave_data *llsd = com->lc_data;
5387 struct lfsck_layout_slave_target *llst;
5388 struct lfsck_start *start = lsp->lsp_start;
5392 if (start == NULL || !(start->ls_flags & LPF_OST_ORPHAN))
5395 if (!lsp->lsp_index_valid)
5398 /* If someone is running the LFSCK without orphan handling,
5399 * it will not maintain the object accessing rbtree. So we
5400 * cannot join it for orphan handling. */
5401 if (!llsd->llsd_rbtree_valid)
5404 spin_unlock(&lfsck->li_lock);
5405 rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
5406 spin_lock(&lfsck->li_lock);
5407 if (rc == 0 && !thread_is_running(&lfsck->li_thread)) {
5408 spin_unlock(&lfsck->li_lock);
5409 llst = lfsck_layout_llst_find_and_del(llsd, lsp->lsp_index,
5412 lfsck_layout_llst_put(llst);
5413 spin_lock(&lfsck->li_lock);
5420 static struct lfsck_operations lfsck_layout_master_ops = {
5421 .lfsck_reset = lfsck_layout_reset,
5422 .lfsck_fail = lfsck_layout_fail,
5423 .lfsck_checkpoint = lfsck_layout_master_checkpoint,
5424 .lfsck_prep = lfsck_layout_master_prep,
5425 .lfsck_exec_oit = lfsck_layout_master_exec_oit,
5426 .lfsck_exec_dir = lfsck_layout_exec_dir,
5427 .lfsck_post = lfsck_layout_master_post,
5428 .lfsck_dump = lfsck_layout_dump,
5429 .lfsck_double_scan = lfsck_layout_master_double_scan,
5430 .lfsck_data_release = lfsck_layout_master_data_release,
5431 .lfsck_quit = lfsck_layout_master_quit,
5432 .lfsck_in_notify = lfsck_layout_master_in_notify,
5433 .lfsck_query = lfsck_layout_query,
5436 static struct lfsck_operations lfsck_layout_slave_ops = {
5437 .lfsck_reset = lfsck_layout_reset,
5438 .lfsck_fail = lfsck_layout_fail,
5439 .lfsck_checkpoint = lfsck_layout_slave_checkpoint,
5440 .lfsck_prep = lfsck_layout_slave_prep,
5441 .lfsck_exec_oit = lfsck_layout_slave_exec_oit,
5442 .lfsck_exec_dir = lfsck_layout_exec_dir,
5443 .lfsck_post = lfsck_layout_slave_post,
5444 .lfsck_dump = lfsck_layout_dump,
5445 .lfsck_double_scan = lfsck_layout_slave_double_scan,
5446 .lfsck_data_release = lfsck_layout_slave_data_release,
5447 .lfsck_quit = lfsck_layout_slave_quit,
5448 .lfsck_in_notify = lfsck_layout_slave_in_notify,
5449 .lfsck_query = lfsck_layout_query,
5450 .lfsck_join = lfsck_layout_slave_join,
5453 static void lfsck_layout_assistant_fill_pos(const struct lu_env *env,
5454 struct lfsck_component *com,
5455 struct lfsck_position *pos)
5457 struct lfsck_assistant_data *lad = com->lc_data;
5458 struct lfsck_layout_req *llr;
5460 if (list_empty(&lad->lad_req_list))
5463 llr = list_entry(lad->lad_req_list.next,
5464 struct lfsck_layout_req,
5466 pos->lp_oit_cookie = llr->llr_parent->llo_cookie - 1;
5469 struct lfsck_assistant_operations lfsck_layout_assistant_ops = {
5470 .la_handler_p1 = lfsck_layout_assistant_handler_p1,
5471 .la_handler_p2 = lfsck_layout_assistant_handler_p2,
5472 .la_fill_pos = lfsck_layout_assistant_fill_pos,
5473 .la_double_scan_result = lfsck_layout_double_scan_result,
5474 .la_req_fini = lfsck_layout_assistant_req_fini,
5475 .la_sync_failures = lfsck_layout_assistant_sync_failures,
5478 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
5480 struct lfsck_component *com;
5481 struct lfsck_layout *lo;
5482 struct dt_object *root = NULL;
5483 struct dt_object *obj;
5491 INIT_LIST_HEAD(&com->lc_link);
5492 INIT_LIST_HEAD(&com->lc_link_dir);
5493 init_rwsem(&com->lc_sem);
5494 atomic_set(&com->lc_ref, 1);
5495 com->lc_lfsck = lfsck;
5496 com->lc_type = LFSCK_TYPE_LAYOUT;
5497 if (lfsck->li_master) {
5498 com->lc_ops = &lfsck_layout_master_ops;
5499 com->lc_data = lfsck_assistant_data_init(
5500 &lfsck_layout_assistant_ops,
5502 if (com->lc_data == NULL)
5503 GOTO(out, rc = -ENOMEM);
5505 struct lfsck_layout_slave_data *llsd;
5507 com->lc_ops = &lfsck_layout_slave_ops;
5508 OBD_ALLOC_PTR(llsd);
5510 GOTO(out, rc = -ENOMEM);
5512 INIT_LIST_HEAD(&llsd->llsd_seq_list);
5513 INIT_LIST_HEAD(&llsd->llsd_master_list);
5514 spin_lock_init(&llsd->llsd_lock);
5515 llsd->llsd_rb_root = RB_ROOT;
5516 rwlock_init(&llsd->llsd_rb_lock);
5517 com->lc_data = llsd;
5519 com->lc_file_size = sizeof(*lo);
5520 OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
5521 if (com->lc_file_ram == NULL)
5522 GOTO(out, rc = -ENOMEM);
5524 OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
5525 if (com->lc_file_disk == NULL)
5526 GOTO(out, rc = -ENOMEM);
5528 root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
5530 GOTO(out, rc = PTR_ERR(root));
5532 if (unlikely(!dt_try_as_dir(env, root)))
5533 GOTO(out, rc = -ENOTDIR);
5535 obj = local_file_find_or_create(env, lfsck->li_los, root,
5537 S_IFREG | S_IRUGO | S_IWUSR);
5539 GOTO(out, rc = PTR_ERR(obj));
5542 rc = lfsck_layout_load(env, com);
5544 rc = lfsck_layout_reset(env, com, true);
5545 else if (rc == -ENOENT)
5546 rc = lfsck_layout_init(env, com);
5551 lo = com->lc_file_ram;
5552 switch (lo->ll_status) {
5558 spin_lock(&lfsck->li_lock);
5559 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
5560 spin_unlock(&lfsck->li_lock);
5563 CERROR("%s: unknown lfsck_layout status %d\n",
5564 lfsck_lfsck2name(lfsck), lo->ll_status);
5566 case LS_SCANNING_PHASE1:
5567 case LS_SCANNING_PHASE2:
5568 /* No need to store the status to disk right now.
5569 * If the system crashed before the status stored,
5570 * it will be loaded back when next time. */
5571 lo->ll_status = LS_CRASHED;
5572 if (!lfsck->li_master)
5573 lo->ll_flags |= LF_INCOMPLETE;
5580 spin_lock(&lfsck->li_lock);
5581 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
5582 spin_unlock(&lfsck->li_lock);
5586 if (lo->ll_flags & LF_CRASHED_LASTID) {
5587 LASSERT(lfsck->li_out_notify != NULL);
5589 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
5590 LE_LASTID_REBUILDING);
5596 if (root != NULL && !IS_ERR(root))
5597 lu_object_put(env, &root->do_lu);
5600 lfsck_component_cleanup(env, com);
5601 CERROR("%s: fail to init layout LFSCK component: rc = %d\n",
5602 lfsck_lfsck2name(lfsck), rc);
5608 struct lfsck_orphan_it {
5609 struct lfsck_component *loi_com;
5610 struct lfsck_rbtree_node *loi_lrn;
5611 struct lfsck_layout_slave_target *loi_llst;
5612 struct lu_fid loi_key;
5613 struct lu_orphan_rec loi_rec;
5615 unsigned int loi_over:1;
5618 static int lfsck_fid_match_idx(const struct lu_env *env,
5619 struct lfsck_instance *lfsck,
5620 const struct lu_fid *fid, int idx)
5622 struct seq_server_site *ss;
5623 struct lu_server_fld *sf;
5624 struct lu_seq_range *range = &lfsck_env_info(env)->lti_range;
5627 /* All abnormal cases will be returned to MDT0. */
5628 if (!fid_is_norm(fid)) {
5635 ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
5636 if (unlikely(ss == NULL))
5639 sf = ss->ss_server_fld;
5640 LASSERT(sf != NULL);
5642 fld_range_set_any(range);
5643 rc = fld_server_lookup(env, sf, fid_seq(fid), range);
5647 if (!fld_range_is_mdt(range))
5650 if (range->lsr_index == idx)
5656 static void lfsck_layout_destroy_orphan(const struct lu_env *env,
5657 struct dt_device *dev,
5658 struct dt_object *obj)
5660 struct thandle *handle;
5664 handle = dt_trans_create(env, dev);
5668 rc = dt_declare_ref_del(env, obj, handle);
5672 rc = dt_declare_destroy(env, obj, handle);
5676 rc = dt_trans_start_local(env, dev, handle);
5680 dt_write_lock(env, obj, 0);
5681 rc = dt_ref_del(env, obj, handle);
5683 rc = dt_destroy(env, obj, handle);
5684 dt_write_unlock(env, obj);
5689 dt_trans_stop(env, dev, handle);
5691 CDEBUG(D_LFSCK, "destroy orphan OST-object "DFID": rc = %d\n",
5692 PFID(lfsck_dto2fid(obj)), rc);
5697 static int lfsck_orphan_index_lookup(const struct lu_env *env,
5698 struct dt_object *dt,
5700 const struct dt_key *key,
5701 struct lustre_capa *capa)
5706 static int lfsck_orphan_index_declare_insert(const struct lu_env *env,
5707 struct dt_object *dt,
5708 const struct dt_rec *rec,
5709 const struct dt_key *key,
5710 struct thandle *handle)
5715 static int lfsck_orphan_index_insert(const struct lu_env *env,
5716 struct dt_object *dt,
5717 const struct dt_rec *rec,
5718 const struct dt_key *key,
5719 struct thandle *handle,
5720 struct lustre_capa *capa,
5726 static int lfsck_orphan_index_declare_delete(const struct lu_env *env,
5727 struct dt_object *dt,
5728 const struct dt_key *key,
5729 struct thandle *handle)
5734 static int lfsck_orphan_index_delete(const struct lu_env *env,
5735 struct dt_object *dt,
5736 const struct dt_key *key,
5737 struct thandle *handle,
5738 struct lustre_capa *capa)
5743 static struct dt_it *lfsck_orphan_it_init(const struct lu_env *env,
5744 struct dt_object *dt,
5746 struct lustre_capa *capa)
5748 struct dt_device *dev = lu2dt_dev(dt->do_lu.lo_dev);
5749 struct lfsck_instance *lfsck;
5750 struct lfsck_component *com = NULL;
5751 struct lfsck_layout_slave_data *llsd;
5752 struct lfsck_orphan_it *it = NULL;
5753 struct lfsck_layout *lo;
5757 lfsck = lfsck_instance_find(dev, true, false);
5758 if (unlikely(lfsck == NULL))
5759 RETURN(ERR_PTR(-ENXIO));
5761 com = lfsck_component_find(lfsck, LFSCK_TYPE_LAYOUT);
5762 if (unlikely(com == NULL))
5763 GOTO(out, rc = -ENOENT);
5765 lo = com->lc_file_ram;
5766 if (lo->ll_flags & LF_INCOMPLETE)
5767 GOTO(out, rc = -ESRCH);
5769 llsd = com->lc_data;
5770 if (!llsd->llsd_rbtree_valid)
5771 GOTO(out, rc = -ESRCH);
5775 GOTO(out, rc = -ENOMEM);
5777 it->loi_llst = lfsck_layout_llst_find_and_del(llsd, attr, false);
5778 if (it->loi_llst == NULL)
5779 GOTO(out, rc = -ENXIO);
5781 if (dev->dd_record_fid_accessed) {
5782 /* The first iteration against the rbtree, scan the whole rbtree
5783 * to remove the nodes which do NOT need to be handled. */
5784 write_lock(&llsd->llsd_rb_lock);
5785 if (dev->dd_record_fid_accessed) {
5786 struct rb_node *node;
5787 struct rb_node *next;
5788 struct lfsck_rbtree_node *lrn;
5790 /* No need to record the fid accessing anymore. */
5791 dev->dd_record_fid_accessed = 0;
5793 node = rb_first(&llsd->llsd_rb_root);
5794 while (node != NULL) {
5795 next = rb_next(node);
5796 lrn = rb_entry(node, struct lfsck_rbtree_node,
5798 if (atomic_read(&lrn->lrn_known_count) <=
5799 atomic_read(&lrn->lrn_accessed_count)) {
5800 rb_erase(node, &llsd->llsd_rb_root);
5801 lfsck_rbtree_free(lrn);
5806 write_unlock(&llsd->llsd_rb_lock);
5809 /* read lock the rbtree when init, and unlock when fini */
5810 read_lock(&llsd->llsd_rb_lock);
5818 lfsck_component_put(env, com);
5820 CDEBUG(D_LFSCK, "%s: init the orphan iteration: rc = %d\n",
5821 lfsck_lfsck2name(lfsck), rc);
5823 lfsck_instance_put(env, lfsck);
5828 it = (struct lfsck_orphan_it *)ERR_PTR(rc);
5831 return (struct dt_it *)it;
5834 static void lfsck_orphan_it_fini(const struct lu_env *env,
5837 struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
5838 struct lfsck_component *com = it->loi_com;
5839 struct lfsck_layout_slave_data *llsd;
5840 struct lfsck_layout_slave_target *llst;
5843 CDEBUG(D_LFSCK, "%s: fini the orphan iteration\n",
5844 lfsck_lfsck2name(com->lc_lfsck));
5846 llsd = com->lc_data;
5847 read_unlock(&llsd->llsd_rb_lock);
5848 llst = it->loi_llst;
5849 LASSERT(llst != NULL);
5851 /* Save the key and hash for iterate next. */
5852 llst->llst_fid = it->loi_key;
5853 llst->llst_hash = it->loi_hash;
5854 lfsck_layout_llst_put(llst);
5855 lfsck_component_put(env, com);
5861 * \retval +1: the iteration finished
5862 * \retval 0: on success, not finished
5863 * \retval -ve: on error
5865 static int lfsck_orphan_it_next(const struct lu_env *env,
5868 struct lfsck_thread_info *info = lfsck_env_info(env);
5869 struct filter_fid_old *pfid = &info->lti_old_pfid;
5870 struct lu_attr *la = &info->lti_la;
5871 struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
5872 struct lu_fid *key = &it->loi_key;
5873 struct lu_orphan_rec *rec = &it->loi_rec;
5874 struct lfsck_component *com = it->loi_com;
5875 struct lfsck_instance *lfsck = com->lc_lfsck;
5876 struct lfsck_layout_slave_data *llsd = com->lc_data;
5877 struct dt_object *obj;
5878 struct lfsck_rbtree_node *lrn;
5882 __u32 idx = it->loi_llst->llst_index;
5892 lrn = lfsck_rbtree_search(llsd, key, &exact);
5900 key->f_seq = lrn->lrn_seq;
5901 key->f_oid = lrn->lrn_first_oid;
5906 if (unlikely(key->f_oid == 0)) {
5913 lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH) {
5919 if (unlikely(atomic_read(&lrn->lrn_known_count) <=
5920 atomic_read(&lrn->lrn_accessed_count))) {
5921 struct rb_node *next = rb_next(&lrn->lrn_node);
5923 while (next != NULL) {
5924 lrn = rb_entry(next, struct lfsck_rbtree_node,
5926 if (atomic_read(&lrn->lrn_known_count) >
5927 atomic_read(&lrn->lrn_accessed_count))
5929 next = rb_next(next);
5938 key->f_seq = lrn->lrn_seq;
5939 key->f_oid = lrn->lrn_first_oid;
5943 pos = key->f_oid - lrn->lrn_first_oid;
5946 pos = find_next_bit(lrn->lrn_known_bitmap,
5947 LFSCK_RBTREE_BITMAP_WIDTH, pos);
5948 if (pos >= LFSCK_RBTREE_BITMAP_WIDTH) {
5949 key->f_oid = lrn->lrn_first_oid + pos;
5950 if (unlikely(key->f_oid < lrn->lrn_first_oid)) {
5958 if (test_bit(pos, lrn->lrn_accessed_bitmap)) {
5963 key->f_oid = lrn->lrn_first_oid + pos;
5964 obj = lfsck_object_find(env, lfsck, key);
5967 if (rc == -ENOENT) {
5974 dt_read_lock(env, obj, 0);
5975 if (dt_object_exists(obj) == 0 ||
5976 lfsck_is_dead_obj(obj)) {
5977 dt_read_unlock(env, obj);
5978 lfsck_object_put(env, obj);
5983 rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
5987 rc = dt_xattr_get(env, obj, lfsck_buf_get(env, pfid, sizeof(*pfid)),
5988 XATTR_NAME_FID, BYPASS_CAPA);
5989 if (rc == -ENODATA) {
5990 /* For the pre-created OST-object, update the bitmap to avoid
5991 * others LFSCK (second phase) iteration to touch it again. */
5992 if (la->la_ctime == 0) {
5993 if (!test_and_set_bit(pos, lrn->lrn_accessed_bitmap))
5994 atomic_inc(&lrn->lrn_accessed_count);
5996 /* For the race between repairing dangling referenced
5997 * MDT-object and unlink the file, it may left orphan
5998 * OST-object there. Destroy it now! */
5999 if (unlikely(!(la->la_mode & S_ISUID))) {
6000 dt_read_unlock(env, obj);
6001 lfsck_layout_destroy_orphan(env,
6004 lfsck_object_put(env, obj);
6008 } else if (idx == 0) {
6009 /* If the orphan OST-object has no parent information,
6010 * regard it as referenced by the MDT-object on MDT0. */
6011 fid_zero(&rec->lor_fid);
6012 rec->lor_uid = la->la_uid;
6013 rec->lor_gid = la->la_gid;
6017 dt_read_unlock(env, obj);
6018 lfsck_object_put(env, obj);
6026 if (rc != sizeof(struct filter_fid) &&
6027 rc != sizeof(struct filter_fid_old))
6028 GOTO(out, rc = -EINVAL);
6030 fid_le_to_cpu(&rec->lor_fid, &pfid->ff_parent);
6031 /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
6032 * MDT-object's FID::f_ver, instead it is the OST-object index in its
6033 * parent MDT-object's layout EA. */
6034 save = rec->lor_fid.f_stripe_idx;
6035 rec->lor_fid.f_ver = 0;
6036 rc = lfsck_fid_match_idx(env, lfsck, &rec->lor_fid, idx);
6037 /* If the orphan OST-object does not claim the MDT, then next.
6039 * If we do not know whether it matches or not, then return it
6040 * to the MDT for further check. */
6042 dt_read_unlock(env, obj);
6043 lfsck_object_put(env, obj);
6048 rec->lor_fid.f_stripe_idx = save;
6049 rec->lor_uid = la->la_uid;
6050 rec->lor_gid = la->la_gid;
6052 CDEBUG(D_LFSCK, "%s: return orphan "DFID", PFID "DFID", owner %u:%u\n",
6053 lfsck_lfsck2name(com->lc_lfsck), PFID(key), PFID(&rec->lor_fid),
6054 rec->lor_uid, rec->lor_gid);
6059 dt_read_unlock(env, obj);
6060 lfsck_object_put(env, obj);
6068 * \retval +1: locate to the exactly position
6069 * \retval 0: cannot locate to the exactly position,
6070 * call next() to move to a valid position.
6071 * \retval -ve: on error
6073 static int lfsck_orphan_it_get(const struct lu_env *env,
6075 const struct dt_key *key)
6077 struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6080 it->loi_key = *(struct lu_fid *)key;
6081 rc = lfsck_orphan_it_next(env, di);
6091 static void lfsck_orphan_it_put(const struct lu_env *env,
6096 static struct dt_key *lfsck_orphan_it_key(const struct lu_env *env,
6097 const struct dt_it *di)
6099 struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6101 return (struct dt_key *)&it->loi_key;
6104 static int lfsck_orphan_it_key_size(const struct lu_env *env,
6105 const struct dt_it *di)
6107 return sizeof(struct lu_fid);
6110 static int lfsck_orphan_it_rec(const struct lu_env *env,
6111 const struct dt_it *di,
6115 struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6117 *(struct lu_orphan_rec *)rec = it->loi_rec;
6122 static __u64 lfsck_orphan_it_store(const struct lu_env *env,
6123 const struct dt_it *di)
6125 struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6127 return it->loi_hash;
6131 * \retval +1: locate to the exactly position
6132 * \retval 0: cannot locate to the exactly position,
6133 * call next() to move to a valid position.
6134 * \retval -ve: on error
6136 static int lfsck_orphan_it_load(const struct lu_env *env,
6137 const struct dt_it *di,
6140 struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6141 struct lfsck_layout_slave_target *llst = it->loi_llst;
6144 LASSERT(llst != NULL);
6146 if (hash != llst->llst_hash) {
6147 CDEBUG(D_LFSCK, "%s: the given hash "LPU64" for orphan "
6148 "iteration does not match the one when fini "
6149 LPU64", to be reset.\n",
6150 lfsck_lfsck2name(it->loi_com->lc_lfsck), hash,
6152 fid_zero(&llst->llst_fid);
6153 llst->llst_hash = 0;
6156 it->loi_key = llst->llst_fid;
6157 it->loi_hash = llst->llst_hash;
6158 rc = lfsck_orphan_it_next(env, (struct dt_it *)di);
6168 static int lfsck_orphan_it_key_rec(const struct lu_env *env,
6169 const struct dt_it *di,
6175 const struct dt_index_operations lfsck_orphan_index_ops = {
6176 .dio_lookup = lfsck_orphan_index_lookup,
6177 .dio_declare_insert = lfsck_orphan_index_declare_insert,
6178 .dio_insert = lfsck_orphan_index_insert,
6179 .dio_declare_delete = lfsck_orphan_index_declare_delete,
6180 .dio_delete = lfsck_orphan_index_delete,
6182 .init = lfsck_orphan_it_init,
6183 .fini = lfsck_orphan_it_fini,
6184 .get = lfsck_orphan_it_get,
6185 .put = lfsck_orphan_it_put,
6186 .next = lfsck_orphan_it_next,
6187 .key = lfsck_orphan_it_key,
6188 .key_size = lfsck_orphan_it_key_size,
6189 .rec = lfsck_orphan_it_rec,
6190 .store = lfsck_orphan_it_store,
6191 .load = lfsck_orphan_it_load,
6192 .key_rec = lfsck_orphan_it_key_rec,