4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (c) 2013, 2014, Intel Corporation.
26 * lustre/lfsck/lfsck_lib.c
28 * Author: Fan, Yong <fan.yong@intel.com>
31 #define DEBUG_SUBSYSTEM S_LFSCK
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
43 #include "lfsck_internal.h"
45 #define LFSCK_CHECKPOINT_SKIP 1
47 /* define lfsck thread key */
48 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
50 static void lfsck_key_fini(const struct lu_context *ctx,
51 struct lu_context_key *key, void *data)
53 struct lfsck_thread_info *info = data;
55 lu_buf_free(&info->lti_linkea_buf);
56 lu_buf_free(&info->lti_linkea_buf2);
57 lu_buf_free(&info->lti_big_buf);
61 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
62 LU_KEY_INIT_GENERIC(lfsck);
64 static struct list_head lfsck_instance_list;
65 static struct list_head lfsck_ost_orphan_list;
66 static struct list_head lfsck_mdt_orphan_list;
67 static DEFINE_SPINLOCK(lfsck_instance_lock);
69 static const char *lfsck_status_names[] = {
71 [LS_SCANNING_PHASE1] = "scanning-phase1",
72 [LS_SCANNING_PHASE2] = "scanning-phase2",
73 [LS_COMPLETED] = "completed",
74 [LS_FAILED] = "failed",
75 [LS_STOPPED] = "stopped",
76 [LS_PAUSED] = "paused",
77 [LS_CRASHED] = "crashed",
78 [LS_PARTIAL] = "partial",
79 [LS_CO_FAILED] = "co-failed",
80 [LS_CO_STOPPED] = "co-stopped",
81 [LS_CO_PAUSED] = "co-paused"
84 const char *lfsck_flags_names[] = {
93 const char *lfsck_param_names[] = {
105 enum lfsck_verify_lpf_types {
106 LVLT_BY_BOOKMARK = 0,
107 LVLT_BY_NAMEENTRY = 1,
110 const char *lfsck_status2names(enum lfsck_status status)
112 if (unlikely(status < 0 || status >= LS_MAX))
115 return lfsck_status_names[status];
118 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
120 spin_lock_init(<ds->ltd_lock);
121 init_rwsem(<ds->ltd_rw_sem);
122 INIT_LIST_HEAD(<ds->ltd_orphan);
123 ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
124 if (ltds->ltd_tgts_bitmap == NULL)
130 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
132 struct lfsck_tgt_desc *ltd;
133 struct lfsck_tgt_desc *next;
136 down_write(<ds->ltd_rw_sem);
138 list_for_each_entry_safe(ltd, next, <ds->ltd_orphan,
140 list_del_init(<d->ltd_orphan_list);
144 if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
145 up_write(<ds->ltd_rw_sem);
150 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
151 ltd = LTD_TGT(ltds, idx);
152 if (likely(ltd != NULL)) {
153 LASSERT(list_empty(<d->ltd_layout_list));
154 LASSERT(list_empty(<d->ltd_layout_phase_list));
155 LASSERT(list_empty(<d->ltd_namespace_list));
156 LASSERT(list_empty(<d->ltd_namespace_phase_list));
159 cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
160 LTD_TGT(ltds, idx) = NULL;
165 LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
168 for (idx = 0; idx < TGT_PTRS; idx++) {
169 if (ltds->ltd_tgts_idx[idx] != NULL) {
170 OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
171 ltds->ltd_tgts_idx[idx] = NULL;
175 CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
176 ltds->ltd_tgts_bitmap = NULL;
177 up_write(<ds->ltd_rw_sem);
180 static int __lfsck_add_target(const struct lu_env *env,
181 struct lfsck_instance *lfsck,
182 struct lfsck_tgt_desc *ltd,
183 bool for_ost, bool locked)
185 struct lfsck_tgt_descs *ltds;
186 __u32 index = ltd->ltd_index;
191 ltds = &lfsck->li_ost_descs;
193 ltds = &lfsck->li_mdt_descs;
196 down_write(<ds->ltd_rw_sem);
198 LASSERT(ltds->ltd_tgts_bitmap != NULL);
200 if (index >= ltds->ltd_tgts_bitmap->size) {
201 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
202 (__u32)BITS_PER_LONG);
203 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
204 cfs_bitmap_t *new_bitmap;
206 while (newsize < index + 1)
209 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
210 if (new_bitmap == NULL)
211 GOTO(unlock, rc = -ENOMEM);
213 if (ltds->ltd_tgtnr > 0)
214 cfs_bitmap_copy(new_bitmap, old_bitmap);
215 ltds->ltd_tgts_bitmap = new_bitmap;
216 CFS_FREE_BITMAP(old_bitmap);
219 if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
220 CERROR("%s: the device %s (%u) is registered already\n",
221 lfsck_lfsck2name(lfsck),
222 ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
223 GOTO(unlock, rc = -EEXIST);
226 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
227 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
228 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
229 GOTO(unlock, rc = -ENOMEM);
232 LTD_TGT(ltds, index) = ltd;
233 cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
236 GOTO(unlock, rc = 0);
240 up_write(<ds->ltd_rw_sem);
245 static int lfsck_add_target_from_orphan(const struct lu_env *env,
246 struct lfsck_instance *lfsck)
248 struct lfsck_tgt_descs *ltds = &lfsck->li_ost_descs;
249 struct lfsck_tgt_desc *ltd;
250 struct lfsck_tgt_desc *next;
251 struct list_head *head = &lfsck_ost_orphan_list;
256 spin_lock(&lfsck_instance_lock);
257 list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
258 if (ltd->ltd_key == lfsck->li_bottom)
259 list_move_tail(<d->ltd_orphan_list,
262 spin_unlock(&lfsck_instance_lock);
264 down_write(<ds->ltd_rw_sem);
265 while (!list_empty(<ds->ltd_orphan)) {
266 ltd = list_entry(ltds->ltd_orphan.next,
267 struct lfsck_tgt_desc,
269 list_del_init(<d->ltd_orphan_list);
270 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
271 /* Do not hold the semaphore for too long time. */
272 up_write(<ds->ltd_rw_sem);
276 down_write(<ds->ltd_rw_sem);
278 up_write(<ds->ltd_rw_sem);
281 ltds = &lfsck->li_mdt_descs;
282 head = &lfsck_mdt_orphan_list;
290 static inline struct lfsck_component *
291 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
292 struct list_head *list)
294 struct lfsck_component *com;
296 list_for_each_entry(com, list, lc_link) {
297 if (com->lc_type == type)
303 struct lfsck_component *
304 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
306 struct lfsck_component *com;
308 spin_lock(&lfsck->li_lock);
309 com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
313 com = __lfsck_component_find(lfsck, type,
314 &lfsck->li_list_double_scan);
318 com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
322 lfsck_component_get(com);
323 spin_unlock(&lfsck->li_lock);
327 void lfsck_component_cleanup(const struct lu_env *env,
328 struct lfsck_component *com)
330 if (!list_empty(&com->lc_link))
331 list_del_init(&com->lc_link);
332 if (!list_empty(&com->lc_link_dir))
333 list_del_init(&com->lc_link_dir);
335 lfsck_component_put(env, com);
338 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
339 struct lu_fid *fid, bool locked)
341 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
346 mutex_lock(&lfsck->li_mutex);
348 rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
350 bk->lb_last_fid = *fid;
351 /* We do not care about whether the subsequent sub-operations
352 * failed or not. The worst case is that one FID is lost that
353 * is not a big issue for the LFSCK since it is relative rare
354 * for LFSCK create. */
355 rc = lfsck_bookmark_store(env, lfsck);
359 mutex_unlock(&lfsck->li_mutex);
365 * Request the specified ibits lock for the given object.
367 * Before the LFSCK modifying on the namespace visible object,
368 * it needs to acquire related ibits ldlm lock.
370 * \param[in] env pointer to the thread context
371 * \param[in] lfsck pointer to the lfsck instance
372 * \param[in] obj pointer to the dt_object to be locked
373 * \param[out] lh pointer to the lock handle
374 * \param[in] ibits the bits for the ldlm lock to be acquired
375 * \param[in] mode the mode for the ldlm lock to be acquired
377 * \retval 0 for success
378 * \retval negative error number on failure
380 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
381 struct dt_object *obj, struct lustre_handle *lh,
382 __u64 bits, ldlm_mode_t mode)
384 struct lfsck_thread_info *info = lfsck_env_info(env);
385 ldlm_policy_data_t *policy = &info->lti_policy;
386 struct ldlm_res_id *resid = &info->lti_resid;
387 __u64 flags = LDLM_FL_ATOMIC_CB;
390 LASSERT(lfsck->li_namespace != NULL);
392 memset(policy, 0, sizeof(*policy));
393 policy->l_inodebits.bits = bits;
394 fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
395 if (dt_object_remote(obj)) {
396 struct ldlm_enqueue_info *einfo = &info->lti_einfo;
398 memset(einfo, 0, sizeof(*einfo));
399 einfo->ei_type = LDLM_IBITS;
400 einfo->ei_mode = mode;
401 einfo->ei_cb_bl = ldlm_blocking_ast;
402 einfo->ei_cb_cp = ldlm_completion_ast;
403 einfo->ei_res_id = resid;
405 rc = dt_object_lock(env, obj, lh, einfo, policy);
407 rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid,
408 LDLM_IBITS, policy, mode,
409 &flags, ldlm_blocking_ast,
410 ldlm_completion_ast, NULL, NULL,
411 0, LVB_T_NONE, NULL, lh);
414 if (rc == ELDLM_OK) {
417 memset(lh, 0, sizeof(*lh));
425 * Release the the specified ibits lock.
427 * If the lock has been acquired before, release it
428 * and cleanup the handle. Otherwise, do nothing.
430 * \param[in] lh pointer to the lock handle
431 * \param[in] mode the mode for the ldlm lock to be released
433 void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
435 if (lustre_handle_is_used(lh)) {
436 ldlm_lock_decref(lh, mode);
437 memset(lh, 0, sizeof(*lh));
441 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
442 struct lfsck_instance *lfsck,
443 const struct lu_fid *fid)
445 struct seq_server_site *ss = lfsck_dev_site(lfsck);
446 struct lu_seq_range *range = &lfsck_env_info(env)->lti_range;
449 fld_range_set_mdt(range);
450 rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
452 rc = range->lsr_index;
457 const char dot[] = ".";
458 const char dotdot[] = "..";
459 static const char dotlustre[] = ".lustre";
460 static const char lostfound[] = "lost+found";
463 * Remove the name entry from the .lustre/lost+found directory.
465 * No need to care about the object referenced by the name entry,
466 * either the name entry is invalid or redundant, or the referenced
467 * object has been processed or will be handled by others.
469 * \param[in] env pointer to the thread context
470 * \param[in] lfsck pointer to the lfsck instance
471 * \param[in] name the name for the name entry to be removed
473 * \retval 0 for success
474 * \retval negative error number on failure
476 static int lfsck_lpf_remove_name_entry(const struct lu_env *env,
477 struct lfsck_instance *lfsck,
480 struct dt_object *parent = lfsck->li_lpf_root_obj;
481 struct dt_device *dev = lfsck_obj2dev(parent);
483 struct lustre_handle lh = { 0 };
487 rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
488 MDS_INODELOCK_UPDATE, LCK_EX);
492 th = dt_trans_create(env, dev);
494 GOTO(unlock, rc = PTR_ERR(th));
496 rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
500 rc = dt_declare_ref_del(env, parent, th);
504 rc = dt_trans_start_local(env, dev, th);
508 rc = dt_delete(env, parent, (const struct dt_key *)name, th,
513 dt_write_lock(env, parent, 0);
514 rc = dt_ref_del(env, parent, th);
515 dt_write_unlock(env, parent);
520 dt_trans_stop(env, dev, th);
523 lfsck_ibits_unlock(&lh, LCK_EX);
525 CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n",
526 lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc);
531 static int lfsck_create_lpf_local(const struct lu_env *env,
532 struct lfsck_instance *lfsck,
533 struct dt_object *child,
535 struct dt_object_format *dof,
538 struct dt_insert_rec *rec = &lfsck_env_info(env)->lti_dt_rec;
539 struct dt_object *parent = lfsck->li_lpf_root_obj;
540 struct dt_device *dev = lfsck_obj2dev(child);
541 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
542 struct dt_object *bk_obj = lfsck->li_bookmark_obj;
543 const struct lu_fid *cfid = lfsck_dto2fid(child);
544 struct thandle *th = NULL;
545 struct linkea_data ldata = { NULL };
546 struct lu_buf linkea_buf;
547 const struct lu_name *cname;
549 int len = sizeof(struct lfsck_bookmark);
553 rc = linkea_data_new(&ldata,
554 &lfsck_env_info(env)->lti_linkea_buf2);
558 cname = lfsck_name_get_const(env, name, strlen(name));
559 rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
563 th = dt_trans_create(env, dev);
567 /* 1a. create child */
568 rc = dt_declare_create(env, child, la, NULL, dof, th);
572 /* 2a. increase child nlink */
573 rc = dt_declare_ref_add(env, child, th);
577 /* 3a. insert linkEA for child */
578 lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
579 ldata.ld_leh->leh_len);
580 rc = dt_declare_xattr_set(env, child, &linkea_buf,
581 XATTR_NAME_LINK, 0, th);
585 /* 4a. insert name into parent dir */
586 rec->rec_type = S_IFDIR;
588 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
589 (const struct dt_key *)name, th);
593 /* 5a. increase parent nlink */
594 rc = dt_declare_ref_add(env, parent, th);
598 /* 6a. update bookmark */
599 rc = dt_declare_record_write(env, bk_obj,
600 lfsck_buf_get(env, bk, len), 0, th);
604 rc = dt_trans_start_local(env, dev, th);
608 dt_write_lock(env, child, 0);
609 /* 1b.1. create child */
610 rc = dt_create(env, child, la, NULL, dof, th);
614 if (unlikely(!dt_try_as_dir(env, child)))
615 GOTO(unlock, rc = -ENOTDIR);
617 /* 1b.2. insert dot into child dir */
619 rc = dt_insert(env, child, (const struct dt_rec *)rec,
620 (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
624 /* 1b.3. insert dotdot into child dir */
625 rec->rec_fid = &LU_LPF_FID;
626 rc = dt_insert(env, child, (const struct dt_rec *)rec,
627 (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
631 /* 2b. increase child nlink */
632 rc = dt_ref_add(env, child, th);
636 /* 3b. insert linkEA for child. */
637 rc = dt_xattr_set(env, child, &linkea_buf,
638 XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
639 dt_write_unlock(env, child);
643 /* 4b. insert name into parent dir */
645 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
646 (const struct dt_key *)name, th, BYPASS_CAPA, 1);
650 dt_write_lock(env, parent, 0);
651 /* 5b. increase parent nlink */
652 rc = dt_ref_add(env, parent, th);
653 dt_write_unlock(env, parent);
657 bk->lb_lpf_fid = *cfid;
658 lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
660 /* 6b. update bookmark */
661 rc = dt_record_write(env, bk_obj,
662 lfsck_buf_get(env, bk, len), &pos, th);
667 dt_write_unlock(env, child);
670 dt_trans_stop(env, dev, th);
675 static int lfsck_create_lpf_remote(const struct lu_env *env,
676 struct lfsck_instance *lfsck,
677 struct dt_object *child,
679 struct dt_object_format *dof,
682 struct dt_insert_rec *rec = &lfsck_env_info(env)->lti_dt_rec;
683 struct dt_object *parent = lfsck->li_lpf_root_obj;
684 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
685 struct dt_object *bk_obj = lfsck->li_bookmark_obj;
686 const struct lu_fid *cfid = lfsck_dto2fid(child);
687 struct thandle *th = NULL;
688 struct linkea_data ldata = { NULL };
689 struct lu_buf linkea_buf;
690 const struct lu_name *cname;
691 struct dt_device *dev;
693 int len = sizeof(struct lfsck_bookmark);
697 rc = linkea_data_new(&ldata,
698 &lfsck_env_info(env)->lti_linkea_buf2);
702 cname = lfsck_name_get_const(env, name, strlen(name));
703 rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
707 /* Create .lustre/lost+found/MDTxxxx. */
709 /* XXX: Currently, cross-MDT create operation needs to create the child
710 * object firstly, then insert name into the parent directory. For
711 * this case, the child object resides on current MDT (local), but
712 * the parent ".lustre/lost+found" may be on remote MDT. It is not
713 * easy to contain all the sub-modifications orderly within single
716 * To avoid more inconsistency, we split the create operation into
719 * 1) create the child and update the lfsck_bookmark::lb_lpf_fid
721 * 2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
724 * If 1) done, but 2) failed, then go ahead, the LFSCK will try to
725 * repair such inconsistency when LFSCK run next time. */
727 /* Transaction I: locally */
729 dev = lfsck_obj2dev(child);
730 th = dt_trans_create(env, dev);
734 /* 1a. create child */
735 rc = dt_declare_create(env, child, la, NULL, dof, th);
739 /* 2a. increase child nlink */
740 rc = dt_declare_ref_add(env, child, th);
744 /* 3a. insert linkEA for child */
745 lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
746 ldata.ld_leh->leh_len);
747 rc = dt_declare_xattr_set(env, child, &linkea_buf,
748 XATTR_NAME_LINK, 0, th);
752 /* 4a. update bookmark */
753 rc = dt_declare_record_write(env, bk_obj,
754 lfsck_buf_get(env, bk, len), 0, th);
758 rc = dt_trans_start_local(env, dev, th);
762 dt_write_lock(env, child, 0);
763 /* 1b.1. create child */
764 rc = dt_create(env, child, la, NULL, dof, th);
768 if (unlikely(!dt_try_as_dir(env, child)))
769 GOTO(unlock, rc = -ENOTDIR);
771 /* 1b.2. insert dot into child dir */
772 rec->rec_type = S_IFDIR;
774 rc = dt_insert(env, child, (const struct dt_rec *)rec,
775 (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
779 /* 1b.3. insert dotdot into child dir */
780 rec->rec_fid = &LU_LPF_FID;
781 rc = dt_insert(env, child, (const struct dt_rec *)rec,
782 (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
786 /* 2b. increase child nlink */
787 rc = dt_ref_add(env, child, th);
791 /* 3b. insert linkEA for child */
792 rc = dt_xattr_set(env, child, &linkea_buf,
793 XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
797 bk->lb_lpf_fid = *cfid;
798 lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
800 /* 4b. update bookmark */
801 rc = dt_record_write(env, bk_obj,
802 lfsck_buf_get(env, bk, len), &pos, th);
804 dt_write_unlock(env, child);
805 dt_trans_stop(env, dev, th);
809 /* Transaction II: remotely */
811 dev = lfsck_obj2dev(parent);
812 th = dt_trans_create(env, dev);
817 /* 5a. insert name into parent dir */
819 rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
820 (const struct dt_key *)name, th);
824 /* 6a. increase parent nlink */
825 rc = dt_declare_ref_add(env, parent, th);
829 rc = dt_trans_start_local(env, dev, th);
833 /* 5b. insert name into parent dir */
834 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
835 (const struct dt_key *)name, th, BYPASS_CAPA, 1);
839 dt_write_lock(env, parent, 0);
840 /* 6b. increase parent nlink */
841 rc = dt_ref_add(env, parent, th);
842 dt_write_unlock(env, parent);
847 dt_write_unlock(env, child);
849 dt_trans_stop(env, dev, th);
851 if (rc != 0 && dev == lfsck_obj2dev(parent))
852 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
853 "for orphans, but failed to insert the name %s "
854 "to the .lustre/lost+found/. Such inconsistency "
855 "will be repaired when LFSCK run next time: rc = %d\n",
856 lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
862 * Create the MDTxxxx directory under /ROOT/.lustre/lost+found/
864 * The /ROOT/.lustre/lost+found/MDTxxxx/ directory is used for holding
865 * orphans and other uncertain inconsistent objects found during the
866 * LFSCK. Such directory will be created by the LFSCK engine on the
867 * local MDT before the LFSCK scanning.
869 * \param[in] env pointer to the thread context
870 * \param[in] lfsck pointer to the lfsck instance
872 * \retval 0 for success
873 * \retval negative error number on failure
875 static int lfsck_create_lpf(const struct lu_env *env,
876 struct lfsck_instance *lfsck)
878 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
879 struct lfsck_thread_info *info = lfsck_env_info(env);
880 struct lu_fid *cfid = &info->lti_fid2;
881 struct lu_attr *la = &info->lti_la;
882 struct dt_object_format *dof = &info->lti_dof;
883 struct dt_object *parent = lfsck->li_lpf_root_obj;
884 struct dt_object *child = NULL;
885 struct lustre_handle lh = { 0 };
887 int node = lfsck_dev_idx(lfsck);
891 LASSERT(lfsck->li_master);
892 LASSERT(parent != NULL);
893 LASSERT(lfsck->li_lpf_obj == NULL);
895 rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
896 MDS_INODELOCK_UPDATE, LCK_EX);
900 snprintf(name, 8, "MDT%04x", node);
901 if (fid_is_zero(&bk->lb_lpf_fid)) {
902 /* There is corner case that: in former LFSCK scanning we have
903 * created the .lustre/lost+found/MDTxxxx but failed to update
904 * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
905 * it from MDT0 firstly. */
906 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
907 (const struct dt_key *)name, BYPASS_CAPA);
908 if (rc != 0 && rc != -ENOENT)
912 bk->lb_lpf_fid = *cfid;
913 rc = lfsck_bookmark_store(env, lfsck);
915 rc = lfsck_fid_alloc(env, lfsck, cfid, true);
920 *cfid = bk->lb_lpf_fid;
923 child = lfsck_object_find_bottom(env, lfsck, cfid);
925 GOTO(unlock, rc = PTR_ERR(child));
927 if (dt_object_exists(child) != 0) {
928 if (unlikely(!dt_try_as_dir(env, child)))
931 lfsck->li_lpf_obj = child;
936 memset(la, 0, sizeof(*la));
937 la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
938 la->la_mode = S_IFDIR | S_IRWXU;
939 la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
941 memset(dof, 0, sizeof(*dof));
942 dof->dof_type = dt_mode_to_dft(S_IFDIR);
945 rc = lfsck_create_lpf_local(env, lfsck, child, la, dof, name);
947 rc = lfsck_create_lpf_remote(env, lfsck, child, la, dof, name);
949 lfsck->li_lpf_obj = child;
954 lfsck_ibits_unlock(&lh, LCK_EX);
955 if (rc != 0 && child != NULL && !IS_ERR(child))
956 lfsck_object_put(env, child);
962 * Scan .lustre/lost+found for bad name entries and remove them.
964 * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
965 * index in the system. Any other formatted name is invalid and should be
968 * \param[in] env pointer to the thread context
969 * \param[in] lfsck pointer to the lfsck instance
971 * \retval 0 for success
972 * \retval negative error number on failure
974 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
975 struct lfsck_instance *lfsck)
977 struct dt_object *parent = lfsck->li_lpf_root_obj;
978 struct lu_dirent *ent =
979 (struct lu_dirent *)lfsck_env_info(env)->lti_key;
980 const struct dt_it_ops *iops = &parent->do_index_ops->dio_it;
985 it = iops->init(env, parent, LUDA_64BITHASH, BYPASS_CAPA);
989 rc = iops->load(env, it, 0);
991 rc = iops->next(env, it);
998 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
1002 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
1003 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1006 /* name length must be strlen("MDTxxxx") */
1007 if (ent->lde_namelen != 7)
1010 if (memcmp(ent->lde_name, "MDT", off) != 0)
1013 while (off < 7 && isxdigit(ent->lde_name[off]))
1019 rc = lfsck_lpf_remove_name_entry(env, lfsck,
1026 rc = iops->next(env, it);
1030 iops->fini(env, it);
1032 RETURN(rc > 0 ? 0 : rc);
1035 static int lfsck_update_lpf_entry(const struct lu_env *env,
1036 struct lfsck_instance *lfsck,
1037 struct dt_object *parent,
1038 struct dt_object *child,
1040 enum lfsck_verify_lpf_types type)
1044 if (type == LVLT_BY_BOOKMARK) {
1045 rc = lfsck_update_name_entry(env, lfsck, parent, name,
1046 lfsck_dto2fid(child), S_IFDIR);
1047 } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1048 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
1049 rc = lfsck_bookmark_store(env, lfsck);
1051 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
1052 " in the bookmark file: rc = %d\n",
1053 lfsck_lfsck2name(lfsck),
1054 PFID(lfsck_dto2fid(child)), rc);
1061 * Check whether the @child back references the @parent.
1064 * 1) The child's FID is stored in the bookmark file. If the child back
1065 * references the parent (LU_LPF_FID object) via its ".." entry, then
1066 * insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1067 * the child back references another parent2, then:
1068 * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1069 * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1070 * references the child. So keep them there. As the LFSCK processing,
1071 * the parent3 may be found, then when the LFSCK run next time, the
1072 * inconsistency can be repaired.
1074 * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1075 * entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1076 * via its ".." entry, then update the bookmark file, otherwise, if the child
1077 * back references another parent2, then:
1078 * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1079 * from .lustre/lost+found/;
1080 * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1081 * sub-directory name entry and update the child;
1082 * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1083 * or not, then keep them there.
1085 * \param[in] env pointer to the thread context
1086 * \param[in] lfsck pointer to the lfsck instance
1087 * \param[in] child pointer to the lost+found sub-directory object
1088 * \param[in] name the name for lost+found sub-directory object
1089 * \param[out] fid pointer to the buffer to hold the FID of the object
1090 * (called it as parent2) that is referenced via the
1091 * child's dotdot entry; it also can be the FID that
1092 * is referenced by the name entry under the parent2.
1093 * \param[in] type to indicate where the child's FID is stored in
1095 * \retval positive number for uncertain inconsistency
1096 * \retval 0 for success
1097 * \retval negative error number on failure
1099 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1100 struct lfsck_instance *lfsck,
1101 struct dt_object *child, const char *name,
1103 enum lfsck_verify_lpf_types type)
1105 struct dt_object *parent = lfsck->li_lpf_root_obj;
1106 struct lfsck_thread_info *info = lfsck_env_info(env);
1107 char *name2 = info->lti_key;
1108 struct lu_fid *fid2 = &info->lti_fid3;
1109 struct dt_object *parent2 = NULL;
1110 struct lustre_handle lh = { 0 };
1115 rc = dt_lookup(env, child, (struct dt_rec *)fid,
1116 (const struct dt_key *)dotdot, BYPASS_CAPA);
1120 if (!fid_is_sane(fid))
1121 GOTO(linkea, rc = -EINVAL);
1123 if (lu_fid_eq(fid, &LU_LPF_FID)) {
1124 const struct lu_name *cname;
1126 if (lfsck->li_lpf_obj == NULL) {
1127 lu_object_get(&child->do_lu);
1128 lfsck->li_lpf_obj = child;
1131 cname = lfsck_name_get_const(env, name, strlen(name));
1132 rc = lfsck_verify_linkea(env, child, cname, &LU_LPF_FID);
1134 rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1140 parent2 = lfsck_object_find_bottom(env, lfsck, fid);
1141 if (IS_ERR(parent2))
1142 GOTO(linkea, parent2);
1144 if (!dt_object_exists(parent2)) {
1145 lfsck_object_put(env, parent2);
1147 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1150 if (!dt_try_as_dir(env, parent2)) {
1151 lfsck_object_put(env, parent2);
1153 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1157 /* To prevent rename/unlink race */
1158 rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1159 MDS_INODELOCK_UPDATE, LCK_PR);
1163 dt_read_lock(env, child, 0);
1164 rc = lfsck_links_get_first(env, child, name2, fid2);
1166 dt_read_unlock(env, child);
1167 lfsck_ibits_unlock(&lh, LCK_PR);
1169 GOTO(out_put, rc = 1);
1172 /* It is almost impossible that the bookmark file (or the name entry)
1173 * and the linkEA hit the same data corruption. Trust the linkEA. */
1174 if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1175 dt_read_unlock(env, child);
1176 lfsck_ibits_unlock(&lh, LCK_PR);
1179 if (lfsck->li_lpf_obj == NULL) {
1180 lu_object_get(&child->do_lu);
1181 lfsck->li_lpf_obj = child;
1184 /* Update the child's dotdot entry */
1185 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1186 &LU_LPF_FID, S_IFDIR);
1188 rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1194 if (parent2 == NULL || IS_ERR(parent2)) {
1195 dt_read_unlock(env, child);
1196 lfsck_ibits_unlock(&lh, LCK_PR);
1198 GOTO(out_done, rc = 1);
1201 rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1202 (const struct dt_key *)name2, BYPASS_CAPA);
1203 dt_read_unlock(env, child);
1204 lfsck_ibits_unlock(&lh, LCK_PR);
1205 if (rc != 0 && rc != -ENOENT)
1208 if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1209 if (type == LVLT_BY_BOOKMARK)
1210 GOTO(out_put, rc = 1);
1212 /* Trust the name entry, update the child's dotdot entry. */
1213 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1214 &LU_LPF_FID, S_IFDIR);
1219 if (type == LVLT_BY_BOOKMARK) {
1220 /* Invalid FID record in the bookmark file, reset it. */
1221 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1222 rc = lfsck_bookmark_store(env, lfsck);
1224 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1225 " in the bookmark file: rc = %d\n",
1226 lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1227 } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1228 /* The name entry is wrong, remove it. */
1229 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1235 if (parent2 != NULL && !IS_ERR(parent2))
1236 lfsck_object_put(env, parent2);
1243 * Verify the /ROOT/.lustre/lost+found/ directory.
1245 * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1246 * the LFSCK does not exactly know how to handle, such as orphans. So before
1247 * the LFSCK scanning the system, the consistency of such directory needs to
1248 * be verified firstly to allow the users to use it during the LFSCK.
1250 * \param[in] env pointer to the thread context
1251 * \param[in] lfsck pointer to the lfsck instance
1253 * \retval positive number for uncertain inconsistency
1254 * \retval 0 for success
1255 * \retval negative error number on failure
1257 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1259 struct lfsck_thread_info *info = lfsck_env_info(env);
1260 struct lu_fid *pfid = &info->lti_fid;
1261 struct lu_fid *cfid = &info->lti_fid2;
1262 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1263 struct dt_object *parent;
1264 /* child1's FID is in the bookmark file. */
1265 struct dt_object *child1 = NULL;
1266 /* child2's FID is in the name entry MDTxxxx. */
1267 struct dt_object *child2 = NULL;
1268 const struct lu_name *cname;
1270 int node = lfsck_dev_idx(lfsck);
1274 LASSERT(lfsck->li_master);
1276 if (lfsck->li_lpf_root_obj != NULL)
1280 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
1283 struct lfsck_tgt_desc *ltd;
1285 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1286 if (unlikely(ltd == NULL))
1289 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1295 RETURN(PTR_ERR(parent));
1297 LASSERT(dt_object_exists(parent));
1299 if (unlikely(!dt_try_as_dir(env, parent))) {
1300 lfsck_object_put(env, parent);
1302 GOTO(put, rc = -ENOTDIR);
1305 lfsck->li_lpf_root_obj = parent;
1307 rc = lfsck_scan_lpf_bad_entries(env, lfsck);
1309 CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1310 "for bad sub-directories: rc = %d\n",
1311 lfsck_lfsck2name(lfsck), rc);
1314 if (!fid_is_zero(&bk->lb_lpf_fid)) {
1315 if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1316 struct lu_fid tfid = bk->lb_lpf_fid;
1318 /* Invalid FID record in the bookmark file, reset it. */
1319 fid_zero(&bk->lb_lpf_fid);
1320 rc = lfsck_bookmark_store(env, lfsck);
1322 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1323 " in the bookmark file: rc = %d\n",
1324 lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1329 child1 = lfsck_object_find_bottom(env, lfsck,
1331 if (IS_ERR(child1)) {
1336 if (unlikely(!dt_object_exists(child1) ||
1337 dt_object_remote(child1)) ||
1338 !S_ISDIR(lfsck_object_type(child1))) {
1339 /* Invalid FID record in the bookmark file,
1341 fid_zero(&bk->lb_lpf_fid);
1342 rc = lfsck_bookmark_store(env, lfsck);
1344 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1345 " in the bookmark file: rc = %d\n",
1346 lfsck_lfsck2name(lfsck),
1347 PFID(lfsck_dto2fid(child1)), rc);
1352 lfsck_object_put(env, child1);
1354 } else if (unlikely(!dt_try_as_dir(env, child1))) {
1355 GOTO(put, rc = -ENOTDIR);
1361 snprintf(name, 8, "MDT%04x", node);
1362 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1363 (const struct dt_key *)name, BYPASS_CAPA);
1364 if (rc == -ENOENT) {
1365 if (!fid_is_zero(&bk->lb_lpf_fid))
1374 /* Invalid FID in the name entry, remove the name entry. */
1375 if (!fid_is_norm(cfid)) {
1376 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1383 child2 = lfsck_object_find_bottom(env, lfsck, cfid);
1385 GOTO(put, rc = PTR_ERR(child2));
1387 if (unlikely(!dt_object_exists(child2) ||
1388 dt_object_remote(child2)) ||
1389 !S_ISDIR(lfsck_object_type(child2))) {
1390 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1397 if (unlikely(!dt_try_as_dir(env, child2)))
1398 GOTO(put, rc = -ENOTDIR);
1400 if (child1 == NULL) {
1401 rc = lfsck_verify_lpf_pairs(env, lfsck, child2, name,
1402 pfid, LVLT_BY_NAMEENTRY);
1403 } else if (!lu_fid_eq(cfid, &bk->lb_lpf_fid)) {
1404 rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name,
1405 pfid, LVLT_BY_BOOKMARK);
1406 if (!lu_fid_eq(pfid, &LU_LPF_FID))
1407 rc = lfsck_verify_lpf_pairs(env, lfsck, child2,
1411 if (lfsck->li_lpf_obj == NULL) {
1412 lu_object_get(&child2->do_lu);
1413 lfsck->li_lpf_obj = child2;
1416 cname = lfsck_name_get_const(env, name, strlen(name));
1417 rc = lfsck_verify_linkea(env, child2, cname, &LU_LPF_FID);
1424 rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name,
1425 pfid, LVLT_BY_BOOKMARK);
1430 if (lfsck->li_lpf_obj != NULL) {
1431 if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj))) {
1432 lfsck_object_put(env, lfsck->li_lpf_obj);
1433 lfsck->li_lpf_obj = NULL;
1436 } else if (rc == 0) {
1437 rc = lfsck_create_lpf(env, lfsck);
1440 if (child2 != NULL && !IS_ERR(child2))
1441 lfsck_object_put(env, child2);
1442 if (child1 != NULL && !IS_ERR(child1))
1443 lfsck_object_put(env, child1);
1448 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1450 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
1451 struct seq_server_site *ss = lfsck_dev_site(lfsck);
1456 if (unlikely(ss == NULL))
1459 OBD_ALLOC_PTR(lfsck->li_seq);
1460 if (lfsck->li_seq == NULL)
1463 OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1465 GOTO(out, rc = -ENOMEM);
1467 snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1468 rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1470 OBD_FREE(prefix, MAX_OBD_NAME + 7);
1474 if (fid_is_sane(&bk->lb_last_fid))
1475 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1480 OBD_FREE_PTR(lfsck->li_seq);
1481 lfsck->li_seq = NULL;
1486 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1488 if (lfsck->li_seq != NULL) {
1489 seq_client_fini(lfsck->li_seq);
1490 OBD_FREE_PTR(lfsck->li_seq);
1491 lfsck->li_seq = NULL;
1495 void lfsck_instance_cleanup(const struct lu_env *env,
1496 struct lfsck_instance *lfsck)
1498 struct ptlrpc_thread *thread = &lfsck->li_thread;
1499 struct lfsck_component *com;
1500 struct lfsck_component *next;
1501 struct lfsck_lmv_unit *llu;
1502 struct lfsck_lmv_unit *llu_next;
1503 struct lfsck_lmv *llmv;
1506 LASSERT(list_empty(&lfsck->li_link));
1507 LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1509 if (lfsck->li_obj_oit != NULL) {
1510 lfsck_object_put(env, lfsck->li_obj_oit);
1511 lfsck->li_obj_oit = NULL;
1514 LASSERT(lfsck->li_obj_dir == NULL);
1515 LASSERT(lfsck->li_lmv == NULL);
1517 list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
1518 llmv = &llu->llu_lmv;
1520 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
1521 "still in using: %u\n",
1522 atomic_read(&llmv->ll_ref));
1524 lfsck_lmv_put(env, llmv);
1527 list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1528 lfsck_component_cleanup(env, com);
1531 LASSERT(list_empty(&lfsck->li_list_dir));
1533 list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1535 lfsck_component_cleanup(env, com);
1538 list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1539 lfsck_component_cleanup(env, com);
1542 lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1543 lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1545 if (lfsck->li_lfsck_dir != NULL) {
1546 lfsck_object_put(env, lfsck->li_lfsck_dir);
1547 lfsck->li_lfsck_dir = NULL;
1550 if (lfsck->li_bookmark_obj != NULL) {
1551 lfsck_object_put(env, lfsck->li_bookmark_obj);
1552 lfsck->li_bookmark_obj = NULL;
1555 if (lfsck->li_lpf_obj != NULL) {
1556 lfsck_object_put(env, lfsck->li_lpf_obj);
1557 lfsck->li_lpf_obj = NULL;
1560 if (lfsck->li_lpf_root_obj != NULL) {
1561 lfsck_object_put(env, lfsck->li_lpf_root_obj);
1562 lfsck->li_lpf_root_obj = NULL;
1565 if (lfsck->li_los != NULL) {
1566 local_oid_storage_fini(env, lfsck->li_los);
1567 lfsck->li_los = NULL;
1570 lfsck_fid_fini(lfsck);
1572 OBD_FREE_PTR(lfsck);
1575 static inline struct lfsck_instance *
1576 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1578 struct lfsck_instance *lfsck;
1580 list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1581 if (lfsck->li_bottom == key) {
1583 lfsck_instance_get(lfsck);
1585 list_del_init(&lfsck->li_link);
1594 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1597 struct lfsck_instance *lfsck;
1599 spin_lock(&lfsck_instance_lock);
1600 lfsck = __lfsck_instance_find(key, ref, unlink);
1601 spin_unlock(&lfsck_instance_lock);
1606 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1608 struct lfsck_instance *tmp;
1610 spin_lock(&lfsck_instance_lock);
1611 list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1612 if (lfsck->li_bottom == tmp->li_bottom) {
1613 spin_unlock(&lfsck_instance_lock);
1618 list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1619 spin_unlock(&lfsck_instance_lock);
1623 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1628 bool newline = (bits != 0 ? false : true);
1630 seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1632 for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1635 if (names[i] != NULL) {
1639 seq_printf(m, "%s%c", names[i],
1640 newline ? '\n' : ',');
1646 seq_printf(m, "\n");
1650 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
1653 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
1654 cfs_time_current_sec() - time);
1656 seq_printf(m, "%s: N/A\n", prefix);
1660 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1663 if (fid_is_zero(&pos->lp_dir_parent)) {
1664 if (pos->lp_oit_cookie == 0)
1665 seq_printf(m, "%s: N/A, N/A, N/A\n",
1668 seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1669 prefix, pos->lp_oit_cookie);
1671 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1672 prefix, pos->lp_oit_cookie,
1673 PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1678 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1679 struct lfsck_position *pos, bool init)
1681 const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1683 if (unlikely(lfsck->li_di_oit == NULL)) {
1684 memset(pos, 0, sizeof(*pos));
1688 pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1689 if (!lfsck->li_current_oit_processed && !init)
1690 pos->lp_oit_cookie--;
1692 LASSERT(pos->lp_oit_cookie > 0);
1694 if (lfsck->li_di_dir != NULL) {
1695 struct dt_object *dto = lfsck->li_obj_dir;
1697 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1700 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1701 fid_zero(&pos->lp_dir_parent);
1702 pos->lp_dir_cookie = 0;
1704 pos->lp_dir_parent = *lfsck_dto2fid(dto);
1707 fid_zero(&pos->lp_dir_parent);
1708 pos->lp_dir_cookie = 0;
1712 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1716 if (limit != LFSCK_SPEED_NO_LIMIT) {
1717 if (limit > msecs_to_jiffies(MSEC_PER_SEC)) {
1718 lfsck->li_sleep_rate = limit /
1719 msecs_to_jiffies(MSEC_PER_SEC);
1720 lfsck->li_sleep_jif = 1;
1722 lfsck->li_sleep_rate = 1;
1723 lfsck->li_sleep_jif = msecs_to_jiffies(MSEC_PER_SEC) /
1727 lfsck->li_sleep_jif = 0;
1728 lfsck->li_sleep_rate = 0;
1731 if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1732 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1739 void lfsck_control_speed(struct lfsck_instance *lfsck)
1741 struct ptlrpc_thread *thread = &lfsck->li_thread;
1742 struct l_wait_info lwi;
1744 if (lfsck->li_sleep_jif > 0 &&
1745 lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1746 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1747 LWI_ON_SIGNAL_NOOP, NULL);
1749 l_wait_event(thread->t_ctl_waitq,
1750 !thread_is_running(thread),
1752 lfsck->li_new_scanned = 0;
1756 void lfsck_control_speed_by_self(struct lfsck_component *com)
1758 struct lfsck_instance *lfsck = com->lc_lfsck;
1759 struct ptlrpc_thread *thread = &lfsck->li_thread;
1760 struct l_wait_info lwi;
1762 if (lfsck->li_sleep_jif > 0 &&
1763 com->lc_new_scanned >= lfsck->li_sleep_rate) {
1764 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1765 LWI_ON_SIGNAL_NOOP, NULL);
1767 l_wait_event(thread->t_ctl_waitq,
1768 !thread_is_running(thread),
1770 com->lc_new_scanned = 0;
1774 static struct lfsck_thread_args *
1775 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1776 struct lfsck_component *com,
1777 struct lfsck_start_param *lsp)
1779 struct lfsck_thread_args *lta;
1784 return ERR_PTR(-ENOMEM);
1786 rc = lu_env_init(<a->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1792 lta->lta_lfsck = lfsck_instance_get(lfsck);
1794 lta->lta_com = lfsck_component_get(com);
1801 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1803 if (lta->lta_com != NULL)
1804 lfsck_component_put(<a->lta_env, lta->lta_com);
1805 lfsck_instance_put(<a->lta_env, lta->lta_lfsck);
1806 lu_env_fini(<a->lta_env);
1810 struct lfsck_assistant_data *
1811 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1814 struct lfsck_assistant_data *lad;
1818 lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
1819 if (lad->lad_bitmap == NULL) {
1824 INIT_LIST_HEAD(&lad->lad_req_list);
1825 spin_lock_init(&lad->lad_lock);
1826 INIT_LIST_HEAD(&lad->lad_ost_list);
1827 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1828 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1829 INIT_LIST_HEAD(&lad->lad_mdt_list);
1830 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1831 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1832 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1834 lad->lad_name = name;
1841 * Generic LFSCK asynchronous communication interpretor function.
1842 * The LFSCK RPC reply for both the event notification and status
1843 * querying will be handled here.
1845 * \param[in] env pointer to the thread context
1846 * \param[in] req pointer to the LFSCK request
1847 * \param[in] args pointer to the lfsck_async_interpret_args
1848 * \param[in] rc the result for handling the LFSCK request
1850 * \retval 0 for success
1851 * \retval negative error number on failure
1853 int lfsck_async_interpret_common(const struct lu_env *env,
1854 struct ptlrpc_request *req,
1857 struct lfsck_async_interpret_args *laia = args;
1858 struct lfsck_component *com = laia->laia_com;
1859 struct lfsck_assistant_data *lad = com->lc_data;
1860 struct lfsck_tgt_descs *ltds = laia->laia_ltds;
1861 struct lfsck_tgt_desc *ltd = laia->laia_ltd;
1862 struct lfsck_request *lr = laia->laia_lr;
1864 LASSERT(com->lc_lfsck->li_master);
1866 switch (lr->lr_event) {
1869 CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
1871 lfsck_lfsck2name(com->lc_lfsck),
1872 (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1873 ltd->ltd_index, lad->lad_name, rc);
1875 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1876 struct lfsck_layout *lo = com->lc_file_ram;
1878 if (lr->lr_flags & LEF_TO_OST)
1879 lfsck_lad_set_bitmap(env, com,
1882 lo->ll_flags |= LF_INCOMPLETE;
1884 struct lfsck_namespace *ns = com->lc_file_ram;
1886 /* If some MDT does not join the namespace
1887 * LFSCK, then we cannot know whether there
1888 * is some name entry on such MDT that with
1889 * the referenced MDT-object on this MDT or
1890 * not. So the namespace LFSCK on this MDT
1891 * cannot handle orphan MDT-objects properly.
1892 * So we mark the LFSCK as LF_INCOMPLETE and
1893 * skip orphan MDT-objects handling. */
1894 ns->ln_flags |= LF_INCOMPLETE;
1899 spin_lock(<ds->ltd_lock);
1900 if (ltd->ltd_dead) {
1901 spin_unlock(<ds->ltd_lock);
1905 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1906 struct list_head *list;
1907 struct list_head *phase_list;
1909 if (ltd->ltd_layout_done) {
1910 spin_unlock(<ds->ltd_lock);
1914 if (lr->lr_flags & LEF_TO_OST) {
1915 list = &lad->lad_ost_list;
1916 phase_list = &lad->lad_ost_phase1_list;
1918 list = &lad->lad_mdt_list;
1919 phase_list = &lad->lad_mdt_phase1_list;
1922 if (list_empty(<d->ltd_layout_list))
1923 list_add_tail(<d->ltd_layout_list, list);
1924 if (list_empty(<d->ltd_layout_phase_list))
1925 list_add_tail(<d->ltd_layout_phase_list,
1928 if (ltd->ltd_namespace_done) {
1929 spin_unlock(<ds->ltd_lock);
1933 if (list_empty(<d->ltd_namespace_list))
1934 list_add_tail(<d->ltd_namespace_list,
1935 &lad->lad_mdt_list);
1936 if (list_empty(<d->ltd_namespace_phase_list))
1937 list_add_tail(<d->ltd_namespace_phase_list,
1938 &lad->lad_mdt_phase1_list);
1940 spin_unlock(<ds->ltd_lock);
1943 case LE_PHASE1_DONE:
1944 case LE_PHASE2_DONE:
1946 if (rc != 0 && rc != -EALREADY)
1947 CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
1948 "event = %d, rc = %d\n",
1949 lfsck_lfsck2name(com->lc_lfsck),
1950 (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1951 ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
1954 struct lfsck_reply *reply;
1955 struct list_head *list;
1956 struct list_head *phase_list;
1958 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1959 list = <d->ltd_layout_list;
1960 phase_list = <d->ltd_layout_phase_list;
1962 list = <d->ltd_namespace_list;
1963 phase_list = <d->ltd_namespace_phase_list;
1967 spin_lock(<ds->ltd_lock);
1968 list_del_init(phase_list);
1969 list_del_init(list);
1970 spin_unlock(<ds->ltd_lock);
1974 reply = req_capsule_server_get(&req->rq_pill,
1976 if (reply == NULL) {
1978 CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
1979 "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1981 spin_lock(<ds->ltd_lock);
1982 list_del_init(phase_list);
1983 list_del_init(list);
1984 spin_unlock(<ds->ltd_lock);
1988 switch (reply->lr_status) {
1989 case LS_SCANNING_PHASE1:
1991 case LS_SCANNING_PHASE2:
1992 spin_lock(<ds->ltd_lock);
1993 list_del_init(phase_list);
1994 if (ltd->ltd_dead) {
1995 spin_unlock(<ds->ltd_lock);
1999 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2000 if (ltd->ltd_layout_done) {
2001 spin_unlock(<ds->ltd_lock);
2005 if (lr->lr_flags & LEF_TO_OST)
2006 list_add_tail(phase_list,
2007 &lad->lad_ost_phase2_list);
2009 list_add_tail(phase_list,
2010 &lad->lad_mdt_phase2_list);
2012 if (ltd->ltd_namespace_done) {
2013 spin_unlock(<ds->ltd_lock);
2017 list_add_tail(phase_list,
2018 &lad->lad_mdt_phase2_list);
2020 spin_unlock(<ds->ltd_lock);
2023 spin_lock(<ds->ltd_lock);
2024 list_del_init(phase_list);
2025 list_del_init(list);
2026 spin_unlock(<ds->ltd_lock);
2032 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
2033 lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
2037 if (!laia->laia_shared) {
2039 lfsck_component_put(env, com);
2045 static void lfsck_interpret(const struct lu_env *env,
2046 struct lfsck_instance *lfsck,
2047 struct ptlrpc_request *req, void *args, int result)
2049 struct lfsck_async_interpret_args *laia = args;
2050 struct lfsck_component *com;
2052 LASSERT(laia->laia_com == NULL);
2053 LASSERT(laia->laia_shared);
2055 spin_lock(&lfsck->li_lock);
2056 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2057 laia->laia_com = com;
2058 lfsck_async_interpret_common(env, req, laia, result);
2061 list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
2062 laia->laia_com = com;
2063 lfsck_async_interpret_common(env, req, laia, result);
2065 spin_unlock(&lfsck->li_lock);
2068 static int lfsck_stop_notify(const struct lu_env *env,
2069 struct lfsck_instance *lfsck,
2070 struct lfsck_tgt_descs *ltds,
2071 struct lfsck_tgt_desc *ltd, __u16 type)
2073 struct lfsck_component *com;
2077 LASSERT(lfsck->li_master);
2079 spin_lock(&lfsck->li_lock);
2080 com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
2082 com = __lfsck_component_find(lfsck, type,
2083 &lfsck->li_list_double_scan);
2085 lfsck_component_get(com);
2086 spin_unlock(&lfsck->li_lock);
2089 struct lfsck_thread_info *info = lfsck_env_info(env);
2090 struct lfsck_async_interpret_args *laia = &info->lti_laia;
2091 struct lfsck_request *lr = &info->lti_lr;
2092 struct lfsck_assistant_data *lad = com->lc_data;
2093 struct list_head *list;
2094 struct list_head *phase_list;
2095 struct ptlrpc_request_set *set;
2097 set = ptlrpc_prep_set();
2099 lfsck_component_put(env, com);
2104 if (type == LFSCK_TYPE_LAYOUT) {
2105 list = <d->ltd_layout_list;
2106 phase_list = <d->ltd_layout_phase_list;
2108 list = <d->ltd_namespace_list;
2109 phase_list = <d->ltd_namespace_phase_list;
2112 spin_lock(<ds->ltd_lock);
2113 if (list_empty(list)) {
2114 LASSERT(list_empty(phase_list));
2115 spin_unlock(<ds->ltd_lock);
2116 ptlrpc_set_destroy(set);
2121 list_del_init(phase_list);
2122 list_del_init(list);
2123 spin_unlock(<ds->ltd_lock);
2125 memset(lr, 0, sizeof(*lr));
2126 lr->lr_index = lfsck_dev_idx(lfsck);
2127 lr->lr_event = LE_PEER_EXIT;
2128 lr->lr_active = type;
2129 lr->lr_status = LS_CO_PAUSED;
2130 if (ltds == &lfsck->li_ost_descs)
2131 lr->lr_flags = LEF_TO_OST;
2133 laia->laia_com = com;
2134 laia->laia_ltds = ltds;
2135 atomic_inc(<d->ltd_ref);
2136 laia->laia_ltd = ltd;
2138 laia->laia_shared = 0;
2140 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2141 lfsck_async_interpret_common,
2142 laia, LFSCK_NOTIFY);
2144 CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2145 "co-stop for %s: rc = %d\n",
2146 lfsck_lfsck2name(lfsck),
2147 (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2148 ltd->ltd_index, lad->lad_name, rc);
2151 rc = ptlrpc_set_wait(set);
2154 ptlrpc_set_destroy(set);
2155 lfsck_component_put(env, com);
2161 static int lfsck_async_interpret(const struct lu_env *env,
2162 struct ptlrpc_request *req,
2165 struct lfsck_async_interpret_args *laia = args;
2166 struct lfsck_instance *lfsck;
2168 lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2170 lfsck_interpret(env, lfsck, req, laia, rc);
2171 lfsck_tgt_put(laia->laia_ltd);
2172 if (rc != 0 && laia->laia_result != -EALREADY)
2173 laia->laia_result = rc;
2178 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2179 struct lfsck_request *lr,
2180 struct ptlrpc_request_set *set,
2181 ptlrpc_interpterer_t interpreter,
2182 void *args, int request)
2184 struct lfsck_async_interpret_args *laia;
2185 struct ptlrpc_request *req;
2186 struct lfsck_request *tmp;
2187 struct req_format *format;
2192 format = &RQF_LFSCK_NOTIFY;
2195 format = &RQF_LFSCK_QUERY;
2198 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2199 exp->exp_obd->obd_name, request, -EINVAL);
2203 req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2207 rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2209 ptlrpc_request_free(req);
2214 tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2216 ptlrpc_request_set_replen(req);
2218 laia = ptlrpc_req_async_args(req);
2219 *laia = *(struct lfsck_async_interpret_args *)args;
2220 if (laia->laia_com != NULL)
2221 lfsck_component_get(laia->laia_com);
2222 req->rq_interpret_reply = interpreter;
2223 ptlrpc_set_add_req(set, req);
2228 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2229 struct lfsck_start_param *lsp)
2231 struct lfsck_instance *lfsck = com->lc_lfsck;
2232 struct lfsck_assistant_data *lad = com->lc_data;
2233 struct ptlrpc_thread *mthread = &lfsck->li_thread;
2234 struct ptlrpc_thread *athread = &lad->lad_thread;
2235 struct lfsck_thread_args *lta;
2236 struct task_struct *task;
2240 lad->lad_assistant_status = 0;
2241 lad->lad_post_result = 0;
2242 lad->lad_to_post = 0;
2243 lad->lad_to_double_scan = 0;
2244 lad->lad_in_double_scan = 0;
2246 thread_set_flags(athread, 0);
2248 lta = lfsck_thread_args_init(lfsck, com, lsp);
2250 RETURN(PTR_ERR(lta));
2252 task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2255 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2256 "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2257 lfsck_thread_args_fini(lta);
2259 struct l_wait_info lwi = { 0 };
2261 l_wait_event(mthread->t_ctl_waitq,
2262 thread_is_running(athread) ||
2263 thread_is_stopped(athread),
2265 if (unlikely(!thread_is_running(athread)))
2266 rc = lad->lad_assistant_status;
2274 int lfsck_checkpoint_generic(const struct lu_env *env,
2275 struct lfsck_component *com)
2277 struct lfsck_assistant_data *lad = com->lc_data;
2278 struct ptlrpc_thread *mthread = &com->lc_lfsck->li_thread;
2279 struct ptlrpc_thread *athread = &lad->lad_thread;
2280 struct l_wait_info lwi = { 0 };
2282 if (com->lc_new_checked == 0)
2283 return LFSCK_CHECKPOINT_SKIP;
2285 l_wait_event(mthread->t_ctl_waitq,
2286 list_empty(&lad->lad_req_list) ||
2287 !thread_is_running(mthread) ||
2288 thread_is_stopped(athread),
2291 if (!thread_is_running(mthread) || thread_is_stopped(athread))
2292 return LFSCK_CHECKPOINT_SKIP;
2297 void lfsck_post_generic(const struct lu_env *env,
2298 struct lfsck_component *com, int *result)
2300 struct lfsck_assistant_data *lad = com->lc_data;
2301 struct ptlrpc_thread *athread = &lad->lad_thread;
2302 struct ptlrpc_thread *mthread = &com->lc_lfsck->li_thread;
2303 struct l_wait_info lwi = { 0 };
2305 lad->lad_post_result = *result;
2308 lad->lad_to_post = 1;
2310 wake_up_all(&athread->t_ctl_waitq);
2311 l_wait_event(mthread->t_ctl_waitq,
2312 (*result > 0 && list_empty(&lad->lad_req_list)) ||
2313 thread_is_stopped(athread),
2316 if (lad->lad_assistant_status < 0)
2317 *result = lad->lad_assistant_status;
2320 int lfsck_double_scan_generic(const struct lu_env *env,
2321 struct lfsck_component *com, int status)
2323 struct lfsck_assistant_data *lad = com->lc_data;
2324 struct ptlrpc_thread *mthread = &com->lc_lfsck->li_thread;
2325 struct ptlrpc_thread *athread = &lad->lad_thread;
2326 struct l_wait_info lwi = { 0 };
2328 if (status != LS_SCANNING_PHASE2)
2331 lad->lad_to_double_scan = 1;
2333 wake_up_all(&athread->t_ctl_waitq);
2334 l_wait_event(mthread->t_ctl_waitq,
2335 lad->lad_in_double_scan ||
2336 thread_is_stopped(athread),
2339 if (lad->lad_assistant_status < 0)
2340 return lad->lad_assistant_status;
2345 void lfsck_quit_generic(const struct lu_env *env,
2346 struct lfsck_component *com)
2348 struct lfsck_assistant_data *lad = com->lc_data;
2349 struct ptlrpc_thread *mthread = &com->lc_lfsck->li_thread;
2350 struct ptlrpc_thread *athread = &lad->lad_thread;
2351 struct l_wait_info lwi = { 0 };
2354 wake_up_all(&athread->t_ctl_waitq);
2355 l_wait_event(mthread->t_ctl_waitq,
2356 thread_is_init(athread) ||
2357 thread_is_stopped(athread),
2361 /* external interfaces */
2363 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2366 struct lfsck_instance *lfsck;
2370 rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2374 lfsck = lfsck_instance_find(key, true, false);
2375 if (likely(lfsck != NULL)) {
2376 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2377 lfsck_instance_put(&env, lfsck);
2386 EXPORT_SYMBOL(lfsck_get_speed);
2388 int lfsck_set_speed(struct dt_device *key, int val)
2391 struct lfsck_instance *lfsck;
2395 rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2399 lfsck = lfsck_instance_find(key, true, false);
2400 if (likely(lfsck != NULL)) {
2401 mutex_lock(&lfsck->li_mutex);
2402 if (__lfsck_set_speed(lfsck, val))
2403 rc = lfsck_bookmark_store(&env, lfsck);
2404 mutex_unlock(&lfsck->li_mutex);
2405 lfsck_instance_put(&env, lfsck);
2414 EXPORT_SYMBOL(lfsck_set_speed);
2416 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2419 struct lfsck_instance *lfsck;
2423 rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2427 lfsck = lfsck_instance_find(key, true, false);
2428 if (likely(lfsck != NULL)) {
2429 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2430 lfsck_instance_put(&env, lfsck);
2439 EXPORT_SYMBOL(lfsck_get_windows);
2441 int lfsck_set_windows(struct dt_device *key, int val)
2444 struct lfsck_instance *lfsck;
2448 rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2452 lfsck = lfsck_instance_find(key, true, false);
2453 if (likely(lfsck != NULL)) {
2454 if (val > LFSCK_ASYNC_WIN_MAX) {
2455 CWARN("%s: Too large async window size, which "
2456 "may cause memory issues. The valid range "
2457 "is [0 - %u]. If you do not want to restrict "
2458 "the window size for async requests pipeline, "
2459 "just set it as 0.\n",
2460 lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2462 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2463 mutex_lock(&lfsck->li_mutex);
2464 lfsck->li_bookmark_ram.lb_async_windows = val;
2465 rc = lfsck_bookmark_store(&env, lfsck);
2466 mutex_unlock(&lfsck->li_mutex);
2468 lfsck_instance_put(&env, lfsck);
2477 EXPORT_SYMBOL(lfsck_set_windows);
2479 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2482 struct lfsck_instance *lfsck;
2483 struct lfsck_component *com;
2487 rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2491 lfsck = lfsck_instance_find(key, true, false);
2492 if (likely(lfsck != NULL)) {
2493 com = lfsck_component_find(lfsck, type);
2494 if (likely(com != NULL)) {
2495 rc = com->lc_ops->lfsck_dump(&env, com, m);
2496 lfsck_component_put(&env, com);
2501 lfsck_instance_put(&env, lfsck);
2510 EXPORT_SYMBOL(lfsck_dump);
2512 static int lfsck_stop_all(const struct lu_env *env,
2513 struct lfsck_instance *lfsck,
2514 struct lfsck_stop *stop)
2516 struct lfsck_thread_info *info = lfsck_env_info(env);
2517 struct lfsck_request *lr = &info->lti_lr;
2518 struct lfsck_async_interpret_args *laia = &info->lti_laia;
2519 struct ptlrpc_request_set *set;
2520 struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
2521 struct lfsck_tgt_desc *ltd;
2522 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
2528 LASSERT(stop->ls_flags & LPF_BROADCAST);
2530 set = ptlrpc_prep_set();
2531 if (unlikely(set == NULL))
2534 memset(lr, 0, sizeof(*lr));
2535 lr->lr_event = LE_STOP;
2536 lr->lr_index = lfsck_dev_idx(lfsck);
2537 lr->lr_status = stop->ls_status;
2538 lr->lr_version = bk->lb_version;
2539 lr->lr_active = LFSCK_TYPES_ALL;
2540 lr->lr_param = stop->ls_flags;
2542 laia->laia_com = NULL;
2543 laia->laia_ltds = ltds;
2545 laia->laia_result = 0;
2546 laia->laia_shared = 1;
2548 down_read(<ds->ltd_rw_sem);
2549 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2550 ltd = lfsck_tgt_get(ltds, idx);
2551 LASSERT(ltd != NULL);
2553 laia->laia_ltd = ltd;
2554 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2555 lfsck_async_interpret, laia,
2558 lfsck_interpret(env, lfsck, NULL, laia, rc);
2560 CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2561 "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2565 up_read(<ds->ltd_rw_sem);
2567 rc = ptlrpc_set_wait(set);
2568 ptlrpc_set_destroy(set);
2571 rc = laia->laia_result;
2573 if (rc == -EALREADY)
2577 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2578 lfsck_lfsck2name(lfsck), rc);
2580 RETURN(rc != 0 ? rc : rc1);
2583 static int lfsck_start_all(const struct lu_env *env,
2584 struct lfsck_instance *lfsck,
2585 struct lfsck_start *start)
2587 struct lfsck_thread_info *info = lfsck_env_info(env);
2588 struct lfsck_request *lr = &info->lti_lr;
2589 struct lfsck_async_interpret_args *laia = &info->lti_laia;
2590 struct ptlrpc_request_set *set;
2591 struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
2592 struct lfsck_tgt_desc *ltd;
2593 struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
2598 LASSERT(start->ls_flags & LPF_BROADCAST);
2600 set = ptlrpc_prep_set();
2601 if (unlikely(set == NULL))
2604 memset(lr, 0, sizeof(*lr));
2605 lr->lr_event = LE_START;
2606 lr->lr_index = lfsck_dev_idx(lfsck);
2607 lr->lr_speed = bk->lb_speed_limit;
2608 lr->lr_version = bk->lb_version;
2609 lr->lr_active = start->ls_active;
2610 lr->lr_param = start->ls_flags;
2611 lr->lr_async_windows = bk->lb_async_windows;
2612 lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2613 LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ |
2616 laia->laia_com = NULL;
2617 laia->laia_ltds = ltds;
2619 laia->laia_result = 0;
2620 laia->laia_shared = 1;
2622 down_read(<ds->ltd_rw_sem);
2623 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2624 ltd = lfsck_tgt_get(ltds, idx);
2625 LASSERT(ltd != NULL);
2627 laia->laia_ltd = ltd;
2628 ltd->ltd_layout_done = 0;
2629 ltd->ltd_namespace_done = 0;
2630 ltd->ltd_synced_failures = 0;
2631 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2632 lfsck_async_interpret, laia,
2635 lfsck_interpret(env, lfsck, NULL, laia, rc);
2637 CERROR("%s: cannot notify MDT %x for LFSCK "
2638 "start, failout: rc = %d\n",
2639 lfsck_lfsck2name(lfsck), idx, rc);
2643 up_read(<ds->ltd_rw_sem);
2646 ptlrpc_set_destroy(set);
2651 rc = ptlrpc_set_wait(set);
2652 ptlrpc_set_destroy(set);
2655 rc = laia->laia_result;
2658 struct lfsck_stop *stop = &info->lti_stop;
2660 CERROR("%s: cannot start LFSCK on some MDTs, "
2661 "stop all: rc = %d\n",
2662 lfsck_lfsck2name(lfsck), rc);
2663 if (rc != -EALREADY) {
2664 stop->ls_status = LS_FAILED;
2665 stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2666 lfsck_stop_all(env, lfsck, stop);
2673 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2674 struct lfsck_start_param *lsp)
2676 struct lfsck_start *start = lsp->lsp_start;
2677 struct lfsck_instance *lfsck;
2678 struct lfsck_bookmark *bk;
2679 struct ptlrpc_thread *thread;
2680 struct lfsck_component *com;
2681 struct l_wait_info lwi = { 0 };
2682 struct lfsck_thread_args *lta;
2683 struct task_struct *task;
2690 lfsck = lfsck_instance_find(key, true, false);
2691 if (unlikely(lfsck == NULL))
2694 /* System is not ready, try again later. */
2695 if (unlikely(lfsck->li_namespace == NULL))
2696 GOTO(put, rc = -EAGAIN);
2698 /* start == NULL means auto trigger paused LFSCK. */
2699 if ((start == NULL) &&
2700 (list_empty(&lfsck->li_list_scan) ||
2701 OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2704 bk = &lfsck->li_bookmark_ram;
2705 thread = &lfsck->li_thread;
2706 mutex_lock(&lfsck->li_mutex);
2707 spin_lock(&lfsck->li_lock);
2708 if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2710 if (unlikely(start == NULL)) {
2711 spin_unlock(&lfsck->li_lock);
2715 while (start->ls_active != 0) {
2716 if (!(type & start->ls_active)) {
2721 com = __lfsck_component_find(lfsck, type,
2722 &lfsck->li_list_scan);
2724 com = __lfsck_component_find(lfsck, type,
2725 &lfsck->li_list_double_scan);
2731 if (com->lc_ops->lfsck_join != NULL) {
2732 rc = com->lc_ops->lfsck_join( env, com, lsp);
2733 if (rc != 0 && rc != -EALREADY)
2736 start->ls_active &= ~type;
2739 spin_unlock(&lfsck->li_lock);
2742 spin_unlock(&lfsck->li_lock);
2744 lfsck->li_status = 0;
2745 lfsck->li_oit_over = 0;
2746 lfsck->li_start_unplug = 0;
2747 lfsck->li_drop_dryrun = 0;
2748 lfsck->li_new_scanned = 0;
2750 /* For auto trigger. */
2754 if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2755 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2756 lfsck_lfsck2name(lfsck));
2758 GOTO(out, rc = -EPERM);
2761 start->ls_version = bk->lb_version;
2763 if (start->ls_active != 0) {
2764 struct lfsck_component *next;
2766 if (start->ls_active == LFSCK_TYPES_ALL)
2767 start->ls_active = LFSCK_TYPES_SUPPORTED;
2769 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2770 start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2771 GOTO(out, rc = -ENOTSUPP);
2774 list_for_each_entry_safe(com, next,
2775 &lfsck->li_list_scan, lc_link) {
2776 if (!(com->lc_type & start->ls_active)) {
2777 rc = com->lc_ops->lfsck_post(env, com, 0,
2784 while (start->ls_active != 0) {
2785 if (type & start->ls_active) {
2786 com = __lfsck_component_find(lfsck, type,
2787 &lfsck->li_list_idle);
2789 /* The component status will be updated
2790 * when its prep() is called later by
2791 * the LFSCK main engine. */
2792 list_move_tail(&com->lc_link,
2793 &lfsck->li_list_scan);
2794 start->ls_active &= ~type;
2800 if (list_empty(&lfsck->li_list_scan)) {
2801 /* The speed limit will be used to control both the LFSCK and
2802 * low layer scrub (if applied), need to be handled firstly. */
2803 if (start->ls_valid & LSV_SPEED_LIMIT) {
2804 if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
2805 rc = lfsck_bookmark_store(env, lfsck);
2814 if (start->ls_flags & LPF_RESET)
2815 flags |= DOIF_RESET;
2817 rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
2821 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2822 start->ls_active |= com->lc_type;
2823 if (flags & DOIF_RESET) {
2824 rc = com->lc_ops->lfsck_reset(env, com, false);
2831 lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
2832 if (bk->lb_param & LPF_DRYRUN)
2833 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2835 if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
2836 valid |= DOIV_ERROR_HANDLE;
2837 if (start->ls_flags & LPF_FAILOUT)
2838 flags |= DOIF_FAILOUT;
2841 if (start != NULL && start->ls_valid & LSV_DRYRUN) {
2842 valid |= DOIV_DRYRUN;
2843 if (start->ls_flags & LPF_DRYRUN)
2844 flags |= DOIF_DRYRUN;
2847 if (!list_empty(&lfsck->li_list_scan))
2848 flags |= DOIF_OUTUSED;
2850 lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2851 thread_set_flags(thread, 0);
2852 lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2854 GOTO(out, rc = PTR_ERR(lta));
2856 __lfsck_set_speed(lfsck, bk->lb_speed_limit);
2857 task = kthread_run(lfsck_master_engine, lta, "lfsck");
2860 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
2861 lfsck_lfsck2name(lfsck), rc);
2862 lfsck_thread_args_fini(lta);
2867 l_wait_event(thread->t_ctl_waitq,
2868 thread_is_running(thread) ||
2869 thread_is_stopped(thread),
2871 if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2872 lfsck->li_start_unplug = 1;
2873 wake_up_all(&thread->t_ctl_waitq);
2878 /* release lfsck::li_mutex to avoid deadlock. */
2879 mutex_unlock(&lfsck->li_mutex);
2880 rc = lfsck_start_all(env, lfsck, start);
2882 spin_lock(&lfsck->li_lock);
2883 if (thread_is_stopped(thread)) {
2884 spin_unlock(&lfsck->li_lock);
2886 lfsck->li_status = LS_FAILED;
2887 lfsck->li_flags = 0;
2888 thread_set_flags(thread, SVC_STOPPING);
2889 spin_unlock(&lfsck->li_lock);
2891 lfsck->li_start_unplug = 1;
2892 wake_up_all(&thread->t_ctl_waitq);
2893 l_wait_event(thread->t_ctl_waitq,
2894 thread_is_stopped(thread),
2898 lfsck->li_start_unplug = 1;
2899 wake_up_all(&thread->t_ctl_waitq);
2905 mutex_unlock(&lfsck->li_mutex);
2908 lfsck_instance_put(env, lfsck);
2910 return rc < 0 ? rc : 0;
2912 EXPORT_SYMBOL(lfsck_start);
2914 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2915 struct lfsck_stop *stop)
2917 struct lfsck_instance *lfsck;
2918 struct ptlrpc_thread *thread;
2919 struct l_wait_info lwi = { 0 };
2924 lfsck = lfsck_instance_find(key, true, false);
2925 if (unlikely(lfsck == NULL))
2928 thread = &lfsck->li_thread;
2929 /* release lfsck::li_mutex to avoid deadlock. */
2930 if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2931 if (!lfsck->li_master) {
2932 CERROR("%s: only allow to specify '-A' via MDS\n",
2933 lfsck_lfsck2name(lfsck));
2935 GOTO(out, rc = -EPERM);
2938 rc1 = lfsck_stop_all(env, lfsck, stop);
2941 mutex_lock(&lfsck->li_mutex);
2942 spin_lock(&lfsck->li_lock);
2943 /* no error if LFSCK is already stopped, or was never started */
2944 if (thread_is_init(thread) || thread_is_stopped(thread)) {
2945 spin_unlock(&lfsck->li_lock);
2950 lfsck->li_status = stop->ls_status;
2951 lfsck->li_flags = stop->ls_flags;
2953 lfsck->li_status = LS_STOPPED;
2954 lfsck->li_flags = 0;
2957 thread_set_flags(thread, SVC_STOPPING);
2958 spin_unlock(&lfsck->li_lock);
2960 wake_up_all(&thread->t_ctl_waitq);
2961 l_wait_event(thread->t_ctl_waitq,
2962 thread_is_stopped(thread),
2968 mutex_unlock(&lfsck->li_mutex);
2969 lfsck_instance_put(env, lfsck);
2971 return rc != 0 ? rc : rc1;
2973 EXPORT_SYMBOL(lfsck_stop);
2975 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2976 struct lfsck_request *lr, struct thandle *th)
2978 int rc = -EOPNOTSUPP;
2981 switch (lr->lr_event) {
2983 struct lfsck_start *start = &lfsck_env_info(env)->lti_start;
2984 struct lfsck_start_param lsp;
2986 memset(start, 0, sizeof(*start));
2987 start->ls_valid = lr->lr_valid;
2988 start->ls_speed_limit = lr->lr_speed;
2989 start->ls_version = lr->lr_version;
2990 start->ls_active = lr->lr_active;
2991 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2992 start->ls_async_windows = lr->lr_async_windows;
2994 lsp.lsp_start = start;
2995 lsp.lsp_index = lr->lr_index;
2996 lsp.lsp_index_valid = 1;
2997 rc = lfsck_start(env, key, &lsp);
3001 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
3003 memset(stop, 0, sizeof(*stop));
3004 stop->ls_status = lr->lr_status;
3005 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3006 rc = lfsck_stop(env, key, stop);
3009 case LE_PHASE1_DONE:
3010 case LE_PHASE2_DONE:
3011 case LE_FID_ACCESSED:
3013 case LE_CONDITIONAL_DESTROY:
3014 case LE_SKIP_NLINK_DECLARE:
3016 case LE_SET_LMV_MASTER:
3017 case LE_SET_LMV_SLAVE:
3018 case LE_PAIRS_VERIFY: {
3019 struct lfsck_instance *lfsck;
3020 struct lfsck_component *com;
3022 lfsck = lfsck_instance_find(key, true, false);
3023 if (unlikely(lfsck == NULL))
3026 com = lfsck_component_find(lfsck, lr->lr_active);
3027 if (likely(com != NULL)) {
3028 rc = com->lc_ops->lfsck_in_notify(env, com, lr, th);
3029 lfsck_component_put(env, com);
3032 lfsck_instance_put(env, lfsck);
3041 EXPORT_SYMBOL(lfsck_in_notify);
3043 int lfsck_query(const struct lu_env *env, struct dt_device *key,
3044 struct lfsck_request *lr)
3046 struct lfsck_instance *lfsck;
3047 struct lfsck_component *com;
3051 lfsck = lfsck_instance_find(key, true, false);
3052 if (unlikely(lfsck == NULL))
3055 com = lfsck_component_find(lfsck, lr->lr_active);
3056 if (likely(com != NULL)) {
3057 rc = com->lc_ops->lfsck_query(env, com);
3058 lfsck_component_put(env, com);
3063 lfsck_instance_put(env, lfsck);
3068 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
3069 struct ldlm_namespace *ns)
3071 struct lfsck_instance *lfsck;
3074 lfsck = lfsck_instance_find(key, true, false);
3075 if (likely(lfsck != NULL)) {
3076 lfsck->li_namespace = ns;
3077 lfsck_instance_put(env, lfsck);
3083 EXPORT_SYMBOL(lfsck_register_namespace);
3085 int lfsck_register(const struct lu_env *env, struct dt_device *key,
3086 struct dt_device *next, struct obd_device *obd,
3087 lfsck_out_notify notify, void *notify_data, bool master)
3089 struct lfsck_instance *lfsck;
3090 struct dt_object *root = NULL;
3091 struct dt_object *obj = NULL;
3092 struct lu_fid *fid = &lfsck_env_info(env)->lti_fid;
3096 lfsck = lfsck_instance_find(key, false, false);
3097 if (unlikely(lfsck != NULL))
3100 OBD_ALLOC_PTR(lfsck);
3104 mutex_init(&lfsck->li_mutex);
3105 spin_lock_init(&lfsck->li_lock);
3106 INIT_LIST_HEAD(&lfsck->li_link);
3107 INIT_LIST_HEAD(&lfsck->li_list_scan);
3108 INIT_LIST_HEAD(&lfsck->li_list_dir);
3109 INIT_LIST_HEAD(&lfsck->li_list_double_scan);
3110 INIT_LIST_HEAD(&lfsck->li_list_idle);
3111 INIT_LIST_HEAD(&lfsck->li_list_lmv);
3112 atomic_set(&lfsck->li_ref, 1);
3113 atomic_set(&lfsck->li_double_scan_count, 0);
3114 init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
3115 lfsck->li_out_notify = notify;
3116 lfsck->li_out_notify_data = notify_data;
3117 lfsck->li_next = next;
3118 lfsck->li_bottom = key;
3119 lfsck->li_obd = obd;
3121 rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3125 rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3129 fid->f_seq = FID_SEQ_LOCAL_NAME;
3132 rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3136 rc = dt_root_get(env, key, fid);
3140 root = dt_locate(env, key, fid);
3142 GOTO(out, rc = PTR_ERR(root));
3144 if (unlikely(!dt_try_as_dir(env, root)))
3145 GOTO(out, rc = -ENOTDIR);
3147 lfsck->li_local_root_fid = *fid;
3149 lfsck->li_master = 1;
3150 if (lfsck_dev_idx(lfsck) == 0) {
3151 struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3152 const struct lu_name *cname;
3154 rc = dt_lookup(env, root,
3155 (struct dt_rec *)(&lfsck->li_global_root_fid),
3156 (const struct dt_key *)"ROOT", BYPASS_CAPA);
3160 obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3162 GOTO(out, rc = PTR_ERR(obj));
3164 if (unlikely(!dt_try_as_dir(env, obj)))
3165 GOTO(out, rc = -ENOTDIR);
3167 rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3168 (const struct dt_key *)dotlustre, BYPASS_CAPA);
3172 lfsck_object_put(env, obj);
3173 obj = dt_locate(env, key, fid);
3175 GOTO(out, rc = PTR_ERR(obj));
3177 cname = lfsck_name_get_const(env, dotlustre,
3179 rc = lfsck_verify_linkea(env, obj, cname,
3180 &lfsck->li_global_root_fid);
3184 if (unlikely(!dt_try_as_dir(env, obj)))
3185 GOTO(out, rc = -ENOTDIR);
3188 rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3189 (const struct dt_key *)lostfound,
3194 lfsck_object_put(env, obj);
3195 obj = dt_locate(env, key, fid);
3197 GOTO(out, rc = PTR_ERR(obj));
3199 cname = lfsck_name_get_const(env, lostfound,
3201 rc = lfsck_verify_linkea(env, obj, cname, pfid);
3205 lfsck_object_put(env, obj);
3210 fid->f_seq = FID_SEQ_LOCAL_FILE;
3211 fid->f_oid = OTABLE_IT_OID;
3213 obj = dt_locate(env, key, fid);
3215 GOTO(out, rc = PTR_ERR(obj));
3217 rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3221 lfsck->li_obj_oit = obj;
3222 obj = local_file_find_or_create(env, lfsck->li_los, root, LFSCK_DIR,
3223 S_IFDIR | S_IRUGO | S_IWUSR);
3225 GOTO(out, rc = PTR_ERR(obj));
3227 lu_object_get(&obj->do_lu);
3228 lfsck->li_lfsck_dir = obj;
3229 rc = lfsck_bookmark_setup(env, lfsck);
3234 rc = lfsck_fid_init(lfsck);
3238 rc = lfsck_namespace_setup(env, lfsck);
3243 rc = lfsck_layout_setup(env, lfsck);
3247 /* XXX: more LFSCK components initialization to be added here. */
3249 rc = lfsck_instance_add(lfsck);
3251 rc = lfsck_add_target_from_orphan(env, lfsck);
3253 if (obj != NULL && !IS_ERR(obj))
3254 lfsck_object_put(env, obj);
3255 if (root != NULL && !IS_ERR(root))
3256 lfsck_object_put(env, root);
3258 lfsck_instance_cleanup(env, lfsck);
3261 EXPORT_SYMBOL(lfsck_register);
3263 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3265 struct lfsck_instance *lfsck;
3267 lfsck = lfsck_instance_find(key, false, true);
3269 lfsck_instance_put(env, lfsck);
3271 EXPORT_SYMBOL(lfsck_degister);
3273 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3274 struct dt_device *tgt, struct obd_export *exp,
3275 __u32 index, bool for_ost)
3277 struct lfsck_instance *lfsck;
3278 struct lfsck_tgt_desc *ltd;
3289 INIT_LIST_HEAD(<d->ltd_orphan_list);
3290 INIT_LIST_HEAD(<d->ltd_layout_list);
3291 INIT_LIST_HEAD(<d->ltd_layout_phase_list);
3292 INIT_LIST_HEAD(<d->ltd_namespace_list);
3293 INIT_LIST_HEAD(<d->ltd_namespace_phase_list);
3294 atomic_set(<d->ltd_ref, 1);
3295 ltd->ltd_index = index;
3297 spin_lock(&lfsck_instance_lock);
3298 lfsck = __lfsck_instance_find(key, true, false);
3299 if (lfsck == NULL) {
3301 list_add_tail(<d->ltd_orphan_list,
3302 &lfsck_ost_orphan_list);
3304 list_add_tail(<d->ltd_orphan_list,
3305 &lfsck_mdt_orphan_list);
3306 spin_unlock(&lfsck_instance_lock);
3310 spin_unlock(&lfsck_instance_lock);
3312 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
3316 lfsck_instance_put(env, lfsck);
3320 EXPORT_SYMBOL(lfsck_add_target);
3322 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
3323 struct dt_device *tgt, __u32 index, bool for_ost)
3325 struct lfsck_instance *lfsck;
3326 struct lfsck_tgt_descs *ltds;
3327 struct lfsck_tgt_desc *ltd;
3328 struct list_head *head;
3331 head = &lfsck_ost_orphan_list;
3333 head = &lfsck_mdt_orphan_list;
3335 spin_lock(&lfsck_instance_lock);
3336 list_for_each_entry(ltd, head, ltd_orphan_list) {
3337 if (ltd->ltd_tgt == tgt) {
3338 list_del_init(<d->ltd_orphan_list);
3339 spin_unlock(&lfsck_instance_lock);
3347 lfsck = __lfsck_instance_find(key, true, false);
3348 spin_unlock(&lfsck_instance_lock);
3349 if (unlikely(lfsck == NULL))
3353 ltds = &lfsck->li_ost_descs;
3355 ltds = &lfsck->li_mdt_descs;
3357 down_write(<ds->ltd_rw_sem);
3358 LASSERT(ltds->ltd_tgts_bitmap != NULL);
3360 if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
3363 ltd = LTD_TGT(ltds, index);
3364 if (unlikely(ltd == NULL))
3367 LASSERT(ltds->ltd_tgtnr > 0);
3370 cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
3371 LTD_TGT(ltds, index) = NULL;
3376 head = &lfsck->li_ost_descs.ltd_orphan;
3378 head = &lfsck->li_mdt_descs.ltd_orphan;
3380 list_for_each_entry(ltd, head, ltd_orphan_list) {
3381 if (ltd->ltd_tgt == tgt) {
3382 list_del_init(<d->ltd_orphan_list);
3388 up_write(<ds->ltd_rw_sem);
3390 spin_lock(<ds->ltd_lock);
3392 spin_unlock(<ds->ltd_lock);
3393 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
3394 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
3398 lfsck_instance_put(env, lfsck);
3400 EXPORT_SYMBOL(lfsck_del_target);
3402 static int __init lfsck_init(void)
3406 INIT_LIST_HEAD(&lfsck_instance_list);
3407 INIT_LIST_HEAD(&lfsck_ost_orphan_list);
3408 INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
3409 lfsck_key_init_generic(&lfsck_thread_key, NULL);
3410 rc = lu_context_key_register(&lfsck_thread_key);
3412 tgt_register_lfsck_in_notify(lfsck_in_notify);
3413 tgt_register_lfsck_query(lfsck_query);
3419 static void __exit lfsck_exit(void)
3421 struct lfsck_tgt_desc *ltd;
3422 struct lfsck_tgt_desc *next;
3424 LASSERT(list_empty(&lfsck_instance_list));
3426 list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
3428 list_del_init(<d->ltd_orphan_list);
3432 list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
3434 list_del_init(<d->ltd_orphan_list);
3438 lu_context_key_degister(&lfsck_thread_key);
3441 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
3442 MODULE_DESCRIPTION("LFSCK");
3443 MODULE_LICENSE("GPL");
3445 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);