4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (c) 2013, 2016, Intel Corporation.
26 * lustre/lfsck/lfsck_engine.c
28 * Author: Fan, Yong <fan.yong@intel.com>
31 #define DEBUG_SUBSYSTEM S_LFSCK
33 #include <lu_object.h>
34 #include <dt_object.h>
35 #include <lustre_net.h>
36 #include <lustre_fid.h>
37 #include <obd_support.h>
38 #include <lustre_lib.h>
40 #include "lfsck_internal.h"
42 int lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie, __u16 *type)
45 int align = sizeof(*lt) - 1;
48 fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
49 *cookie = le64_to_cpu(ent->lde_hash);
50 ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
51 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
52 ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
54 if (unlikely(!(ent->lde_attrs & LUDA_TYPE)))
57 len = (ent->lde_namelen + align) & ~align;
58 lt = (struct luda_type *)(ent->lde_name + len);
59 *type = le16_to_cpu(lt->lt_type);
61 /* Make sure the name is terminated with '\0'. The data (object type)
62 * after ent::lde_name maybe broken, but we have stored such data in
63 * the output parameter @type as above. */
64 ent->lde_name[ent->lde_namelen] = '\0';
69 static void lfsck_di_oit_put(const struct lu_env *env, struct lfsck_instance *lfsck)
71 const struct dt_it_ops *iops;
74 spin_lock(&lfsck->li_lock);
75 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
76 di = lfsck->li_di_oit;
77 lfsck->li_di_oit = NULL;
78 spin_unlock(&lfsck->li_lock);
82 static void lfsck_di_dir_put(const struct lu_env *env, struct lfsck_instance *lfsck)
84 const struct dt_it_ops *iops;
87 spin_lock(&lfsck->li_lock);
88 iops = &lfsck->li_obj_dir->do_index_ops->dio_it;
89 di = lfsck->li_di_dir;
90 lfsck->li_di_dir = NULL;
91 lfsck->li_cookie_dir = 0;
92 spin_unlock(&lfsck->li_lock);
97 * Check whether needs to scan the directory or not.
99 * 1) If we are not doing namespace LFSCK, or the given @obj is not directory,
100 * then needs not to scan the @obj. Otherwise,
101 * 2) Global /ROOT needs to be scanned, backend root needs not to be scanned.
102 * 3) If the @obj is neither IGIF nor normal FID (including .lustre and its
103 * sub-directories that have been scanned when the LFSCK engine start),
104 * then needs not to be scanned.
105 * 4) If it is a remote object, then scanning the object will be done on the
106 * MDT on which the object really resides.
107 * 5) If the local object has normal FID, then needs to be scanned. Otherwise,
108 * 6) If the object has linkEA, then needs to be scanned. Otherwise,
109 * 7) If none of the previous conditions are true, we need to check the parent
110 * directories whether this subdirectory is in a tree that should be scanned.
111 * Set the parent as current @obj, repeat 2)-7).
113 * \param[in] env pointer to the thread context
114 * \param[in] lfsck pointer to the lfsck instance
115 * \param[in] obj pointer to the object to be checked
117 * \retval positive number if the directory needs to be scanned
118 * \retval 0 if the directory needs NOT to be scanned
119 * \retval negative error number on failure
121 static int lfsck_needs_scan_dir(const struct lu_env *env,
122 struct lfsck_instance *lfsck,
123 struct dt_object *obj)
125 struct lfsck_thread_info *info = lfsck_env_info(env);
126 struct lu_fid *fid = &info->lti_fid;
127 struct lu_seq_range *range = &info->lti_range;
128 struct lu_attr *la = &info->lti_la;
129 struct seq_server_site *ss = lfsck_dev_site(lfsck);
130 __u32 idx = lfsck_dev_idx(lfsck);
134 if (list_empty(&lfsck->li_list_dir) || !S_ISDIR(lfsck_object_type(obj)))
137 *fid = *lfsck_dto2fid(obj);
138 rc = dt_attr_get(env, obj, la);
139 if (unlikely(rc || (la->la_valid & LA_FLAGS &&
140 la->la_flags & LUSTRE_ORPHAN_FL))) {
141 /* Orphan directory is empty, does not need scan. */
143 "%s: skip orphan dir "DFID", %llx/%x: rc = %d\n",
144 lfsck_lfsck2name(lfsck), PFID(fid),
145 la->la_valid, la->la_flags, rc);
153 /* Global /ROOT is visible. */
154 if (unlikely(lu_fid_eq(fid, &lfsck->li_global_root_fid)))
157 /* Backend root is invisible. */
158 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
161 if (!fid_is_norm(fid) && !fid_is_igif(fid))
164 fld_range_set_mdt(range);
165 rc = fld_local_lookup(env, ss->ss_server_fld,
166 fid_seq(fid), range);
167 if (rc != 0 || range->lsr_index != idx)
168 /* Current FID should NOT be for the input parameter
169 * @obj, because the lfsck_master_oit_engine() has
170 * filtered out agent object. So current FID is for
171 * the ancestor of the original input parameter @obj.
172 * So the ancestor is a remote directory. The input
173 * parameter @obj is local directory, and should be
174 * scanned under such case. */
177 /* normal FID on this target (locally) must be for the
178 * client-side visiable object. */
179 if (fid_is_norm(fid))
182 /* Only true after "obj = NULL" set below */
184 obj = lfsck_object_find_bottom(env, lfsck, fid);
189 if (!dt_object_exists(obj))
193 dt_read_lock(env, obj, DT_TGT_CHILD);
194 if (unlikely(lfsck_is_dead_obj(obj))) {
195 dt_read_unlock(env, obj);
200 rc = dt_xattr_get(env, obj,
201 lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK);
202 dt_read_unlock(env, obj);
206 if (rc < 0 && rc != -ENODATA)
209 rc = dt_lookup_dir(env, obj, dotdot, fid);
211 lfsck_object_put(env, obj);
217 if (!fid_is_sane(fid))
222 if (depth > 0 && obj != NULL)
223 lfsck_object_put(env, obj);
228 static int lfsck_load_stripe_lmv(const struct lu_env *env,
229 struct lfsck_instance *lfsck,
230 struct dt_object *obj)
232 struct lmv_mds_md_v1 *lmv = &lfsck_env_info(env)->lti_lmv;
233 struct lfsck_lmv *llmv;
237 LASSERT(lfsck->li_obj_dir == NULL);
238 LASSERT(lfsck->li_lmv == NULL);
240 rc = lfsck_read_stripe_lmv(env, lfsck, obj, lmv);
241 if (rc == -ENODATA) {
242 lfsck->li_obj_dir = lfsck_object_get(obj);
254 if (lmv->lmv_magic == LMV_MAGIC) {
255 struct lfsck_slave_lmv_rec *lslr;
258 llmv->ll_lmv_master = 1;
259 if (lmv->lmv_stripe_count < 1)
260 stripes = LFSCK_LMV_DEF_STRIPES;
261 else if (lmv->lmv_stripe_count > LFSCK_LMV_MAX_STRIPES)
262 stripes = LFSCK_LMV_MAX_STRIPES;
264 stripes = lmv->lmv_stripe_count;
266 OBD_ALLOC_PTR_ARRAY_LARGE(lslr, stripes);
273 llmv->ll_stripes_allocated = stripes;
274 llmv->ll_hash_type = LMV_HASH_TYPE_UNKNOWN;
275 llmv->ll_lslr = lslr;
277 llmv->ll_lmv_slave = 1;
280 lfsck->li_obj_dir = lfsck_object_get(obj);
282 atomic_set(&llmv->ll_ref, 1);
283 lfsck->li_lmv = llmv;
288 /* LFSCK wrap functions */
290 static void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
293 struct lfsck_component *com;
295 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
296 com->lc_ops->lfsck_fail(env, com, new_checked);
300 void lfsck_close_dir(const struct lu_env *env,
301 struct lfsck_instance *lfsck, int result)
303 struct lfsck_component *com;
306 if (lfsck->li_lmv != NULL) {
307 lfsck->li_lmv->ll_exit_value = result;
308 if (lfsck->li_obj_dir != NULL) {
309 list_for_each_entry(com, &lfsck->li_list_dir,
311 com->lc_ops->lfsck_close_dir(env, com);
315 lfsck_lmv_put(env, lfsck->li_lmv);
316 lfsck->li_lmv = NULL;
319 if (lfsck->li_di_dir != NULL) {
320 const struct dt_it_ops *dir_iops;
321 struct dt_it *dir_di = lfsck->li_di_dir;
323 LASSERT(lfsck->li_obj_dir != NULL);
325 dir_iops = &lfsck->li_obj_dir->do_index_ops->dio_it;
326 lfsck_di_dir_put(env, lfsck);
327 dir_iops->fini(env, dir_di);
330 if (lfsck->li_obj_dir != NULL) {
331 struct dt_object *dir_obj = lfsck->li_obj_dir;
333 lfsck->li_obj_dir = NULL;
334 lfsck_object_put(env, dir_obj);
340 int lfsck_open_dir(const struct lu_env *env,
341 struct lfsck_instance *lfsck, __u64 cookie)
343 struct dt_object *obj = lfsck->li_obj_dir;
344 struct dt_it *di = lfsck->li_di_dir;
345 struct lfsck_component *com;
346 const struct dt_it_ops *iops;
350 LASSERT(obj != NULL);
353 if (unlikely(!dt_try_as_dir(env, obj)))
354 GOTO(out, rc = -ENOTDIR);
356 list_for_each_entry(com, &lfsck->li_list_dir, lc_link_dir) {
357 rc = com->lc_ops->lfsck_open_dir(env, com);
362 iops = &obj->do_index_ops->dio_it;
363 di = iops->init(env, obj, lfsck->li_args_dir);
365 GOTO(out, rc = PTR_ERR(di));
367 rc = iops->load(env, di, cookie);
370 else if (rc == 0 || (rc > 0 && cookie > 0))
371 rc = iops->next(env, di);
379 lfsck->li_cookie_dir = iops->store(env, di);
380 spin_lock(&lfsck->li_lock);
381 lfsck->li_di_dir = di;
382 spin_unlock(&lfsck->li_lock);
389 lfsck_close_dir(env, lfsck, rc);
394 static int lfsck_checkpoint(const struct lu_env *env,
395 struct lfsck_instance *lfsck)
397 struct lfsck_component *com;
401 if (likely(ktime_get_seconds() <= lfsck->li_time_next_checkpoint))
404 lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, false);
405 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
406 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
411 lfsck->li_time_last_checkpoint = ktime_get_seconds();
412 lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
413 LFSCK_CHECKPOINT_INTERVAL;
414 return rc1 != 0 ? rc1 : rc;
417 static int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck,
418 struct lfsck_start_param *lsp)
420 struct dt_object *obj = NULL;
421 struct lfsck_component *com;
422 struct lfsck_component *next;
423 struct lfsck_position *pos = NULL;
424 const struct dt_it_ops *iops =
425 &lfsck->li_obj_oit->do_index_ops->dio_it;
429 LASSERT(lfsck->li_obj_dir == NULL);
430 LASSERT(lfsck->li_di_dir == NULL);
432 lfsck->li_current_oit_processed = 0;
433 list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
434 com->lc_new_checked = 0;
435 rc = com->lc_ops->lfsck_prep(env, com, lsp);
440 (!lfsck_pos_is_zero(&com->lc_pos_start) &&
441 lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
442 pos = &com->lc_pos_start;
445 /* Init otable-based iterator. */
447 rc = iops->load(env, lfsck->li_di_oit, 0);
448 if (rc > 0 || unlikely(rc == -ENODATA)) {
449 lfsck->li_oit_over = 1;
456 rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
457 if (rc > 0 || unlikely(rc == -ENODATA))
458 lfsck->li_oit_over = 1;
462 if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
465 /* Find the directory for namespace-based traverse. */
466 obj = lfsck_object_find_bottom(env, lfsck, &pos->lp_dir_parent);
468 RETURN(PTR_ERR(obj));
470 /* Remote directory will be scanned by the LFSCK instance
471 * on the MDT where the remote object really resides on. */
472 if (!dt_object_exists(obj) || dt_object_remote(obj) ||
473 unlikely(!S_ISDIR(lfsck_object_type(obj))))
476 rc = lfsck_load_stripe_lmv(env, lfsck, obj);
478 /* For the master MDT-object of a striped directory,
479 * reset the iteration from the directory beginning. */
480 if (lfsck->li_lmv != NULL && lfsck->li_lmv->ll_lmv_master)
481 pos->lp_dir_cookie = 0;
483 rc = lfsck_open_dir(env, lfsck, pos->lp_dir_cookie);
485 /* The end of the directory. */
493 lfsck_object_put(env, obj);
496 lfsck_close_dir(env, lfsck, rc);
497 list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
499 com->lc_ops->lfsck_post(env, com, rc, true);
506 lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, true);
507 lfsck->li_pos_current = lfsck->li_pos_checkpoint;
508 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
509 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
514 lfsck->li_time_last_checkpoint = ktime_get_seconds();
515 lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
516 LFSCK_CHECKPOINT_INTERVAL;
520 static int lfsck_exec_oit(const struct lu_env *env,
521 struct lfsck_instance *lfsck, struct dt_object *obj)
523 struct lfsck_component *com;
527 LASSERT(lfsck->li_obj_dir == NULL);
529 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
530 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
535 rc = lfsck_needs_scan_dir(env, lfsck, obj);
539 rc = lfsck_load_stripe_lmv(env, lfsck, obj);
541 rc = lfsck_open_dir(env, lfsck, 0);
547 lfsck_fail(env, lfsck, false);
550 lfsck_close_dir(env, lfsck, rc);
552 return rc > 0 ? 0 : rc;
555 static int lfsck_exec_dir(const struct lu_env *env,
556 struct lfsck_instance *lfsck,
557 struct lfsck_assistant_object *lso,
558 struct lu_dirent *ent, __u16 type)
560 struct lfsck_component *com;
563 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
564 rc = com->lc_ops->lfsck_exec_dir(env, com, lso, ent, type);
571 static int lfsck_master_dir_engine(const struct lu_env *env,
572 struct lfsck_instance *lfsck);
574 static int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
577 struct lfsck_component *com;
578 struct lfsck_component *next;
581 lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, false);
582 lfsck_close_dir(env, lfsck, result);
584 while (thread_is_running(&lfsck->li_thread) && rc > 0 &&
585 !list_empty(&lfsck->li_list_lmv)) {
586 struct lfsck_lmv_unit *llu;
588 spin_lock(&lfsck->li_lock);
589 llu = list_entry(lfsck->li_list_lmv.next,
590 struct lfsck_lmv_unit, llu_link);
591 list_del_init(&llu->llu_link);
592 spin_unlock(&lfsck->li_lock);
594 lfsck->li_lmv = &llu->llu_lmv;
595 lfsck->li_obj_dir = lfsck_object_get(llu->llu_obj);
596 rc = lfsck_open_dir(env, lfsck, 0);
598 rc = lfsck_master_dir_engine(env, lfsck);
599 lfsck_close_dir(env, lfsck, result);
605 list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
606 rc = com->lc_ops->lfsck_post(env, com, result, false);
608 CDEBUG(D_LFSCK, "%s: lfsck_post at the component %u: "
609 "rc = %d\n", lfsck_lfsck2name(lfsck),
610 (__u32)com->lc_type, rc);
613 lfsck->li_time_last_checkpoint = ktime_get_seconds();
614 lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
615 LFSCK_CHECKPOINT_INTERVAL;
617 /* Ignore some component post failure to make other can go ahead. */
621 static int lfsck_double_scan(const struct lu_env *env,
622 struct lfsck_instance *lfsck)
624 struct lfsck_component *com;
625 struct lfsck_component *next;
629 list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
630 rc = com->lc_ops->lfsck_double_scan(env, com);
635 wait_event_idle(lfsck->li_thread.t_ctl_waitq,
636 atomic_read(&lfsck->li_double_scan_count) == 0);
638 if (lfsck->li_status != LS_PAUSED &&
639 lfsck->li_status != LS_CO_PAUSED) {
640 list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
642 spin_lock(&lfsck->li_lock);
643 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
644 spin_unlock(&lfsck->li_lock);
648 return rc1 != 0 ? rc1 : rc;
651 static void lfsck_quit(const struct lu_env *env, struct lfsck_instance *lfsck)
653 struct lfsck_component *com;
654 struct lfsck_component *next;
656 list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
658 if (com->lc_ops->lfsck_quit != NULL)
659 com->lc_ops->lfsck_quit(env, com);
661 spin_lock(&lfsck->li_lock);
662 list_del_init(&com->lc_link_dir);
663 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
664 spin_unlock(&lfsck->li_lock);
667 list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
669 if (com->lc_ops->lfsck_quit != NULL)
670 com->lc_ops->lfsck_quit(env, com);
672 spin_lock(&lfsck->li_lock);
673 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
674 spin_unlock(&lfsck->li_lock);
680 static int lfsck_master_dir_engine(const struct lu_env *env,
681 struct lfsck_instance *lfsck)
683 struct lfsck_thread_info *info = lfsck_env_info(env);
684 struct dt_object *dir = lfsck->li_obj_dir;
685 const struct dt_it_ops *iops = &dir->do_index_ops->dio_it;
686 struct dt_it *di = lfsck->li_di_dir;
687 struct lu_dirent *ent =
688 (struct lu_dirent *)info->lti_key;
689 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
690 struct ptlrpc_thread *thread = &lfsck->li_thread;
691 struct lfsck_assistant_object *lso = NULL;
697 if (CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_DELAY2, cfs_fail_val) &&
698 unlikely(!thread_is_running(thread))) {
699 CDEBUG(D_LFSCK, "%s: scan dir exit for engine stop, "
700 "parent "DFID", cookie %#llx\n",
701 lfsck_lfsck2name(lfsck),
702 PFID(lfsck_dto2fid(dir)), lfsck->li_cookie_dir);
707 lfsck->li_new_scanned++;
708 rc = iops->rec(env, di, (struct dt_rec *)ent,
711 rc = lfsck_unpack_ent(ent, &lfsck->li_cookie_dir,
715 CDEBUG(D_LFSCK, "%s: scan dir failed at rec(), "
716 "parent "DFID", cookie %#llx: rc = %d\n",
717 lfsck_lfsck2name(lfsck),
718 PFID(lfsck_dto2fid(dir)),
719 lfsck->li_cookie_dir, rc);
720 lfsck_fail(env, lfsck, true);
721 if (bk->lb_param & LPF_FAILOUT)
727 if (ent->lde_attrs & LUDA_IGNORE)
730 /* skip dot entry. */
731 if (ent->lde_namelen == 1 && ent->lde_name[0] == '.')
735 lso = lfsck_assistant_object_init(env,
736 lfsck_dto2fid(dir), NULL,
737 lfsck->li_pos_current.lp_oit_cookie, true);
739 if (bk->lb_param & LPF_FAILOUT)
740 RETURN(PTR_ERR(lso));
747 /* The type in the @ent structure may has been overwritten,
748 * so we need to pass the @type parameter independently. */
749 rc = lfsck_exec_dir(env, lfsck, lso, ent, type);
750 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
754 rc = lfsck_checkpoint(env, lfsck);
755 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
759 lfsck_control_speed(lfsck);
760 if (unlikely(!thread_is_running(thread))) {
761 CDEBUG(D_LFSCK, "%s: scan dir exit for engine stop, "
762 "parent "DFID", cookie %#llx\n",
763 lfsck_lfsck2name(lfsck),
764 PFID(lfsck_dto2fid(dir)),
765 lfsck->li_cookie_dir);
769 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
770 spin_lock(&lfsck->li_lock);
771 thread_set_flags(thread, SVC_STOPPING);
772 spin_unlock(&lfsck->li_lock);
773 GOTO(out, rc = -EINVAL);
776 rc = iops->next(env, di);
778 CDEBUG(D_LFSCK, "%s dir engine fail to locate next "
779 "for the directory "DFID": rc = %d\n",
780 lfsck_lfsck2name(lfsck),
781 PFID(&lfsck->li_pos_current.lp_dir_parent), rc);
784 if (rc > 0 && !lfsck->li_oit_over)
785 lfsck_close_dir(env, lfsck, rc);
791 lfsck_assistant_object_put(env, lso);
797 * Object-table based iteration engine.
799 * Object-table based iteration is the basic linear engine to scan all the
800 * objects on current device in turn. For each object, it calls all the
801 * registered LFSCK component(s)' API to perform related consistency
804 * It flushes related LFSCK trace files to disk via making checkpoint
805 * periodically. Then if the server crashed or the LFSCK is paused, the
806 * LFSCK can resume from the latest checkpoint.
808 * It also controls the whole LFSCK speed via lfsck_control_speed() to
809 * avoid the server to become overload.
811 * \param[in] env pointer to the thread context
812 * \param[in] lfsck pointer to the lfsck instance
814 * \retval positive number if all objects have been scanned
815 * \retval 0 if the iteration is stopped or paused
816 * \retval negative error number on failure
818 static int lfsck_master_oit_engine(const struct lu_env *env,
819 struct lfsck_instance *lfsck)
821 struct lfsck_thread_info *info = lfsck_env_info(env);
822 const struct dt_it_ops *iops =
823 &lfsck->li_obj_oit->do_index_ops->dio_it;
824 struct dt_it *di = lfsck->li_di_oit;
825 struct lu_fid *fid = &info->lti_fid;
826 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
827 struct ptlrpc_thread *thread = &lfsck->li_thread;
828 struct seq_server_site *ss = lfsck_dev_site(lfsck);
829 __u32 idx = lfsck_dev_idx(lfsck);
833 if (unlikely(ss == NULL))
837 struct dt_object *target;
839 if (lfsck->li_di_dir != NULL) {
840 rc = lfsck_master_dir_engine(env, lfsck);
845 if (unlikely(lfsck->li_oit_over))
848 if (CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_DELAY1, cfs_fail_val) &&
849 unlikely(!thread_is_running(thread))) {
850 CDEBUG(D_LFSCK, "%s: OIT scan exit for engine stop, "
852 lfsck_lfsck2name(lfsck), iops->store(env, di));
857 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
860 lfsck->li_current_oit_processed = 1;
862 if (!list_empty(&lfsck->li_list_lmv)) {
863 struct lfsck_lmv_unit *llu;
865 spin_lock(&lfsck->li_lock);
866 llu = list_entry(lfsck->li_list_lmv.next,
867 struct lfsck_lmv_unit, llu_link);
868 list_del_init(&llu->llu_link);
869 spin_unlock(&lfsck->li_lock);
871 lfsck->li_lmv = &llu->llu_lmv;
872 lfsck->li_obj_dir = lfsck_object_get(llu->llu_obj);
873 rc = lfsck_open_dir(env, lfsck, 0);
875 rc = lfsck_master_dir_engine(env, lfsck);
881 lfsck->li_new_scanned++;
882 lfsck->li_pos_current.lp_oit_cookie = iops->store(env, di);
883 rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
885 CDEBUG(D_LFSCK, "%s: OIT scan failed at rec(): "
886 "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
887 lfsck_fail(env, lfsck, true);
888 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
894 if (unlikely(!fid_is_sane(fid))) {
895 CDEBUG(D_LFSCK, "%s: OIT scan find invalid FID "DFID
897 lfsck_lfsck2name(lfsck), PFID(fid));
901 if (fid_is_idif(fid)) {
902 __u32 idx1 = fid_idif_ost_idx(fid);
904 LASSERT(!lfsck->li_master);
907 struct ost_id *oi = &info->lti_oi;
909 if (unlikely(idx1 != 0)) {
910 CDEBUG(D_LFSCK, "%s: invalid IDIF "DFID
911 ", not match device index %u\n",
912 lfsck_lfsck2name(lfsck),
918 /* rebuild the IDIF with index to
919 * avoid double instances for the
921 fid_to_ostid(fid, oi);
922 ostid_to_fid(fid, oi, idx);
924 } else if (!fid_is_norm(fid) && !fid_is_igif(fid) &&
925 !fid_is_last_id(fid) &&
926 !lu_fid_eq(fid, &lfsck->li_global_root_fid)) {
928 /* If the FID/object is only used locally and invisible
929 * to external nodes, then LFSCK will not handle it.
931 * dot_lustre sequence has been handled specially. */
934 struct lu_seq_range *range = &info->lti_range;
936 if (lfsck->li_master)
937 fld_range_set_mdt(range);
939 fld_range_set_ost(range);
940 rc = fld_local_lookup(env, ss->ss_server_fld,
941 fid_seq(fid), range);
942 if (rc != 0 || range->lsr_index != idx) {
943 /* Remote object will be handled by the LFSCK
944 * instance on the MDT where the remote object
945 * really resides on. */
951 target = lfsck_object_find_bottom(env, lfsck, fid);
952 if (IS_ERR(target)) {
953 CDEBUG(D_LFSCK, "%s: OIT scan failed at find target "
954 DFID", cookie %llu: rc = %d\n",
955 lfsck_lfsck2name(lfsck), PFID(fid),
956 iops->store(env, di), rc);
957 lfsck_fail(env, lfsck, true);
958 if (bk->lb_param & LPF_FAILOUT)
959 RETURN(PTR_ERR(target));
964 if (dt_object_exists(target))
965 rc = lfsck_exec_oit(env, lfsck, target);
967 lfsck_object_put(env, target);
968 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
972 rc = lfsck_checkpoint(env, lfsck);
973 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
977 lfsck_control_speed(lfsck);
979 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
980 spin_lock(&lfsck->li_lock);
981 thread_set_flags(thread, SVC_STOPPING);
982 spin_unlock(&lfsck->li_lock);
986 rc = iops->next(env, di);
987 if (unlikely(rc > 0))
988 lfsck->li_oit_over = 1;
989 else if (likely(rc == 0))
990 lfsck->li_current_oit_processed = 0;
992 CDEBUG(D_LFSCK, "%s oit engine fail to locate next at "
993 "%llu: rc = %d\n", lfsck_lfsck2name(lfsck),
994 iops->store(env, di), rc);
996 if (unlikely(!thread_is_running(thread))) {
997 CDEBUG(D_LFSCK, "%s: OIT scan exit for engine stop, "
998 "cookie %llu\n", lfsck_lfsck2name(lfsck),
999 iops->store(env, di));
1002 } while (rc == 0 || lfsck->li_di_dir != NULL);
1007 int lfsck_master_engine(void *args)
1009 struct lfsck_thread_args *lta = args;
1010 struct lu_env *env = <a->lta_env;
1011 struct lfsck_instance *lfsck = lta->lta_lfsck;
1012 struct ptlrpc_thread *thread = &lfsck->li_thread;
1013 struct dt_object *oit_obj = lfsck->li_obj_oit;
1014 const struct dt_it_ops *oit_iops = &oit_obj->do_index_ops->dio_it;
1015 struct dt_it *oit_di;
1019 spin_lock(&lfsck->li_lock);
1020 lfsck->li_task = current;
1021 spin_unlock(&lfsck->li_lock);
1023 /* There will be some objects verification during the LFSCK start,
1024 * such as the subsequent lfsck_verify_lpf(). Trigger low layer OI
1025 * OI scrub before that to handle the potential inconsistence. */
1026 oit_di = oit_iops->init(env, oit_obj, lfsck->li_args_oit);
1027 if (IS_ERR(oit_di)) {
1028 rc = PTR_ERR(oit_di);
1029 CDEBUG(D_LFSCK, "%s: master engine fail to init iteration: "
1030 "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
1032 GOTO(fini_args, rc);
1035 if (lfsck->li_master &&
1036 (!list_empty(&lfsck->li_list_scan) ||
1037 !list_empty(&lfsck->li_list_double_scan))) {
1038 rc = lfsck_verify_lpf(env, lfsck);
1039 /* FIXME: once OI files are missing, this will fail, it should
1040 * return error, but to satisfy sanity-lfsck test 4 & 5, leave
1041 * it uninitialized here, and any code dereference it need to
1045 CERROR("%s: master engine fail to verify the "
1046 ".lustre/lost+found/, go ahead: rc = %d\n",
1047 lfsck_lfsck2name(lfsck), rc);
1050 spin_lock(&lfsck->li_lock);
1051 lfsck->li_di_oit = oit_di;
1052 spin_unlock(&lfsck->li_lock);
1053 rc = lfsck_prep(env, lfsck, lta->lta_lsp);
1057 CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = %#x, dir_flags = %#x, "
1058 "oit_cookie = %llu, dir_cookie = %#llx, parent = "DFID
1059 ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
1060 lfsck->li_pos_checkpoint.lp_oit_cookie,
1061 lfsck->li_pos_checkpoint.lp_dir_cookie,
1062 PFID(&lfsck->li_pos_checkpoint.lp_dir_parent),
1065 spin_lock(&lfsck->li_lock);
1066 if (unlikely(!thread_is_starting(thread))) {
1067 spin_unlock(&lfsck->li_lock);
1068 GOTO(fini_oit, rc = 0);
1071 thread_set_flags(thread, SVC_RUNNING);
1072 spin_unlock(&lfsck->li_lock);
1073 wake_up(&thread->t_ctl_waitq);
1075 wait_event_idle(thread->t_ctl_waitq,
1076 lfsck->li_start_unplug ||
1077 !thread_is_running(thread));
1078 if (!thread_is_running(thread))
1079 GOTO(fini_oit, rc = 0);
1081 if (!list_empty(&lfsck->li_list_scan) ||
1082 list_empty(&lfsck->li_list_double_scan))
1083 rc = lfsck_master_oit_engine(env, lfsck);
1087 lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, false);
1088 CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = %#x, dir_flags = %#x, "
1089 "oit_cookie = %llu, dir_cookie = %#llx, parent = "DFID
1090 ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
1091 lfsck->li_pos_checkpoint.lp_oit_cookie,
1092 lfsck->li_pos_checkpoint.lp_dir_cookie,
1093 PFID(&lfsck->li_pos_checkpoint.lp_dir_parent),
1096 if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
1097 rc = lfsck_post(env, lfsck, rc);
1099 lfsck_close_dir(env, lfsck, rc);
1102 lfsck_di_oit_put(env, lfsck);
1103 oit_iops->fini(env, oit_di);
1105 if (!list_empty(&lfsck->li_list_double_scan))
1106 rc = lfsck_double_scan(env, lfsck);
1110 lfsck_quit(env, lfsck);
1113 /* XXX: Purge the pinned objects in the future. */
1116 spin_lock(&lfsck->li_lock);
1117 thread_set_flags(thread, SVC_STOPPED);
1118 lfsck->li_task = NULL;
1119 spin_unlock(&lfsck->li_lock);
1120 wake_up(&thread->t_ctl_waitq);
1121 lfsck_thread_args_fini(lta);
1125 static inline bool lfsck_assistant_req_empty(struct lfsck_assistant_data *lad)
1129 spin_lock(&lad->lad_lock);
1130 if (list_empty(&lad->lad_req_list))
1132 spin_unlock(&lad->lad_lock);
1138 * Query the LFSCK status from the instatnces on remote servers.
1140 * The LFSCK assistant thread queries the LFSCK instances on other
1141 * servers (MDT/OST) about their status, such as whether they have
1142 * finished the phase1/phase2 scanning or not, and so on.
1144 * \param[in] env pointer to the thread context
1145 * \param[in] com pointer to the lfsck component
1147 * \retval 0 for success
1148 * \retval negative error number on failure
1150 static int lfsck_assistant_query_others(const struct lu_env *env,
1151 struct lfsck_component *com)
1153 struct lfsck_thread_info *info = lfsck_env_info(env);
1154 struct lfsck_request *lr = &info->lti_lr;
1155 struct lfsck_async_interpret_args *laia = &info->lti_laia;
1156 struct lfsck_instance *lfsck = com->lc_lfsck;
1157 struct lfsck_assistant_data *lad = com->lc_data;
1158 struct ptlrpc_request_set *set;
1159 struct lfsck_tgt_descs *ltds;
1160 struct lfsck_tgt_desc *ltd;
1161 struct list_head *phase_head;
1166 set = ptlrpc_prep_set();
1170 lad->lad_touch_gen++;
1171 memset(lr, 0, sizeof(*lr));
1172 lr->lr_event = LE_QUERY;
1173 lr->lr_active = com->lc_type;
1175 memset(laia, 0, sizeof(*laia));
1176 laia->laia_com = com;
1179 if (!list_empty(&lad->lad_mdt_phase1_list)) {
1180 ltds = &lfsck->li_mdt_descs;
1182 phase_head = &lad->lad_mdt_phase1_list;
1183 } else if (com->lc_type != LFSCK_TYPE_LAYOUT) {
1188 ltds = &lfsck->li_ost_descs;
1189 lr->lr_flags = LEF_TO_OST;
1190 phase_head = &lad->lad_ost_phase1_list;
1193 laia->laia_ltds = ltds;
1194 spin_lock(<ds->ltd_lock);
1195 while (!list_empty(phase_head)) {
1196 struct list_head *phase_list;
1199 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1200 ltd = list_entry(phase_head->next,
1201 struct lfsck_tgt_desc,
1202 ltd_layout_phase_list);
1203 phase_list = <d->ltd_layout_phase_list;
1204 gen = <d->ltd_layout_gen;
1206 ltd = list_entry(phase_head->next,
1207 struct lfsck_tgt_desc,
1208 ltd_namespace_phase_list);
1209 phase_list = <d->ltd_namespace_phase_list;
1210 gen = <d->ltd_namespace_gen;
1213 if (*gen == lad->lad_touch_gen)
1216 *gen = lad->lad_touch_gen;
1217 list_move_tail(phase_list, phase_head);
1218 atomic_inc(<d->ltd_ref);
1219 laia->laia_ltd = ltd;
1220 spin_unlock(<ds->ltd_lock);
1221 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1222 lfsck_async_interpret_common,
1225 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to query "
1226 "%s %x for %s: rc = %d\n",
1227 lfsck_lfsck2name(lfsck),
1228 (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1229 ltd->ltd_index, lad->lad_name, rc);
1233 spin_lock(<ds->ltd_lock);
1235 spin_unlock(<ds->ltd_lock);
1237 rc = ptlrpc_set_wait(env, set);
1239 ptlrpc_set_destroy(set);
1243 if (com->lc_type == LFSCK_TYPE_LAYOUT && !(lr->lr_flags & LEF_TO_OST) &&
1244 list_empty(&lad->lad_mdt_phase1_list))
1248 ptlrpc_set_destroy(set);
1250 RETURN(rc1 != 0 ? rc1 : rc);
1254 * Notify the LFSCK event to the instances on remote servers.
1256 * The LFSCK assistant thread notifies the LFSCK instances on other
1257 * servers (MDT/OST) about some events, such as start new scanning,
1258 * stop the scanning, this LFSCK instance will exit, and so on.
1260 * \param[in] env pointer to the thread context
1261 * \param[in] com pointer to the lfsck component
1262 * \param[in] lr pointer to the LFSCK event request
1264 * \retval 0 for success
1265 * \retval negative error number on failure
1267 static int lfsck_assistant_notify_others(const struct lu_env *env,
1268 struct lfsck_component *com,
1269 struct lfsck_request *lr)
1271 struct lfsck_thread_info *info = lfsck_env_info(env);
1272 struct lfsck_async_interpret_args *laia = &info->lti_laia;
1273 struct lfsck_instance *lfsck = com->lc_lfsck;
1274 struct lfsck_assistant_data *lad = com->lc_data;
1275 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1276 struct ptlrpc_request_set *set;
1277 struct lfsck_tgt_descs *ltds;
1278 struct lfsck_tgt_desc *ltd;
1279 struct lfsck_tgt_desc *next;
1285 set = ptlrpc_prep_set();
1289 lr->lr_index = lfsck_dev_idx(lfsck);
1290 lr->lr_active = com->lc_type;
1292 memset(laia, 0, sizeof(*laia));
1293 laia->laia_com = com;
1296 switch (lr->lr_event) {
1298 if (com->lc_type != LFSCK_TYPE_LAYOUT)
1301 lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN;
1302 lr->lr_speed = bk->lb_speed_limit;
1303 lr->lr_version = bk->lb_version;
1304 lr->lr_param |= bk->lb_param;
1305 lr->lr_async_windows = bk->lb_async_windows;
1306 lr->lr_flags = LEF_TO_OST;
1308 /* Notify OSTs firstly, then handle other MDTs if needed. */
1309 ltds = &lfsck->li_ost_descs;
1310 laia->laia_ltds = ltds;
1311 down_read(<ds->ltd_rw_sem);
1312 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1313 ltd = lfsck_tgt_get(ltds, idx);
1314 LASSERT(ltd != NULL);
1316 laia->laia_ltd = ltd;
1317 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1318 lfsck_async_interpret_common,
1319 laia, LFSCK_NOTIFY);
1321 lfsck_lad_set_bitmap(env, com, idx);
1322 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
1323 "notify OST %x for %s start: rc = %d\n",
1324 lfsck_lfsck2name(lfsck), idx,
1329 up_read(<ds->ltd_rw_sem);
1332 rc = ptlrpc_set_wait(env, set);
1334 ptlrpc_set_destroy(set);
1339 if (!(bk->lb_param & LPF_ALL_TGT))
1342 /* link other MDT targets locallly. */
1343 ltds = &lfsck->li_mdt_descs;
1344 spin_lock(<ds->ltd_lock);
1345 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1346 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1347 ltd = lfsck_ltd2tgt(ltds, idx);
1348 LASSERT(ltd != NULL);
1350 if (!list_empty(<d->ltd_layout_list))
1353 list_add_tail(<d->ltd_layout_list,
1354 &lad->lad_mdt_list);
1355 list_add_tail(<d->ltd_layout_phase_list,
1356 &lad->lad_mdt_phase1_list);
1359 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1360 ltd = lfsck_ltd2tgt(ltds, idx);
1361 LASSERT(ltd != NULL);
1363 if (!list_empty(<d->ltd_namespace_list))
1366 list_add_tail(<d->ltd_namespace_list,
1367 &lad->lad_mdt_list);
1368 list_add_tail(<d->ltd_namespace_phase_list,
1369 &lad->lad_mdt_phase1_list);
1372 spin_unlock(<ds->ltd_lock);
1375 case LE_PHASE2_DONE:
1376 case LE_PEER_EXIT: {
1377 struct list_head *phase_head;
1379 /* Handle other MDTs firstly if needed, then notify the OSTs. */
1380 if (bk->lb_param & LPF_ALL_TGT) {
1381 phase_head = &lad->lad_mdt_list;
1382 ltds = &lfsck->li_mdt_descs;
1383 if (lr->lr_event == LE_STOP) {
1384 /* unlink other MDT targets locallly. */
1385 spin_lock(<ds->ltd_lock);
1386 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1387 list_for_each_entry_safe(ltd, next,
1388 phase_head, ltd_layout_list) {
1390 <d->ltd_layout_phase_list);
1392 <d->ltd_layout_list);
1395 list_for_each_entry_safe(ltd, next,
1397 ltd_namespace_list) {
1399 <d->ltd_namespace_phase_list);
1401 <d->ltd_namespace_list);
1404 spin_unlock(<ds->ltd_lock);
1406 if (com->lc_type != LFSCK_TYPE_LAYOUT)
1409 lr->lr_flags |= LEF_TO_OST;
1410 phase_head = &lad->lad_ost_list;
1411 ltds = &lfsck->li_ost_descs;
1413 lr->lr_flags &= ~LEF_TO_OST;
1415 } else if (com->lc_type != LFSCK_TYPE_LAYOUT) {
1418 lr->lr_flags |= LEF_TO_OST;
1419 phase_head = &lad->lad_ost_list;
1420 ltds = &lfsck->li_ost_descs;
1424 laia->laia_ltds = ltds;
1425 spin_lock(<ds->ltd_lock);
1426 while (!list_empty(phase_head)) {
1427 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1428 ltd = list_entry(phase_head->next,
1429 struct lfsck_tgt_desc,
1431 if (!list_empty(<d->ltd_layout_phase_list))
1433 <d->ltd_layout_phase_list);
1434 list_del_init(<d->ltd_layout_list);
1436 ltd = list_entry(phase_head->next,
1437 struct lfsck_tgt_desc,
1438 ltd_namespace_list);
1439 if (!list_empty(<d->ltd_namespace_phase_list))
1441 <d->ltd_namespace_phase_list);
1442 list_del_init(<d->ltd_namespace_list);
1444 atomic_inc(<d->ltd_ref);
1445 laia->laia_ltd = ltd;
1446 spin_unlock(<ds->ltd_lock);
1447 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1448 lfsck_async_interpret_common,
1449 laia, LFSCK_NOTIFY);
1451 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
1452 "notify %s %x for %s stop/phase2_done/"
1453 "peer_exit: rc = %d\n",
1454 lfsck_lfsck2name(lfsck),
1455 (lr->lr_flags & LEF_TO_OST) ?
1456 "OST" : "MDT", ltd->ltd_index,
1460 spin_lock(<ds->ltd_lock);
1462 spin_unlock(<ds->ltd_lock);
1464 rc = ptlrpc_set_wait(env, set);
1466 ptlrpc_set_destroy(set);
1470 if (com->lc_type == LFSCK_TYPE_LAYOUT &&
1471 !(lr->lr_flags & LEF_TO_OST)) {
1472 lr->lr_flags |= LEF_TO_OST;
1473 phase_head = &lad->lad_ost_list;
1474 ltds = &lfsck->li_ost_descs;
1479 case LE_PHASE1_DONE:
1480 lad->lad_ops->la_sync_failures(env, com, lr);
1481 lad->lad_touch_gen++;
1482 ltds = &lfsck->li_mdt_descs;
1483 laia->laia_ltds = ltds;
1484 spin_lock(<ds->ltd_lock);
1485 while (!list_empty(&lad->lad_mdt_list)) {
1486 struct list_head *list;
1489 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1490 ltd = list_entry(lad->lad_mdt_list.next,
1491 struct lfsck_tgt_desc,
1493 list = <d->ltd_layout_list;
1494 gen = <d->ltd_layout_gen;
1496 struct lfsck_namespace *ns = com->lc_file_ram;
1498 ltd = list_entry(lad->lad_mdt_list.next,
1499 struct lfsck_tgt_desc,
1500 ltd_namespace_list);
1501 list = <d->ltd_namespace_list;
1502 gen = <d->ltd_namespace_gen;
1503 lr->lr_flags2 = ns->ln_flags & ~LF_INCOMPLETE;
1506 if (*gen == lad->lad_touch_gen)
1509 *gen = lad->lad_touch_gen;
1510 list_move_tail(list, &lad->lad_mdt_list);
1511 if (ltd->ltd_synced_failures)
1514 atomic_inc(<d->ltd_ref);
1515 laia->laia_ltd = ltd;
1516 spin_unlock(<ds->ltd_lock);
1517 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1518 lfsck_async_interpret_common,
1519 laia, LFSCK_NOTIFY);
1521 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
1522 "notify MDT %x for %s phase1 done: "
1523 "rc = %d\n", lfsck_lfsck2name(lfsck),
1524 ltd->ltd_index, lad->lad_name, rc);
1527 spin_lock(<ds->ltd_lock);
1529 spin_unlock(<ds->ltd_lock);
1532 CDEBUG(D_LFSCK, "%s: LFSCK assistant unexpected LFSCK event: "
1533 "rc = %d\n", lfsck_lfsck2name(lfsck), lr->lr_event);
1538 rc1 = ptlrpc_set_wait(env, set);
1539 ptlrpc_set_destroy(set);
1541 RETURN(rc != 0 ? rc : rc1);
1545 * The LFSCK assistant thread is triggered by the LFSCK main engine.
1546 * They co-work together as an asynchronous pipeline: the LFSCK main
1547 * engine scans the system and pre-fetches the objects, attributes,
1548 * or name entries, etc, and pushes them into the pipeline as input
1549 * requests for the LFSCK assistant thread; on the other end of the
1550 * pipeline, the LFSCK assistant thread performs the real check and
1551 * repair for every request from the main engine.
1553 * Generally, the assistant engine may be blocked when check/repair
1554 * something, so the LFSCK main engine will run some faster. On the
1555 * other hand, the LFSCK main engine will drive multiple assistant
1556 * threads in parallel, means for each LFSCK component on the master
1557 * (such as layout LFSCK, namespace LFSCK), there is an independent
1558 * LFSCK assistant thread. So under such 1:N multiple asynchronous
1559 * pipelines mode, the whole LFSCK performance will be much better
1560 * than check/repair everything by the LFSCK main engine itself.
1562 int lfsck_assistant_engine(void *args)
1564 struct lfsck_thread_args *lta = args;
1565 struct lu_env *env = <a->lta_env;
1566 struct lfsck_component *com = lta->lta_com;
1567 struct lfsck_instance *lfsck = lta->lta_lfsck;
1568 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1569 struct lfsck_position *pos = &com->lc_pos_start;
1570 struct lfsck_thread_info *info = lfsck_env_info(env);
1571 struct lfsck_request *lr = &info->lti_lr;
1572 struct lfsck_assistant_data *lad = com->lc_data;
1573 struct ptlrpc_thread *mthread = &lfsck->li_thread;
1574 struct ptlrpc_thread *athread = &lad->lad_thread;
1575 const struct lfsck_assistant_operations *lao = lad->lad_ops;
1576 struct lfsck_assistant_req *lar;
1582 CDEBUG(D_LFSCK, "%s: %s LFSCK assistant thread start\n",
1583 lfsck_lfsck2name(lfsck), lad->lad_name);
1585 memset(lr, 0, sizeof(*lr));
1586 lr->lr_event = LE_START;
1587 if (pos->lp_oit_cookie <= 1)
1588 lr->lr_param = LPF_RESET;
1589 rc = lfsck_assistant_notify_others(env, com, lr);
1591 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to notify others "
1592 "to start %s: rc = %d\n",
1593 lfsck_lfsck2name(lfsck), lad->lad_name, rc);
1597 spin_lock(&lad->lad_lock);
1598 lad->lad_task = current;
1599 thread_set_flags(athread, SVC_RUNNING);
1600 spin_unlock(&lad->lad_lock);
1601 wake_up(&mthread->t_ctl_waitq);
1604 while (!list_empty(&lad->lad_req_list)) {
1605 bool wakeup = false;
1607 if (unlikely(test_bit(LAD_EXIT, &lad->lad_flags) ||
1608 !thread_is_running(mthread)))
1609 GOTO(cleanup, rc = lad->lad_post_result);
1611 lar = list_entry(lad->lad_req_list.next,
1612 struct lfsck_assistant_req,
1614 /* Only the lfsck_assistant_engine thread itself can
1615 * remove the "lar" from the head of the list, LFSCK
1616 * engine thread only inserts other new "lar" at the
1617 * end of the list. So it is safe to handle current
1618 * "lar" without the spin_lock. */
1619 rc = lao->la_handler_p1(env, com, lar);
1620 spin_lock(&lad->lad_lock);
1621 list_del_init(&lar->lar_list);
1622 lad->lad_prefetched--;
1623 /* Wake up the main engine thread only when the list
1624 * is empty or half of the prefetched items have been
1625 * handled to avoid too frequent thread schedule. */
1626 if (lad->lad_prefetched <= (bk->lb_async_windows / 2))
1628 spin_unlock(&lad->lad_lock);
1630 wake_up(&mthread->t_ctl_waitq);
1632 lao->la_req_fini(env, lar);
1633 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
1637 wait_event_idle(athread->t_ctl_waitq,
1638 !lfsck_assistant_req_empty(lad) ||
1639 test_bit(LAD_EXIT, &lad->lad_flags) ||
1640 test_bit(LAD_TO_POST, &lad->lad_flags) ||
1641 test_bit(LAD_TO_DOUBLE_SCAN, &lad->lad_flags));
1643 if (unlikely(test_bit(LAD_EXIT, &lad->lad_flags)))
1644 GOTO(cleanup, rc = lad->lad_post_result);
1646 if (!list_empty(&lad->lad_req_list))
1649 if (test_bit(LAD_TO_POST, &lad->lad_flags)) {
1650 CDEBUG(D_LFSCK, "%s: %s LFSCK assistant thread post\n",
1651 lfsck_lfsck2name(lfsck), lad->lad_name);
1653 if (unlikely(test_bit(LAD_EXIT, &lad->lad_flags)))
1654 GOTO(cleanup, rc = lad->lad_post_result);
1656 clear_bit(LAD_TO_POST, &lad->lad_flags);
1657 LASSERT(lad->lad_post_result > 0);
1659 /* Wakeup the master engine to go ahead. */
1660 wake_up(&mthread->t_ctl_waitq);
1662 memset(lr, 0, sizeof(*lr));
1663 lr->lr_event = LE_PHASE1_DONE;
1664 lr->lr_status = lad->lad_post_result;
1665 rc = lfsck_assistant_notify_others(env, com, lr);
1667 CDEBUG(D_LFSCK, "%s: LFSCK assistant notified "
1668 "others for %s post: rc = %d\n",
1669 lfsck_lfsck2name(lfsck),
1673 if (test_bit(LAD_TO_DOUBLE_SCAN, &lad->lad_flags)) {
1674 clear_bit(LAD_TO_DOUBLE_SCAN, &lad->lad_flags);
1675 atomic_inc(&lfsck->li_double_scan_count);
1676 set_bit(LAD_IN_DOUBLE_SCAN, &lad->lad_flags);
1677 wake_up(&mthread->t_ctl_waitq);
1679 com->lc_new_checked = 0;
1680 com->lc_new_scanned = 0;
1681 com->lc_time_last_checkpoint = ktime_get_seconds();
1682 com->lc_time_next_checkpoint =
1683 com->lc_time_last_checkpoint +
1684 LFSCK_CHECKPOINT_INTERVAL;
1686 CDEBUG(D_LFSCK, "%s: LFSCK assistant sync before "
1687 "the second-stage scaning\n",
1688 lfsck_lfsck2name(lfsck));
1690 /* Flush async updates before handling orphan. */
1691 rc2 = dt_sync(env, lfsck->li_next);
1693 CDEBUG(D_LFSCK, "%s: LFSCK assistant phase2 "
1694 "scan start, synced: rc = %d\n",
1695 lfsck_lfsck2name(lfsck), rc2);
1697 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_DOUBLESCAN))
1698 GOTO(cleanup, rc = 0);
1700 while (test_bit(LAD_IN_DOUBLE_SCAN, &lad->lad_flags)) {
1703 rc = lfsck_assistant_query_others(env, com);
1704 if (lfsck_phase2_next_ready(lad))
1710 /* Pull LFSCK status on related targets once
1711 * per 30 seconds if we are not notified. */
1712 while (seconds > 0 &&
1713 wait_event_idle_timeout(
1714 athread->t_ctl_waitq,
1715 lfsck_phase2_next_ready(lad) ||
1718 !thread_is_running(mthread),
1719 cfs_time_seconds(1)) == 0)
1723 test_bit(LAD_EXIT, &lad->lad_flags) ||
1724 !thread_is_running(mthread)))
1725 GOTO(cleanup, rc = 0);
1731 rc = lao->la_handler_p2(env, com);
1736 test_bit(LAD_EXIT, &lad->lad_flags) ||
1737 !thread_is_running(mthread)))
1738 GOTO(cleanup, rc = 0);
1744 /* Cleanup the unfinished requests. */
1745 spin_lock(&lad->lad_lock);
1747 lad->lad_assistant_status = rc;
1749 if (test_bit(LAD_EXIT, &lad->lad_flags) && lad->lad_post_result <= 0)
1750 lao->la_fill_pos(env, com, &lfsck->li_pos_checkpoint);
1752 thread_set_flags(athread, SVC_STOPPING);
1753 while (!list_empty(&lad->lad_req_list)) {
1754 lar = list_entry(lad->lad_req_list.next,
1755 struct lfsck_assistant_req,
1757 list_del_init(&lar->lar_list);
1758 lad->lad_prefetched--;
1759 spin_unlock(&lad->lad_lock);
1760 lao->la_req_fini(env, lar);
1761 spin_lock(&lad->lad_lock);
1763 spin_unlock(&lad->lad_lock);
1765 memset(lr, 0, sizeof(*lr));
1767 lr->lr_event = LE_PHASE2_DONE;
1769 } else if (rc == 0) {
1770 if (lfsck->li_flags & LPF_ALL_TGT) {
1771 lr->lr_event = LE_STOP;
1772 lr->lr_status = LS_STOPPED;
1774 lr->lr_event = LE_PEER_EXIT;
1775 switch (lfsck->li_status) {
1778 lr->lr_status = LS_CO_PAUSED;
1782 lr->lr_status = LS_CO_STOPPED;
1785 CDEBUG(D_LFSCK, "%s: LFSCK assistant unknown "
1786 "status: rc = %d\n",
1787 lfsck_lfsck2name(lfsck),
1789 lr->lr_status = LS_CO_FAILED;
1794 if (lfsck->li_flags & LPF_ALL_TGT) {
1795 lr->lr_event = LE_STOP;
1796 lr->lr_status = LS_FAILED;
1798 lr->lr_event = LE_PEER_EXIT;
1799 lr->lr_status = LS_CO_FAILED;
1803 rc1 = lfsck_assistant_notify_others(env, com, lr);
1805 CDEBUG(D_LFSCK, "%s: LFSCK assistant failed to notify "
1806 "others for %s quit: rc = %d\n",
1807 lfsck_lfsck2name(lfsck), lad->lad_name, rc1);
1811 CDEBUG(D_LFSCK, "%s: LFSCK assistant sync before exit\n",
1812 lfsck_lfsck2name(lfsck));
1814 /* Flush async updates before exit. */
1815 rc2 = dt_sync(env, lfsck->li_next);
1817 CDEBUG(D_LFSCK, "%s: LFSCK assistant synced before exit: rc = %d\n",
1818 lfsck_lfsck2name(lfsck), rc2);
1820 /* Under force exit case, some requests may be just freed without
1821 * verification, those objects should be re-handled when next run.
1822 * So not update the on-disk trace file under such case. */
1823 if (test_bit(LAD_IN_DOUBLE_SCAN, &lad->lad_flags)) {
1824 if (!test_bit(LAD_EXIT, &lad->lad_flags))
1825 rc1 = lao->la_double_scan_result(env, com, rc);
1827 CDEBUG(D_LFSCK, "%s: LFSCK assistant phase2 scan "
1828 "finished: rc = %d\n",
1829 lfsck_lfsck2name(lfsck), rc1 != 0 ? rc1 : rc);
1833 if (test_bit(LAD_IN_DOUBLE_SCAN, &lad->lad_flags))
1834 atomic_dec(&lfsck->li_double_scan_count);
1836 spin_lock(&lad->lad_lock);
1837 lad->lad_assistant_status = (rc1 != 0 ? rc1 : rc);
1838 thread_set_flags(athread, SVC_STOPPED);
1839 lad->lad_task = NULL;
1840 spin_unlock(&lad->lad_lock);
1842 CDEBUG(D_LFSCK, "%s: %s LFSCK assistant thread exit: rc = %d\n",
1843 lfsck_lfsck2name(lfsck), lad->lad_name,
1844 lad->lad_assistant_status);
1846 lfsck_thread_args_fini(lta);
1847 wake_up(&mthread->t_ctl_waitq);