4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (c) 2017, Intel Corporation.
26 * lustre/osd-zfs/osd_scrub.c
28 * Top-level entry points into osd module
30 * The OI scrub is used for rebuilding Object Index files when restores MDT from
33 * The otable based iterator scans ZFS objects to feed up layer LFSCK.
35 * Author: Fan Yong <fan.yong@intel.com>
38 #define DEBUG_SUBSYSTEM S_LFSCK
40 #include <linux/kthread.h>
41 #include <uapi/linux/lustre/lustre_idl.h>
42 #include <lustre_disk.h>
43 #include <dt_object.h>
44 #include <linux/xattr.h>
45 #include <lustre_scrub.h>
46 #include <obd_class.h>
47 #include <lustre_nodemap.h>
48 #include <sys/dsl_dataset.h>
49 #include <sys/zap_impl.h>
51 #include <sys/zap_leaf.h>
53 #include "osd_internal.h"
55 #define OSD_OTABLE_MAX_HASH ((1ULL << 48) - 1)
56 #define OTABLE_PREFETCH 256
58 static inline bool osd_scrub_has_window(struct osd_otable_it *it)
60 return it->ooi_prefetched < OTABLE_PREFETCH;
64 * update/insert/delete the specified OI mapping (@fid @id) according to the ops
66 * \retval 1, changed nothing
67 * \retval 0, changed successfully
68 * \retval -ve, on error
70 int osd_scrub_refresh_mapping(const struct lu_env *env,
71 struct osd_device *dev,
72 const struct lu_fid *fid,
73 uint64_t oid, enum dt_txn_op ops,
74 bool force, const char *name)
76 struct osd_thread_info *info = osd_oti_get(env);
77 struct zpl_direntry *zde = &info->oti_zde.lzd_reg;
78 char *buf = info->oti_str;
85 if (dev->od_scrub.os_file.sf_param & SP_DRYRUN && !force)
88 tx = dmu_tx_create(dev->od_os);
90 GOTO(log, rc = -ENOMEM);
92 zapid = osd_get_name_n_idx(env, dev, fid, buf,
93 sizeof(info->oti_str), &dn);
94 osd_tx_hold_zap(tx, zapid, dn,
95 ops == DTO_INDEX_INSERT ? TRUE : FALSE, NULL);
96 rc = -dmu_tx_assign(tx, TXG_WAIT);
103 case DTO_INDEX_UPDATE:
105 zde->zde_dnode = oid;
106 zde->zde_type = 0; /* The type in OI mapping is useless. */
107 rc = -zap_update(dev->od_os, zapid, buf, 8, sizeof(*zde) / 8,
109 if (unlikely(rc == -ENOENT)) {
110 /* Some unlink thread may removed the OI mapping. */
114 case DTO_INDEX_INSERT:
116 zde->zde_dnode = oid;
117 zde->zde_type = 0; /* The type in OI mapping is useless. */
118 rc = osd_zap_add(dev, zapid, dn, buf, 8, sizeof(*zde) / 8,
120 if (unlikely(rc == -EEXIST))
123 case DTO_INDEX_DELETE:
124 rc = osd_zap_remove(dev, zapid, dn, buf, tx);
126 /* It is normal that the unlink thread has removed the
127 * OI mapping already.
133 LASSERTF(0, "Unexpected ops %d\n", ops);
142 CDEBUG(D_LFSCK, "%s: refresh OI map for scrub, op %d, force %s, "
143 DFID" => %llu (%s): rc = %d\n", osd_name(dev), ops,
144 force ? "yes" : "no", PFID(fid), oid, name ? name : "null", rc);
150 osd_scrub_check_update(const struct lu_env *env, struct osd_device *dev,
151 const struct lu_fid *fid, uint64_t oid, int val)
153 struct lustre_scrub *scrub = &dev->od_scrub;
154 struct scrub_file *sf = &scrub->os_file;
155 struct osd_inconsistent_item *oii = NULL;
156 nvlist_t *nvbuf = NULL;
159 int ops = DTO_INDEX_UPDATE;
163 down_write(&scrub->os_rwsem);
164 scrub->os_new_checked++;
168 if (scrub->os_in_prior)
169 oii = list_first_entry(&scrub->os_inconsistent_items,
170 struct osd_inconsistent_item, oii_list);
172 if (oid < sf->sf_pos_latest_start && !oii)
175 if (oii && oii->oii_insert) {
176 ops = DTO_INDEX_INSERT;
180 rc = osd_fid_lookup(env, dev, fid, &oid2);
185 ops = DTO_INDEX_INSERT;
188 rc = __osd_obj2dnode(dev->od_os, oid, &dn);
190 /* Someone removed the object by race. */
191 if (rc == -ENOENT || rc == -EEXIST)
196 spin_lock(&scrub->os_lock);
197 scrub->os_full_speed = 1;
198 spin_unlock(&scrub->os_lock);
200 sf->sf_flags |= SF_INCONSISTENT;
201 } else if (oid == oid2) {
204 struct lustre_mdt_attrs *lma = NULL;
207 rc = __osd_xattr_load_by_oid(dev, oid2, &nvbuf);
208 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
213 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
214 (uchar_t **)&lma, &size);
215 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
220 lustre_lma_swab(lma);
221 if (unlikely(lu_fid_eq(&lma->lma_self_fid, fid))) {
224 "%s: the FID "DFID" is used by two objects: %llu and %llu (in OI): rc = %d\n",
225 osd_name(dev), PFID(fid), oid, oid2, rc);
231 spin_lock(&scrub->os_lock);
232 scrub->os_full_speed = 1;
233 spin_unlock(&scrub->os_lock);
234 sf->sf_flags |= SF_INCONSISTENT;
237 rc = osd_scrub_refresh_mapping(env, dev, fid, oid, ops, false, NULL);
239 if (scrub->os_in_prior)
240 sf->sf_items_updated_prior++;
242 sf->sf_items_updated++;
248 if (dev->od_is_ost) {
250 uint64_t nlink, mode;
252 rc = -sa_handle_get(dev->od_os, oid, NULL, SA_HDL_PRIVATE,
257 rc = -sa_lookup(hdl, SA_ZPL_MODE(dev), &mode, sizeof(mode));
258 if (rc || !S_ISREG(mode)) {
259 sa_handle_destroy(hdl);
263 rc = -sa_lookup(hdl, SA_ZPL_LINKS(dev), &nlink, sizeof(nlink));
264 if (rc == 0 && nlink > 1)
265 scrub->os_has_ml_file = 1;
267 sa_handle_destroy(hdl);
275 sf->sf_items_failed++;
276 if (sf->sf_pos_first_inconsistent == 0 ||
277 sf->sf_pos_first_inconsistent > oid)
278 sf->sf_pos_first_inconsistent = oid;
283 /* There may be conflict unlink during the OI scrub,
284 * if happend, then remove the new added OI mapping.
286 if (ops == DTO_INDEX_INSERT && dn && dn->dn_free_txg)
287 osd_scrub_refresh_mapping(env, dev, fid, oid,
288 DTO_INDEX_DELETE, false, NULL);
289 up_write(&scrub->os_rwsem);
295 spin_lock(&scrub->os_lock);
296 if (likely(!list_empty(&oii->oii_list)))
297 list_del(&oii->oii_list);
298 spin_unlock(&scrub->os_lock);
302 RETURN(sf->sf_param & SP_FAILOUT ? rc : 0);
305 /* iteration engine */
308 osd_scrub_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
310 spin_lock(&scrub->os_lock);
311 if (osd_scrub_has_window(it) ||
312 !list_empty(&scrub->os_inconsistent_items) ||
313 it->ooi_waiting || kthread_should_stop())
314 scrub->os_waiting = 0;
316 scrub->os_waiting = 1;
317 spin_unlock(&scrub->os_lock);
319 return !scrub->os_waiting;
322 static int osd_scrub_next(const struct lu_env *env, struct osd_device *dev,
323 struct lu_fid *fid, uint64_t *oid)
325 struct lustre_scrub *scrub = &dev->od_scrub;
326 struct osd_otable_it *it = dev->od_otable_it;
327 struct lustre_mdt_attrs *lma = NULL;
328 nvlist_t *nvbuf = NULL;
333 if (CFS_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_DELAY) && cfs_fail_val > 0) {
334 wait_var_event_timeout(
336 !list_empty(&scrub->os_inconsistent_items) ||
337 kthread_should_stop(),
338 cfs_time_seconds(cfs_fail_val));
340 if (kthread_should_stop())
341 RETURN(SCRUB_NEXT_EXIT);
344 if (CFS_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_CRASH)) {
345 spin_lock(&scrub->os_lock);
346 scrub->os_running = 0;
347 spin_unlock(&scrub->os_lock);
348 RETURN(SCRUB_NEXT_CRASH);
351 if (CFS_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_FATAL))
352 RETURN(SCRUB_NEXT_FATAL);
361 if (!list_empty(&scrub->os_inconsistent_items)) {
362 spin_lock(&scrub->os_lock);
363 if (likely(!list_empty(&scrub->os_inconsistent_items))) {
364 struct osd_inconsistent_item *oii;
366 oii = list_first_entry(&scrub->os_inconsistent_items,
367 struct osd_inconsistent_item,
369 *fid = oii->oii_cache.oic_fid;
370 *oid = oii->oii_cache.oic_dnode;
371 scrub->os_in_prior = 1;
372 spin_unlock(&scrub->os_lock);
376 spin_unlock(&scrub->os_lock);
379 if (!scrub->os_full_speed && !osd_scrub_has_window(it))
380 wait_var_event(scrub, osd_scrub_wakeup(scrub, it));
382 if (kthread_should_stop())
383 GOTO(out, rc = SCRUB_NEXT_EXIT);
385 rc = -dmu_object_next(dev->od_os, &scrub->os_pos_current, B_FALSE, 0);
387 GOTO(out, rc = (rc == -ESRCH ? SCRUB_NEXT_BREAK : rc));
389 rc = __osd_xattr_load_by_oid(dev, scrub->os_pos_current, &nvbuf);
390 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
396 LASSERT(nvbuf != NULL);
397 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
398 (uchar_t **)&lma, &size);
400 lustre_lma_swab(lma);
401 if (likely(!(lma->lma_compat & LMAC_NOT_IN_OI) &&
402 !(lma->lma_incompat & LMAI_AGENT))) {
403 *fid = lma->lma_self_fid;
404 *oid = scrub->os_pos_current;
410 if (!scrub->os_full_speed) {
411 spin_lock(&scrub->os_lock);
412 it->ooi_prefetched++;
413 if (it->ooi_waiting) {
417 spin_unlock(&scrub->os_lock);
429 static int osd_scrub_exec(const struct lu_env *env, struct osd_device *dev,
430 const struct lu_fid *fid, uint64_t oid, int rc)
432 struct lustre_scrub *scrub = &dev->od_scrub;
433 struct osd_otable_it *it = dev->od_otable_it;
435 rc = osd_scrub_check_update(env, dev, fid, oid, rc);
436 if (!scrub->os_in_prior) {
437 if (!scrub->os_full_speed) {
438 spin_lock(&scrub->os_lock);
439 it->ooi_prefetched++;
440 if (it->ooi_waiting) {
444 spin_unlock(&scrub->os_lock);
447 spin_lock(&scrub->os_lock);
448 scrub->os_in_prior = 0;
449 spin_unlock(&scrub->os_lock);
455 rc = scrub_checkpoint(env, scrub);
457 CDEBUG(D_LFSCK, "%s: fail to checkpoint, pos = %llu: rc = %d\n",
458 scrub->os_name, scrub->os_pos_current, rc);
459 /* Continue, as long as the scrub itself can go ahead. */
465 static int osd_scan_ml_file_main(const struct lu_env *env,
466 struct osd_device *dev);
468 static int osd_scrub_main(void *args)
471 struct osd_device *dev = (struct osd_device *)args;
472 struct lustre_scrub *scrub = &dev->od_scrub;
478 rc = lu_env_init(&env, LCT_LOCAL | LCT_DT_THREAD);
480 CDEBUG(D_LFSCK, "%s: OI scrub fail to init env: rc = %d\n",
485 rc = scrub_thread_prep(&env, scrub, dev->od_uuid, 1);
487 CDEBUG(D_LFSCK, "%s: OI scrub fail to scrub prep: rc = %d\n",
492 if (!scrub->os_full_speed) {
493 struct osd_otable_it *it = dev->od_otable_it;
495 wait_var_event(scrub,
496 it->ooi_user_ready ||
497 kthread_should_stop());
499 if (kthread_should_stop())
502 scrub->os_pos_current = it->ooi_pos;
505 CDEBUG(D_LFSCK, "%s: OI scrub start, flags = 0x%x, pos = %llu\n",
506 scrub->os_name, scrub->os_start_flags,
507 scrub->os_pos_current);
509 fid = &osd_oti_get(&env)->oti_fid;
510 while (!rc && !kthread_should_stop()) {
511 rc = osd_scrub_next(&env, dev, fid, &oid);
513 case SCRUB_NEXT_EXIT:
515 case SCRUB_NEXT_CRASH:
516 spin_lock(&scrub->os_lock);
517 scrub->os_running = 0;
518 spin_unlock(&scrub->os_lock);
519 GOTO(out, rc = -EINVAL);
520 case SCRUB_NEXT_FATAL:
521 GOTO(post, rc = -EINVAL);
522 case SCRUB_NEXT_BREAK:
526 rc = osd_scrub_exec(&env, dev, fid, oid, rc);
532 if (scrub->os_has_ml_file) {
533 ret = osd_scan_ml_file_main(&env, dev);
538 rc = scrub_thread_post(&env, &dev->od_scrub, rc);
539 CDEBUG(D_LFSCK, "%s: OI scrub: stop, pos = %llu: rc = %d\n",
540 scrub->os_name, scrub->os_pos_current, rc);
543 while (!list_empty(&scrub->os_inconsistent_items)) {
544 struct osd_inconsistent_item *oii;
546 oii = list_first_entry(&scrub->os_inconsistent_items,
547 struct osd_inconsistent_item, oii_list);
548 list_del_init(&oii->oii_list);
555 spin_lock(&scrub->os_lock);
556 scrub->os_running = 0;
557 spin_unlock(&scrub->os_lock);
558 if (xchg(&scrub->os_task, NULL) == NULL)
559 /* scrub_stop is waiting, we need to synchronize */
560 wait_var_event(scrub, kthread_should_stop());
565 /* initial OI scrub */
569 typedef int (*handle_dirent_t)(const struct lu_env *, struct osd_device *,
570 const char *, uint64_t, uint64_t,
571 enum osd_lf_flags, bool);
572 static int osd_ios_varfid_hd(const struct lu_env *, struct osd_device *,
573 const char *, uint64_t, uint64_t,
574 enum osd_lf_flags, bool);
575 static int osd_ios_uld_hd(const struct lu_env *, struct osd_device *,
576 const char *, uint64_t, uint64_t,
577 enum osd_lf_flags, bool);
579 typedef int (*scan_dir_t)(const struct lu_env *, struct osd_device *,
580 uint64_t, handle_dirent_t, enum osd_lf_flags);
581 static int osd_ios_general_sd(const struct lu_env *, struct osd_device *,
582 uint64_t, handle_dirent_t, enum osd_lf_flags);
583 static int osd_ios_ROOT_sd(const struct lu_env *, struct osd_device *,
584 uint64_t, handle_dirent_t, enum osd_lf_flags);
588 struct lu_fid olm_fid;
589 enum osd_lf_flags olm_flags;
590 scan_dir_t olm_scan_dir;
591 handle_dirent_t olm_handle_dirent;
594 /* Add the new introduced local files in the list in the future. */
595 static const struct osd_lf_map osd_lf_maps[] = {
598 .olm_name = MOUNT_CONFIGS_DIR,
600 .f_seq = FID_SEQ_LOCAL_FILE,
601 .f_oid = MGS_CONFIGS_OID,
603 .olm_flags = OLF_SCAN_SUBITEMS,
604 .olm_scan_dir = osd_ios_general_sd,
605 .olm_handle_dirent = osd_ios_varfid_hd,
608 /* NIDTBL_VERSIONS */
610 .olm_name = MGS_NIDTBL_DIR,
611 .olm_flags = OLF_SCAN_SUBITEMS,
612 .olm_scan_dir = osd_ios_general_sd,
613 .olm_handle_dirent = osd_ios_varfid_hd,
618 .olm_name = MDT_ORPHAN_DIR,
625 .f_seq = FID_SEQ_ROOT,
626 .f_oid = FID_OID_ROOT,
628 .olm_flags = OLF_SCAN_SUBITEMS,
629 .olm_scan_dir = osd_ios_ROOT_sd,
636 .f_seq = FID_SEQ_LOCAL_FILE,
637 .f_oid = FLD_INDEX_OID,
641 /* changelog_catalog */
643 .olm_name = CHANGELOG_CATALOG,
646 /* changelog_users */
648 .olm_name = CHANGELOG_USERS,
654 .olm_flags = OLF_SCAN_SUBITEMS,
655 .olm_scan_dir = osd_ios_general_sd,
656 .olm_handle_dirent = osd_ios_varfid_hd,
662 .olm_flags = OLF_SCAN_SUBITEMS,
663 .olm_scan_dir = osd_ios_general_sd,
664 .olm_handle_dirent = osd_ios_varfid_hd,
669 .olm_name = LFSCK_DIR,
670 .olm_flags = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
671 .olm_scan_dir = osd_ios_general_sd,
672 .olm_handle_dirent = osd_ios_varfid_hd,
677 .olm_name = LFSCK_BOOKMARK,
682 .olm_name = LFSCK_LAYOUT,
685 /* lfsck_namespace */
687 .olm_name = LFSCK_NAMESPACE,
690 /* OSP update logs update_log{_dir} use f_seq = FID_SEQ_UPDATE_LOG{_DIR}
691 * and f_oid = index for their log files. See lu_update_log{_dir}_fid()
697 .olm_name = "update_log",
699 .f_seq = FID_SEQ_UPDATE_LOG,
701 .olm_flags = OLF_IDX_IN_FID,
706 .olm_name = "update_log_dir",
708 .f_seq = FID_SEQ_UPDATE_LOG_DIR,
710 .olm_flags = OLF_SCAN_SUBITEMS | OLF_IDX_IN_FID,
711 .olm_scan_dir = osd_ios_general_sd,
712 .olm_handle_dirent = osd_ios_uld_hd,
717 .olm_name = HSM_ACTIONS,
722 .olm_name = LUSTRE_NODEMAP_NAME,
727 .olm_name = INDEX_BACKUP_DIR,
729 .f_seq = FID_SEQ_LOCAL_FILE,
730 .f_oid = INDEX_BACKUP_OID,
732 .olm_flags = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
733 .olm_scan_dir = osd_ios_general_sd,
734 .olm_handle_dirent = osd_ios_varfid_hd,
742 /* Add the new introduced files under .lustre/ in the list in the future. */
743 static const struct osd_lf_map osd_dl_maps[] = {
748 .f_seq = FID_SEQ_DOT_LUSTRE,
749 .f_oid = FID_OID_DOT_LUSTRE_OBF,
753 /* .lustre/lost+found */
755 .olm_name = "lost+found",
757 .f_seq = FID_SEQ_DOT_LUSTRE,
758 .f_oid = FID_OID_DOT_LUSTRE_LPF,
767 struct osd_ios_item {
768 struct list_head oii_list;
770 enum osd_lf_flags oii_flags;
771 scan_dir_t oii_scan_dir;
772 handle_dirent_t oii_handle_dirent;
775 static int osd_ios_new_item(struct osd_device *dev, uint64_t parent,
776 enum osd_lf_flags flags, scan_dir_t scan_dir,
777 handle_dirent_t handle_dirent)
779 struct osd_ios_item *item;
785 CWARN("%s: initial OI scrub failed to add item for %llu: rc = %d\n",
786 osd_name(dev), parent, rc);
790 INIT_LIST_HEAD(&item->oii_list);
791 item->oii_parent = parent;
792 item->oii_flags = flags;
793 item->oii_scan_dir = scan_dir;
794 item->oii_handle_dirent = handle_dirent;
795 list_add_tail(&item->oii_list, &dev->od_ios_list);
800 static bool osd_index_need_recreate(const struct lu_env *env,
801 struct osd_device *dev, uint64_t oid)
803 struct osd_thread_info *info = osd_oti_get(env);
804 zap_attribute_t *za = &info->oti_za2;
805 zap_cursor_t *zc = &info->oti_zc2;
809 zap_cursor_init_serialized(zc, dev->od_os, oid, 0);
810 rc = -zap_cursor_retrieve(zc, za);
812 if (rc && rc != -ENOENT)
818 static void osd_ios_index_register(const struct lu_env *env,
819 struct osd_device *osd,
820 const struct lu_fid *fid, uint64_t oid)
822 struct osd_thread_info *info = osd_oti_get(env);
823 zap_attribute_t *za = &info->oti_za2;
824 zap_cursor_t *zc = &info->oti_zc2;
825 struct zap_leaf_entry *le;
834 rc = __osd_obj2dnode(osd->od_os, oid, &dn);
835 if (rc == -EEXIST || rc == -ENOENT)
841 if (!osd_object_is_zap(dn))
844 rc = -sa_handle_get(osd->od_os, oid, NULL, SA_HDL_PRIVATE, &hdl);
848 rc = -sa_lookup(hdl, SA_ZPL_MODE(osd), &mode, sizeof(mode));
849 sa_handle_destroy(hdl);
856 zap_cursor_init_serialized(zc, osd->od_os, oid, 0);
857 rc = -zap_cursor_retrieve(zc, za);
859 /* Skip empty index object */
860 GOTO(fini, rc = (rc == -ENOENT ? 1 : rc));
862 if (zc->zc_zap->zap_ismicro ||
863 !(zap_f_phys(zc->zc_zap)->zap_flags & ZAP_FLAG_UINT64_KEY))
866 le = ZAP_LEAF_ENTRY(zc->zc_leaf, 0);
867 keysize = le->le_name_numints * 8;
868 recsize = za->za_integer_length * za->za_num_integers;
869 if (likely(keysize && recsize))
870 rc = osd_index_register(osd, fid, keysize, recsize);
881 CWARN("%s: failed to register index "DFID" (%u/%u): rc = %d\n",
882 osd_name(osd), PFID(fid), keysize, recsize, rc);
884 CDEBUG(D_LFSCK, "%s: registered index "DFID" (%u/%u)\n",
885 osd_name(osd), PFID(fid), keysize, recsize);
888 static void osd_index_restore(const struct lu_env *env, struct osd_device *dev,
889 struct lustre_index_restore_unit *liru, void *buf,
892 struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
893 struct lu_fid *tgt_fid = &liru->liru_cfid;
894 struct lu_fid bak_fid;
898 lustre_fid2lbx(buf, tgt_fid, bufsize);
899 rc = -zap_lookup(dev->od_os, dev->od_index_backup_id, buf, 8,
900 sizeof(*zde) / 8, (void *)zde);
904 rc = osd_get_fid_by_oid(env, dev, zde->lzd_reg.zde_dnode, &bak_fid);
908 /* The OI mapping for index may be invalid, since it will be
909 * re-created, not update the OI mapping, just cache it in RAM.
911 rc = osd_idc_find_and_init_with_oid(env, dev, tgt_fid,
914 rc = lustre_index_restore(env, &dev->od_dt_dev,
915 &liru->liru_pfid, tgt_fid, &bak_fid,
916 liru->liru_name, &dev->od_index_backup_list,
917 &dev->od_lock, buf, bufsize);
921 CDEBUG(D_WARNING, "%s: restore index '%s' with "DFID": rc = %d\n",
922 osd_name(dev), liru->liru_name, PFID(tgt_fid), rc);
926 * verify FID-in-LMA and OI entry for one object
928 * ios: Initial OI Scrub.
930 static int osd_ios_scan_one(const struct lu_env *env, struct osd_device *dev,
931 const struct lu_fid *fid, uint64_t parent,
932 uint64_t oid, const char *name,
933 enum osd_lf_flags flags)
935 struct lustre_scrub *scrub = &dev->od_scrub;
936 struct scrub_file *sf = &scrub->os_file;
937 struct lustre_mdt_attrs *lma = NULL;
938 nvlist_t *nvbuf = NULL;
947 rc = __osd_xattr_load_by_oid(dev, oid, &nvbuf);
948 if (unlikely(rc == -ENOENT || rc == -EEXIST))
951 if (rc && rc != -ENODATA) {
952 CWARN("%s: initial OI scrub failed to get lma for %llu: rc = %d\n",
953 osd_name(dev), oid, rc);
959 LASSERT(nvbuf != NULL);
960 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
961 (uchar_t **)&lma, &size);
962 if (rc || size == 0) {
963 LASSERT(lma == NULL);
966 LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
967 lustre_lma_swab(lma);
968 if (lma->lma_compat & LMAC_NOT_IN_OI) {
973 if (lma->lma_compat & LMAC_IDX_BACKUP &&
974 osd_index_need_recreate(env, dev, oid)) {
975 if (parent == dev->od_root) {
976 lu_local_obj_fid(&tfid,
979 rc = osd_get_fid_by_oid(env, dev,
987 rc = lustre_liru_new(
988 &dev->od_index_restore_list,
989 &tfid, &lma->lma_self_fid, oid,
995 tfid = lma->lma_self_fid;
996 if (!(flags & OLF_NOT_BACKUP))
997 osd_ios_index_register(env, dev, &tfid, oid);
1002 if (rc == -ENODATA) {
1004 /* Skip the object without FID-in-LMA */
1005 CDEBUG(D_LFSCK, "%s: %llu has no FID-in-LMA, skip it\n",
1006 osd_name(dev), oid);
1011 LASSERT(!fid_is_zero(fid));
1014 if (flags & OLF_IDX_IN_FID) {
1015 LASSERT(dev->od_index >= 0);
1017 tfid.f_oid = dev->od_index;
1021 rc = osd_fid_lookup(env, dev, &tfid, &oid2);
1023 if (rc != -ENOENT) {
1024 CWARN("%s: initial OI scrub failed to lookup fid for "DFID"=>%llu: rc = %d\n",
1025 osd_name(dev), PFID(&tfid), oid, rc);
1029 flag = SF_RECREATED;
1030 op = DTO_INDEX_INSERT;
1035 flag = SF_INCONSISTENT;
1036 op = DTO_INDEX_UPDATE;
1039 if (!(sf->sf_flags & flag)) {
1040 scrub_file_reset(scrub, dev->od_uuid, flag);
1041 rc = scrub_file_store(env, scrub);
1046 rc = osd_scrub_refresh_mapping(env, dev, &tfid, oid, op, true, name);
1048 RETURN(rc > 0 ? 0 : rc);
1051 static int osd_ios_varfid_hd(const struct lu_env *env, struct osd_device *dev,
1052 const char *name, uint64_t parent, uint64_t oid,
1053 enum osd_lf_flags flags, bool is_dir)
1058 rc = osd_ios_scan_one(env, dev, NULL, parent, oid, name, 0);
1060 rc = osd_ios_new_item(dev, oid, flags, osd_ios_general_sd,
1066 static int osd_ios_uld_hd(const struct lu_env *env, struct osd_device *dev,
1067 const char *name, uint64_t parent, uint64_t oid,
1068 enum osd_lf_flags flags, bool is_dir)
1074 /* skip any non-DFID format name */
1078 /* skip the start '[' */
1079 sscanf(&name[1], SFID, RFID(&tfid));
1080 if (fid_is_sane(&tfid))
1081 rc = osd_ios_scan_one(env, dev, &tfid, parent, oid, name, 0);
1089 * General scanner for the directories execpt /ROOT during initial OI scrub.
1090 * It scans the name entries under the given directory one by one. For each
1091 * entry, verifies its OI mapping via the given @handle_dirent.
1093 static int osd_ios_general_sd(const struct lu_env *env, struct osd_device *dev,
1094 uint64_t parent, handle_dirent_t handle_dirent,
1095 enum osd_lf_flags flags)
1097 struct osd_thread_info *info = osd_oti_get(env);
1098 struct luz_direntry *zde = &info->oti_zde;
1099 zap_attribute_t *za = &info->oti_za;
1100 zap_cursor_t *zc = &info->oti_zc;
1104 zap_cursor_init_serialized(zc, dev->od_os, parent, 0);
1105 rc = -zap_cursor_retrieve(zc, za);
1107 zap_cursor_advance(zc);
1112 rc = -zap_cursor_retrieve(zc, za);
1114 GOTO(log, rc = (rc == -ENOENT ? 0 : rc));
1116 /* skip the entry started with '.' */
1117 if (likely(za->za_name[0] != '.')) {
1118 rc = osd_zap_lookup(dev, parent, NULL, za->za_name,
1119 za->za_integer_length,
1120 sizeof(*zde) / za->za_integer_length,
1123 CWARN("%s: initial OI scrub failed to lookup %s under %llu: rc = %d\n",
1124 osd_name(dev), za->za_name, parent, rc);
1128 rc = handle_dirent(env, dev, za->za_name, parent,
1129 zde->lzd_reg.zde_dnode, flags,
1130 S_ISDIR(DTTOIF(zde->lzd_reg.zde_type)) ?
1133 "%s: initial OI scrub handled %s under %llu: rc = %d\n",
1134 osd_name(dev), za->za_name, parent, rc);
1137 zap_cursor_advance(zc);
1142 CWARN("%s: initial OI scrub failed to scan the directory %llu: rc = %d\n",
1143 osd_name(dev), parent, rc);
1144 zap_cursor_fini(zc);
1150 * The scanner for /ROOT directory. It is not all the items under /ROOT will
1151 * be scanned during the initial OI scrub, instead, only the .lustre and the
1152 * sub-items under .lustre will be handled.
1154 static int osd_ios_ROOT_sd(const struct lu_env *env, struct osd_device *dev,
1155 uint64_t parent, handle_dirent_t handle_dirent,
1156 enum osd_lf_flags flags)
1158 struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1159 const struct osd_lf_map *map;
1165 rc = osd_zap_lookup(dev, parent, NULL, dot_lustre_name, 8,
1166 sizeof(*zde) / 8, (void *)zde);
1167 if (rc == -ENOENT) {
1168 /* The .lustre directory is lost. That is not fatal. It can
1169 * be re-created in the subsequent MDT start processing.
1175 CWARN("%s: initial OI scrub failed to find .lustre: rc = %d\n",
1181 oid = zde->lzd_reg.zde_dnode;
1182 rc = osd_ios_scan_one(env, dev, &LU_DOT_LUSTRE_FID, parent, oid,
1183 dot_lustre_name, 0);
1187 for (map = osd_dl_maps; map->olm_name; map++) {
1188 rc = osd_zap_lookup(dev, oid, NULL, map->olm_name, 8,
1189 sizeof(*zde) / 8, (void *)zde);
1192 CWARN("%s: initial OI scrub failed to find the entry %s under .lustre: rc = %d\n",
1193 osd_name(dev), map->olm_name, rc);
1194 else if (!fid_is_zero(&map->olm_fid))
1195 /* Try to remove the stale OI mapping. */
1196 osd_scrub_refresh_mapping(env, dev,
1198 DTO_INDEX_DELETE, true,
1203 rc = osd_ios_scan_one(env, dev, &map->olm_fid, oid,
1204 zde->lzd_reg.zde_dnode, map->olm_name,
1213 static void osd_initial_OI_scrub(const struct lu_env *env,
1214 struct osd_device *dev)
1216 struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1217 const struct osd_lf_map *map;
1221 for (map = osd_lf_maps; map->olm_name; map++) {
1222 rc = osd_zap_lookup(dev, dev->od_root, NULL, map->olm_name, 8,
1223 sizeof(*zde) / 8, (void *)zde);
1226 CWARN("%s: initial OI scrub failed to find the entry %s: rc = %d\n",
1227 osd_name(dev), map->olm_name, rc);
1228 else if (!fid_is_zero(&map->olm_fid))
1229 /* Try to remove the stale OI mapping. */
1230 osd_scrub_refresh_mapping(env, dev,
1232 DTO_INDEX_DELETE, true,
1237 rc = osd_ios_scan_one(env, dev, &map->olm_fid, dev->od_root,
1238 zde->lzd_reg.zde_dnode, map->olm_name,
1240 if (!rc && map->olm_flags & OLF_SCAN_SUBITEMS)
1241 osd_ios_new_item(dev, zde->lzd_reg.zde_dnode,
1242 map->olm_flags, map->olm_scan_dir,
1243 map->olm_handle_dirent);
1246 while (!list_empty(&dev->od_ios_list)) {
1247 struct osd_ios_item *item;
1249 item = list_first_entry(&dev->od_ios_list,
1250 struct osd_ios_item, oii_list);
1251 list_del_init(&item->oii_list);
1252 item->oii_scan_dir(env, dev, item->oii_parent,
1253 item->oii_handle_dirent, item->oii_flags);
1257 if (!list_empty(&dev->od_index_restore_list)) {
1260 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1262 CERROR("%s: not enough RAM for rebuild index: rc = %d\n",
1263 osd_name(dev), -ENOMEM);
1265 while (!list_empty(&dev->od_index_restore_list)) {
1266 struct lustre_index_restore_unit *liru;
1268 liru = list_first_entry(&dev->od_index_restore_list,
1269 struct lustre_index_restore_unit,
1271 list_del(&liru->liru_link);
1273 osd_index_restore(env, dev, liru, buf,
1274 INDEX_BACKUP_BUFSIZE);
1275 OBD_FREE(liru, liru->liru_len);
1279 OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1285 /* OI scrub start/stop */
1287 int osd_scrub_start(const struct lu_env *env, struct osd_device *dev,
1293 if (dev->od_dt_dev.dd_rdonly)
1296 /* od_otable_sem: prevent concurrent start/stop */
1297 down(&dev->od_otable_sem);
1298 rc = scrub_start(osd_scrub_main, &dev->od_scrub, dev, flags);
1299 up(&dev->od_otable_sem);
1301 RETURN(rc == -EALREADY ? 0 : rc);
1304 void osd_scrub_stop(struct osd_device *dev)
1306 struct lustre_scrub *scrub = &dev->od_scrub;
1309 /* od_otable_sem: prevent concurrent start/stop */
1310 down(&dev->od_otable_sem);
1311 spin_lock(&scrub->os_lock);
1312 scrub->os_paused = 1;
1313 spin_unlock(&scrub->os_lock);
1315 up(&dev->od_otable_sem);
1320 /* OI scrub setup/cleanup */
1322 static const char osd_scrub_name[] = "OI_scrub";
1324 int osd_scrub_setup(const struct lu_env *env, struct osd_device *dev,
1325 time64_t interval, bool resetoi)
1327 struct osd_thread_info *info = osd_oti_get(env);
1328 struct lustre_scrub *scrub = &dev->od_scrub;
1329 struct scrub_file *sf = &scrub->os_file;
1330 struct lu_fid *fid = &info->oti_fid;
1331 struct dt_object *obj;
1337 memcpy(dev->od_uuid.b,
1338 &dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid,
1339 sizeof(dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid));
1340 memset(&dev->od_scrub, 0, sizeof(struct lustre_scrub));
1341 init_rwsem(&scrub->os_rwsem);
1342 spin_lock_init(&scrub->os_lock);
1343 INIT_LIST_HEAD(&scrub->os_inconsistent_items);
1344 scrub->os_name = osd_name(dev);
1345 scrub->os_auto_scrub_interval = interval;
1347 /* 'What the @fid is' is not imporatant, because the object
1348 * has no OI mapping, and only is visible inside the OSD.
1350 fid->f_seq = FID_SEQ_IGIF_MAX;
1352 fid->f_oid = ((1 << 31) | dev->od_index) + 1;
1354 fid->f_oid = dev->od_index + 1;
1356 rc = osd_obj_find_or_create(env, dev, dev->od_root,
1357 osd_scrub_name, &oid, fid, false);
1361 rc = osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1365 obj = lu2dt(lu_object_find_slice(env, osd2lu_dev(dev), fid, NULL));
1366 if (IS_ERR_OR_NULL(obj))
1367 RETURN(obj ? PTR_ERR(obj) : -ENOENT);
1369 obj->do_body_ops = &osd_body_scrub_ops;
1370 scrub->os_obj = obj;
1371 rc = scrub_file_load(env, scrub);
1372 if (rc == -ENOENT || rc == -EFAULT) {
1373 scrub_file_init(scrub, dev->od_uuid);
1375 } else if (rc < 0) {
1376 GOTO(cleanup_obj, rc);
1378 if (!guid_equal(&sf->sf_uuid, &dev->od_uuid)) {
1380 "%s: UUID has been changed from %pU to %pU\n",
1381 osd_name(dev), &sf->sf_uuid, &dev->od_uuid);
1382 scrub_file_reset(scrub, dev->od_uuid, SF_INCONSISTENT);
1384 } else if (sf->sf_status == SS_SCANNING) {
1385 sf->sf_status = SS_CRASHED;
1389 if (unlikely((sf->sf_oi_count & (sf->sf_oi_count - 1)) != 0 ||
1390 sf->sf_oi_count > OSD_OI_FID_NR_MAX)) {
1391 LCONSOLE_WARN("%s: invalid OI count %u, reset to %u\n",
1392 osd_name(dev), sf->sf_oi_count,
1394 sf->sf_oi_count = osd_oi_count;
1399 if (sf->sf_pos_last_checkpoint != 0)
1400 scrub->os_pos_current = sf->sf_pos_last_checkpoint + 1;
1402 scrub->os_pos_current = 1;
1405 rc = scrub_file_store(env, scrub);
1407 GOTO(cleanup_obj, rc);
1410 /* Initialize OI files. */
1411 rc = osd_oi_init(env, dev, resetoi);
1413 GOTO(cleanup_obj, rc);
1415 if (!dev->od_dt_dev.dd_rdonly)
1416 osd_initial_OI_scrub(env, dev);
1418 if (!dev->od_dt_dev.dd_rdonly &&
1419 scrub->os_auto_scrub_interval != AS_NEVER &&
1420 ((sf->sf_status == SS_PAUSED) ||
1421 (sf->sf_status == SS_CRASHED &&
1422 sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1423 SF_UPGRADE | SF_AUTO)) ||
1424 (sf->sf_status == SS_INIT &&
1425 sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1427 rc = osd_scrub_start(env, dev, SS_AUTO_FULL);
1430 GOTO(cleanup_oi, rc);
1435 osd_oi_fini(env, dev);
1437 dt_object_put_nocache(env, scrub->os_obj);
1438 scrub->os_obj = NULL;
1443 void osd_scrub_cleanup(const struct lu_env *env, struct osd_device *dev)
1445 struct lustre_scrub *scrub = &dev->od_scrub;
1447 LASSERT(!dev->od_otable_it);
1449 if (scrub->os_obj) {
1450 osd_scrub_stop(dev);
1451 dt_object_put_nocache(env, scrub->os_obj);
1452 scrub->os_obj = NULL;
1455 if (dev->od_oi_table)
1456 osd_oi_fini(env, dev);
1459 /* object table based iteration APIs */
1461 static struct dt_it *osd_otable_it_init(const struct lu_env *env,
1462 struct dt_object *dt, __u32 attr)
1464 enum dt_otable_it_flags flags = attr >> DT_OTABLE_IT_FLAGS_SHIFT;
1465 enum dt_otable_it_valid valid = attr & ~DT_OTABLE_IT_FLAGS_MASK;
1466 struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1467 struct lustre_scrub *scrub = &dev->od_scrub;
1468 struct osd_otable_it *it;
1473 if (dev->od_dt_dev.dd_rdonly)
1474 RETURN(ERR_PTR(-EROFS));
1476 /* od_otable_sem: prevent concurrent init/fini */
1477 down(&dev->od_otable_sem);
1478 if (dev->od_otable_it)
1479 GOTO(out, it = ERR_PTR(-EALREADY));
1483 GOTO(out, it = ERR_PTR(-ENOMEM));
1485 if (flags & DOIF_OUTUSED)
1486 it->ooi_used_outside = 1;
1488 if (flags & DOIF_RESET)
1491 if (valid & DOIV_ERROR_HANDLE) {
1492 if (flags & DOIF_FAILOUT)
1493 start |= SS_SET_FAILOUT;
1495 start |= SS_CLEAR_FAILOUT;
1498 if (valid & DOIV_DRYRUN) {
1499 if (flags & DOIF_DRYRUN)
1500 start |= SS_SET_DRYRUN;
1502 start |= SS_CLEAR_DRYRUN;
1505 /* XXX: dmu_object_next() does NOT find dnodes allocated
1506 * in the current non-committed txg, so we force txg
1507 * commit to find all existing dnodes ...
1509 txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1511 dev->od_otable_it = it;
1513 rc = scrub_start(osd_scrub_main, scrub, dev, start & ~SS_AUTO_PARTIAL);
1514 if (rc == -EALREADY) {
1516 } else if (rc < 0) {
1517 dev->od_otable_it = NULL;
1521 it->ooi_pos = scrub->os_pos_current;
1527 up(&dev->od_otable_sem);
1528 return (struct dt_it *)it;
1531 static void osd_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1533 struct osd_otable_it *it = (struct osd_otable_it *)di;
1534 struct osd_device *dev = it->ooi_dev;
1536 /* od_otable_sem: prevent concurrent init/fini */
1537 down(&dev->od_otable_sem);
1538 scrub_stop(&dev->od_scrub);
1539 LASSERT(dev->od_otable_it == it);
1541 dev->od_otable_it = NULL;
1542 up(&dev->od_otable_sem);
1546 static int osd_otable_it_get(const struct lu_env *env,
1547 struct dt_it *di, const struct dt_key *key)
1552 static void osd_otable_it_put(const struct lu_env *env, struct dt_it *di)
1556 static void osd_otable_it_preload(const struct lu_env *env,
1557 struct osd_otable_it *it)
1559 struct osd_device *dev = it->ooi_dev;
1562 /* can go negative on the very first access to the iterator
1563 * or if some non-Lustre objects were found
1565 if (unlikely(it->ooi_prefetched < 0))
1566 it->ooi_prefetched = 0;
1568 if (it->ooi_prefetched >= (OTABLE_PREFETCH >> 1))
1571 if (it->ooi_prefetched_dnode == 0)
1572 it->ooi_prefetched_dnode = it->ooi_pos;
1574 while (it->ooi_prefetched < OTABLE_PREFETCH) {
1575 rc = -dmu_object_next(dev->od_os, &it->ooi_prefetched_dnode,
1580 dmu_prefetch(dev->od_os, it->ooi_prefetched_dnode,
1581 0, 0, 0, ZIO_PRIORITY_ASYNC_READ);
1582 it->ooi_prefetched++;
1587 osd_otable_it_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
1589 spin_lock(&scrub->os_lock);
1590 if (it->ooi_pos < scrub->os_pos_current || scrub->os_waiting ||
1592 it->ooi_waiting = 0;
1594 it->ooi_waiting = 1;
1595 spin_unlock(&scrub->os_lock);
1597 return !it->ooi_waiting;
1600 static int osd_otable_it_next(const struct lu_env *env, struct dt_it *di)
1602 struct osd_otable_it *it = (struct osd_otable_it *)di;
1603 struct osd_device *dev = it->ooi_dev;
1604 struct lustre_scrub *scrub = &dev->od_scrub;
1605 struct lustre_mdt_attrs *lma = NULL;
1606 nvlist_t *nvbuf = NULL;
1611 LASSERT(it->ooi_user_ready);
1612 fid_zero(&it->ooi_fid);
1614 if (unlikely(it->ooi_all_cached))
1625 if (it->ooi_pos >= scrub->os_pos_current)
1626 wait_var_event(scrub,
1627 osd_otable_it_wakeup(scrub, it));
1629 if (!scrub->os_running && !it->ooi_used_outside)
1632 rc = -dmu_object_next(dev->od_os, &it->ooi_pos, B_FALSE, 0);
1634 if (unlikely(rc == -ESRCH)) {
1635 it->ooi_all_cached = 1;
1642 rc = __osd_xattr_load_by_oid(dev, it->ooi_pos, &nvbuf);
1645 if (!scrub->os_full_speed) {
1646 spin_lock(&scrub->os_lock);
1649 it->ooi_prefetched--;
1650 if (!scrub->os_full_speed) {
1651 if (scrub->os_waiting) {
1652 scrub->os_waiting = 0;
1657 spin_unlock(&scrub->os_lock);
1659 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
1665 LASSERT(nvbuf != NULL);
1666 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
1667 (uchar_t **)&lma, &size);
1668 if (rc || size == 0)
1669 /* It is either non-Lustre object or OSD internal object,
1670 * ignore it, go ahead
1674 LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
1675 lustre_lma_swab(lma);
1676 if (unlikely(lma->lma_compat & LMAC_NOT_IN_OI ||
1677 lma->lma_incompat & LMAI_AGENT))
1680 it->ooi_fid = lma->lma_self_fid;
1688 if (!rc && scrub->os_full_speed)
1689 osd_otable_it_preload(env, it);
1694 static struct dt_key *osd_otable_it_key(const struct lu_env *env,
1695 const struct dt_it *di)
1700 static int osd_otable_it_key_size(const struct lu_env *env,
1701 const struct dt_it *di)
1703 return sizeof(__u64);
1706 static int osd_otable_it_rec(const struct lu_env *env, const struct dt_it *di,
1707 struct dt_rec *rec, __u32 attr)
1709 struct osd_otable_it *it = (struct osd_otable_it *)di;
1710 struct lu_fid *fid = (struct lu_fid *)rec;
1716 static __u64 osd_otable_it_store(const struct lu_env *env,
1717 const struct dt_it *di)
1719 struct osd_otable_it *it = (struct osd_otable_it *)di;
1725 * Set the OSD layer iteration start position as the specified hash.
1727 static int osd_otable_it_load(const struct lu_env *env,
1728 const struct dt_it *di, __u64 hash)
1730 struct osd_otable_it *it = (struct osd_otable_it *)di;
1731 struct osd_device *dev = it->ooi_dev;
1732 struct lustre_scrub *scrub = &dev->od_scrub;
1736 /* Forbid to set iteration position after iteration started. */
1737 if (it->ooi_user_ready)
1740 if (hash > OSD_OTABLE_MAX_HASH)
1741 hash = OSD_OTABLE_MAX_HASH;
1743 /* The hash is the last checkpoint position, start from the next one. */
1744 it->ooi_pos = hash + 1;
1745 it->ooi_prefetched = 0;
1746 it->ooi_prefetched_dnode = 0;
1747 it->ooi_user_ready = 1;
1748 if (!scrub->os_full_speed)
1751 /* Unplug OSD layer iteration by the first next() call. */
1752 rc = osd_otable_it_next(env, (struct dt_it *)it);
1757 static int osd_otable_it_key_rec(const struct lu_env *env,
1758 const struct dt_it *di, void *key_rec)
1763 const struct dt_index_operations osd_otable_ops = {
1765 .init = osd_otable_it_init,
1766 .fini = osd_otable_it_fini,
1767 .get = osd_otable_it_get,
1768 .put = osd_otable_it_put,
1769 .next = osd_otable_it_next,
1770 .key = osd_otable_it_key,
1771 .key_size = osd_otable_it_key_size,
1772 .rec = osd_otable_it_rec,
1773 .store = osd_otable_it_store,
1774 .load = osd_otable_it_load,
1775 .key_rec = osd_otable_it_key_rec,
1779 /* high priority inconsistent items list APIs */
1781 int osd_oii_insert(const struct lu_env *env, struct osd_device *dev,
1782 const struct lu_fid *fid, uint64_t oid, bool insert)
1784 struct lustre_scrub *scrub = &dev->od_scrub;
1785 struct osd_inconsistent_item *oii;
1786 bool wakeup = false;
1789 osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1794 INIT_LIST_HEAD(&oii->oii_list);
1795 oii->oii_cache.oic_dev = dev;
1796 oii->oii_cache.oic_fid = *fid;
1797 oii->oii_cache.oic_dnode = oid;
1798 oii->oii_insert = insert;
1800 spin_lock(&scrub->os_lock);
1801 if (!scrub->os_running) {
1802 spin_unlock(&scrub->os_lock);
1807 if (list_empty(&scrub->os_inconsistent_items))
1809 list_add_tail(&oii->oii_list, &scrub->os_inconsistent_items);
1810 spin_unlock(&scrub->os_lock);
1818 int osd_oii_lookup(struct osd_device *dev, const struct lu_fid *fid,
1821 struct lustre_scrub *scrub = &dev->od_scrub;
1822 struct osd_inconsistent_item *oii;
1826 spin_lock(&scrub->os_lock);
1827 list_for_each_entry(oii, &scrub->os_inconsistent_items, oii_list) {
1828 if (lu_fid_eq(fid, &oii->oii_cache.oic_fid)) {
1829 *oid = oii->oii_cache.oic_dnode;
1834 spin_unlock(&scrub->os_lock);
1839 typedef int (*scan_dir_helper_t)(const struct lu_env *env,
1840 struct osd_device *dev, uint64_t dir_oid,
1841 struct osd_zap_it *ozi);
1843 static int osd_scan_dir(const struct lu_env *env, struct osd_device *dev,
1844 uint64_t id, scan_dir_helper_t cb)
1846 struct osd_zap_it *it;
1847 struct luz_direntry *zde;
1848 zap_attribute_t *za;
1853 OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
1857 rc = osd_zap_cursor_init(&it->ozi_zc, dev->od_os, id, 0);
1864 rc = -zap_cursor_retrieve(it->ozi_zc, za);
1872 if (name_is_dot_or_dotdot(za->za_name, strlen(za->za_name))) {
1873 zap_cursor_advance(it->ozi_zc);
1877 strncpy(it->ozi_name, za->za_name, sizeof(it->ozi_name));
1878 if (za->za_integer_length != 8) {
1883 rc = osd_zap_lookup(dev, it->ozi_zc->zc_zapobj, NULL,
1884 za->za_name, za->za_integer_length,
1885 sizeof(*zde) / za->za_integer_length, zde);
1889 rc = cb(env, dev, id, it);
1893 zap_cursor_advance(it->ozi_zc);
1895 osd_zap_cursor_fini(it->ozi_zc);
1898 OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
1902 static int osd_remove_ml_file(const struct lu_env *env, struct osd_device *dev,
1903 uint64_t dir, uint64_t id, struct lu_fid *fid,
1906 struct osd_thread_info *info = osd_oti_get(env);
1907 struct dt_object *dt;
1908 struct osd_object *obj = NULL;
1914 rc = -sa_handle_get(dev->od_os, id, NULL, SA_HDL_PRIVATE, &hdl);
1918 dt = lu2dt(lu_object_find_slice(env, osd2lu_dev(dev), fid, NULL));
1920 RETURN(PTR_ERR(dt));
1923 obj = osd_dt_obj(dt);
1924 down_read(&obj->oo_guard);
1927 rc = -sa_lookup(hdl, SA_ZPL_LINKS(dev), &nlink, sizeof(nlink));
1933 CERROR("%s: multi-link file O/%s/%s/%s has nlink %llu: rc = %d\n",
1934 osd_name(dev), info->oti_seq_name, info->oti_dir_name,
1939 tx = dmu_tx_create(dev->od_os);
1942 CERROR("%s: fail to create tx to remove multi-link file!: rc = %d\n",
1947 dmu_tx_hold_zap(tx, dir, FALSE, NULL);
1948 rc = -dmu_tx_assign(tx, TXG_WAIT);
1953 rc = -sa_update(hdl, SA_ZPL_LINKS(dev), &nlink, sizeof(nlink), tx);
1957 rc = -zap_remove(dev->od_os, dir, name, tx);
1969 up_read(&obj->oo_guard);
1970 dt_object_put_nocache(env, dt);
1973 sa_handle_destroy(hdl);
1977 static int osd_scan_ml_file(const struct lu_env *env, struct osd_device *dev,
1978 uint64_t dir_oid, struct osd_zap_it *ozi)
1980 struct osd_thread_info *info = osd_oti_get(env);
1981 struct lu_fid *fid = &info->oti_fid;
1982 struct ost_id *ostid = &info->oti_ostid;
1989 rc = osd_get_fid_by_oid(env, dev, ozi->ozi_zde.lzd_reg.zde_dnode, fid);
1994 fid_to_ostid(fid, ostid);
1996 snprintf(name, sizeof(name), (fid_seq_is_rsvd(seq) ||
1997 fid_seq_is_mdt0(seq)) ? "%llu" : "%llx",
1998 fid_seq_is_idif(seq) ? 0 : seq);
1999 if (strcmp(info->oti_seq_name, name) != 0)
2002 snprintf(name, sizeof(name), "d%d",
2003 (int)ostid_id(ostid) % OSD_OST_MAP_SIZE);
2004 if (strcmp(info->oti_dir_name, name) != 0)
2007 snprintf(name, sizeof(name), "%llu", ostid_id(ostid));
2008 if (strcmp(ozi->ozi_name, name) == 0)
2012 CDEBUG(D_LFSCK, "%s: the file O/%s/%s/%s is corrupted\n",
2013 osd_name(dev), info->oti_seq_name, info->oti_dir_name,
2016 rc = osd_remove_ml_file(env, dev, dir_oid,
2017 ozi->ozi_zde.lzd_reg.zde_dnode, fid,
2022 static int osd_scan_ml_file_dir(const struct lu_env *env,
2023 struct osd_device *dev, uint64_t dir_oid,
2024 struct osd_zap_it *ozi)
2026 struct osd_thread_info *info = osd_oti_get(env);
2028 if (!S_ISDIR(cpu_to_le16(DTTOIF(ozi->ozi_zde.lzd_reg.zde_type))))
2031 info->oti_dir_name = ozi->ozi_name;
2032 return osd_scan_dir(env, dev, ozi->ozi_zde.lzd_reg.zde_dnode,
2036 static int osd_scan_ml_file_seq(const struct lu_env *env,
2037 struct osd_device *dev, uint64_t dir_oid,
2038 struct osd_zap_it *ozi)
2040 struct osd_thread_info *info = osd_oti_get(env);
2042 if (!S_ISDIR(cpu_to_le16(DTTOIF(ozi->ozi_zde.lzd_reg.zde_type))))
2045 info->oti_seq_name = ozi->ozi_name;
2046 return osd_scan_dir(env, dev, ozi->ozi_zde.lzd_reg.zde_dnode,
2047 osd_scan_ml_file_dir);
2050 static int osd_scan_ml_file_main(const struct lu_env *env,
2051 struct osd_device *dev)
2053 return osd_scan_dir(env, dev, dev->od_O_id, osd_scan_ml_file_seq);