4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (c) 2013, 2016, Intel Corporation.
26 * lustre/lfsck/lfsck_lib.c
28 * Author: Fan, Yong <fan.yong@intel.com>
31 #define DEBUG_SUBSYSTEM S_LFSCK
33 #include <linux/kthread.h>
34 #include <linux/sched.h>
35 #include <linux/list.h>
36 #include <lu_object.h>
37 #include <dt_object.h>
38 #include <md_object.h>
39 #include <lustre_fld.h>
40 #include <lustre_lib.h>
41 #include <lustre_net.h>
42 #include <lustre_lfsck.h>
43 #include <lustre/lustre_lfsck_user.h>
45 #include "lfsck_internal.h"
47 #define LFSCK_CHECKPOINT_SKIP 1
49 /* define lfsck thread key */
50 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
52 static void lfsck_key_fini(const struct lu_context *ctx,
53 struct lu_context_key *key, void *data)
55 struct lfsck_thread_info *info = data;
57 lu_buf_free(&info->lti_linkea_buf);
58 lu_buf_free(&info->lti_linkea_buf2);
59 lu_buf_free(&info->lti_big_buf);
63 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
64 LU_KEY_INIT_GENERIC(lfsck);
66 static struct list_head lfsck_instance_list;
67 static struct list_head lfsck_ost_orphan_list;
68 static struct list_head lfsck_mdt_orphan_list;
69 static DEFINE_SPINLOCK(lfsck_instance_lock);
71 const char *lfsck_flags_names[] = {
80 const char *lfsck_param_names[] = {
92 enum lfsck_verify_lpf_types {
94 LVLT_BY_NAMEENTRY = 1,
98 lfsck_reset_ltd_status(struct lfsck_tgt_desc *ltd, enum lfsck_type type)
100 if (type == LFSCK_TYPE_LAYOUT) {
101 ltd->ltd_layout_status = LS_MAX;
102 ltd->ltd_layout_repaired = 0;
104 ltd->ltd_namespace_status = LS_MAX;
105 ltd->ltd_namespace_repaired = 0;
109 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
111 spin_lock_init(<ds->ltd_lock);
112 init_rwsem(<ds->ltd_rw_sem);
113 INIT_LIST_HEAD(<ds->ltd_orphan);
114 ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
115 if (ltds->ltd_tgts_bitmap == NULL)
121 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
123 struct lfsck_tgt_desc *ltd;
124 struct lfsck_tgt_desc *next;
127 down_write(<ds->ltd_rw_sem);
129 list_for_each_entry_safe(ltd, next, <ds->ltd_orphan,
131 list_del_init(<d->ltd_orphan_list);
135 if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
136 up_write(<ds->ltd_rw_sem);
141 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
142 ltd = lfsck_ltd2tgt(ltds, idx);
143 if (likely(ltd != NULL)) {
144 LASSERT(list_empty(<d->ltd_layout_list));
145 LASSERT(list_empty(<d->ltd_layout_phase_list));
146 LASSERT(list_empty(<d->ltd_namespace_list));
147 LASSERT(list_empty(<d->ltd_namespace_phase_list));
150 cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
151 lfsck_assign_tgt(ltds, NULL, idx);
156 LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
159 for (idx = 0; idx < TGT_PTRS; idx++) {
160 if (ltds->ltd_tgts_idx[idx] != NULL) {
161 OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
162 ltds->ltd_tgts_idx[idx] = NULL;
166 CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
167 ltds->ltd_tgts_bitmap = NULL;
168 up_write(<ds->ltd_rw_sem);
171 static int __lfsck_add_target(const struct lu_env *env,
172 struct lfsck_instance *lfsck,
173 struct lfsck_tgt_desc *ltd,
174 bool for_ost, bool locked)
176 struct lfsck_tgt_descs *ltds;
177 __u32 index = ltd->ltd_index;
182 ltds = &lfsck->li_ost_descs;
184 ltds = &lfsck->li_mdt_descs;
187 down_write(<ds->ltd_rw_sem);
189 LASSERT(ltds->ltd_tgts_bitmap != NULL);
191 if (index >= ltds->ltd_tgts_bitmap->size) {
192 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
193 (__u32)BITS_PER_LONG);
194 struct cfs_bitmap *old_bitmap = ltds->ltd_tgts_bitmap;
195 struct cfs_bitmap *new_bitmap;
197 while (newsize < index + 1)
200 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
201 if (new_bitmap == NULL)
202 GOTO(unlock, rc = -ENOMEM);
204 if (ltds->ltd_tgtnr > 0)
205 cfs_bitmap_copy(new_bitmap, old_bitmap);
206 ltds->ltd_tgts_bitmap = new_bitmap;
207 CFS_FREE_BITMAP(old_bitmap);
210 if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
211 CERROR("%s: the device %s (%u) is registered already\n",
212 lfsck_lfsck2name(lfsck),
213 ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
214 GOTO(unlock, rc = -EEXIST);
217 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
218 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
219 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
220 GOTO(unlock, rc = -ENOMEM);
223 lfsck_assign_tgt(ltds, ltd, index);
224 cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
227 GOTO(unlock, rc = 0);
231 up_write(<ds->ltd_rw_sem);
236 static int lfsck_add_target_from_orphan(const struct lu_env *env,
237 struct lfsck_instance *lfsck)
239 struct lfsck_tgt_descs *ltds = &lfsck->li_ost_descs;
240 struct lfsck_tgt_desc *ltd;
241 struct lfsck_tgt_desc *next;
242 struct list_head *head = &lfsck_ost_orphan_list;
247 spin_lock(&lfsck_instance_lock);
248 list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
249 if (ltd->ltd_key == lfsck->li_bottom)
250 list_move_tail(<d->ltd_orphan_list,
253 spin_unlock(&lfsck_instance_lock);
255 down_write(<ds->ltd_rw_sem);
256 while (!list_empty(<ds->ltd_orphan)) {
257 ltd = list_entry(ltds->ltd_orphan.next,
258 struct lfsck_tgt_desc,
260 list_del_init(<d->ltd_orphan_list);
261 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
262 /* Do not hold the semaphore for too long time. */
263 up_write(<ds->ltd_rw_sem);
267 down_write(<ds->ltd_rw_sem);
269 up_write(<ds->ltd_rw_sem);
272 ltds = &lfsck->li_mdt_descs;
273 head = &lfsck_mdt_orphan_list;
281 static inline struct lfsck_component *
282 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
283 struct list_head *list)
285 struct lfsck_component *com;
287 list_for_each_entry(com, list, lc_link) {
288 if (com->lc_type == type)
294 struct lfsck_component *
295 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
297 struct lfsck_component *com;
299 spin_lock(&lfsck->li_lock);
300 com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
304 com = __lfsck_component_find(lfsck, type,
305 &lfsck->li_list_double_scan);
309 com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
313 lfsck_component_get(com);
314 spin_unlock(&lfsck->li_lock);
318 void lfsck_component_cleanup(const struct lu_env *env,
319 struct lfsck_component *com)
321 if (!list_empty(&com->lc_link))
322 list_del_init(&com->lc_link);
323 if (!list_empty(&com->lc_link_dir))
324 list_del_init(&com->lc_link_dir);
326 lfsck_component_put(env, com);
329 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
330 struct lu_fid *fid, bool locked)
332 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
337 mutex_lock(&lfsck->li_mutex);
339 rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
341 bk->lb_last_fid = *fid;
342 /* We do not care about whether the subsequent sub-operations
343 * failed or not. The worst case is that one FID is lost that
344 * is not a big issue for the LFSCK since it is relative rare
345 * for LFSCK create. */
346 rc = lfsck_bookmark_store(env, lfsck);
350 mutex_unlock(&lfsck->li_mutex);
355 static int __lfsck_ibits_lock(const struct lu_env *env,
356 struct lfsck_instance *lfsck,
357 struct dt_object *obj, struct ldlm_res_id *resid,
358 struct lustre_handle *lh, __u64 bits,
361 struct lfsck_thread_info *info = lfsck_env_info(env);
362 union ldlm_policy_data *policy = &info->lti_policy;
363 __u64 flags = LDLM_FL_ATOMIC_CB;
366 LASSERT(lfsck->li_namespace != NULL);
368 memset(policy, 0, sizeof(*policy));
369 policy->l_inodebits.bits = bits;
370 if (dt_object_remote(obj)) {
371 struct ldlm_enqueue_info *einfo = &info->lti_einfo;
373 memset(einfo, 0, sizeof(*einfo));
374 einfo->ei_type = LDLM_IBITS;
375 einfo->ei_mode = mode;
376 einfo->ei_cb_bl = ldlm_blocking_ast;
377 einfo->ei_cb_cp = ldlm_completion_ast;
378 einfo->ei_res_id = resid;
380 rc = dt_object_lock(env, obj, lh, einfo, policy);
382 rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid,
383 LDLM_IBITS, policy, mode,
384 &flags, ldlm_blocking_ast,
385 ldlm_completion_ast, NULL, NULL,
386 0, LVB_T_NONE, NULL, lh);
389 if (rc == ELDLM_OK) {
392 memset(lh, 0, sizeof(*lh));
400 * Request the specified ibits lock for the given object.
402 * Before the LFSCK modifying on the namespace visible object,
403 * it needs to acquire related ibits ldlm lock.
405 * \param[in] env pointer to the thread context
406 * \param[in] lfsck pointer to the lfsck instance
407 * \param[in] obj pointer to the dt_object to be locked
408 * \param[out] lh pointer to the lock handle
409 * \param[in] bits the bits for the ldlm lock to be acquired
410 * \param[in] mode the mode for the ldlm lock to be acquired
412 * \retval 0 for success
413 * \retval negative error number on failure
415 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
416 struct dt_object *obj, struct lustre_handle *lh,
417 __u64 bits, enum ldlm_mode mode)
419 struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
421 LASSERT(!lustre_handle_is_used(lh));
423 fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
424 return __lfsck_ibits_lock(env, lfsck, obj, resid, lh, bits, mode);
428 * Release the the specified ibits lock.
430 * If the lock has been acquired before, release it
431 * and cleanup the handle. Otherwise, do nothing.
433 * \param[in] lh pointer to the lock handle
434 * \param[in] mode the mode for the ldlm lock to be released
436 void lfsck_ibits_unlock(struct lustre_handle *lh, enum ldlm_mode mode)
438 if (lustre_handle_is_used(lh)) {
439 ldlm_lock_decref(lh, mode);
440 memset(lh, 0, sizeof(*lh));
445 * Request compound ibits locks for the given <obj, name> pairs.
447 * Before the LFSCK modifying on the namespace visible object, it needs to
448 * acquire related ibits ldlm lock. Usually, we can use lfsck_ibits_lock for
449 * the lock purpose. But the simple lfsck_ibits_lock for directory-based
450 * modificationis (such as insert name entry to the directory) may be too
451 * coarse-grained and not efficient.
453 * The lfsck_lock() will request compound ibits locks on the specified
454 * <obj, name> pairs: the PDO (Parallel Directory Operations) ibits (UPDATE)
455 * lock on the directory object, and the regular ibits lock on the name hash.
457 * \param[in] env pointer to the thread context
458 * \param[in] lfsck pointer to the lfsck instance
459 * \param[in] obj pointer to the dt_object to be locked
460 * \param[in] name used for building the PDO lock resource
461 * \param[out] llh pointer to the lfsck_lock_handle
462 * \param[in] bits the bits for the ldlm lock to be acquired
463 * \param[in] mode the mode for the ldlm lock to be acquired
465 * \retval 0 for success
466 * \retval negative error number on failure
468 int lfsck_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
469 struct dt_object *obj, const char *name,
470 struct lfsck_lock_handle *llh, __u64 bits, enum ldlm_mode mode)
472 struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
475 LASSERT(S_ISDIR(lfsck_object_type(obj)));
476 LASSERT(name != NULL);
477 LASSERT(name[0] != 0);
478 LASSERT(!lustre_handle_is_used(&llh->llh_pdo_lh));
479 LASSERT(!lustre_handle_is_used(&llh->llh_reg_lh));
483 llh->llh_pdo_mode = LCK_EX;
486 llh->llh_pdo_mode = LCK_CW;
489 llh->llh_pdo_mode = LCK_CR;
492 CDEBUG(D_LFSCK, "%s: unexpected PDO lock mode %u on the obj "
493 DFID"\n", lfsck_lfsck2name(lfsck), mode,
494 PFID(lfsck_dto2fid(obj)));
498 fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
499 rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_pdo_lh,
500 MDS_INODELOCK_UPDATE, llh->llh_pdo_mode);
504 llh->llh_reg_mode = mode;
505 resid->name[LUSTRE_RES_ID_HSH_OFF] = full_name_hash(name, strlen(name));
506 LASSERT(resid->name[LUSTRE_RES_ID_HSH_OFF] != 0);
507 rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_reg_lh,
508 bits, llh->llh_reg_mode);
510 lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
516 * Release the the compound ibits locks.
518 * \param[in] llh pointer to the lfsck_lock_handle to be released
520 void lfsck_unlock(struct lfsck_lock_handle *llh)
522 lfsck_ibits_unlock(&llh->llh_reg_lh, llh->llh_reg_mode);
523 lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
526 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
527 struct lfsck_instance *lfsck,
528 const struct lu_fid *fid)
530 struct seq_server_site *ss = lfsck_dev_site(lfsck);
531 struct lu_seq_range *range = &lfsck_env_info(env)->lti_range;
534 if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) {
535 /* "ROOT" is always on the MDT0. */
536 if (lu_fid_eq(fid, &lfsck->li_global_root_fid))
539 return lfsck_dev_idx(lfsck);
542 fld_range_set_mdt(range);
543 rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
545 rc = range->lsr_index;
550 const char dot[] = ".";
551 const char dotdot[] = "..";
552 static const char dotlustre[] = ".lustre";
553 static const char lostfound[] = "lost+found";
556 * Remove the name entry from the .lustre/lost+found directory.
558 * No need to care about the object referenced by the name entry,
559 * either the name entry is invalid or redundant, or the referenced
560 * object has been processed or will be handled by others.
562 * \param[in] env pointer to the thread context
563 * \param[in] lfsck pointer to the lfsck instance
564 * \param[in] name the name for the name entry to be removed
566 * \retval 0 for success
567 * \retval negative error number on failure
569 static int lfsck_lpf_remove_name_entry(const struct lu_env *env,
570 struct lfsck_instance *lfsck,
573 struct dt_object *parent = lfsck->li_lpf_root_obj;
574 struct dt_device *dev = lfsck_obj2dev(parent);
576 struct lfsck_lock_handle *llh = &lfsck_env_info(env)->lti_llh;
580 rc = lfsck_lock(env, lfsck, parent, name, llh,
581 MDS_INODELOCK_UPDATE, LCK_PW);
585 th = dt_trans_create(env, dev);
587 GOTO(unlock, rc = PTR_ERR(th));
589 rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
593 rc = dt_declare_ref_del(env, parent, th);
597 rc = dt_trans_start_local(env, dev, th);
601 rc = dt_delete(env, parent, (const struct dt_key *)name, th);
605 dt_write_lock(env, parent, 0);
606 rc = dt_ref_del(env, parent, th);
607 dt_write_unlock(env, parent);
612 dt_trans_stop(env, dev, th);
617 CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n",
618 lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc);
623 static int lfsck_create_lpf_local(const struct lu_env *env,
624 struct lfsck_instance *lfsck,
625 struct dt_object *child,
627 struct dt_object_format *dof,
630 struct dt_insert_rec *rec = &lfsck_env_info(env)->lti_dt_rec;
631 struct dt_object *parent = lfsck->li_lpf_root_obj;
632 struct dt_device *dev = lfsck_obj2dev(child);
633 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
634 struct dt_object *bk_obj = lfsck->li_bookmark_obj;
635 const struct lu_fid *cfid = lfsck_dto2fid(child);
636 struct thandle *th = NULL;
637 struct linkea_data ldata = { NULL };
638 struct lu_buf linkea_buf;
639 const struct lu_name *cname;
641 int len = sizeof(struct lfsck_bookmark);
645 rc = linkea_data_new(&ldata,
646 &lfsck_env_info(env)->lti_linkea_buf2);
650 cname = lfsck_name_get_const(env, name, strlen(name));
651 rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
655 th = dt_trans_create(env, dev);
659 /* 1a. create child */
660 rc = dt_declare_create(env, child, la, NULL, dof, th);
664 if (!dt_try_as_dir(env, child))
665 GOTO(stop, rc = -ENOTDIR);
667 /* 2a. increase child nlink */
668 rc = dt_declare_ref_add(env, child, th);
672 /* 3a. insert dot into child dir */
673 rec->rec_type = S_IFDIR;
675 rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
676 (const struct dt_key *)dot, th);
680 /* 4a. insert dotdot into child dir */
681 rec->rec_fid = &LU_LPF_FID;
682 rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
683 (const struct dt_key *)dotdot, th);
687 /* 5a. insert linkEA for child */
688 lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
689 ldata.ld_leh->leh_len);
690 rc = dt_declare_xattr_set(env, child, &linkea_buf,
691 XATTR_NAME_LINK, 0, th);
695 /* 6a. insert name into parent dir */
696 rec->rec_type = S_IFDIR;
698 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
699 (const struct dt_key *)name, th);
703 /* 7a. increase parent nlink */
704 rc = dt_declare_ref_add(env, parent, th);
708 /* 8a. update bookmark */
709 rc = dt_declare_record_write(env, bk_obj,
710 lfsck_buf_get(env, bk, len), 0, th);
714 rc = dt_trans_start_local(env, dev, th);
718 dt_write_lock(env, child, 0);
719 /* 1b. create child */
720 rc = dt_create(env, child, la, NULL, dof, th);
724 /* 2b. increase child nlink */
725 rc = dt_ref_add(env, child, th);
729 /* 3b. insert dot into child dir */
731 rc = dt_insert(env, child, (const struct dt_rec *)rec,
732 (const struct dt_key *)dot, th, 1);
736 /* 4b. insert dotdot into child dir */
737 rec->rec_fid = &LU_LPF_FID;
738 rc = dt_insert(env, child, (const struct dt_rec *)rec,
739 (const struct dt_key *)dotdot, th, 1);
743 /* 5b. insert linkEA for child. */
744 rc = dt_xattr_set(env, child, &linkea_buf,
745 XATTR_NAME_LINK, 0, th);
746 dt_write_unlock(env, child);
750 /* 6b. insert name into parent dir */
752 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
753 (const struct dt_key *)name, th, 1);
757 dt_write_lock(env, parent, 0);
758 /* 7b. increase parent nlink */
759 rc = dt_ref_add(env, parent, th);
760 dt_write_unlock(env, parent);
764 bk->lb_lpf_fid = *cfid;
765 lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
767 /* 8b. update bookmark */
768 rc = dt_record_write(env, bk_obj,
769 lfsck_buf_get(env, bk, len), &pos, th);
774 dt_write_unlock(env, child);
777 dt_trans_stop(env, dev, th);
782 static int lfsck_create_lpf_remote(const struct lu_env *env,
783 struct lfsck_instance *lfsck,
784 struct dt_object *child,
786 struct dt_object_format *dof,
789 struct dt_insert_rec *rec = &lfsck_env_info(env)->lti_dt_rec;
790 struct dt_object *parent = lfsck->li_lpf_root_obj;
791 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
792 struct dt_object *bk_obj = lfsck->li_bookmark_obj;
793 const struct lu_fid *cfid = lfsck_dto2fid(child);
794 struct thandle *th = NULL;
795 struct linkea_data ldata = { NULL };
796 struct lu_buf linkea_buf;
797 const struct lu_name *cname;
798 struct dt_device *dev;
800 int len = sizeof(struct lfsck_bookmark);
804 rc = linkea_data_new(&ldata,
805 &lfsck_env_info(env)->lti_linkea_buf2);
809 cname = lfsck_name_get_const(env, name, strlen(name));
810 rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
814 /* Create .lustre/lost+found/MDTxxxx. */
816 /* XXX: Currently, cross-MDT create operation needs to create the child
817 * object firstly, then insert name into the parent directory. For
818 * this case, the child object resides on current MDT (local), but
819 * the parent ".lustre/lost+found" may be on remote MDT. It is not
820 * easy to contain all the sub-modifications orderly within single
823 * To avoid more inconsistency, we split the create operation into
826 * 1) create the child and update the lfsck_bookmark::lb_lpf_fid
828 * 2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
831 * If 1) done, but 2) failed, then go ahead, the LFSCK will try to
832 * repair such inconsistency when LFSCK run next time. */
834 /* Transaction I: locally */
836 dev = lfsck_obj2dev(child);
837 th = dt_trans_create(env, dev);
841 /* 1a. create child */
842 rc = dt_declare_create(env, child, la, NULL, dof, th);
846 if (!dt_try_as_dir(env, child))
847 GOTO(stop, rc = -ENOTDIR);
849 /* 2a. increase child nlink */
850 rc = dt_declare_ref_add(env, child, th);
854 /* 3a. insert dot into child dir */
855 rec->rec_type = S_IFDIR;
857 rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
858 (const struct dt_key *)dot, th);
862 /* 4a. insert dotdot into child dir */
863 rec->rec_fid = &LU_LPF_FID;
864 rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
865 (const struct dt_key *)dotdot, th);
869 /* 5a. insert linkEA for child */
870 lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
871 ldata.ld_leh->leh_len);
872 rc = dt_declare_xattr_set(env, child, &linkea_buf,
873 XATTR_NAME_LINK, 0, th);
877 /* 6a. update bookmark */
878 rc = dt_declare_record_write(env, bk_obj,
879 lfsck_buf_get(env, bk, len), 0, th);
883 rc = dt_trans_start_local(env, dev, th);
887 dt_write_lock(env, child, 0);
888 /* 1b. create child */
889 rc = dt_create(env, child, la, NULL, dof, th);
893 /* 2b. increase child nlink */
894 rc = dt_ref_add(env, child, th);
898 /* 3b. insert dot into child dir */
899 rec->rec_type = S_IFDIR;
901 rc = dt_insert(env, child, (const struct dt_rec *)rec,
902 (const struct dt_key *)dot, th, 1);
906 /* 4b. insert dotdot into child dir */
907 rec->rec_fid = &LU_LPF_FID;
908 rc = dt_insert(env, child, (const struct dt_rec *)rec,
909 (const struct dt_key *)dotdot, th, 1);
913 /* 5b. insert linkEA for child */
914 rc = dt_xattr_set(env, child, &linkea_buf,
915 XATTR_NAME_LINK, 0, th);
919 bk->lb_lpf_fid = *cfid;
920 lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
922 /* 6b. update bookmark */
923 rc = dt_record_write(env, bk_obj,
924 lfsck_buf_get(env, bk, len), &pos, th);
926 dt_write_unlock(env, child);
927 dt_trans_stop(env, dev, th);
931 /* Transaction II: remotely */
933 dev = lfsck_obj2dev(parent);
934 th = dt_trans_create(env, dev);
939 /* 5a. insert name into parent dir */
941 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
942 (const struct dt_key *)name, th);
946 /* 6a. increase parent nlink */
947 rc = dt_declare_ref_add(env, parent, th);
951 rc = dt_trans_start_local(env, dev, th);
955 /* 5b. insert name into parent dir */
956 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
957 (const struct dt_key *)name, th, 1);
961 dt_write_lock(env, parent, 0);
962 /* 6b. increase parent nlink */
963 rc = dt_ref_add(env, parent, th);
964 dt_write_unlock(env, parent);
969 dt_write_unlock(env, child);
971 dt_trans_stop(env, dev, th);
973 if (rc != 0 && dev == lfsck_obj2dev(parent))
974 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
975 "for orphans, but failed to insert the name %s "
976 "to the .lustre/lost+found/. Such inconsistency "
977 "will be repaired when LFSCK run next time: rc = %d\n",
978 lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
984 * Create the MDTxxxx directory under /ROOT/.lustre/lost+found/
986 * The /ROOT/.lustre/lost+found/MDTxxxx/ directory is used for holding
987 * orphans and other uncertain inconsistent objects found during the
988 * LFSCK. Such directory will be created by the LFSCK engine on the
989 * local MDT before the LFSCK scanning.
991 * \param[in] env pointer to the thread context
992 * \param[in] lfsck pointer to the lfsck instance
994 * \retval 0 for success
995 * \retval negative error number on failure
997 static int lfsck_create_lpf(const struct lu_env *env,
998 struct lfsck_instance *lfsck)
1000 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1001 struct lfsck_thread_info *info = lfsck_env_info(env);
1002 struct lu_fid *cfid = &info->lti_fid2;
1003 struct lu_attr *la = &info->lti_la;
1004 struct dt_object_format *dof = &info->lti_dof;
1005 struct dt_object *parent = lfsck->li_lpf_root_obj;
1006 struct dt_object *child = NULL;
1007 struct lfsck_lock_handle *llh = &info->lti_llh;
1009 int node = lfsck_dev_idx(lfsck);
1013 LASSERT(lfsck->li_master);
1014 LASSERT(parent != NULL);
1015 LASSERT(lfsck->li_lpf_obj == NULL);
1017 snprintf(name, 8, "MDT%04x", node);
1018 rc = lfsck_lock(env, lfsck, parent, name, llh,
1019 MDS_INODELOCK_UPDATE, LCK_PW);
1023 if (fid_is_zero(&bk->lb_lpf_fid)) {
1024 /* There is corner case that: in former LFSCK scanning we have
1025 * created the .lustre/lost+found/MDTxxxx but failed to update
1026 * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
1027 * it from MDT0 firstly. */
1028 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1029 (const struct dt_key *)name);
1030 if (rc != 0 && rc != -ENOENT)
1034 bk->lb_lpf_fid = *cfid;
1035 rc = lfsck_bookmark_store(env, lfsck);
1037 rc = lfsck_fid_alloc(env, lfsck, cfid, true);
1042 *cfid = bk->lb_lpf_fid;
1045 child = lfsck_object_find_bottom_new(env, lfsck, cfid);
1047 GOTO(unlock, rc = PTR_ERR(child));
1049 if (dt_object_exists(child) != 0) {
1050 if (unlikely(!dt_try_as_dir(env, child)))
1053 lfsck->li_lpf_obj = child;
1058 memset(la, 0, sizeof(*la));
1059 la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
1060 la->la_mode = S_IFDIR | S_IRWXU;
1061 la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1063 memset(dof, 0, sizeof(*dof));
1064 dof->dof_type = dt_mode_to_dft(S_IFDIR);
1067 rc = lfsck_create_lpf_local(env, lfsck, child, la, dof, name);
1069 rc = lfsck_create_lpf_remote(env, lfsck, child, la, dof, name);
1071 lfsck->li_lpf_obj = child;
1077 if (rc != 0 && child != NULL && !IS_ERR(child))
1078 lfsck_object_put(env, child);
1084 * Scan .lustre/lost+found for bad name entries and remove them.
1086 * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
1087 * index in the system. Any other formatted name is invalid and should be
1090 * \param[in] env pointer to the thread context
1091 * \param[in] lfsck pointer to the lfsck instance
1093 * \retval 0 for success
1094 * \retval negative error number on failure
1096 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
1097 struct lfsck_instance *lfsck)
1099 struct dt_object *parent = lfsck->li_lpf_root_obj;
1100 struct lu_dirent *ent =
1101 (struct lu_dirent *)lfsck_env_info(env)->lti_key;
1102 const struct dt_it_ops *iops = &parent->do_index_ops->dio_it;
1107 it = iops->init(env, parent, LUDA_64BITHASH);
1109 RETURN(PTR_ERR(it));
1111 rc = iops->load(env, it, 0);
1113 rc = iops->next(env, it);
1120 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
1124 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
1125 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1128 /* name length must be strlen("MDTxxxx") */
1129 if (ent->lde_namelen != 7)
1132 if (memcmp(ent->lde_name, "MDT", off) != 0)
1135 while (off < 7 && isxdigit(ent->lde_name[off]))
1141 rc = lfsck_lpf_remove_name_entry(env, lfsck,
1148 rc = iops->next(env, it);
1152 iops->fini(env, it);
1154 RETURN(rc > 0 ? 0 : rc);
1157 static int lfsck_update_lpf_entry(const struct lu_env *env,
1158 struct lfsck_instance *lfsck,
1159 struct dt_object *parent,
1160 struct dt_object *child,
1162 enum lfsck_verify_lpf_types type)
1166 if (type == LVLT_BY_BOOKMARK) {
1167 rc = lfsck_update_name_entry(env, lfsck, parent, name,
1168 lfsck_dto2fid(child), S_IFDIR);
1169 } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1170 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
1171 rc = lfsck_bookmark_store(env, lfsck);
1173 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
1174 " in the bookmark file: rc = %d\n",
1175 lfsck_lfsck2name(lfsck),
1176 PFID(lfsck_dto2fid(child)), rc);
1183 * Check whether the @child back references the @parent.
1186 * 1) The child's FID is stored in the bookmark file. If the child back
1187 * references the parent (LU_LPF_FID object) via its ".." entry, then
1188 * insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1189 * the child back references another parent2, then:
1190 * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1191 * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1192 * references the child. So keep them there. As the LFSCK processing,
1193 * the parent3 may be found, then when the LFSCK run next time, the
1194 * inconsistency can be repaired.
1196 * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1197 * entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1198 * via its ".." entry, then update the bookmark file, otherwise, if the child
1199 * back references another parent2, then:
1200 * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1201 * from .lustre/lost+found/;
1202 * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1203 * sub-directory name entry and update the child;
1204 * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1205 * or not, then keep them there.
1207 * \param[in] env pointer to the thread context
1208 * \param[in] lfsck pointer to the lfsck instance
1209 * \param[in] child pointer to the lost+found sub-directory object
1210 * \param[in] name the name for lost+found sub-directory object
1211 * \param[out] fid pointer to the buffer to hold the FID of the object
1212 * (called it as parent2) that is referenced via the
1213 * child's dotdot entry; it also can be the FID that
1214 * is referenced by the name entry under the parent2.
1215 * \param[in] type to indicate where the child's FID is stored in
1217 * \retval positive number for uncertain inconsistency
1218 * \retval 0 for success
1219 * \retval negative error number on failure
1221 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1222 struct lfsck_instance *lfsck,
1223 struct dt_object *child, const char *name,
1225 enum lfsck_verify_lpf_types type)
1227 struct dt_object *parent = lfsck->li_lpf_root_obj;
1228 struct lfsck_thread_info *info = lfsck_env_info(env);
1229 char *name2 = info->lti_key;
1230 struct lu_fid *fid2 = &info->lti_fid3;
1231 struct dt_object *parent2 = NULL;
1232 struct lustre_handle lh = { 0 };
1237 rc = dt_lookup(env, child, (struct dt_rec *)fid,
1238 (const struct dt_key *)dotdot);
1242 if (!fid_is_sane(fid))
1243 GOTO(linkea, rc = -EINVAL);
1245 if (lu_fid_eq(fid, &LU_LPF_FID)) {
1246 const struct lu_name *cname;
1248 if (lfsck->li_lpf_obj == NULL) {
1249 lu_object_get(&child->do_lu);
1250 lfsck->li_lpf_obj = child;
1253 cname = lfsck_name_get_const(env, name, strlen(name));
1254 rc = lfsck_verify_linkea(env, child, cname, &LU_LPF_FID);
1256 rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1262 parent2 = lfsck_object_find_bottom(env, lfsck, fid);
1263 if (IS_ERR(parent2))
1264 GOTO(linkea, parent2);
1266 if (!dt_object_exists(parent2)) {
1267 lfsck_object_put(env, parent2);
1269 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1272 if (!dt_try_as_dir(env, parent2)) {
1273 lfsck_object_put(env, parent2);
1275 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1279 /* To prevent rename/unlink race */
1280 rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1281 MDS_INODELOCK_UPDATE, LCK_PR);
1285 dt_read_lock(env, child, 0);
1286 rc = lfsck_links_get_first(env, child, name2, fid2);
1288 dt_read_unlock(env, child);
1289 lfsck_ibits_unlock(&lh, LCK_PR);
1291 GOTO(out_put, rc = 1);
1294 /* It is almost impossible that the bookmark file (or the name entry)
1295 * and the linkEA hit the same data corruption. Trust the linkEA. */
1296 if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1297 dt_read_unlock(env, child);
1298 lfsck_ibits_unlock(&lh, LCK_PR);
1301 if (lfsck->li_lpf_obj == NULL) {
1302 lu_object_get(&child->do_lu);
1303 lfsck->li_lpf_obj = child;
1306 /* Update the child's dotdot entry */
1307 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1308 &LU_LPF_FID, S_IFDIR);
1310 rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1316 if (parent2 == NULL || IS_ERR(parent2)) {
1317 dt_read_unlock(env, child);
1318 lfsck_ibits_unlock(&lh, LCK_PR);
1320 GOTO(out_done, rc = 1);
1323 rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1324 (const struct dt_key *)name2);
1325 dt_read_unlock(env, child);
1326 lfsck_ibits_unlock(&lh, LCK_PR);
1327 if (rc != 0 && rc != -ENOENT)
1330 if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1331 if (type == LVLT_BY_BOOKMARK)
1332 GOTO(out_put, rc = 1);
1334 /* Trust the name entry, update the child's dotdot entry. */
1335 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1336 &LU_LPF_FID, S_IFDIR);
1341 if (type == LVLT_BY_BOOKMARK) {
1342 /* Invalid FID record in the bookmark file, reset it. */
1343 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1344 rc = lfsck_bookmark_store(env, lfsck);
1346 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1347 " in the bookmark file: rc = %d\n",
1348 lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1349 } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1350 /* The name entry is wrong, remove it. */
1351 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1357 if (parent2 != NULL && !IS_ERR(parent2))
1358 lfsck_object_put(env, parent2);
1365 * Verify the /ROOT/.lustre/lost+found/ directory.
1367 * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1368 * the LFSCK does not exactly know how to handle, such as orphans. So before
1369 * the LFSCK scanning the system, the consistency of such directory needs to
1370 * be verified firstly to allow the users to use it during the LFSCK.
1372 * \param[in] env pointer to the thread context
1373 * \param[in] lfsck pointer to the lfsck instance
1375 * \retval positive number for uncertain inconsistency
1376 * \retval 0 for success
1377 * \retval negative error number on failure
1379 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1381 struct lfsck_thread_info *info = lfsck_env_info(env);
1382 struct lu_fid *pfid = &info->lti_fid;
1383 struct lu_fid *cfid = &info->lti_fid2;
1384 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1385 struct dt_object *parent;
1386 /* child1's FID is in the bookmark file. */
1387 struct dt_object *child1 = NULL;
1388 /* child2's FID is in the name entry MDTxxxx. */
1389 struct dt_object *child2 = NULL;
1390 const struct lu_name *cname;
1392 int node = lfsck_dev_idx(lfsck);
1396 LASSERT(lfsck->li_master);
1398 if (lfsck->li_lpf_root_obj != NULL)
1402 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
1405 struct lfsck_tgt_desc *ltd;
1407 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1408 if (unlikely(ltd == NULL))
1411 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1417 RETURN(PTR_ERR(parent));
1419 LASSERT(dt_object_exists(parent));
1421 if (unlikely(!dt_try_as_dir(env, parent))) {
1422 lfsck_object_put(env, parent);
1424 GOTO(put, rc = -ENOTDIR);
1427 lfsck->li_lpf_root_obj = parent;
1429 rc = lfsck_scan_lpf_bad_entries(env, lfsck);
1431 CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1432 "for bad sub-directories: rc = %d\n",
1433 lfsck_lfsck2name(lfsck), rc);
1437 snprintf(name, 8, "MDT%04x", node);
1438 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1439 (const struct dt_key *)name);
1440 if (rc == -ENOENT) {
1448 /* Invalid FID in the name entry, remove the name entry. */
1449 if (!fid_is_norm(cfid)) {
1450 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1457 child2 = lfsck_object_find_bottom(env, lfsck, cfid);
1459 GOTO(put, rc = PTR_ERR(child2));
1461 if (unlikely(!dt_object_exists(child2) ||
1462 dt_object_remote(child2)) ||
1463 !S_ISDIR(lfsck_object_type(child2))) {
1464 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1471 if (unlikely(!dt_try_as_dir(env, child2))) {
1472 lfsck_object_put(env, child2);
1478 if (fid_is_zero(&bk->lb_lpf_fid))
1481 if (likely(lu_fid_eq(cfid, &bk->lb_lpf_fid))) {
1482 if (lfsck->li_lpf_obj == NULL) {
1483 lu_object_get(&child2->do_lu);
1484 lfsck->li_lpf_obj = child2;
1487 cname = lfsck_name_get_const(env, name, strlen(name));
1488 rc = lfsck_verify_linkea(env, child2, cname, &LU_LPF_FID);
1493 if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1494 struct lu_fid tfid = bk->lb_lpf_fid;
1496 /* Invalid FID record in the bookmark file, reset it. */
1497 fid_zero(&bk->lb_lpf_fid);
1498 rc = lfsck_bookmark_store(env, lfsck);
1500 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1501 " in the bookmark file: rc = %d\n",
1502 lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1510 child1 = lfsck_object_find_bottom(env, lfsck, &bk->lb_lpf_fid);
1511 if (IS_ERR(child1)) {
1516 if (unlikely(!dt_object_exists(child1) ||
1517 dt_object_remote(child1)) ||
1518 !S_ISDIR(lfsck_object_type(child1))) {
1519 /* Invalid FID record in the bookmark file, reset it. */
1520 fid_zero(&bk->lb_lpf_fid);
1521 rc = lfsck_bookmark_store(env, lfsck);
1523 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1524 " in the bookmark file: rc = %d\n",
1525 lfsck_lfsck2name(lfsck),
1526 PFID(lfsck_dto2fid(child1)), rc);
1531 lfsck_object_put(env, child1);
1536 if (unlikely(!dt_try_as_dir(env, child1))) {
1537 lfsck_object_put(env, child1);
1543 rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name, pfid,
1545 if (lu_fid_eq(pfid, &LU_LPF_FID))
1550 rc = lfsck_verify_lpf_pairs(env, lfsck, child2, name,
1551 pfid, LVLT_BY_NAMEENTRY);
1556 if (lfsck->li_lpf_obj != NULL) {
1557 if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj))) {
1558 lfsck_object_put(env, lfsck->li_lpf_obj);
1559 lfsck->li_lpf_obj = NULL;
1562 } else if (rc == 0) {
1563 rc = lfsck_create_lpf(env, lfsck);
1566 if (child2 != NULL && !IS_ERR(child2))
1567 lfsck_object_put(env, child2);
1568 if (child1 != NULL && !IS_ERR(child1))
1569 lfsck_object_put(env, child1);
1574 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1576 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1577 struct seq_server_site *ss = lfsck_dev_site(lfsck);
1582 if (unlikely(ss == NULL))
1585 OBD_ALLOC_PTR(lfsck->li_seq);
1586 if (lfsck->li_seq == NULL)
1589 OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1591 GOTO(out, rc = -ENOMEM);
1593 snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1594 rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1596 OBD_FREE(prefix, MAX_OBD_NAME + 7);
1600 if (fid_is_sane(&bk->lb_last_fid))
1601 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1606 OBD_FREE_PTR(lfsck->li_seq);
1607 lfsck->li_seq = NULL;
1612 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1614 if (lfsck->li_seq != NULL) {
1615 seq_client_fini(lfsck->li_seq);
1616 OBD_FREE_PTR(lfsck->li_seq);
1617 lfsck->li_seq = NULL;
1621 void lfsck_instance_cleanup(const struct lu_env *env,
1622 struct lfsck_instance *lfsck)
1624 struct ptlrpc_thread *thread = &lfsck->li_thread;
1625 struct lfsck_component *com;
1626 struct lfsck_component *next;
1627 struct lfsck_lmv_unit *llu;
1628 struct lfsck_lmv_unit *llu_next;
1629 struct lfsck_lmv *llmv;
1632 LASSERT(list_empty(&lfsck->li_link));
1633 LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1635 if (lfsck->li_obj_oit != NULL) {
1636 lfsck_object_put(env, lfsck->li_obj_oit);
1637 lfsck->li_obj_oit = NULL;
1640 LASSERT(lfsck->li_obj_dir == NULL);
1641 LASSERT(lfsck->li_lmv == NULL);
1643 list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
1644 llmv = &llu->llu_lmv;
1646 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
1647 "still in using: %u\n",
1648 atomic_read(&llmv->ll_ref));
1650 lfsck_lmv_put(env, llmv);
1653 list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1654 lfsck_component_cleanup(env, com);
1657 LASSERT(list_empty(&lfsck->li_list_dir));
1659 list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1661 lfsck_component_cleanup(env, com);
1664 list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1665 lfsck_component_cleanup(env, com);
1668 lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1669 lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1671 if (lfsck->li_lfsck_dir != NULL) {
1672 lfsck_object_put(env, lfsck->li_lfsck_dir);
1673 lfsck->li_lfsck_dir = NULL;
1676 if (lfsck->li_bookmark_obj != NULL) {
1677 lfsck_object_put(env, lfsck->li_bookmark_obj);
1678 lfsck->li_bookmark_obj = NULL;
1681 if (lfsck->li_lpf_obj != NULL) {
1682 lfsck_object_put(env, lfsck->li_lpf_obj);
1683 lfsck->li_lpf_obj = NULL;
1686 if (lfsck->li_lpf_root_obj != NULL) {
1687 lfsck_object_put(env, lfsck->li_lpf_root_obj);
1688 lfsck->li_lpf_root_obj = NULL;
1691 if (lfsck->li_los != NULL) {
1692 local_oid_storage_fini(env, lfsck->li_los);
1693 lfsck->li_los = NULL;
1696 lfsck_fid_fini(lfsck);
1698 OBD_FREE_PTR(lfsck);
1701 static inline struct lfsck_instance *
1702 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1704 struct lfsck_instance *lfsck;
1706 list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1707 if (lfsck->li_bottom == key) {
1709 lfsck_instance_get(lfsck);
1711 list_del_init(&lfsck->li_link);
1720 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1723 struct lfsck_instance *lfsck;
1725 spin_lock(&lfsck_instance_lock);
1726 lfsck = __lfsck_instance_find(key, ref, unlink);
1727 spin_unlock(&lfsck_instance_lock);
1732 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1734 struct lfsck_instance *tmp;
1736 spin_lock(&lfsck_instance_lock);
1737 list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1738 if (lfsck->li_bottom == tmp->li_bottom) {
1739 spin_unlock(&lfsck_instance_lock);
1744 list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1745 spin_unlock(&lfsck_instance_lock);
1749 void lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1754 bool newline = (bits != 0 ? false : true);
1756 seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1758 for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1761 if (names[i] != NULL) {
1765 seq_printf(m, "%s%c", names[i],
1766 newline ? '\n' : ',');
1775 void lfsck_time_dump(struct seq_file *m, __u64 time, const char *name)
1778 seq_printf(m, "%s_time: N/A\n", name);
1779 seq_printf(m, "time_since_%s: N/A\n", name);
1781 seq_printf(m, "%s_time: %llu\n", name, time);
1782 seq_printf(m, "time_since_%s: %llu seconds\n",
1783 name, cfs_time_current_sec() - time);
1787 void lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1790 if (fid_is_zero(&pos->lp_dir_parent)) {
1791 if (pos->lp_oit_cookie == 0) {
1792 seq_printf(m, "%s: N/A, N/A, N/A\n", prefix);
1795 seq_printf(m, "%s: %llu, N/A, N/A\n",
1796 prefix, pos->lp_oit_cookie);
1798 seq_printf(m, "%s: %llu, "DFID", %#llx\n",
1799 prefix, pos->lp_oit_cookie,
1800 PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1804 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1805 struct lfsck_position *pos, bool init)
1807 const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1809 if (unlikely(lfsck->li_di_oit == NULL)) {
1810 memset(pos, 0, sizeof(*pos));
1814 pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1815 if (!lfsck->li_current_oit_processed && !init)
1816 pos->lp_oit_cookie--;
1818 LASSERT(pos->lp_oit_cookie > 0);
1820 if (lfsck->li_di_dir != NULL) {
1821 struct dt_object *dto = lfsck->li_obj_dir;
1823 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1826 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1827 fid_zero(&pos->lp_dir_parent);
1828 pos->lp_dir_cookie = 0;
1830 pos->lp_dir_parent = *lfsck_dto2fid(dto);
1833 fid_zero(&pos->lp_dir_parent);
1834 pos->lp_dir_cookie = 0;
1838 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1842 if (limit != LFSCK_SPEED_NO_LIMIT) {
1843 if (limit > msecs_to_jiffies(MSEC_PER_SEC)) {
1844 lfsck->li_sleep_rate = limit /
1845 msecs_to_jiffies(MSEC_PER_SEC);
1846 lfsck->li_sleep_jif = 1;
1848 lfsck->li_sleep_rate = 1;
1849 lfsck->li_sleep_jif = msecs_to_jiffies(MSEC_PER_SEC) /
1853 lfsck->li_sleep_jif = 0;
1854 lfsck->li_sleep_rate = 0;
1857 if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1858 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1865 void lfsck_control_speed(struct lfsck_instance *lfsck)
1867 struct ptlrpc_thread *thread = &lfsck->li_thread;
1868 struct l_wait_info lwi;
1870 if (lfsck->li_sleep_jif > 0 &&
1871 lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1872 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1873 LWI_ON_SIGNAL_NOOP, NULL);
1875 l_wait_event(thread->t_ctl_waitq,
1876 !thread_is_running(thread),
1878 lfsck->li_new_scanned = 0;
1882 void lfsck_control_speed_by_self(struct lfsck_component *com)
1884 struct lfsck_instance *lfsck = com->lc_lfsck;
1885 struct ptlrpc_thread *thread = &lfsck->li_thread;
1886 struct l_wait_info lwi;
1888 if (lfsck->li_sleep_jif > 0 &&
1889 com->lc_new_scanned >= lfsck->li_sleep_rate) {
1890 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1891 LWI_ON_SIGNAL_NOOP, NULL);
1893 l_wait_event(thread->t_ctl_waitq,
1894 !thread_is_running(thread),
1896 com->lc_new_scanned = 0;
1900 static struct lfsck_thread_args *
1901 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1902 struct lfsck_component *com,
1903 struct lfsck_start_param *lsp)
1905 struct lfsck_thread_args *lta;
1910 return ERR_PTR(-ENOMEM);
1912 rc = lu_env_init(<a->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1918 lta->lta_lfsck = lfsck_instance_get(lfsck);
1920 lta->lta_com = lfsck_component_get(com);
1927 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1929 if (lta->lta_com != NULL)
1930 lfsck_component_put(<a->lta_env, lta->lta_com);
1931 lfsck_instance_put(<a->lta_env, lta->lta_lfsck);
1932 lu_env_fini(<a->lta_env);
1936 struct lfsck_assistant_data *
1937 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1940 struct lfsck_assistant_data *lad;
1944 lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
1945 if (lad->lad_bitmap == NULL) {
1950 INIT_LIST_HEAD(&lad->lad_req_list);
1951 spin_lock_init(&lad->lad_lock);
1952 INIT_LIST_HEAD(&lad->lad_ost_list);
1953 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1954 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1955 INIT_LIST_HEAD(&lad->lad_mdt_list);
1956 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1957 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1958 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1960 lad->lad_name = name;
1966 struct lfsck_assistant_object *
1967 lfsck_assistant_object_init(const struct lu_env *env, const struct lu_fid *fid,
1968 const struct lu_attr *attr, __u64 cookie,
1971 struct lfsck_assistant_object *lso;
1975 return ERR_PTR(-ENOMEM);
1977 lso->lso_fid = *fid;
1979 lso->lso_attr = *attr;
1981 atomic_set(&lso->lso_ref, 1);
1982 lso->lso_oit_cookie = cookie;
1984 lso->lso_is_dir = 1;
1990 lfsck_assistant_object_load(const struct lu_env *env,
1991 struct lfsck_instance *lfsck,
1992 struct lfsck_assistant_object *lso)
1994 struct dt_object *obj;
1996 obj = lfsck_object_find_bottom(env, lfsck, &lso->lso_fid);
2000 if (unlikely(!dt_object_exists(obj) || lfsck_is_dead_obj(obj))) {
2002 lfsck_object_put(env, obj);
2004 return ERR_PTR(-ENOENT);
2007 if (lso->lso_is_dir && unlikely(!dt_try_as_dir(env, obj))) {
2008 lfsck_object_put(env, obj);
2010 return ERR_PTR(-ENOTDIR);
2017 * Generic LFSCK asynchronous communication interpretor function.
2018 * The LFSCK RPC reply for both the event notification and status
2019 * querying will be handled here.
2021 * \param[in] env pointer to the thread context
2022 * \param[in] req pointer to the LFSCK request
2023 * \param[in] args pointer to the lfsck_async_interpret_args
2024 * \param[in] rc the result for handling the LFSCK request
2026 * \retval 0 for success
2027 * \retval negative error number on failure
2029 int lfsck_async_interpret_common(const struct lu_env *env,
2030 struct ptlrpc_request *req,
2033 struct lfsck_async_interpret_args *laia = args;
2034 struct lfsck_component *com = laia->laia_com;
2035 struct lfsck_assistant_data *lad = com->lc_data;
2036 struct lfsck_tgt_descs *ltds = laia->laia_ltds;
2037 struct lfsck_tgt_desc *ltd = laia->laia_ltd;
2038 struct lfsck_request *lr = laia->laia_lr;
2040 LASSERT(com->lc_lfsck->li_master);
2042 switch (lr->lr_event) {
2045 CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
2047 lfsck_lfsck2name(com->lc_lfsck),
2048 (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2049 ltd->ltd_index, lad->lad_name, rc);
2051 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2052 struct lfsck_layout *lo = com->lc_file_ram;
2054 if (lr->lr_flags & LEF_TO_OST)
2055 lfsck_lad_set_bitmap(env, com,
2058 lo->ll_flags |= LF_INCOMPLETE;
2060 struct lfsck_namespace *ns = com->lc_file_ram;
2062 /* If some MDT does not join the namespace
2063 * LFSCK, then we cannot know whether there
2064 * is some name entry on such MDT that with
2065 * the referenced MDT-object on this MDT or
2066 * not. So the namespace LFSCK on this MDT
2067 * cannot handle orphan MDT-objects properly.
2068 * So we mark the LFSCK as LF_INCOMPLETE and
2069 * skip orphan MDT-objects handling. */
2070 ns->ln_flags |= LF_INCOMPLETE;
2075 spin_lock(<ds->ltd_lock);
2076 if (ltd->ltd_dead) {
2077 spin_unlock(<ds->ltd_lock);
2081 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2082 struct list_head *list;
2083 struct list_head *phase_list;
2085 if (ltd->ltd_layout_done) {
2086 spin_unlock(<ds->ltd_lock);
2090 if (lr->lr_flags & LEF_TO_OST) {
2091 list = &lad->lad_ost_list;
2092 phase_list = &lad->lad_ost_phase1_list;
2094 list = &lad->lad_mdt_list;
2095 phase_list = &lad->lad_mdt_phase1_list;
2098 if (list_empty(<d->ltd_layout_list))
2099 list_add_tail(<d->ltd_layout_list, list);
2100 if (list_empty(<d->ltd_layout_phase_list))
2101 list_add_tail(<d->ltd_layout_phase_list,
2104 if (ltd->ltd_namespace_done) {
2105 spin_unlock(<ds->ltd_lock);
2109 if (list_empty(<d->ltd_namespace_list))
2110 list_add_tail(<d->ltd_namespace_list,
2111 &lad->lad_mdt_list);
2112 if (list_empty(<d->ltd_namespace_phase_list))
2113 list_add_tail(<d->ltd_namespace_phase_list,
2114 &lad->lad_mdt_phase1_list);
2116 spin_unlock(<ds->ltd_lock);
2119 case LE_PHASE1_DONE:
2120 case LE_PHASE2_DONE:
2122 if (rc != 0 && rc != -EALREADY)
2123 CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
2124 "event = %d, rc = %d\n",
2125 lfsck_lfsck2name(com->lc_lfsck),
2126 (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2127 ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
2130 struct lfsck_reply *reply;
2131 struct list_head *list;
2132 struct list_head *phase_list;
2134 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2135 list = <d->ltd_layout_list;
2136 phase_list = <d->ltd_layout_phase_list;
2138 list = <d->ltd_namespace_list;
2139 phase_list = <d->ltd_namespace_phase_list;
2143 if (lr->lr_flags & LEF_QUERY_ALL) {
2144 lfsck_reset_ltd_status(ltd, com->lc_type);
2148 spin_lock(<ds->ltd_lock);
2149 list_del_init(phase_list);
2150 list_del_init(list);
2151 spin_unlock(<ds->ltd_lock);
2155 reply = req_capsule_server_get(&req->rq_pill,
2157 if (reply == NULL) {
2159 CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
2160 "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
2163 if (lr->lr_flags & LEF_QUERY_ALL) {
2164 lfsck_reset_ltd_status(ltd, com->lc_type);
2168 spin_lock(<ds->ltd_lock);
2169 list_del_init(phase_list);
2170 list_del_init(list);
2171 spin_unlock(<ds->ltd_lock);
2175 if (lr->lr_flags & LEF_QUERY_ALL) {
2176 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2177 ltd->ltd_layout_status = reply->lr_status;
2178 ltd->ltd_layout_repaired = reply->lr_repaired;
2180 ltd->ltd_namespace_status = reply->lr_status;
2181 ltd->ltd_namespace_repaired =
2187 switch (reply->lr_status) {
2188 case LS_SCANNING_PHASE1:
2190 case LS_SCANNING_PHASE2:
2191 spin_lock(<ds->ltd_lock);
2192 list_del_init(phase_list);
2193 if (ltd->ltd_dead) {
2194 spin_unlock(<ds->ltd_lock);
2198 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2199 if (ltd->ltd_layout_done) {
2200 spin_unlock(<ds->ltd_lock);
2204 if (lr->lr_flags & LEF_TO_OST)
2205 list_add_tail(phase_list,
2206 &lad->lad_ost_phase2_list);
2208 list_add_tail(phase_list,
2209 &lad->lad_mdt_phase2_list);
2211 if (ltd->ltd_namespace_done) {
2212 spin_unlock(<ds->ltd_lock);
2216 list_add_tail(phase_list,
2217 &lad->lad_mdt_phase2_list);
2219 spin_unlock(<ds->ltd_lock);
2222 spin_lock(<ds->ltd_lock);
2223 list_del_init(phase_list);
2224 list_del_init(list);
2225 spin_unlock(<ds->ltd_lock);
2231 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
2232 lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
2236 if (!laia->laia_shared) {
2238 lfsck_component_put(env, com);
2244 static void lfsck_interpret(const struct lu_env *env,
2245 struct lfsck_instance *lfsck,
2246 struct ptlrpc_request *req, void *args, int result)
2248 struct lfsck_async_interpret_args *laia = args;
2249 struct lfsck_component *com;
2251 LASSERT(laia->laia_com == NULL);
2252 LASSERT(laia->laia_shared);
2254 spin_lock(&lfsck->li_lock);
2255 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2256 laia->laia_com = com;
2257 lfsck_async_interpret_common(env, req, laia, result);
2260 list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
2261 laia->laia_com = com;
2262 lfsck_async_interpret_common(env, req, laia, result);
2264 spin_unlock(&lfsck->li_lock);
2267 static int lfsck_stop_notify(const struct lu_env *env,
2268 struct lfsck_instance *lfsck,
2269 struct lfsck_tgt_descs *ltds,
2270 struct lfsck_tgt_desc *ltd, __u16 type)
2272 struct lfsck_component *com;
2276 LASSERT(lfsck->li_master);
2278 spin_lock(&lfsck->li_lock);
2279 com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
2281 com = __lfsck_component_find(lfsck, type,
2282 &lfsck->li_list_double_scan);
2284 lfsck_component_get(com);
2285 spin_unlock(&lfsck->li_lock);
2288 struct lfsck_thread_info *info = lfsck_env_info(env);
2289 struct lfsck_async_interpret_args *laia = &info->lti_laia;
2290 struct lfsck_request *lr = &info->lti_lr;
2291 struct lfsck_assistant_data *lad = com->lc_data;
2292 struct list_head *list;
2293 struct list_head *phase_list;
2294 struct ptlrpc_request_set *set;
2296 set = ptlrpc_prep_set();
2298 lfsck_component_put(env, com);
2303 if (type == LFSCK_TYPE_LAYOUT) {
2304 list = <d->ltd_layout_list;
2305 phase_list = <d->ltd_layout_phase_list;
2307 list = <d->ltd_namespace_list;
2308 phase_list = <d->ltd_namespace_phase_list;
2311 spin_lock(<ds->ltd_lock);
2312 if (list_empty(list)) {
2313 LASSERT(list_empty(phase_list));
2314 spin_unlock(<ds->ltd_lock);
2315 ptlrpc_set_destroy(set);
2320 list_del_init(phase_list);
2321 list_del_init(list);
2322 spin_unlock(<ds->ltd_lock);
2324 memset(lr, 0, sizeof(*lr));
2325 lr->lr_index = lfsck_dev_idx(lfsck);
2326 lr->lr_event = LE_PEER_EXIT;
2327 lr->lr_active = type;
2328 lr->lr_status = LS_CO_PAUSED;
2329 if (ltds == &lfsck->li_ost_descs)
2330 lr->lr_flags = LEF_TO_OST;
2332 memset(laia, 0, sizeof(*laia));
2333 laia->laia_com = com;
2334 laia->laia_ltds = ltds;
2335 atomic_inc(<d->ltd_ref);
2336 laia->laia_ltd = ltd;
2339 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2340 lfsck_async_interpret_common,
2341 laia, LFSCK_NOTIFY);
2343 CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2344 "co-stop for %s: rc = %d\n",
2345 lfsck_lfsck2name(lfsck),
2346 (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2347 ltd->ltd_index, lad->lad_name, rc);
2350 rc = ptlrpc_set_wait(set);
2353 ptlrpc_set_destroy(set);
2354 lfsck_component_put(env, com);
2360 static int lfsck_async_interpret(const struct lu_env *env,
2361 struct ptlrpc_request *req,
2364 struct lfsck_async_interpret_args *laia = args;
2365 struct lfsck_instance *lfsck;
2367 lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2369 lfsck_interpret(env, lfsck, req, laia, rc);
2370 lfsck_tgt_put(laia->laia_ltd);
2371 if (rc != 0 && laia->laia_result != -EALREADY)
2372 laia->laia_result = rc;
2377 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2378 struct lfsck_request *lr,
2379 struct ptlrpc_request_set *set,
2380 ptlrpc_interpterer_t interpreter,
2381 void *args, int request)
2383 struct lfsck_async_interpret_args *laia;
2384 struct ptlrpc_request *req;
2385 struct lfsck_request *tmp;
2386 struct req_format *format;
2391 format = &RQF_LFSCK_NOTIFY;
2394 format = &RQF_LFSCK_QUERY;
2397 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2398 exp->exp_obd->obd_name, request, -EINVAL);
2402 req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2406 rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2408 ptlrpc_request_free(req);
2413 tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2415 ptlrpc_request_set_replen(req);
2417 laia = ptlrpc_req_async_args(req);
2418 *laia = *(struct lfsck_async_interpret_args *)args;
2419 if (laia->laia_com != NULL)
2420 lfsck_component_get(laia->laia_com);
2421 req->rq_interpret_reply = interpreter;
2422 req->rq_allow_intr = 1;
2423 ptlrpc_set_add_req(set, req);
2428 int lfsck_query_all(const struct lu_env *env, struct lfsck_component *com)
2430 struct lfsck_thread_info *info = lfsck_env_info(env);
2431 struct lfsck_request *lr = &info->lti_lr;
2432 struct lfsck_async_interpret_args *laia = &info->lti_laia;
2433 struct lfsck_instance *lfsck = com->lc_lfsck;
2434 struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
2435 struct lfsck_tgt_desc *ltd;
2436 struct ptlrpc_request_set *set;
2441 memset(lr, 0, sizeof(*lr));
2442 lr->lr_event = LE_QUERY;
2443 lr->lr_active = com->lc_type;
2444 lr->lr_flags = LEF_QUERY_ALL;
2446 memset(laia, 0, sizeof(*laia));
2447 laia->laia_com = com;
2450 set = ptlrpc_prep_set();
2455 laia->laia_ltds = ltds;
2456 down_read(<ds->ltd_rw_sem);
2457 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2458 ltd = lfsck_tgt_get(ltds, idx);
2459 LASSERT(ltd != NULL);
2461 laia->laia_ltd = ltd;
2462 up_read(<ds->ltd_rw_sem);
2463 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2464 lfsck_async_interpret_common,
2467 struct lfsck_assistant_data *lad = com->lc_data;
2469 CDEBUG(D_LFSCK, "%s: Fail to query %s %x for stat %s: "
2470 "rc = %d\n", lfsck_lfsck2name(lfsck),
2471 (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2472 ltd->ltd_index, lad->lad_name, rc);
2473 lfsck_reset_ltd_status(ltd, com->lc_type);
2476 down_read(<ds->ltd_rw_sem);
2478 up_read(<ds->ltd_rw_sem);
2480 if (com->lc_type == LFSCK_TYPE_LAYOUT && !(lr->lr_flags & LEF_TO_OST)) {
2481 ltds = &lfsck->li_ost_descs;
2482 lr->lr_flags |= LEF_TO_OST;
2486 rc = ptlrpc_set_wait(set);
2487 ptlrpc_set_destroy(set);
2492 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2493 struct lfsck_start_param *lsp)
2495 struct lfsck_instance *lfsck = com->lc_lfsck;
2496 struct lfsck_assistant_data *lad = com->lc_data;
2497 struct ptlrpc_thread *mthread = &lfsck->li_thread;
2498 struct ptlrpc_thread *athread = &lad->lad_thread;
2499 struct lfsck_thread_args *lta;
2500 struct task_struct *task;
2504 lad->lad_assistant_status = 0;
2505 lad->lad_post_result = 0;
2506 lad->lad_to_post = 0;
2507 lad->lad_to_double_scan = 0;
2508 lad->lad_in_double_scan = 0;
2510 lad->lad_advance_lock = false;
2511 thread_set_flags(athread, 0);
2513 lta = lfsck_thread_args_init(lfsck, com, lsp);
2515 RETURN(PTR_ERR(lta));
2517 task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2520 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2521 "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2522 lfsck_thread_args_fini(lta);
2524 struct l_wait_info lwi = { 0 };
2526 l_wait_event(mthread->t_ctl_waitq,
2527 thread_is_running(athread) ||
2528 thread_is_stopped(athread),
2530 if (unlikely(!thread_is_running(athread)))
2531 rc = lad->lad_assistant_status;
2539 int lfsck_checkpoint_generic(const struct lu_env *env,
2540 struct lfsck_component *com)
2542 struct lfsck_assistant_data *lad = com->lc_data;
2543 struct ptlrpc_thread *mthread = &com->lc_lfsck->li_thread;
2544 struct ptlrpc_thread *athread = &lad->lad_thread;
2545 struct l_wait_info lwi = { 0 };
2547 l_wait_event(mthread->t_ctl_waitq,
2548 list_empty(&lad->lad_req_list) ||
2549 !thread_is_running(mthread) ||
2550 thread_is_stopped(athread),
2553 if (!thread_is_running(mthread) || thread_is_stopped(athread))
2554 return LFSCK_CHECKPOINT_SKIP;
2559 void lfsck_post_generic(const struct lu_env *env,
2560 struct lfsck_component *com, int *result)
2562 struct lfsck_assistant_data *lad = com->lc_data;
2563 struct ptlrpc_thread *athread = &lad->lad_thread;
2564 struct ptlrpc_thread *mthread = &com->lc_lfsck->li_thread;
2565 struct l_wait_info lwi = { 0 };
2567 lad->lad_post_result = *result;
2570 lad->lad_to_post = 1;
2572 CDEBUG(D_LFSCK, "%s: waiting for assistant to do %s post, rc = %d\n",
2573 lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, *result);
2575 wake_up_all(&athread->t_ctl_waitq);
2576 l_wait_event(mthread->t_ctl_waitq,
2577 (*result > 0 && list_empty(&lad->lad_req_list)) ||
2578 thread_is_stopped(athread),
2581 if (lad->lad_assistant_status < 0)
2582 *result = lad->lad_assistant_status;
2584 CDEBUG(D_LFSCK, "%s: the assistant has done %s post, rc = %d\n",
2585 lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, *result);
2588 int lfsck_double_scan_generic(const struct lu_env *env,
2589 struct lfsck_component *com, int status)
2591 struct lfsck_assistant_data *lad = com->lc_data;
2592 struct ptlrpc_thread *mthread = &com->lc_lfsck->li_thread;
2593 struct ptlrpc_thread *athread = &lad->lad_thread;
2594 struct l_wait_info lwi = { 0 };
2596 if (status != LS_SCANNING_PHASE2)
2599 lad->lad_to_double_scan = 1;
2601 CDEBUG(D_LFSCK, "%s: waiting for assistant to do %s double_scan, "
2603 lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, status);
2605 wake_up_all(&athread->t_ctl_waitq);
2606 l_wait_event(mthread->t_ctl_waitq,
2607 lad->lad_in_double_scan ||
2608 thread_is_stopped(athread),
2611 CDEBUG(D_LFSCK, "%s: the assistant has done %s double_scan, "
2612 "status %d\n", lfsck_lfsck2name(com->lc_lfsck), lad->lad_name,
2613 lad->lad_assistant_status);
2615 if (lad->lad_assistant_status < 0)
2616 return lad->lad_assistant_status;
2621 void lfsck_quit_generic(const struct lu_env *env,
2622 struct lfsck_component *com)
2624 struct lfsck_assistant_data *lad = com->lc_data;
2625 struct ptlrpc_thread *mthread = &com->lc_lfsck->li_thread;
2626 struct ptlrpc_thread *athread = &lad->lad_thread;
2627 struct l_wait_info lwi = { 0 };
2630 wake_up_all(&athread->t_ctl_waitq);
2631 l_wait_event(mthread->t_ctl_waitq,
2632 thread_is_init(athread) ||
2633 thread_is_stopped(athread),
2637 /* external interfaces */
2639 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2642 struct lfsck_instance *lfsck;
2646 rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2650 lfsck = lfsck_instance_find(key, true, false);
2651 if (likely(lfsck != NULL)) {
2652 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2653 lfsck_instance_put(&env, lfsck);
2662 EXPORT_SYMBOL(lfsck_get_speed);
2664 int lfsck_set_speed(struct dt_device *key, __u32 val)
2667 struct lfsck_instance *lfsck;
2671 rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2675 lfsck = lfsck_instance_find(key, true, false);
2676 if (likely(lfsck != NULL)) {
2677 mutex_lock(&lfsck->li_mutex);
2678 if (__lfsck_set_speed(lfsck, val))
2679 rc = lfsck_bookmark_store(&env, lfsck);
2680 mutex_unlock(&lfsck->li_mutex);
2681 lfsck_instance_put(&env, lfsck);
2690 EXPORT_SYMBOL(lfsck_set_speed);
2692 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2695 struct lfsck_instance *lfsck;
2699 rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2703 lfsck = lfsck_instance_find(key, true, false);
2704 if (likely(lfsck != NULL)) {
2705 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2706 lfsck_instance_put(&env, lfsck);
2715 EXPORT_SYMBOL(lfsck_get_windows);
2717 int lfsck_set_windows(struct dt_device *key, int val)
2720 struct lfsck_instance *lfsck;
2724 rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2728 lfsck = lfsck_instance_find(key, true, false);
2729 if (likely(lfsck != NULL)) {
2730 if (val < 1 || val > LFSCK_ASYNC_WIN_MAX) {
2731 CWARN("%s: invalid async windows size that may "
2732 "cause memory issues. The valid range is "
2734 lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2736 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2737 mutex_lock(&lfsck->li_mutex);
2738 lfsck->li_bookmark_ram.lb_async_windows = val;
2739 rc = lfsck_bookmark_store(&env, lfsck);
2740 mutex_unlock(&lfsck->li_mutex);
2742 lfsck_instance_put(&env, lfsck);
2751 EXPORT_SYMBOL(lfsck_set_windows);
2753 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2756 struct lfsck_instance *lfsck;
2757 struct lfsck_component *com;
2761 rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2765 lfsck = lfsck_instance_find(key, true, false);
2766 if (likely(lfsck != NULL)) {
2767 com = lfsck_component_find(lfsck, type);
2768 if (likely(com != NULL)) {
2769 com->lc_ops->lfsck_dump(&env, com, m);
2770 lfsck_component_put(&env, com);
2775 lfsck_instance_put(&env, lfsck);
2784 EXPORT_SYMBOL(lfsck_dump);
2786 static int lfsck_stop_all(const struct lu_env *env,
2787 struct lfsck_instance *lfsck,
2788 struct lfsck_stop *stop)
2790 struct lfsck_thread_info *info = lfsck_env_info(env);
2791 struct lfsck_request *lr = &info->lti_lr;
2792 struct lfsck_async_interpret_args *laia = &info->lti_laia;
2793 struct ptlrpc_request_set *set;
2794 struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
2795 struct lfsck_tgt_desc *ltd;
2796 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
2802 LASSERT(stop->ls_flags & LPF_BROADCAST);
2804 set = ptlrpc_prep_set();
2805 if (unlikely(set == NULL))
2808 memset(lr, 0, sizeof(*lr));
2809 lr->lr_event = LE_STOP;
2810 lr->lr_index = lfsck_dev_idx(lfsck);
2811 lr->lr_status = stop->ls_status;
2812 lr->lr_version = bk->lb_version;
2813 lr->lr_active = LFSCK_TYPES_ALL;
2814 lr->lr_param = stop->ls_flags;
2816 memset(laia, 0, sizeof(*laia));
2817 laia->laia_ltds = ltds;
2819 laia->laia_shared = 1;
2821 down_read(<ds->ltd_rw_sem);
2822 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2823 ltd = lfsck_tgt_get(ltds, idx);
2824 LASSERT(ltd != NULL);
2826 laia->laia_ltd = ltd;
2827 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2828 lfsck_async_interpret, laia,
2831 lfsck_interpret(env, lfsck, NULL, laia, rc);
2833 CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2834 "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2838 up_read(<ds->ltd_rw_sem);
2840 rc = ptlrpc_set_wait(set);
2841 ptlrpc_set_destroy(set);
2844 rc = laia->laia_result;
2846 if (rc == -EALREADY)
2850 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2851 lfsck_lfsck2name(lfsck), rc);
2853 RETURN(rc != 0 ? rc : rc1);
2856 static int lfsck_start_all(const struct lu_env *env,
2857 struct lfsck_instance *lfsck,
2858 struct lfsck_start *start)
2860 struct lfsck_thread_info *info = lfsck_env_info(env);
2861 struct lfsck_request *lr = &info->lti_lr;
2862 struct lfsck_async_interpret_args *laia = &info->lti_laia;
2863 struct ptlrpc_request_set *set;
2864 struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
2865 struct lfsck_tgt_desc *ltd;
2866 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
2871 LASSERT(start->ls_flags & LPF_BROADCAST);
2873 set = ptlrpc_prep_set();
2874 if (unlikely(set == NULL))
2877 memset(lr, 0, sizeof(*lr));
2878 lr->lr_event = LE_START;
2879 lr->lr_index = lfsck_dev_idx(lfsck);
2880 lr->lr_speed = bk->lb_speed_limit;
2881 lr->lr_version = bk->lb_version;
2882 lr->lr_active = start->ls_active;
2883 lr->lr_param = start->ls_flags;
2884 lr->lr_async_windows = bk->lb_async_windows;
2885 lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2886 LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ |
2889 memset(laia, 0, sizeof(*laia));
2890 laia->laia_ltds = ltds;
2892 laia->laia_shared = 1;
2894 down_read(<ds->ltd_rw_sem);
2895 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2896 ltd = lfsck_tgt_get(ltds, idx);
2897 LASSERT(ltd != NULL);
2899 laia->laia_ltd = ltd;
2900 ltd->ltd_layout_done = 0;
2901 ltd->ltd_namespace_done = 0;
2902 ltd->ltd_synced_failures = 0;
2903 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2904 lfsck_async_interpret, laia,
2907 lfsck_interpret(env, lfsck, NULL, laia, rc);
2909 CERROR("%s: cannot notify MDT %x for LFSCK "
2910 "start, failout: rc = %d\n",
2911 lfsck_lfsck2name(lfsck), idx, rc);
2915 up_read(<ds->ltd_rw_sem);
2918 ptlrpc_set_destroy(set);
2923 rc = ptlrpc_set_wait(set);
2924 ptlrpc_set_destroy(set);
2927 rc = laia->laia_result;
2930 struct lfsck_stop *stop = &info->lti_stop;
2932 CERROR("%s: cannot start LFSCK on some MDTs, "
2933 "stop all: rc = %d\n",
2934 lfsck_lfsck2name(lfsck), rc);
2935 if (rc != -EALREADY) {
2936 stop->ls_status = LS_FAILED;
2937 stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2938 lfsck_stop_all(env, lfsck, stop);
2945 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2946 struct lfsck_start_param *lsp)
2948 struct lfsck_start *start = lsp->lsp_start;
2949 struct lfsck_instance *lfsck;
2950 struct lfsck_bookmark *bk;
2951 struct ptlrpc_thread *thread;
2952 struct lfsck_component *com;
2953 struct l_wait_info lwi = { 0 };
2954 struct lfsck_thread_args *lta;
2955 struct task_struct *task;
2956 struct lfsck_tgt_descs *ltds;
2957 struct lfsck_tgt_desc *ltd;
2965 lfsck = lfsck_instance_find(key, true, false);
2966 if (unlikely(lfsck == NULL))
2969 /* System is not ready, try again later. */
2970 if (unlikely(lfsck->li_namespace == NULL))
2971 GOTO(put, rc = -EAGAIN);
2973 /* start == NULL means auto trigger paused LFSCK. */
2974 if ((start == NULL) &&
2975 (list_empty(&lfsck->li_list_scan) ||
2976 OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2979 bk = &lfsck->li_bookmark_ram;
2980 thread = &lfsck->li_thread;
2981 mutex_lock(&lfsck->li_mutex);
2982 spin_lock(&lfsck->li_lock);
2983 if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2985 if (unlikely(start == NULL)) {
2986 spin_unlock(&lfsck->li_lock);
2990 while (start->ls_active != 0) {
2991 if (!(type & start->ls_active)) {
2996 com = __lfsck_component_find(lfsck, type,
2997 &lfsck->li_list_scan);
2999 com = __lfsck_component_find(lfsck, type,
3000 &lfsck->li_list_double_scan);
3006 if (com->lc_ops->lfsck_join != NULL) {
3007 rc = com->lc_ops->lfsck_join( env, com, lsp);
3008 if (rc != 0 && rc != -EALREADY)
3011 start->ls_active &= ~type;
3014 spin_unlock(&lfsck->li_lock);
3017 spin_unlock(&lfsck->li_lock);
3019 lfsck->li_status = 0;
3020 lfsck->li_oit_over = 0;
3021 lfsck->li_start_unplug = 0;
3022 lfsck->li_drop_dryrun = 0;
3023 lfsck->li_new_scanned = 0;
3025 /* For auto trigger. */
3029 if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
3030 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
3031 lfsck_lfsck2name(lfsck));
3033 GOTO(out, rc = -EPERM);
3036 start->ls_version = bk->lb_version;
3038 if (start->ls_active != 0) {
3039 struct lfsck_component *next;
3041 if (start->ls_active == LFSCK_TYPES_ALL)
3042 start->ls_active = LFSCK_TYPES_SUPPORTED;
3044 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
3045 start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
3046 GOTO(out, rc = -ENOTSUPP);
3049 list_for_each_entry_safe(com, next,
3050 &lfsck->li_list_scan, lc_link) {
3051 if (!(com->lc_type & start->ls_active)) {
3052 rc = com->lc_ops->lfsck_post(env, com, 0,
3059 while (start->ls_active != 0) {
3060 if (type & start->ls_active) {
3061 com = __lfsck_component_find(lfsck, type,
3062 &lfsck->li_list_idle);
3064 /* The component status will be updated
3065 * when its prep() is called later by
3066 * the LFSCK main engine. */
3067 list_move_tail(&com->lc_link,
3068 &lfsck->li_list_scan);
3069 start->ls_active &= ~type;
3075 if (list_empty(&lfsck->li_list_scan)) {
3076 /* The speed limit will be used to control both the LFSCK and
3077 * low layer scrub (if applied), need to be handled firstly. */
3078 if (start->ls_valid & LSV_SPEED_LIMIT) {
3079 if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
3080 rc = lfsck_bookmark_store(env, lfsck);
3089 if (start->ls_flags & LPF_RESET)
3090 flags |= DOIF_RESET;
3092 rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
3096 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
3097 start->ls_active |= com->lc_type;
3098 if (flags & DOIF_RESET) {
3099 rc = com->lc_ops->lfsck_reset(env, com, false);
3105 ltds = &lfsck->li_mdt_descs;
3106 down_read(<ds->ltd_rw_sem);
3107 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
3108 ltd = lfsck_ltd2tgt(ltds, idx);
3109 LASSERT(ltd != NULL);
3111 ltd->ltd_layout_done = 0;
3112 ltd->ltd_namespace_done = 0;
3113 ltd->ltd_synced_failures = 0;
3114 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_NAMESPACE);
3115 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_LAYOUT);
3116 list_del_init(<d->ltd_layout_phase_list);
3117 list_del_init(<d->ltd_layout_list);
3118 list_del_init(<d->ltd_namespace_phase_list);
3119 list_del_init(<d->ltd_namespace_list);
3121 up_read(<ds->ltd_rw_sem);
3123 ltds = &lfsck->li_ost_descs;
3124 down_read(<ds->ltd_rw_sem);
3125 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
3126 ltd = lfsck_ltd2tgt(ltds, idx);
3127 LASSERT(ltd != NULL);
3129 ltd->ltd_layout_done = 0;
3130 ltd->ltd_synced_failures = 0;
3131 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_LAYOUT);
3132 list_del_init(<d->ltd_layout_phase_list);
3133 list_del_init(<d->ltd_layout_list);
3135 up_read(<ds->ltd_rw_sem);
3138 lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
3139 if (bk->lb_param & LPF_DRYRUN)
3140 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
3142 if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
3143 valid |= DOIV_ERROR_HANDLE;
3144 if (start->ls_flags & LPF_FAILOUT)
3145 flags |= DOIF_FAILOUT;
3148 if (start != NULL && start->ls_valid & LSV_DRYRUN) {
3149 valid |= DOIV_DRYRUN;
3150 if (start->ls_flags & LPF_DRYRUN)
3151 flags |= DOIF_DRYRUN;
3154 if (!list_empty(&lfsck->li_list_scan))
3155 flags |= DOIF_OUTUSED;
3157 lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
3158 thread_set_flags(thread, 0);
3159 lta = lfsck_thread_args_init(lfsck, NULL, lsp);
3161 GOTO(out, rc = PTR_ERR(lta));
3163 __lfsck_set_speed(lfsck, bk->lb_speed_limit);
3164 task = kthread_run(lfsck_master_engine, lta, "lfsck");
3167 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
3168 lfsck_lfsck2name(lfsck), rc);
3169 lfsck_thread_args_fini(lta);
3174 l_wait_event(thread->t_ctl_waitq,
3175 thread_is_running(thread) ||
3176 thread_is_stopped(thread),
3178 if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
3179 lfsck->li_start_unplug = 1;
3180 wake_up_all(&thread->t_ctl_waitq);
3185 /* release lfsck::li_mutex to avoid deadlock. */
3186 mutex_unlock(&lfsck->li_mutex);
3187 rc = lfsck_start_all(env, lfsck, start);
3189 spin_lock(&lfsck->li_lock);
3190 if (thread_is_stopped(thread)) {
3191 spin_unlock(&lfsck->li_lock);
3193 lfsck->li_status = LS_FAILED;
3194 lfsck->li_flags = 0;
3195 thread_set_flags(thread, SVC_STOPPING);
3196 spin_unlock(&lfsck->li_lock);
3198 lfsck->li_start_unplug = 1;
3199 wake_up_all(&thread->t_ctl_waitq);
3200 l_wait_event(thread->t_ctl_waitq,
3201 thread_is_stopped(thread),
3205 lfsck->li_start_unplug = 1;
3206 wake_up_all(&thread->t_ctl_waitq);
3212 mutex_unlock(&lfsck->li_mutex);
3215 lfsck_instance_put(env, lfsck);
3217 return rc < 0 ? rc : 0;
3219 EXPORT_SYMBOL(lfsck_start);
3221 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
3222 struct lfsck_stop *stop)
3224 struct lfsck_instance *lfsck;
3225 struct ptlrpc_thread *thread;
3226 struct l_wait_info lwi = { 0 };
3231 lfsck = lfsck_instance_find(key, true, false);
3232 if (unlikely(lfsck == NULL))
3235 thread = &lfsck->li_thread;
3236 /* release lfsck::li_mutex to avoid deadlock. */
3237 if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
3238 if (!lfsck->li_master) {
3239 CERROR("%s: only allow to specify '-A' via MDS\n",
3240 lfsck_lfsck2name(lfsck));
3242 GOTO(out, rc = -EPERM);
3245 rc1 = lfsck_stop_all(env, lfsck, stop);
3248 mutex_lock(&lfsck->li_mutex);
3249 spin_lock(&lfsck->li_lock);
3250 /* no error if LFSCK is already stopped, or was never started */
3251 if (thread_is_init(thread) || thread_is_stopped(thread)) {
3252 spin_unlock(&lfsck->li_lock);
3257 lfsck->li_status = stop->ls_status;
3258 lfsck->li_flags = stop->ls_flags;
3260 lfsck->li_status = LS_STOPPED;
3261 lfsck->li_flags = 0;
3264 thread_set_flags(thread, SVC_STOPPING);
3266 if (lfsck->li_master) {
3267 struct lfsck_component *com;
3268 struct lfsck_assistant_data *lad;
3270 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
3272 spin_lock(&lad->lad_lock);
3273 if (lad->lad_task != NULL)
3274 force_sig(SIGINT, lad->lad_task);
3275 spin_unlock(&lad->lad_lock);
3278 list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
3280 spin_lock(&lad->lad_lock);
3281 if (lad->lad_task != NULL)
3282 force_sig(SIGINT, lad->lad_task);
3283 spin_unlock(&lad->lad_lock);
3287 spin_unlock(&lfsck->li_lock);
3289 wake_up_all(&thread->t_ctl_waitq);
3290 l_wait_event(thread->t_ctl_waitq,
3291 thread_is_stopped(thread),
3297 mutex_unlock(&lfsck->li_mutex);
3298 lfsck_instance_put(env, lfsck);
3300 return rc != 0 ? rc : rc1;
3302 EXPORT_SYMBOL(lfsck_stop);
3304 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
3305 struct lfsck_request *lr, struct thandle *th)
3307 int rc = -EOPNOTSUPP;
3310 switch (lr->lr_event) {
3312 struct lfsck_start *start = &lfsck_env_info(env)->lti_start;
3313 struct lfsck_start_param lsp;
3315 memset(start, 0, sizeof(*start));
3316 start->ls_valid = lr->lr_valid;
3317 start->ls_speed_limit = lr->lr_speed;
3318 start->ls_version = lr->lr_version;
3319 start->ls_active = lr->lr_active;
3320 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3321 start->ls_async_windows = lr->lr_async_windows;
3323 lsp.lsp_start = start;
3324 lsp.lsp_index = lr->lr_index;
3325 lsp.lsp_index_valid = 1;
3326 rc = lfsck_start(env, key, &lsp);
3330 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
3332 memset(stop, 0, sizeof(*stop));
3333 stop->ls_status = lr->lr_status;
3334 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3335 rc = lfsck_stop(env, key, stop);
3338 case LE_PHASE1_DONE:
3339 case LE_PHASE2_DONE:
3340 case LE_FID_ACCESSED:
3342 case LE_CONDITIONAL_DESTROY:
3343 case LE_SKIP_NLINK_DECLARE:
3345 case LE_SET_LMV_MASTER:
3346 case LE_SET_LMV_SLAVE:
3347 case LE_PAIRS_VERIFY: {
3348 struct lfsck_instance *lfsck;
3349 struct lfsck_component *com;
3351 lfsck = lfsck_instance_find(key, true, false);
3352 if (unlikely(lfsck == NULL))
3355 com = lfsck_component_find(lfsck, lr->lr_active);
3356 if (likely(com != NULL)) {
3357 rc = com->lc_ops->lfsck_in_notify(env, com, lr, th);
3358 lfsck_component_put(env, com);
3361 lfsck_instance_put(env, lfsck);
3370 EXPORT_SYMBOL(lfsck_in_notify);
3372 int lfsck_query(const struct lu_env *env, struct dt_device *key,
3373 struct lfsck_request *req, struct lfsck_reply *rep,
3374 struct lfsck_query *que)
3376 struct lfsck_instance *lfsck;
3377 struct lfsck_component *com;
3383 lfsck = lfsck_instance_find(key, true, false);
3384 if (unlikely(lfsck == NULL))
3388 if (que->lu_types == LFSCK_TYPES_ALL)
3390 LFSCK_TYPES_SUPPORTED & ~LFSCK_TYPE_SCRUB;
3392 if (que->lu_types & ~LFSCK_TYPES_SUPPORTED) {
3393 que->lu_types &= ~LFSCK_TYPES_SUPPORTED;
3395 GOTO(out, rc = -ENOTSUPP);
3398 for (i = 0, type = 1 << i; i < LFSCK_TYPE_BITS;
3399 i++, type = 1 << i) {
3400 if (!(que->lu_types & type))
3404 com = lfsck_component_find(lfsck, type);
3405 if (unlikely(com == NULL))
3406 GOTO(out, rc = -ENOTSUPP);
3408 memset(que->lu_mdts_count[i], 0,
3409 sizeof(__u32) * (LS_MAX + 1));
3410 memset(que->lu_osts_count[i], 0,
3411 sizeof(__u32) * (LS_MAX + 1));
3412 que->lu_repaired[i] = 0;
3413 rc = com->lc_ops->lfsck_query(env, com, req, rep,
3415 lfsck_component_put(env, com);
3420 if (!(que->lu_flags & LPF_WAIT))
3423 for (i = 0, type = 1 << i; i < LFSCK_TYPE_BITS;
3424 i++, type = 1 << i) {
3425 if (!(que->lu_types & type))
3428 if (que->lu_mdts_count[i][LS_SCANNING_PHASE1] != 0 ||
3429 que->lu_mdts_count[i][LS_SCANNING_PHASE2] != 0 ||
3430 que->lu_osts_count[i][LS_SCANNING_PHASE1] != 0 ||
3431 que->lu_osts_count[i][LS_SCANNING_PHASE2] != 0) {
3432 struct l_wait_info lwi;
3434 /* If it is required to wait, then sleep
3435 * 3 seconds and try to query again. */
3436 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(3),
3440 rc = l_wait_event(lfsck->li_thread.t_ctl_waitq,
3442 if (rc == -ETIMEDOUT)
3447 com = lfsck_component_find(lfsck, req->lr_active);
3448 if (likely(com != NULL)) {
3449 rc = com->lc_ops->lfsck_query(env, com, req, rep,
3451 lfsck_component_put(env, com);
3460 lfsck_instance_put(env, lfsck);
3463 EXPORT_SYMBOL(lfsck_query);
3465 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
3466 struct ldlm_namespace *ns)
3468 struct lfsck_instance *lfsck;
3471 lfsck = lfsck_instance_find(key, true, false);
3472 if (likely(lfsck != NULL)) {
3473 lfsck->li_namespace = ns;
3474 lfsck_instance_put(env, lfsck);
3480 EXPORT_SYMBOL(lfsck_register_namespace);
3482 int lfsck_register(const struct lu_env *env, struct dt_device *key,
3483 struct dt_device *next, struct obd_device *obd,
3484 lfsck_out_notify notify, void *notify_data, bool master)
3486 struct lfsck_instance *lfsck;
3487 struct dt_object *root = NULL;
3488 struct dt_object *obj = NULL;
3489 struct lu_fid *fid = &lfsck_env_info(env)->lti_fid;
3493 lfsck = lfsck_instance_find(key, false, false);
3494 if (unlikely(lfsck != NULL))
3497 OBD_ALLOC_PTR(lfsck);
3501 mutex_init(&lfsck->li_mutex);
3502 spin_lock_init(&lfsck->li_lock);
3503 INIT_LIST_HEAD(&lfsck->li_link);
3504 INIT_LIST_HEAD(&lfsck->li_list_scan);
3505 INIT_LIST_HEAD(&lfsck->li_list_dir);
3506 INIT_LIST_HEAD(&lfsck->li_list_double_scan);
3507 INIT_LIST_HEAD(&lfsck->li_list_idle);
3508 INIT_LIST_HEAD(&lfsck->li_list_lmv);
3509 atomic_set(&lfsck->li_ref, 1);
3510 atomic_set(&lfsck->li_double_scan_count, 0);
3511 init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
3512 lfsck->li_out_notify = notify;
3513 lfsck->li_out_notify_data = notify_data;
3514 lfsck->li_next = next;
3515 lfsck->li_bottom = key;
3516 lfsck->li_obd = obd;
3518 rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3522 rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3526 fid->f_seq = FID_SEQ_LOCAL_NAME;
3529 rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3533 rc = dt_root_get(env, key, fid);
3537 root = dt_locate(env, key, fid);
3539 GOTO(out, rc = PTR_ERR(root));
3541 if (unlikely(!dt_try_as_dir(env, root)))
3542 GOTO(out, rc = -ENOTDIR);
3544 lfsck->li_local_root_fid = *fid;
3546 lfsck->li_master = 1;
3547 if (lfsck_dev_idx(lfsck) == 0) {
3548 struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3549 const struct lu_name *cname;
3551 rc = dt_lookup(env, root,
3552 (struct dt_rec *)(&lfsck->li_global_root_fid),
3553 (const struct dt_key *)"ROOT");
3557 obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3559 GOTO(out, rc = PTR_ERR(obj));
3561 if (unlikely(!dt_try_as_dir(env, obj)))
3562 GOTO(out, rc = -ENOTDIR);
3564 rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3565 (const struct dt_key *)dotlustre);
3569 lfsck_object_put(env, obj);
3570 obj = dt_locate(env, key, fid);
3572 GOTO(out, rc = PTR_ERR(obj));
3574 cname = lfsck_name_get_const(env, dotlustre,
3576 rc = lfsck_verify_linkea(env, obj, cname,
3577 &lfsck->li_global_root_fid);
3581 if (unlikely(!dt_try_as_dir(env, obj)))
3582 GOTO(out, rc = -ENOTDIR);
3585 rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3586 (const struct dt_key *)lostfound);
3590 lfsck_object_put(env, obj);
3591 obj = dt_locate(env, key, fid);
3593 GOTO(out, rc = PTR_ERR(obj));
3595 cname = lfsck_name_get_const(env, lostfound,
3597 rc = lfsck_verify_linkea(env, obj, cname, pfid);
3601 lfsck_object_put(env, obj);
3606 fid->f_seq = FID_SEQ_LOCAL_FILE;
3607 fid->f_oid = OTABLE_IT_OID;
3609 obj = dt_locate(env, key, fid);
3611 GOTO(out, rc = PTR_ERR(obj));
3613 rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3617 lfsck->li_obj_oit = obj;
3618 obj = local_file_find_or_create(env, lfsck->li_los, root, LFSCK_DIR,
3619 S_IFDIR | S_IRUGO | S_IWUSR);
3621 GOTO(out, rc = PTR_ERR(obj));
3623 lu_object_get(&obj->do_lu);
3624 lfsck->li_lfsck_dir = obj;
3625 rc = lfsck_bookmark_setup(env, lfsck);
3630 rc = lfsck_fid_init(lfsck);
3634 rc = lfsck_namespace_setup(env, lfsck);
3639 rc = lfsck_layout_setup(env, lfsck);
3643 /* XXX: more LFSCK components initialization to be added here. */
3645 rc = lfsck_instance_add(lfsck);
3647 rc = lfsck_add_target_from_orphan(env, lfsck);
3649 if (obj != NULL && !IS_ERR(obj))
3650 lfsck_object_put(env, obj);
3651 if (root != NULL && !IS_ERR(root))
3652 lfsck_object_put(env, root);
3654 lfsck_instance_cleanup(env, lfsck);
3657 EXPORT_SYMBOL(lfsck_register);
3659 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3661 struct lfsck_instance *lfsck;
3663 lfsck = lfsck_instance_find(key, false, true);
3665 lfsck_instance_put(env, lfsck);
3667 EXPORT_SYMBOL(lfsck_degister);
3669 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3670 struct dt_device *tgt, struct obd_export *exp,
3671 __u32 index, bool for_ost)
3673 struct lfsck_instance *lfsck;
3674 struct lfsck_tgt_desc *ltd;
3685 INIT_LIST_HEAD(<d->ltd_orphan_list);
3686 INIT_LIST_HEAD(<d->ltd_layout_list);
3687 INIT_LIST_HEAD(<d->ltd_layout_phase_list);
3688 INIT_LIST_HEAD(<d->ltd_namespace_list);
3689 INIT_LIST_HEAD(<d->ltd_namespace_phase_list);
3690 atomic_set(<d->ltd_ref, 1);
3691 ltd->ltd_index = index;
3693 spin_lock(&lfsck_instance_lock);
3694 lfsck = __lfsck_instance_find(key, true, false);
3695 if (lfsck == NULL) {
3697 list_add_tail(<d->ltd_orphan_list,
3698 &lfsck_ost_orphan_list);
3700 list_add_tail(<d->ltd_orphan_list,
3701 &lfsck_mdt_orphan_list);
3702 spin_unlock(&lfsck_instance_lock);
3706 spin_unlock(&lfsck_instance_lock);
3708 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
3712 lfsck_instance_put(env, lfsck);
3716 EXPORT_SYMBOL(lfsck_add_target);
3718 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
3719 struct dt_device *tgt, __u32 index, bool for_ost)
3721 struct lfsck_instance *lfsck;
3722 struct lfsck_tgt_descs *ltds;
3723 struct lfsck_tgt_desc *ltd;
3724 struct list_head *head;
3727 head = &lfsck_ost_orphan_list;
3729 head = &lfsck_mdt_orphan_list;
3731 spin_lock(&lfsck_instance_lock);
3732 list_for_each_entry(ltd, head, ltd_orphan_list) {
3733 if (ltd->ltd_tgt == tgt) {
3734 list_del_init(<d->ltd_orphan_list);
3735 spin_unlock(&lfsck_instance_lock);
3743 lfsck = __lfsck_instance_find(key, true, false);
3744 spin_unlock(&lfsck_instance_lock);
3745 if (unlikely(lfsck == NULL))
3749 ltds = &lfsck->li_ost_descs;
3751 ltds = &lfsck->li_mdt_descs;
3753 down_write(<ds->ltd_rw_sem);
3754 LASSERT(ltds->ltd_tgts_bitmap != NULL);
3756 if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
3759 ltd = lfsck_ltd2tgt(ltds, index);
3760 if (unlikely(ltd == NULL))
3763 LASSERT(ltds->ltd_tgtnr > 0);
3766 cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
3767 lfsck_assign_tgt(ltds, NULL, index);
3772 head = &lfsck->li_ost_descs.ltd_orphan;
3774 head = &lfsck->li_mdt_descs.ltd_orphan;
3776 list_for_each_entry(ltd, head, ltd_orphan_list) {
3777 if (ltd->ltd_tgt == tgt) {
3778 list_del_init(<d->ltd_orphan_list);
3784 up_write(<ds->ltd_rw_sem);
3786 spin_lock(<ds->ltd_lock);
3788 spin_unlock(<ds->ltd_lock);
3789 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
3790 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
3794 lfsck_instance_put(env, lfsck);
3796 EXPORT_SYMBOL(lfsck_del_target);
3798 static int __init lfsck_init(void)
3802 INIT_LIST_HEAD(&lfsck_instance_list);
3803 INIT_LIST_HEAD(&lfsck_ost_orphan_list);
3804 INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
3805 lfsck_key_init_generic(&lfsck_thread_key, NULL);
3806 rc = lu_context_key_register(&lfsck_thread_key);
3808 tgt_register_lfsck_in_notify(lfsck_in_notify);
3809 tgt_register_lfsck_query(lfsck_query);
3815 static void __exit lfsck_exit(void)
3817 struct lfsck_tgt_desc *ltd;
3818 struct lfsck_tgt_desc *next;
3820 LASSERT(list_empty(&lfsck_instance_list));
3822 list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
3824 list_del_init(<d->ltd_orphan_list);
3828 list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
3830 list_del_init(<d->ltd_orphan_list);
3834 lu_context_key_degister(&lfsck_thread_key);
3837 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3838 MODULE_DESCRIPTION("Lustre File System Checker");
3839 MODULE_VERSION(LUSTRE_VERSION_STRING);
3840 MODULE_LICENSE("GPL");
3842 module_init(lfsck_init);
3843 module_exit(lfsck_exit);