4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (c) 2012, 2013, Intel Corporation.
26 * lustre/lfsck/lfsck_engine.c
28 * Author: Fan, Yong <fan.yong@intel.com>
31 #define DEBUG_SUBSYSTEM S_LFSCK
33 #include <lu_object.h>
34 #include <dt_object.h>
35 #include <lustre_net.h>
36 #include <lustre_fid.h>
37 #include <obd_support.h>
38 #include <lustre_lib.h>
40 #include "lfsck_internal.h"
42 static int lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie, __u16 *type)
45 int align = sizeof(*lt) - 1;
48 fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
49 *cookie = le64_to_cpu(ent->lde_hash);
50 ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
51 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
52 ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
54 if (unlikely(!(ent->lde_attrs & LUDA_TYPE)))
57 len = (ent->lde_namelen + align) & ~align;
58 lt = (struct luda_type *)(ent->lde_name + len);
59 *type = le16_to_cpu(lt->lt_type);
61 /* Make sure the name is terminated with '\0'. The data (object type)
62 * after ent::lde_name maybe broken, but we have stored such data in
63 * the output parameter @type as above. */
64 ent->lde_name[ent->lde_namelen] = '\0';
69 static void lfsck_di_oit_put(const struct lu_env *env, struct lfsck_instance *lfsck)
71 const struct dt_it_ops *iops;
74 spin_lock(&lfsck->li_lock);
75 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
76 di = lfsck->li_di_oit;
77 lfsck->li_di_oit = NULL;
78 spin_unlock(&lfsck->li_lock);
82 static void lfsck_di_dir_put(const struct lu_env *env, struct lfsck_instance *lfsck)
84 const struct dt_it_ops *iops;
87 spin_lock(&lfsck->li_lock);
88 iops = &lfsck->li_obj_dir->do_index_ops->dio_it;
89 di = lfsck->li_di_dir;
90 lfsck->li_di_dir = NULL;
91 lfsck->li_cookie_dir = 0;
92 spin_unlock(&lfsck->li_lock);
96 static void lfsck_close_dir(const struct lu_env *env,
97 struct lfsck_instance *lfsck)
99 struct dt_object *dir_obj = lfsck->li_obj_dir;
100 const struct dt_it_ops *dir_iops = &dir_obj->do_index_ops->dio_it;
101 struct dt_it *dir_di = lfsck->li_di_dir;
103 lfsck_di_dir_put(env, lfsck);
104 dir_iops->fini(env, dir_di);
105 lfsck->li_obj_dir = NULL;
106 lfsck_object_put(env, dir_obj);
109 static int lfsck_update_lma(const struct lu_env *env,
110 struct lfsck_instance *lfsck, struct dt_object *obj)
112 struct lfsck_thread_info *info = lfsck_env_info(env);
113 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
114 struct dt_device *dt = lfsck->li_bottom;
115 struct lustre_mdt_attrs *lma = &info->lti_lma;
122 if (bk->lb_param & LPF_DRYRUN)
125 buf = lfsck_buf_get(env, info->lti_lma_old, LMA_OLD_SIZE);
126 rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LMA, BYPASS_CAPA);
131 fl = LU_XATTR_CREATE;
132 lustre_lma_init(lma, lfsck_dto2fid(obj), LMAC_FID_ON_OST, 0);
134 if (rc != LMA_OLD_SIZE && rc != sizeof(struct lustre_mdt_attrs))
137 fl = LU_XATTR_REPLACE;
138 lustre_lma_swab(lma);
139 lustre_lma_init(lma, lfsck_dto2fid(obj),
140 lma->lma_compat | LMAC_FID_ON_OST,
143 lustre_lma_swab(lma);
145 th = dt_trans_create(env, dt);
149 buf = lfsck_buf_get(env, lma, sizeof(*lma));
150 rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th);
154 rc = dt_trans_start(env, dt, th);
158 rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th, BYPASS_CAPA);
163 dt_trans_stop(env, dt, th);
167 static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
170 if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
171 !dt_try_as_dir(env, obj)))
174 return dt_lookup(env, obj, (struct dt_rec *)fid,
175 (const struct dt_key *)"..", BYPASS_CAPA);
178 static int lfsck_needs_scan_dir(const struct lu_env *env,
179 struct lfsck_instance *lfsck,
180 struct dt_object *obj)
182 struct lu_fid *fid = &lfsck_env_info(env)->lti_fid;
186 if (list_empty(&lfsck->li_list_dir) || !S_ISDIR(lfsck_object_type(obj)))
190 /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
191 * which is the agent directory to manage the objects
192 * which name entries reside on remote MDTs. Related
193 * consistency verification will be processed in LFSCK
195 if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
197 lfsck_object_put(env, obj);
201 /* No need to check .lustre and its children. */
202 if (fid_seq_is_dot(fid_seq(lfsck_dto2fid(obj)))) {
204 lfsck_object_put(env, obj);
208 dt_read_lock(env, obj, MOR_TGT_CHILD);
209 if (unlikely(lfsck_is_dead_obj(obj))) {
210 dt_read_unlock(env, obj);
212 lfsck_object_put(env, obj);
216 rc = dt_xattr_get(env, obj,
217 lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
219 dt_read_unlock(env, obj);
222 lfsck_object_put(env, obj);
226 if (rc < 0 && rc != -ENODATA) {
228 lfsck_object_put(env, obj);
232 rc = lfsck_parent_fid(env, obj, fid);
234 lfsck_object_put(env, obj);
238 if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
241 obj = lfsck_object_find(env, lfsck, fid);
245 if (!dt_object_exists(obj)) {
246 lfsck_object_put(env, obj);
250 if (dt_object_remote(obj)) {
251 /* .lustre/lost+found/MDTxxx can be remote directory. */
252 if (fid_seq_is_dot(fid_seq(lfsck_dto2fid(obj))))
255 /* Other remote directory should be client
256 * visible and need to be checked. */
258 lfsck_object_put(env, obj);
267 /* LFSCK wrap functions */
269 static void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
272 struct lfsck_component *com;
274 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
275 com->lc_ops->lfsck_fail(env, com, new_checked);
279 static int lfsck_checkpoint(const struct lu_env *env,
280 struct lfsck_instance *lfsck)
282 struct lfsck_component *com;
286 if (likely(cfs_time_beforeq(cfs_time_current(),
287 lfsck->li_time_next_checkpoint)))
290 lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, false);
291 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
292 rc = com->lc_ops->lfsck_checkpoint(env, com, false);
297 lfsck->li_time_last_checkpoint = cfs_time_current();
298 lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
299 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
300 return rc1 != 0 ? rc1 : rc;
303 static int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck,
304 struct lfsck_start_param *lsp)
306 struct dt_object *obj = NULL;
307 struct lfsck_component *com;
308 struct lfsck_component *next;
309 struct lfsck_position *pos = NULL;
310 const struct dt_it_ops *iops =
311 &lfsck->li_obj_oit->do_index_ops->dio_it;
316 LASSERT(lfsck->li_obj_dir == NULL);
317 LASSERT(lfsck->li_di_dir == NULL);
319 lfsck->li_current_oit_processed = 0;
320 list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
321 com->lc_new_checked = 0;
322 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
325 rc = com->lc_ops->lfsck_prep(env, com, lsp);
330 (!lfsck_pos_is_zero(&com->lc_pos_start) &&
331 lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
332 pos = &com->lc_pos_start;
335 /* Init otable-based iterator. */
337 rc = iops->load(env, lfsck->li_di_oit, 0);
339 lfsck->li_oit_over = 1;
346 rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
350 lfsck->li_oit_over = 1;
352 if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
355 /* Find the directory for namespace-based traverse. */
356 obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
358 RETURN(PTR_ERR(obj));
360 /* XXX: Currently, skip remote object, the consistency for
361 * remote object will be processed in LFSCK phase III. */
362 if (!dt_object_exists(obj) || dt_object_remote(obj) ||
363 unlikely(!S_ISDIR(lfsck_object_type(obj))))
366 if (unlikely(!dt_try_as_dir(env, obj)))
367 GOTO(out, rc = -ENOTDIR);
369 /* Init the namespace-based directory traverse. */
370 iops = &obj->do_index_ops->dio_it;
371 di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
373 GOTO(out, rc = PTR_ERR(di));
375 LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
377 rc = iops->load(env, di, pos->lp_dir_cookie);
378 if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
379 rc = iops->next(env, di);
389 lfsck->li_obj_dir = lfsck_object_get(obj);
390 lfsck->li_cookie_dir = iops->store(env, di);
391 spin_lock(&lfsck->li_lock);
392 lfsck->li_di_dir = di;
393 spin_unlock(&lfsck->li_lock);
399 lfsck_object_put(env, obj);
402 list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
404 com->lc_ops->lfsck_post(env, com, rc, true);
410 lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, true);
411 lfsck->li_pos_current = lfsck->li_pos_checkpoint;
412 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
413 rc = com->lc_ops->lfsck_checkpoint(env, com, true);
418 lfsck->li_time_last_checkpoint = cfs_time_current();
419 lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
420 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
424 static int lfsck_exec_oit(const struct lu_env *env,
425 struct lfsck_instance *lfsck, struct dt_object *obj)
427 struct lfsck_component *com;
428 const struct dt_it_ops *iops;
433 LASSERT(lfsck->li_obj_dir == NULL);
435 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
436 rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
441 rc = lfsck_needs_scan_dir(env, lfsck, obj);
445 if (unlikely(!dt_try_as_dir(env, obj)))
446 GOTO(out, rc = -ENOTDIR);
448 iops = &obj->do_index_ops->dio_it;
449 di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
451 GOTO(out, rc = PTR_ERR(di));
453 rc = iops->load(env, di, 0);
455 rc = iops->next(env, di);
465 lfsck->li_obj_dir = lfsck_object_get(obj);
466 lfsck->li_cookie_dir = iops->store(env, di);
467 spin_lock(&lfsck->li_lock);
468 lfsck->li_di_dir = di;
469 spin_unlock(&lfsck->li_lock);
475 lfsck_fail(env, lfsck, false);
476 return (rc > 0 ? 0 : rc);
479 static int lfsck_exec_dir(const struct lu_env *env,
480 struct lfsck_instance *lfsck,
481 struct lu_dirent *ent, __u16 type)
483 struct lfsck_component *com;
486 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
487 rc = com->lc_ops->lfsck_exec_dir(env, com, ent, type);
494 static int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
497 struct lfsck_component *com;
498 struct lfsck_component *next;
502 lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, false);
503 list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
504 rc = com->lc_ops->lfsck_post(env, com, result, false);
509 lfsck->li_time_last_checkpoint = cfs_time_current();
510 lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
511 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
513 /* Ignore some component post failure to make other can go ahead. */
517 static int lfsck_double_scan(const struct lu_env *env,
518 struct lfsck_instance *lfsck)
520 struct lfsck_component *com;
521 struct lfsck_component *next;
522 struct l_wait_info lwi = { 0 };
526 list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
527 if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
530 rc = com->lc_ops->lfsck_double_scan(env, com);
535 l_wait_event(lfsck->li_thread.t_ctl_waitq,
536 atomic_read(&lfsck->li_double_scan_count) == 0,
539 if (lfsck->li_status != LS_PAUSED &&
540 lfsck->li_status != LS_CO_PAUSED) {
541 list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
543 spin_lock(&lfsck->li_lock);
544 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
545 spin_unlock(&lfsck->li_lock);
549 return rc1 != 0 ? rc1 : rc;
552 static void lfsck_quit(const struct lu_env *env, struct lfsck_instance *lfsck)
554 struct lfsck_component *com;
555 struct lfsck_component *next;
557 list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
559 if (com->lc_ops->lfsck_quit != NULL)
560 com->lc_ops->lfsck_quit(env, com);
562 spin_lock(&lfsck->li_lock);
563 list_del_init(&com->lc_link_dir);
564 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
565 spin_unlock(&lfsck->li_lock);
568 list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
570 if (com->lc_ops->lfsck_quit != NULL)
571 com->lc_ops->lfsck_quit(env, com);
573 spin_lock(&lfsck->li_lock);
574 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
575 spin_unlock(&lfsck->li_lock);
581 static int lfsck_master_dir_engine(const struct lu_env *env,
582 struct lfsck_instance *lfsck)
584 struct lfsck_thread_info *info = lfsck_env_info(env);
585 struct dt_object *dir = lfsck->li_obj_dir;
586 const struct dt_it_ops *iops = &dir->do_index_ops->dio_it;
587 struct dt_it *di = lfsck->li_di_dir;
588 struct lu_dirent *ent =
589 (struct lu_dirent *)info->lti_key;
590 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
591 struct ptlrpc_thread *thread = &lfsck->li_thread;
597 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
599 struct l_wait_info lwi;
601 lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
603 l_wait_event(thread->t_ctl_waitq,
604 !thread_is_running(thread),
608 lfsck->li_new_scanned++;
609 rc = iops->rec(env, di, (struct dt_rec *)ent,
612 rc = lfsck_unpack_ent(ent, &lfsck->li_cookie_dir,
616 CDEBUG(D_LFSCK, "%s: scan dir failed at rec(), "
617 "parent "DFID", cookie "LPX64": rc = %d\n",
618 lfsck_lfsck2name(lfsck),
619 PFID(lfsck_dto2fid(dir)),
620 lfsck->li_cookie_dir, rc);
621 lfsck_fail(env, lfsck, true);
622 if (bk->lb_param & LPF_FAILOUT)
628 if (ent->lde_attrs & LUDA_IGNORE)
631 /* The type in the @ent structure may has been overwritten,
632 * so we need to pass the @type parameter independently. */
633 rc = lfsck_exec_dir(env, lfsck, ent, type);
634 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
638 rc = lfsck_checkpoint(env, lfsck);
639 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
643 lfsck_control_speed(lfsck);
644 if (unlikely(!thread_is_running(thread))) {
645 CDEBUG(D_LFSCK, "%s: scan dir exit for engine stop, "
646 "parent "DFID", cookie "LPX64"\n",
647 lfsck_lfsck2name(lfsck),
648 PFID(lfsck_dto2fid(dir)),
649 lfsck->li_cookie_dir);
653 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
654 spin_lock(&lfsck->li_lock);
655 thread_set_flags(thread, SVC_STOPPING);
656 spin_unlock(&lfsck->li_lock);
660 rc = iops->next(env, di);
663 if (rc > 0 && !lfsck->li_oit_over)
664 lfsck_close_dir(env, lfsck);
669 static int lfsck_master_oit_engine(const struct lu_env *env,
670 struct lfsck_instance *lfsck)
672 struct lfsck_thread_info *info = lfsck_env_info(env);
673 const struct dt_it_ops *iops =
674 &lfsck->li_obj_oit->do_index_ops->dio_it;
675 struct dt_it *di = lfsck->li_di_oit;
676 struct lu_fid *fid = &info->lti_fid;
677 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
678 struct ptlrpc_thread *thread = &lfsck->li_thread;
680 lfsck_dev_idx(lfsck->li_bottom);
685 struct dt_object *target;
686 bool update_lma = false;
688 if (lfsck->li_di_dir != NULL) {
689 rc = lfsck_master_dir_engine(env, lfsck);
694 if (unlikely(lfsck->li_oit_over))
697 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY1) &&
699 struct l_wait_info lwi;
701 lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
703 l_wait_event(thread->t_ctl_waitq,
704 !thread_is_running(thread),
708 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
711 lfsck->li_current_oit_processed = 1;
712 lfsck->li_new_scanned++;
713 lfsck->li_pos_current.lp_oit_cookie = iops->store(env, di);
714 rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
716 CDEBUG(D_LFSCK, "%s: OIT scan failed at rec(): "
717 "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
718 lfsck_fail(env, lfsck, true);
719 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
725 if (fid_is_idif(fid)) {
726 __u32 idx1 = fid_idif_ost_idx(fid);
728 LASSERT(!lfsck->li_master);
730 /* It is an old format device, update the LMA. */
732 struct ost_id *oi = &info->lti_oi;
734 fid_to_ostid(fid, oi);
735 ostid_to_fid(fid, oi, idx);
738 } else if (!fid_is_norm(fid) && !fid_is_igif(fid) &&
739 !fid_is_last_id(fid) && !fid_is_root(fid) &&
740 !fid_seq_is_dot(fid_seq(fid))) {
741 /* If the FID/object is only used locally and invisible
742 * to external nodes, then LFSCK will not handle it. */
746 target = lfsck_object_find(env, lfsck, fid);
747 if (IS_ERR(target)) {
748 CDEBUG(D_LFSCK, "%s: OIT scan failed at find target "
749 DFID", cookie "LPU64": rc = %d\n",
750 lfsck_lfsck2name(lfsck), PFID(fid),
751 iops->store(env, di), rc);
752 lfsck_fail(env, lfsck, true);
753 if (bk->lb_param & LPF_FAILOUT)
754 RETURN(PTR_ERR(target));
759 /* XXX: Currently, skip remote object, the consistency for
760 * remote object will be processed in LFSCK phase III. */
761 if (dt_object_exists(target) && !dt_object_remote(target)) {
763 rc = lfsck_update_lma(env, lfsck, target);
765 CDEBUG(D_LFSCK, "%s: fail to update "
766 "LMA for "DFID": rc = %d\n",
767 lfsck_lfsck2name(lfsck),
768 PFID(lfsck_dto2fid(target)), rc);
771 rc = lfsck_exec_oit(env, lfsck, target);
773 lfsck_object_put(env, target);
774 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
778 rc = lfsck_checkpoint(env, lfsck);
779 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
783 lfsck_control_speed(lfsck);
785 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
786 spin_lock(&lfsck->li_lock);
787 thread_set_flags(thread, SVC_STOPPING);
788 spin_unlock(&lfsck->li_lock);
792 rc = iops->next(env, di);
793 if (unlikely(rc > 0))
794 lfsck->li_oit_over = 1;
795 else if (likely(rc == 0))
796 lfsck->li_current_oit_processed = 0;
798 if (unlikely(!thread_is_running(thread))) {
799 CDEBUG(D_LFSCK, "%s: OIT scan exit for engine stop, "
800 "cookie "LPU64"\n", lfsck_lfsck2name(lfsck),
801 iops->store(env, di));
804 } while (rc == 0 || lfsck->li_di_dir != NULL);
809 int lfsck_master_engine(void *args)
811 struct lfsck_thread_args *lta = args;
812 struct lu_env *env = <a->lta_env;
813 struct lfsck_instance *lfsck = lta->lta_lfsck;
814 struct ptlrpc_thread *thread = &lfsck->li_thread;
815 struct dt_object *oit_obj = lfsck->li_obj_oit;
816 const struct dt_it_ops *oit_iops = &oit_obj->do_index_ops->dio_it;
817 struct dt_it *oit_di;
818 struct l_wait_info lwi = { 0 };
822 if (lfsck->li_master &&
823 (!list_empty(&lfsck->li_list_scan) ||
824 !list_empty(&lfsck->li_list_double_scan))) {
825 rc = lfsck_verify_lpf(env, lfsck);
826 /* Fail to verify the .lustre/lost+found/MDTxxxx/ may be not
827 * fatal, because the .lustre/lost+found/ maybe not accessed
828 * by the LFSCK if it does not add orphans or others to such
829 * directory. So go ahead until hit failure when really uses
832 CDEBUG(D_LFSCK, "%s: master engine fail to verify the "
833 ".lustre/lost+found/, go ahead: rc = %d\n",
834 lfsck_lfsck2name(lfsck), rc);
837 oit_di = oit_iops->init(env, oit_obj, lfsck->li_args_oit, BYPASS_CAPA);
838 if (IS_ERR(oit_di)) {
839 rc = PTR_ERR(oit_di);
840 CDEBUG(D_LFSCK, "%s: master engine fail to init iteration: "
841 "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
846 spin_lock(&lfsck->li_lock);
847 lfsck->li_di_oit = oit_di;
848 spin_unlock(&lfsck->li_lock);
849 rc = lfsck_prep(env, lfsck, lta->lta_lsp);
853 CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = %#x, dir_flags = %#x, "
854 "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
855 ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
856 lfsck->li_pos_checkpoint.lp_oit_cookie,
857 lfsck->li_pos_checkpoint.lp_dir_cookie,
858 PFID(&lfsck->li_pos_checkpoint.lp_dir_parent),
861 spin_lock(&lfsck->li_lock);
862 thread_set_flags(thread, SVC_RUNNING);
863 spin_unlock(&lfsck->li_lock);
864 wake_up_all(&thread->t_ctl_waitq);
866 l_wait_event(thread->t_ctl_waitq,
867 lfsck->li_start_unplug ||
868 !thread_is_running(thread),
870 if (!thread_is_running(thread))
871 GOTO(fini_oit, rc = 0);
873 if (!list_empty(&lfsck->li_list_scan) ||
874 list_empty(&lfsck->li_list_double_scan))
875 rc = lfsck_master_oit_engine(env, lfsck);
879 CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = %#x, dir_flags = %#x, "
880 "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
881 ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
882 lfsck->li_pos_checkpoint.lp_oit_cookie,
883 lfsck->li_pos_checkpoint.lp_dir_cookie,
884 PFID(&lfsck->li_pos_checkpoint.lp_dir_parent),
887 if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
888 rc = lfsck_post(env, lfsck, rc);
890 if (lfsck->li_di_dir != NULL)
891 lfsck_close_dir(env, lfsck);
894 lfsck_di_oit_put(env, lfsck);
895 oit_iops->fini(env, oit_di);
897 if (!list_empty(&lfsck->li_list_double_scan))
898 rc = lfsck_double_scan(env, lfsck);
902 lfsck_quit(env, lfsck);
905 /* XXX: Purge the pinned objects in the future. */
908 spin_lock(&lfsck->li_lock);
909 thread_set_flags(thread, SVC_STOPPED);
910 spin_unlock(&lfsck->li_lock);
911 wake_up_all(&thread->t_ctl_waitq);
912 lfsck_thread_args_fini(lta);
916 static inline bool lfsck_assistant_req_empty(struct lfsck_assistant_data *lad)
920 spin_lock(&lad->lad_lock);
921 if (list_empty(&lad->lad_req_list))
923 spin_unlock(&lad->lad_lock);
929 * Query the LFSCK status from the instatnces on remote servers.
931 * The LFSCK assistant thread queries the LFSCK instances on other
932 * servers (MDT/OST) about their status, such as whether they have
933 * finished the phase1/phase2 scanning or not, and so on.
935 * \param[in] env pointer to the thread context
936 * \param[in] com pointer to the lfsck component
938 * \retval 0 for success
939 * \retval negative error number on failure
941 static int lfsck_assistant_query_others(const struct lu_env *env,
942 struct lfsck_component *com)
944 struct lfsck_thread_info *info = lfsck_env_info(env);
945 struct lfsck_request *lr = &info->lti_lr;
946 struct lfsck_async_interpret_args *laia = &info->lti_laia;
947 struct lfsck_instance *lfsck = com->lc_lfsck;
948 struct lfsck_assistant_data *lad = com->lc_data;
949 struct ptlrpc_request_set *set;
950 struct lfsck_tgt_descs *ltds;
951 struct lfsck_tgt_desc *ltd;
952 struct list_head *phase_head;
957 set = ptlrpc_prep_set();
961 lad->lad_touch_gen++;
962 memset(lr, 0, sizeof(*lr));
963 lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
964 lr->lr_event = LE_QUERY;
965 lr->lr_active = com->lc_type;
966 laia->laia_com = com;
968 laia->laia_shared = 0;
970 if (!list_empty(&lad->lad_mdt_phase1_list)) {
971 ltds = &lfsck->li_mdt_descs;
973 phase_head = &lad->lad_mdt_phase1_list;
974 } else if (com->lc_type != LFSCK_TYPE_LAYOUT) {
979 ltds = &lfsck->li_ost_descs;
980 lr->lr_flags = LEF_TO_OST;
981 phase_head = &lad->lad_ost_phase1_list;
984 laia->laia_ltds = ltds;
985 spin_lock(<ds->ltd_lock);
986 while (!list_empty(phase_head)) {
987 struct list_head *phase_list;
990 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
991 ltd = list_entry(phase_head->next,
992 struct lfsck_tgt_desc,
993 ltd_layout_phase_list);
994 phase_list = <d->ltd_layout_phase_list;
995 gen = <d->ltd_layout_gen;
997 ltd = list_entry(phase_head->next,
998 struct lfsck_tgt_desc,
999 ltd_namespace_phase_list);
1000 phase_list = <d->ltd_namespace_phase_list;
1001 gen = <d->ltd_namespace_gen;
1004 if (*gen == lad->lad_touch_gen)
1007 *gen = lad->lad_touch_gen;
1008 list_move_tail(phase_list, phase_head);
1009 atomic_inc(<d->ltd_ref);
1010 laia->laia_ltd = ltd;
1011 spin_unlock(<ds->ltd_lock);
1012 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1013 lfsck_async_interpret_common,
1016 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to query "
1017 "%s %x for %s: rc = %d\n",
1018 lfsck_lfsck2name(lfsck),
1019 (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1020 ltd->ltd_index, lad->lad_name, rc);
1024 spin_lock(<ds->ltd_lock);
1026 spin_unlock(<ds->ltd_lock);
1028 rc = ptlrpc_set_wait(set);
1030 ptlrpc_set_destroy(set);
1034 if (com->lc_type == LFSCK_TYPE_LAYOUT && !(lr->lr_flags & LEF_TO_OST) &&
1035 list_empty(&lad->lad_mdt_phase1_list))
1039 ptlrpc_set_destroy(set);
1041 RETURN(rc1 != 0 ? rc1 : rc);
1045 * Notify the LFSCK event to the instatnces on remote servers.
1047 * The LFSCK assistant thread notifies the LFSCK instances on other
1048 * servers (MDT/OST) about some events, such as start new scanning,
1049 * stop the scanning, this LFSCK instance will exit, and so on.
1051 * \param[in] env pointer to the thread context
1052 * \param[in] com pointer to the lfsck component
1053 * \param[in] lr pointer to the LFSCK event request
1055 * \retval 0 for success
1056 * \retval negative error number on failure
1058 static int lfsck_assistant_notify_others(const struct lu_env *env,
1059 struct lfsck_component *com,
1060 struct lfsck_request *lr)
1062 struct lfsck_thread_info *info = lfsck_env_info(env);
1063 struct lfsck_async_interpret_args *laia = &info->lti_laia;
1064 struct lfsck_instance *lfsck = com->lc_lfsck;
1065 struct lfsck_assistant_data *lad = com->lc_data;
1066 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1067 struct ptlrpc_request_set *set;
1068 struct lfsck_tgt_descs *ltds;
1069 struct lfsck_tgt_desc *ltd;
1070 struct lfsck_tgt_desc *next;
1076 set = ptlrpc_prep_set();
1080 lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1081 lr->lr_active = com->lc_type;
1082 laia->laia_com = com;
1084 laia->laia_shared = 0;
1086 switch (lr->lr_event) {
1088 if (com->lc_type != LFSCK_TYPE_LAYOUT)
1091 lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
1092 LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ;
1093 lr->lr_speed = bk->lb_speed_limit;
1094 lr->lr_version = bk->lb_version;
1095 lr->lr_param |= bk->lb_param;
1096 lr->lr_async_windows = bk->lb_async_windows;
1097 lr->lr_flags = LEF_TO_OST;
1099 /* Notify OSTs firstly, then handle other MDTs if needed. */
1100 ltds = &lfsck->li_ost_descs;
1101 laia->laia_ltds = ltds;
1102 down_read(<ds->ltd_rw_sem);
1103 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1104 ltd = lfsck_tgt_get(ltds, idx);
1105 LASSERT(ltd != NULL);
1107 laia->laia_ltd = ltd;
1108 ltd->ltd_layout_done = 0;
1109 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1110 lfsck_async_interpret_common,
1111 laia, LFSCK_NOTIFY);
1113 lfsck_lad_set_bitmap(env, com, idx);
1114 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
1115 "notify OST %x for %s start: rc = %d\n",
1116 lfsck_lfsck2name(lfsck), idx,
1121 up_read(<ds->ltd_rw_sem);
1124 rc = ptlrpc_set_wait(set);
1126 ptlrpc_set_destroy(set);
1131 if (!(bk->lb_param & LPF_ALL_TGT))
1134 /* link other MDT targets locallly. */
1135 ltds = &lfsck->li_mdt_descs;
1136 spin_lock(<ds->ltd_lock);
1137 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1138 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1139 ltd = LTD_TGT(ltds, idx);
1140 LASSERT(ltd != NULL);
1142 if (!list_empty(<d->ltd_layout_list))
1145 list_add_tail(<d->ltd_layout_list,
1146 &lad->lad_mdt_list);
1147 list_add_tail(<d->ltd_layout_phase_list,
1148 &lad->lad_mdt_phase1_list);
1151 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1152 ltd = LTD_TGT(ltds, idx);
1153 LASSERT(ltd != NULL);
1155 if (!list_empty(<d->ltd_namespace_list))
1158 list_add_tail(<d->ltd_namespace_list,
1159 &lad->lad_mdt_list);
1160 list_add_tail(<d->ltd_namespace_phase_list,
1161 &lad->lad_mdt_phase1_list);
1164 spin_unlock(<ds->ltd_lock);
1167 case LE_PHASE2_DONE:
1168 case LE_PEER_EXIT: {
1169 struct list_head *phase_head;
1171 /* Handle other MDTs firstly if needed, then notify the OSTs. */
1172 if (bk->lb_param & LPF_ALL_TGT) {
1173 phase_head = &lad->lad_mdt_list;
1174 ltds = &lfsck->li_mdt_descs;
1175 if (lr->lr_event == LE_STOP) {
1176 /* unlink other MDT targets locallly. */
1177 spin_lock(<ds->ltd_lock);
1178 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1179 list_for_each_entry_safe(ltd, next,
1180 phase_head, ltd_layout_list) {
1182 <d->ltd_layout_phase_list);
1184 <d->ltd_layout_list);
1187 list_for_each_entry_safe(ltd, next,
1189 ltd_namespace_list) {
1191 <d->ltd_namespace_phase_list);
1193 <d->ltd_namespace_list);
1196 spin_unlock(<ds->ltd_lock);
1198 if (com->lc_type != LFSCK_TYPE_LAYOUT)
1201 lr->lr_flags |= LEF_TO_OST;
1202 phase_head = &lad->lad_ost_list;
1203 ltds = &lfsck->li_ost_descs;
1205 lr->lr_flags &= ~LEF_TO_OST;
1207 } else if (com->lc_type != LFSCK_TYPE_LAYOUT) {
1210 lr->lr_flags |= LEF_TO_OST;
1211 phase_head = &lad->lad_ost_list;
1212 ltds = &lfsck->li_ost_descs;
1216 laia->laia_ltds = ltds;
1217 spin_lock(<ds->ltd_lock);
1218 while (!list_empty(phase_head)) {
1219 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1220 ltd = list_entry(phase_head->next,
1221 struct lfsck_tgt_desc,
1223 if (!list_empty(<d->ltd_layout_phase_list))
1225 <d->ltd_layout_phase_list);
1226 list_del_init(<d->ltd_layout_list);
1228 ltd = list_entry(phase_head->next,
1229 struct lfsck_tgt_desc,
1230 ltd_namespace_list);
1231 if (!list_empty(<d->ltd_namespace_phase_list))
1233 <d->ltd_namespace_phase_list);
1234 list_del_init(<d->ltd_namespace_list);
1236 atomic_inc(<d->ltd_ref);
1237 laia->laia_ltd = ltd;
1238 spin_unlock(<ds->ltd_lock);
1239 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1240 lfsck_async_interpret_common,
1241 laia, LFSCK_NOTIFY);
1243 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
1244 "notify %s %x for %s stop/phase2_done/"
1245 "peer_exit: rc = %d\n",
1246 lfsck_lfsck2name(lfsck),
1247 (lr->lr_flags & LEF_TO_OST) ?
1248 "OST" : "MDT", ltd->ltd_index,
1252 spin_lock(<ds->ltd_lock);
1254 spin_unlock(<ds->ltd_lock);
1256 rc = ptlrpc_set_wait(set);
1258 ptlrpc_set_destroy(set);
1262 if (com->lc_type == LFSCK_TYPE_LAYOUT &&
1263 !(lr->lr_flags & LEF_TO_OST)) {
1264 lr->lr_flags |= LEF_TO_OST;
1265 phase_head = &lad->lad_ost_list;
1266 ltds = &lfsck->li_ost_descs;
1271 case LE_PHASE1_DONE:
1272 lad->lad_ops->la_sync_failures(env, com, lr);
1273 lad->lad_touch_gen++;
1274 ltds = &lfsck->li_mdt_descs;
1275 laia->laia_ltds = ltds;
1276 spin_lock(<ds->ltd_lock);
1277 while (!list_empty(&lad->lad_mdt_list)) {
1278 struct list_head *list;
1281 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1282 ltd = list_entry(lad->lad_mdt_list.next,
1283 struct lfsck_tgt_desc,
1285 list = <d->ltd_layout_list;
1286 gen = <d->ltd_layout_gen;
1288 ltd = list_entry(lad->lad_mdt_list.next,
1289 struct lfsck_tgt_desc,
1290 ltd_namespace_list);
1291 list = <d->ltd_namespace_list;
1292 gen = <d->ltd_namespace_gen;
1295 if (*gen == lad->lad_touch_gen)
1298 *gen = lad->lad_touch_gen;
1299 list_move_tail(list, &lad->lad_mdt_list);
1300 atomic_inc(<d->ltd_ref);
1301 laia->laia_ltd = ltd;
1302 spin_unlock(<ds->ltd_lock);
1303 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1304 lfsck_async_interpret_common,
1305 laia, LFSCK_NOTIFY);
1307 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
1308 "notify MDT %x for %s phase1 done: "
1309 "rc = %d\n", lfsck_lfsck2name(lfsck),
1310 ltd->ltd_index, lad->lad_name, rc);
1313 spin_lock(<ds->ltd_lock);
1315 spin_unlock(<ds->ltd_lock);
1318 CDEBUG(D_LFSCK, "%s: LFSCK assistant unexpected LFSCK event: "
1319 "rc = %d\n", lfsck_lfsck2name(lfsck), lr->lr_event);
1324 rc1 = ptlrpc_set_wait(set);
1325 ptlrpc_set_destroy(set);
1327 RETURN(rc != 0 ? rc : rc1);
1331 * The LFSCK assistant thread is triggered by the LFSCK main engine.
1332 * They co-work together as an asynchronous pipeline: the LFSCK main
1333 * engine scans the system and pre-fetches the objects, attributes,
1334 * or name entries, etc, and pushes them into the pipeline as input
1335 * requests for the LFSCK assistant thread; on the other end of the
1336 * pipeline, the LFSCK assistant thread performs the real check and
1337 * repair for every request from the main engine.
1339 * Generally, the assistant engine may be blocked when check/repair
1340 * something, so the LFSCK main engine will run some faster. On the
1341 * other hand, the LFSCK main engine will drive multiple assistant
1342 * threads in parallel, means for each LFSCK component on the master
1343 * (such as layout LFSCK, namespace LFSCK), there is an independent
1344 * LFSCK assistant thread. So under such 1:N multiple asynchronous
1345 * pipelines mode, the whole LFSCK performance will be much better
1346 * than check/repair everything by the LFSCK main engine itself.
1348 int lfsck_assistant_engine(void *args)
1350 struct lfsck_thread_args *lta = args;
1351 struct lu_env *env = <a->lta_env;
1352 struct lfsck_component *com = lta->lta_com;
1353 struct lfsck_instance *lfsck = lta->lta_lfsck;
1354 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1355 struct lfsck_position *pos = &com->lc_pos_start;
1356 struct lfsck_thread_info *info = lfsck_env_info(env);
1357 struct lfsck_request *lr = &info->lti_lr;
1358 struct lfsck_assistant_data *lad = com->lc_data;
1359 struct ptlrpc_thread *mthread = &lfsck->li_thread;
1360 struct ptlrpc_thread *athread = &lad->lad_thread;
1361 struct lfsck_assistant_operations *lao = lad->lad_ops;
1362 struct lfsck_assistant_req *lar;
1363 struct l_wait_info lwi = { 0 };
1368 CDEBUG(D_LFSCK, "%s: %s LFSCK assistant thread start\n",
1369 lfsck_lfsck2name(lfsck), lad->lad_name);
1371 memset(lr, 0, sizeof(*lr));
1372 lr->lr_event = LE_START;
1373 if (pos->lp_oit_cookie <= 1)
1374 lr->lr_param = LPF_RESET;
1375 rc = lfsck_assistant_notify_others(env, com, lr);
1377 CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to notify others "
1378 "to start %s: rc = %d\n",
1379 lfsck_lfsck2name(lfsck), lad->lad_name, rc);
1383 spin_lock(&lad->lad_lock);
1384 thread_set_flags(athread, SVC_RUNNING);
1385 spin_unlock(&lad->lad_lock);
1386 wake_up_all(&mthread->t_ctl_waitq);
1389 while (!list_empty(&lad->lad_req_list)) {
1390 bool wakeup = false;
1392 if (unlikely(lad->lad_exit ||
1393 !thread_is_running(mthread)))
1394 GOTO(cleanup1, rc = lad->lad_post_result);
1396 lar = list_entry(lad->lad_req_list.next,
1397 struct lfsck_assistant_req,
1399 /* Only the lfsck_assistant_engine thread itself can
1400 * remove the "lar" from the head of the list, LFSCK
1401 * engine thread only inserts other new "lar" at the
1402 * end of the list. So it is safe to handle current
1403 * "lar" without the spin_lock. */
1404 rc = lao->la_handler_p1(env, com, lar);
1405 spin_lock(&lad->lad_lock);
1406 list_del_init(&lar->lar_list);
1407 lad->lad_prefetched--;
1408 /* Wake up the main engine thread only when the list
1409 * is empty or half of the prefetched items have been
1410 * handled to avoid too frequent thread schedule. */
1411 if (lad->lad_prefetched == 0 ||
1412 (bk->lb_async_windows != 0 &&
1413 bk->lb_async_windows / 2 ==
1414 lad->lad_prefetched))
1416 spin_unlock(&lad->lad_lock);
1418 wake_up_all(&mthread->t_ctl_waitq);
1420 lao->la_req_fini(env, lar);
1421 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
1425 l_wait_event(athread->t_ctl_waitq,
1426 !lfsck_assistant_req_empty(lad) ||
1429 lad->lad_to_double_scan,
1432 if (unlikely(lad->lad_exit))
1433 GOTO(cleanup1, rc = lad->lad_post_result);
1435 if (!list_empty(&lad->lad_req_list))
1438 if (lad->lad_to_post) {
1439 CDEBUG(D_LFSCK, "%s: %s LFSCK assistant thread post\n",
1440 lfsck_lfsck2name(lfsck), lad->lad_name);
1442 if (unlikely(lad->lad_exit))
1443 GOTO(cleanup1, rc = lad->lad_post_result);
1445 lad->lad_to_post = 0;
1446 LASSERT(lad->lad_post_result > 0);
1448 memset(lr, 0, sizeof(*lr));
1449 lr->lr_event = LE_PHASE1_DONE;
1450 lr->lr_status = lad->lad_post_result;
1451 rc = lfsck_assistant_notify_others(env, com, lr);
1453 CDEBUG(D_LFSCK, "%s: LFSCK assistant failed to "
1454 "notify others for %s post: rc = %d\n",
1455 lfsck_lfsck2name(lfsck),
1458 /* Wakeup the master engine to go ahead. */
1459 wake_up_all(&mthread->t_ctl_waitq);
1462 if (lad->lad_to_double_scan) {
1463 lad->lad_to_double_scan = 0;
1464 atomic_inc(&lfsck->li_double_scan_count);
1465 lad->lad_in_double_scan = 1;
1466 wake_up_all(&mthread->t_ctl_waitq);
1468 com->lc_new_checked = 0;
1469 com->lc_new_scanned = 0;
1470 com->lc_time_last_checkpoint = cfs_time_current();
1471 com->lc_time_next_checkpoint =
1472 com->lc_time_last_checkpoint +
1473 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1475 /* Flush async updates before handling orphan. */
1476 dt_sync(env, lfsck->li_next);
1478 CDEBUG(D_LFSCK, "%s: LFSCK assistant phase2 "
1479 "scan start\n", lfsck_lfsck2name(lfsck));
1481 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_DOUBLESCAN))
1482 GOTO(cleanup2, rc = 0);
1484 while (lad->lad_in_double_scan) {
1485 rc = lfsck_assistant_query_others(env, com);
1486 if (lfsck_phase2_next_ready(lad))
1492 /* Pull LFSCK status on related targets once
1493 * per 30 seconds if we are not notified. */
1494 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(30),
1495 cfs_time_seconds(1),
1497 rc = l_wait_event(athread->t_ctl_waitq,
1498 lfsck_phase2_next_ready(lad) ||
1500 !thread_is_running(mthread),
1503 if (unlikely(lad->lad_exit ||
1504 !thread_is_running(mthread)))
1505 GOTO(cleanup2, rc = 0);
1507 if (rc == -ETIMEDOUT)
1514 rc = lao->la_handler_p2(env, com);
1518 if (unlikely(lad->lad_exit ||
1519 !thread_is_running(mthread)))
1520 GOTO(cleanup2, rc = 0);
1526 /* Cleanup the unfinished requests. */
1527 spin_lock(&lad->lad_lock);
1529 lad->lad_assistant_status = rc;
1531 if (lad->lad_exit && lad->lad_post_result <= 0)
1532 lao->la_fill_pos(env, com, &lfsck->li_pos_checkpoint);
1534 while (!list_empty(&lad->lad_req_list)) {
1535 lar = list_entry(lad->lad_req_list.next,
1536 struct lfsck_assistant_req,
1538 list_del_init(&lar->lar_list);
1539 lad->lad_prefetched--;
1540 spin_unlock(&lad->lad_lock);
1541 lao->la_req_fini(env, lar);
1542 spin_lock(&lad->lad_lock);
1544 spin_unlock(&lad->lad_lock);
1546 LASSERTF(lad->lad_prefetched == 0, "unmatched prefeteched objs %d\n",
1547 lad->lad_prefetched);
1550 memset(lr, 0, sizeof(*lr));
1552 lr->lr_event = LE_PHASE2_DONE;
1554 } else if (rc == 0) {
1555 if (lfsck->li_flags & LPF_ALL_TGT) {
1556 lr->lr_event = LE_STOP;
1557 lr->lr_status = LS_STOPPED;
1559 lr->lr_event = LE_PEER_EXIT;
1560 switch (lfsck->li_status) {
1563 lr->lr_status = LS_CO_PAUSED;
1567 lr->lr_status = LS_CO_STOPPED;
1570 CDEBUG(D_LFSCK, "%s: LFSCK assistant unknown "
1571 "status: rc = %d\n",
1572 lfsck_lfsck2name(lfsck),
1574 lr->lr_status = LS_CO_FAILED;
1579 if (lfsck->li_flags & LPF_ALL_TGT) {
1580 lr->lr_event = LE_STOP;
1581 lr->lr_status = LS_FAILED;
1583 lr->lr_event = LE_PEER_EXIT;
1584 lr->lr_status = LS_CO_FAILED;
1588 rc1 = lfsck_assistant_notify_others(env, com, lr);
1590 CDEBUG(D_LFSCK, "%s: LFSCK assistant failed to notify "
1591 "others for %s quit: rc = %d\n",
1592 lfsck_lfsck2name(lfsck), lad->lad_name, rc1);
1596 /* Flush async updates before exit. */
1597 dt_sync(env, lfsck->li_next);
1599 /* Under force exit case, some requests may be just freed without
1600 * verification, those objects should be re-handled when next run.
1601 * So not update the on-disk tracing file under such case. */
1602 if (lad->lad_in_double_scan) {
1604 rc1 = lao->la_double_scan_result(env, com, rc);
1606 CDEBUG(D_LFSCK, "%s: LFSCK assistant phase2 scan "
1607 "finished: rc = %d\n",
1608 lfsck_lfsck2name(lfsck), rc1 != 0 ? rc1 : rc);
1612 if (lad->lad_in_double_scan)
1613 atomic_dec(&lfsck->li_double_scan_count);
1615 spin_lock(&lad->lad_lock);
1616 lad->lad_assistant_status = (rc1 != 0 ? rc1 : rc);
1617 thread_set_flags(athread, SVC_STOPPED);
1618 wake_up_all(&mthread->t_ctl_waitq);
1619 spin_unlock(&lad->lad_lock);
1621 CDEBUG(D_LFSCK, "%s: %s LFSCK assistant thread exit: rc = %d\n",
1622 lfsck_lfsck2name(lfsck), lad->lad_name,
1623 lad->lad_assistant_status);
1625 lfsck_thread_args_fini(lta);