Whamcloud - gitweb
LU-5820 lfsck: use multiple namespace LFSCK trace files 09/12809/19
authorFan Yong <fan.yong@intel.com>
Thu, 6 Nov 2014 11:59:27 +0000 (19:59 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 3 Feb 2015 18:42:55 +0000 (18:42 +0000)
The namespace LFSCK uses trace file to record the FID of the object
that has multiple hard links, or has remote name entry, or contains
some uncertain inconsistency, and so on. Only single namespace LFSCK
trace file may be not efficient, especially when there are millions
of FIDs to be recorded. So use multiple namespace LFSCK trace files
and per trace file based semaphore to control the concurrent access
of the trace file.

For Lustre-2.x (x <= 6), the LFSCK used LFSCK_NAMESPACE_MAGIC_V1 as
the namespace trace file magic. When downgrade to such old release,
the old LFSCK will not recognize the new LFSCK_NAMESPACE_MAGIC_V2 in
the new trace file, then it will reset the whole LFSCK, and will not
cause start failure. The similar case will happen when upgrade from
such old release.

This patch also drops some repeated FID recording in the namespace
LFSCK trace file. Related FID should have been recorded in the trace
file via lfsck_namespace_exec_oit(), it is unnecessary to do that
again when scanning the directory.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: Iec27c52b21789dbde1e4c1153f61162f028ceac3
Reviewed-on: http://review.whamcloud.com/12809
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
lustre/include/lustre_disk.h
lustre/lfsck/lfsck_bookmark.c
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c
lustre/lfsck/lfsck_namespace.c
lustre/osd-ldiskfs/osd_scrub.c
lustre/tests/sanity-lfsck.sh

index 2e19595..0c1f665 100644 (file)
 #define QMT_DIR                        "quota_master"
 #define QSD_DIR                        "quota_slave"
 #define HSM_ACTIONS            "hsm_actions"
 #define QMT_DIR                        "quota_master"
 #define QSD_DIR                        "quota_slave"
 #define HSM_ACTIONS            "hsm_actions"
+#define LFSCK_DIR              "LFSCK"
+#define LFSCK_BOOKMARK         "lfsck_bookmark"
+#define LFSCK_LAYOUT           "lfsck_layout"
+#define LFSCK_NAMESPACE                "lfsck_namespace"
 
 /****************** persistent mount data *********************/
 
 
 /****************** persistent mount data *********************/
 
index 89cbeaa..0d1f6d0 100644 (file)
@@ -39,8 +39,6 @@
 
 #define LFSCK_BOOKMARK_MAGIC   0x20130C1D
 
 
 #define LFSCK_BOOKMARK_MAGIC   0x20130C1D
 
-static const char lfsck_bookmark_name[] = "lfsck_bookmark";
-
 static void lfsck_bookmark_le_to_cpu(struct lfsck_bookmark *des,
                                     struct lfsck_bookmark *src)
 {
 static void lfsck_bookmark_le_to_cpu(struct lfsck_bookmark *des,
                                     struct lfsck_bookmark *src)
 {
@@ -175,7 +173,7 @@ int lfsck_bookmark_setup(const struct lu_env *env,
        }
 
        obj = local_file_find_or_create(env, lfsck->li_los, root,
        }
 
        obj = local_file_find_or_create(env, lfsck->li_los, root,
-                                       lfsck_bookmark_name,
+                                       LFSCK_BOOKMARK,
                                        S_IFREG | S_IRUGO | S_IWUSR);
        lu_object_put(env, &root->do_lu);
        if (IS_ERR(obj))
                                        S_IFREG | S_IRUGO | S_IWUSR);
        lu_object_put(env, &root->do_lu);
        if (IS_ERR(obj))
index 3484301..ecbbf74 100644 (file)
@@ -480,6 +480,15 @@ struct lfsck_tgt_descs {
 #define OST_TGT(lfsck, index)   LTD_TGT(&lfsck->li_ost_descs, index)
 #define MDT_TGT(lfsck, index)   LTD_TGT(&lfsck->li_mdt_descs, index)
 
 #define OST_TGT(lfsck, index)   LTD_TGT(&lfsck->li_ost_descs, index)
 #define MDT_TGT(lfsck, index)   LTD_TGT(&lfsck->li_mdt_descs, index)
 
+#define LFSCK_STF_BITS 4
+/* If want to adjust the LFSCK_STF_COUNT, please change LFSCK_STF_BITS. */
+#define LFSCK_STF_COUNT        (1 << LFSCK_STF_BITS)
+
+struct lfsck_sub_trace_obj {
+       struct dt_object        *lsto_obj;
+       struct mutex             lsto_mutex;
+};
+
 struct lfsck_component {
        /* into lfsck_instance::li_list_(scan,double_scan,idle} */
        struct list_head         lc_link;
 struct lfsck_component {
        /* into lfsck_instance::li_list_(scan,double_scan,idle} */
        struct list_head         lc_link;
@@ -493,6 +502,7 @@ struct lfsck_component {
        struct lfsck_position    lc_pos_start;
        struct lfsck_instance   *lc_lfsck;
        struct dt_object        *lc_obj;
        struct lfsck_position    lc_pos_start;
        struct lfsck_instance   *lc_lfsck;
        struct dt_object        *lc_obj;
+       struct lfsck_sub_trace_obj lc_sub_trace_objs[LFSCK_STF_COUNT];
        struct lfsck_operations *lc_ops;
        void                    *lc_file_ram;
        void                    *lc_file_disk;
        struct lfsck_operations *lc_ops;
        void                    *lc_file_ram;
        void                    *lc_file_disk;
@@ -628,6 +638,7 @@ struct lfsck_instance {
        struct local_oid_storage *li_los;
        struct lu_fid             li_local_root_fid;  /* backend root "/" */
        struct lu_fid             li_global_root_fid; /* /ROOT */
        struct local_oid_storage *li_los;
        struct lu_fid             li_local_root_fid;  /* backend root "/" */
        struct lu_fid             li_global_root_fid; /* /ROOT */
+       struct dt_object         *li_lfsck_dir;
        struct dt_object         *li_bookmark_obj;
        struct dt_object         *li_lpf_obj;
        struct dt_object         *li_lpf_root_obj;
        struct dt_object         *li_bookmark_obj;
        struct dt_object         *li_lpf_obj;
        struct dt_object         *li_lpf_root_obj;
@@ -1261,6 +1272,15 @@ static inline void lfsck_component_put(const struct lu_env *env,
                                       struct lfsck_component *com)
 {
        if (atomic_dec_and_test(&com->lc_ref)) {
                                       struct lfsck_component *com)
 {
        if (atomic_dec_and_test(&com->lc_ref)) {
+               struct lfsck_sub_trace_obj *lsto;
+               int                         i;
+
+               for (i = 0, lsto = &com->lc_sub_trace_objs[0];
+                    i < LFSCK_STF_COUNT; i++, lsto++) {
+                       if (lsto->lsto_obj != NULL)
+                               lu_object_put(env, &lsto->lsto_obj->do_lu);
+               }
+
                if (com->lc_obj != NULL)
                        lu_object_put_nocache(env, &com->lc_obj->do_lu);
                if (com->lc_file_ram != NULL)
                if (com->lc_obj != NULL)
                        lu_object_put_nocache(env, &com->lc_obj->do_lu);
                if (com->lc_file_ram != NULL)
@@ -1349,6 +1369,11 @@ static inline struct lfsck_lmv *lfsck_lmv_get(struct lfsck_lmv *llmv)
        return llmv;
 }
 
        return llmv;
 }
 
+static inline int lfsck_sub_trace_file_fid2idx(const struct lu_fid *fid)
+{
+       return fid->f_oid & (LFSCK_STF_COUNT - 1);
+}
+
 static inline void lfsck_lmv_header_le_to_cpu(struct lmv_mds_md_v1 *dst,
                                              const struct lmv_mds_md_v1 *src)
 {
 static inline void lfsck_lmv_header_le_to_cpu(struct lmv_mds_md_v1 *dst,
                                              const struct lmv_mds_md_v1 *src)
 {
index a16396a..02dc24a 100644 (file)
@@ -53,8 +53,6 @@
 
 #define LFSCK_LAYOUT_MAGIC             LFSCK_LAYOUT_MAGIC_V2
 
 
 #define LFSCK_LAYOUT_MAGIC             LFSCK_LAYOUT_MAGIC_V2
 
-static const char lfsck_layout_name[] = "lfsck_layout";
-
 struct lfsck_layout_seq {
        struct list_head         lls_list;
        __u64                    lls_seq;
 struct lfsck_layout_seq {
        struct list_head         lls_list;
        __u64                    lls_seq;
@@ -5456,7 +5454,7 @@ int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
                com->lc_ops = &lfsck_layout_master_ops;
                com->lc_data = lfsck_assistant_data_init(
                                &lfsck_layout_assistant_ops,
                com->lc_ops = &lfsck_layout_master_ops;
                com->lc_data = lfsck_assistant_data_init(
                                &lfsck_layout_assistant_ops,
-                               "lfsck_layout");
+                               LFSCK_LAYOUT);
                if (com->lc_data == NULL)
                        GOTO(out, rc = -ENOMEM);
        } else {
                if (com->lc_data == NULL)
                        GOTO(out, rc = -ENOMEM);
        } else {
@@ -5491,7 +5489,7 @@ int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
                GOTO(out, rc = -ENOTDIR);
 
        obj = local_file_find_or_create(env, lfsck->li_los, root,
                GOTO(out, rc = -ENOTDIR);
 
        obj = local_file_find_or_create(env, lfsck->li_los, root,
-                                       lfsck_layout_name,
+                                       LFSCK_LAYOUT,
                                        S_IFREG | S_IRUGO | S_IWUSR);
        if (IS_ERR(obj))
                GOTO(out, rc = PTR_ERR(obj));
                                        S_IFREG | S_IRUGO | S_IWUSR);
        if (IS_ERR(obj))
                GOTO(out, rc = PTR_ERR(obj));
index 2882700..3cf9703 100644 (file)
@@ -1544,6 +1544,11 @@ void lfsck_instance_cleanup(const struct lu_env *env,
        lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
        lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
 
        lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
        lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
 
+       if (lfsck->li_lfsck_dir != NULL) {
+               lu_object_put_nocache(env, &lfsck->li_lfsck_dir->do_lu);
+               lfsck->li_lfsck_dir = NULL;
+       }
+
        if (lfsck->li_bookmark_obj != NULL) {
                lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
                lfsck->li_bookmark_obj = NULL;
        if (lfsck->li_bookmark_obj != NULL) {
                lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
                lfsck->li_bookmark_obj = NULL;
@@ -3210,12 +3215,18 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key,
        if (IS_ERR(obj))
                GOTO(out, rc = PTR_ERR(obj));
 
        if (IS_ERR(obj))
                GOTO(out, rc = PTR_ERR(obj));
 
-       lu_object_get(&obj->do_lu);
-       lfsck->li_obj_oit = obj;
        rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
        if (rc != 0)
                GOTO(out, rc);
 
        rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
        if (rc != 0)
                GOTO(out, rc);
 
+       lfsck->li_obj_oit = obj;
+       obj = local_file_find_or_create(env, lfsck->li_los, root, LFSCK_DIR,
+                                       S_IFDIR | S_IRUGO | S_IWUSR);
+       if (IS_ERR(obj))
+               GOTO(out, rc = PTR_ERR(obj));
+
+       lu_object_get(&obj->do_lu);
+       lfsck->li_lfsck_dir = obj;
        rc = lfsck_bookmark_setup(env, lfsck);
        if (rc != 0)
                GOTO(out, rc);
        rc = lfsck_bookmark_setup(env, lfsck);
        if (rc != 0)
                GOTO(out, rc);
index f254e1d..91e0355 100644 (file)
 
 #include "lfsck_internal.h"
 
 
 #include "lfsck_internal.h"
 
-#define LFSCK_NAMESPACE_MAGIC  0xA0629D03
+#define LFSCK_NAMESPACE_MAGIC_V1       0xA0629D03
+#define LFSCK_NAMESPACE_MAGIC_V2       0xA0621A0B
+
+/* For Lustre-2.x (x <= 6), the namespace LFSCK used LFSCK_NAMESPACE_MAGIC_V1
+ * as the trace file magic. When downgrade to such old release, the old LFSCK
+ * will not recognize the new LFSCK_NAMESPACE_MAGIC_V2 in the new trace file,
+ * then it will reset the whole LFSCK, and will not cause start failure. The
+ * similar case will happen when upgrade from such old release. */
+#define LFSCK_NAMESPACE_MAGIC          LFSCK_NAMESPACE_MAGIC_V2
 
 enum lfsck_nameentry_check {
        LFSCK_NAMEENTRY_DEAD            = 1, /* The object has been unlinked. */
 
 enum lfsck_nameentry_check {
        LFSCK_NAMEENTRY_DEAD            = 1, /* The object has been unlinked. */
@@ -49,8 +57,6 @@ enum lfsck_nameentry_check {
        LFSCK_NAMEENTRY_RECREATED       = 3, /* The entry has been recreated. */
 };
 
        LFSCK_NAMEENTRY_RECREATED       = 3, /* The entry has been recreated. */
 };
 
-static const char lfsck_namespace_name[] = "lfsck_namespace";
-
 static struct lfsck_namespace_req *
 lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
                                   struct lu_dirent *ent, __u16 type)
 static struct lfsck_namespace_req *
 lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
                                   struct lu_dirent *ent, __u16 type)
@@ -466,6 +472,70 @@ log:
        return rc;
 }
 
        return rc;
 }
 
+static struct dt_object *
+lfsck_namespace_load_one_trace_file(const struct lu_env *env,
+                                   struct lfsck_component *com,
+                                   struct dt_object *parent,
+                                   const char *name,
+                                   const struct dt_index_features *ft,
+                                   bool reset)
+{
+       struct lfsck_instance   *lfsck = com->lc_lfsck;
+       struct dt_object        *obj;
+       int                      rc;
+
+       if (reset) {
+               rc = local_object_unlink(env, lfsck->li_bottom, parent, name);
+               if (rc != 0 && rc != -ENOENT)
+                       return ERR_PTR(rc);
+       }
+
+       if (ft != NULL)
+               obj = local_index_find_or_create(env, lfsck->li_los, parent,
+                                       name, S_IFREG | S_IRUGO | S_IWUSR, ft);
+       else
+               obj = local_file_find_or_create(env, lfsck->li_los, parent,
+                                       name, S_IFREG | S_IRUGO | S_IWUSR);
+
+       return obj;
+}
+
+static int lfsck_namespace_load_sub_trace_files(const struct lu_env *env,
+                                               struct lfsck_component *com,
+                                               bool reset)
+{
+       char                            *name = lfsck_env_info(env)->lti_key;
+       struct lfsck_sub_trace_obj      *lsto;
+       struct dt_object                *obj;
+       int                              rc;
+       int                              i;
+
+       for (i = 0, lsto = &com->lc_sub_trace_objs[0];
+            i < LFSCK_STF_COUNT; i++, lsto++) {
+               snprintf(name, NAME_MAX, "%s_%02d", LFSCK_NAMESPACE, i);
+               if (lsto->lsto_obj != NULL) {
+                       if (!reset)
+                               continue;
+
+                       lu_object_put(env, &lsto->lsto_obj->do_lu);
+                       lsto->lsto_obj = NULL;
+               }
+
+               obj = lfsck_namespace_load_one_trace_file(env, com,
+                                       com->lc_lfsck->li_lfsck_dir,
+                                       name, &dt_lfsck_features, reset);
+               if (IS_ERR(obj))
+                       return PTR_ERR(obj);
+
+               lsto->lsto_obj = obj;
+               rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
+               if (rc != 0)
+                       return rc;
+       }
+
+       return 0;
+}
+
 static int lfsck_namespace_init(const struct lu_env *env,
                                struct lfsck_component *com)
 {
 static int lfsck_namespace_init(const struct lu_env *env,
                                struct lfsck_component *com)
 {
@@ -478,6 +548,9 @@ static int lfsck_namespace_init(const struct lu_env *env,
        down_write(&com->lc_sem);
        rc = lfsck_namespace_store(env, com, true);
        up_write(&com->lc_sem);
        down_write(&com->lc_sem);
        rc = lfsck_namespace_store(env, com, true);
        up_write(&com->lc_sem);
+       if (rc == 0)
+               rc = lfsck_namespace_load_sub_trace_files(env, com, true);
+
        return rc;
 }
 
        return rc;
 }
 
@@ -499,10 +572,11 @@ int lfsck_namespace_trace_update(const struct lu_env *env,
                                 const __u8 flags, bool add)
 {
        struct lfsck_instance   *lfsck  = com->lc_lfsck;
                                 const __u8 flags, bool add)
 {
        struct lfsck_instance   *lfsck  = com->lc_lfsck;
-       struct dt_object        *obj    = com->lc_obj;
+       struct dt_object        *obj;
        struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid3;
        struct dt_device        *dev    = lfsck->li_bottom;
        struct thandle          *th     = NULL;
        struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid3;
        struct dt_device        *dev    = lfsck->li_bottom;
        struct thandle          *th     = NULL;
+       int                      idx;
        int                      rc     = 0;
        __u8                     old    = 0;
        __u8                     new    = 0;
        int                      rc     = 0;
        __u8                     old    = 0;
        __u8                     new    = 0;
@@ -510,7 +584,9 @@ int lfsck_namespace_trace_update(const struct lu_env *env,
 
        LASSERT(flags != 0);
 
 
        LASSERT(flags != 0);
 
-       down_write(&com->lc_sem);
+       idx = lfsck_sub_trace_file_fid2idx(fid);
+       obj = com->lc_sub_trace_objs[idx].lsto_obj;
+       mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
        fid_cpu_to_be(key, fid);
        rc = dt_lookup(env, obj, (struct dt_rec *)&old,
                       (const struct dt_key *)key, BYPASS_CAPA);
        fid_cpu_to_be(key, fid);
        rc = dt_lookup(env, obj, (struct dt_rec *)&old,
                       (const struct dt_key *)key, BYPASS_CAPA);
@@ -585,7 +661,7 @@ log:
               (__u32)flags, (__u32)old, (__u32)new, rc);
 
 unlock:
               (__u32)flags, (__u32)old, (__u32)new, rc);
 
 unlock:
-       up_write(&com->lc_sem);
+       mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
 
        return rc;
 }
 
        return rc;
 }
@@ -2783,6 +2859,7 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env,
        struct linkea_data               ldata  = { NULL };
        struct lustre_handle             lh     = { 0 };
        __u32                            old    = la->la_nlink;
        struct linkea_data               ldata  = { NULL };
        struct lustre_handle             lh     = { 0 };
        __u32                            old    = la->la_nlink;
+       int                              idx;
        int                              rc     = 0;
        __u8                             flags;
        ENTRY;
        int                              rc     = 0;
        __u8                             flags;
        ENTRY;
@@ -2825,8 +2902,10 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env,
                GOTO(unlock, rc = 0);
 
        fid_cpu_to_be(tfid, cfid);
                GOTO(unlock, rc = 0);
 
        fid_cpu_to_be(tfid, cfid);
-       rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)&flags,
-                      (const struct dt_key *)tfid, BYPASS_CAPA);
+       idx = lfsck_sub_trace_file_fid2idx(cfid);
+       rc = dt_lookup(env, com->lc_sub_trace_objs[idx].lsto_obj,
+                      (struct dt_rec *)&flags, (const struct dt_key *)tfid,
+                      BYPASS_CAPA);
        if (rc != 0)
                GOTO(unlock, rc);
 
        if (rc != 0)
                GOTO(unlock, rc);
 
@@ -3592,6 +3671,33 @@ static void lfsck_namespace_release_lmv(const struct lu_env *env,
        }
 }
 
        }
 }
 
+static int lfsck_namespace_check_for_double_scan(const struct lu_env *env,
+                                                struct lfsck_component *com,
+                                                struct dt_object *obj)
+{
+       struct lu_attr *la = &lfsck_env_info(env)->lti_la;
+       int             rc;
+
+       rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
+       if (rc != 0)
+               return rc;
+
+       /* zero-linkEA object may be orphan, but it also maybe because
+        * of upgrading. Currently, we cannot record it for double scan.
+        * Because it may cause the LFSCK trace file to be too large. */
+
+       /* "la_ctime" == 1 means that it has ever been removed from
+        * backend /lost+found directory but not been added back to
+        * the normal namespace yet. */
+
+       if ((S_ISREG(lfsck_object_type(obj)) && la->la_nlink > 1) ||
+           unlikely(la->la_ctime == 1))
+               rc = lfsck_namespace_trace_update(env, com, lfsck_dto2fid(obj),
+                                                 LNTF_CHECK_LINKEA, true);
+
+       return rc;
+}
+
 /* namespace APIs */
 
 static int lfsck_namespace_reset(const struct lu_env *env,
 /* namespace APIs */
 
 static int lfsck_namespace_reset(const struct lu_env *env,
@@ -3626,22 +3732,15 @@ static int lfsck_namespace_reset(const struct lu_env *env,
        ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
        ns->ln_status = LS_INIT;
 
        ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
        ns->ln_status = LS_INIT;
 
-       rc = local_object_unlink(env, lfsck->li_bottom, root,
-                                lfsck_namespace_name);
-       if (rc != 0)
-               GOTO(out, rc);
-
        lfsck_object_put(env, com->lc_obj);
        com->lc_obj = NULL;
        lfsck_object_put(env, com->lc_obj);
        com->lc_obj = NULL;
-       dto = local_index_find_or_create(env, lfsck->li_los, root,
-                                        lfsck_namespace_name,
-                                        S_IFREG | S_IRUGO | S_IWUSR,
-                                        &dt_lfsck_features);
+       dto = lfsck_namespace_load_one_trace_file(env, com, root,
+                               LFSCK_NAMESPACE, NULL, true);
        if (IS_ERR(dto))
                GOTO(out, rc = PTR_ERR(dto));
 
        com->lc_obj = dto;
        if (IS_ERR(dto))
                GOTO(out, rc = PTR_ERR(dto));
 
        com->lc_obj = dto;
-       rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
+       rc = lfsck_namespace_load_sub_trace_files(env, com, true);
        if (rc != 0)
                GOTO(out, rc);
 
        if (rc != 0)
                GOTO(out, rc);
 
@@ -3907,7 +4006,6 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env,
        struct lfsck_namespace   *ns    = com->lc_file_ram;
        struct lfsck_instance    *lfsck = com->lc_lfsck;
        const struct lu_fid      *fid   = lfsck_dto2fid(obj);
        struct lfsck_namespace   *ns    = com->lc_file_ram;
        struct lfsck_instance    *lfsck = com->lc_lfsck;
        const struct lu_fid      *fid   = lfsck_dto2fid(obj);
-       struct lu_attr           *la    = &info->lti_la;
        struct lu_fid            *pfid  = &info->lti_fid2;
        struct lu_name           *cname = &info->lti_name;
        struct lu_seq_range      *range = &info->lti_range;
        struct lu_fid            *pfid  = &info->lti_fid2;
        struct lu_name           *cname = &info->lti_name;
        struct lu_seq_range      *range = &info->lti_range;
@@ -3943,23 +4041,8 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env,
                GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
        }
 
                GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
        }
 
-       /* zero-linkEA object may be orphan, but it also maybe because
-        * of upgrading. Currently, we cannot record it for double scan.
-        * Because it may cause the LFSCK trace file to be too large. */
        if (rc == -ENODATA) {
        if (rc == -ENODATA) {
-               if (S_ISDIR(lfsck_object_type(obj)))
-                       GOTO(out, rc = 0);
-
-               rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
-               if (rc != 0)
-                       GOTO(out, rc);
-
-               /* "la_ctime" == 1 means that it has ever been removed from
-                * backend /lost+found directory but not been added back to
-                * the normal namespace yet. */
-               if (la->la_nlink > 1 || unlikely(la->la_ctime == 1))
-                       rc = lfsck_namespace_trace_update(env, com, fid,
-                                               LNTF_CHECK_LINKEA, true);
+               rc = lfsck_namespace_check_for_double_scan(env, com, obj);
 
                GOTO(out, rc);
        }
 
                GOTO(out, rc);
        }
@@ -3985,24 +4068,12 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env,
                rc = fld_local_lookup(env, ss->ss_server_fld,
                                      fid_seq(pfid), range);
                if ((rc == -ENOENT) ||
                rc = fld_local_lookup(env, ss->ss_server_fld,
                                      fid_seq(pfid), range);
                if ((rc == -ENOENT) ||
-                   (rc == 0 && range->lsr_index != idx)) {
+                   (rc == 0 && range->lsr_index != idx))
                        rc = lfsck_namespace_trace_update(env, com, fid,
                                                LNTF_CHECK_LINKEA, true);
                        rc = lfsck_namespace_trace_update(env, com, fid,
                                                LNTF_CHECK_LINKEA, true);
-               } else {
-                       if (S_ISDIR(lfsck_object_type(obj)))
-                               GOTO(out, rc = 0);
-
-                       rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
-                       if (rc != 0)
-                               GOTO(out, rc);
-
-                       /* "la_ctime" == 1 means that it has ever been
-                        * removed from backend /lost+found directory but
-                        * not been added back to the normal namespace yet. */
-                       if (la->la_nlink > 1 || unlikely(la->la_ctime == 1))
-                               rc = lfsck_namespace_trace_update(env, com,
-                                               fid, LNTF_CHECK_LINKEA, true);
-               }
+               else
+                       rc = lfsck_namespace_check_for_double_scan(env, com,
+                                                                  obj);
        }
 
        GOTO(out, rc);
        }
 
        GOTO(out, rc);
@@ -4410,36 +4481,50 @@ static int lfsck_namespace_in_notify(const struct lu_env *env,
 
        switch (lr->lr_event) {
        case LE_SKIP_NLINK_DECLARE: {
 
        switch (lr->lr_event) {
        case LE_SKIP_NLINK_DECLARE: {
-               struct dt_object        *obj   = com->lc_obj;
+               struct dt_object        *obj;
                struct lu_fid           *key   = &lfsck_env_info(env)->lti_fid3;
                struct lu_fid           *key   = &lfsck_env_info(env)->lti_fid3;
+               int                      idx;
                __u8                     flags = 0;
 
                LASSERT(th != NULL);
 
                __u8                     flags = 0;
 
                LASSERT(th != NULL);
 
+               idx = lfsck_sub_trace_file_fid2idx(&lr->lr_fid);
+               obj = com->lc_sub_trace_objs[idx].lsto_obj;
+               fid_cpu_to_be(key, &lr->lr_fid);
+               mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
                rc = dt_declare_delete(env, obj,
                                       (const struct dt_key *)key, th);
                if (rc == 0)
                        rc = dt_declare_insert(env, obj,
                                               (const struct dt_rec *)&flags,
                                               (const struct dt_key *)key, th);
                rc = dt_declare_delete(env, obj,
                                       (const struct dt_key *)key, th);
                if (rc == 0)
                        rc = dt_declare_insert(env, obj,
                                               (const struct dt_rec *)&flags,
                                               (const struct dt_key *)key, th);
+               mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
 
                RETURN(rc);
        }
        case LE_SKIP_NLINK: {
 
                RETURN(rc);
        }
        case LE_SKIP_NLINK: {
-               struct dt_object        *obj   = com->lc_obj;
+               struct dt_object        *obj;
                struct lu_fid           *key   = &lfsck_env_info(env)->lti_fid3;
                struct lu_fid           *key   = &lfsck_env_info(env)->lti_fid3;
+               int                      idx;
                __u8                     flags = 0;
                bool                     exist = false;
                ENTRY;
 
                LASSERT(th != NULL);
 
                __u8                     flags = 0;
                bool                     exist = false;
                ENTRY;
 
                LASSERT(th != NULL);
 
+               idx = lfsck_sub_trace_file_fid2idx(&lr->lr_fid);
+               obj = com->lc_sub_trace_objs[idx].lsto_obj;
                fid_cpu_to_be(key, &lr->lr_fid);
                fid_cpu_to_be(key, &lr->lr_fid);
+               mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
                rc = dt_lookup(env, obj, (struct dt_rec *)&flags,
                               (const struct dt_key *)key, BYPASS_CAPA);
                if (rc == 0) {
                rc = dt_lookup(env, obj, (struct dt_rec *)&flags,
                               (const struct dt_key *)key, BYPASS_CAPA);
                if (rc == 0) {
-                       if (flags & LNTF_SKIP_NLINK)
+                       if (flags & LNTF_SKIP_NLINK) {
+                               mutex_unlock(
+                               &com->lc_sub_trace_objs[idx].lsto_mutex);
+
                                RETURN(0);
                                RETURN(0);
+                       }
 
                        exist = true;
                } else if (rc != -ENOENT) {
 
                        exist = true;
                } else if (rc != -ENOENT) {
@@ -4460,6 +4545,7 @@ static int lfsck_namespace_in_notify(const struct lu_env *env,
                GOTO(log, rc);
 
 log:
                GOTO(log, rc);
 
 log:
+               mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
                CDEBUG(D_LFSCK, "%s: RPC service thread mark the "DFID
                       " to be skipped for namespace double scan: rc = %d\n",
                       lfsck_lfsck2name(com->lc_lfsck), PFID(&lr->lr_fid), rc);
                CDEBUG(D_LFSCK, "%s: RPC service thread mark the "DFID
                       " to be skipped for namespace double scan: rc = %d\n",
                       lfsck_lfsck2name(com->lc_lfsck), PFID(&lr->lr_fid), rc);
@@ -4865,6 +4951,7 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
        enum lfsck_namespace_inconsistency_type type = LNIT_NONE;
        ENTRY;
 
        enum lfsck_namespace_inconsistency_type type = LNIT_NONE;
        ENTRY;
 
+       la->la_nlink = 0;
        if (lnr->lnr_attr & LUDA_UPGRADE) {
                ns->ln_flags |= LF_UPGRADE;
                ns->ln_dirent_repaired++;
        if (lnr->lnr_attr & LUDA_UPGRADE) {
                ns->ln_flags |= LF_UPGRADE;
                ns->ln_dirent_repaired++;
@@ -5018,7 +5105,7 @@ again:
                                type = LNIT_BAD_TYPE;
                        }
 
                                type = LNIT_BAD_TYPE;
                        }
 
-                       goto record;
+                       goto stop;
                }
 
                ns->ln_flags |= LF_INCONSISTENT;
                }
 
                ns->ln_flags |= LF_INCONSISTENT;
@@ -5080,7 +5167,7 @@ nodata:
                        ns->ln_linkea_repaired++;
                        repaired = true;
                        log = true;
                        ns->ln_linkea_repaired++;
                        repaired = true;
                        log = true;
-                       goto record;
+                       goto stop;
                }
 
                if (!lustre_handle_is_used(&lh))
                }
 
                if (!lustre_handle_is_used(&lh))
@@ -5148,35 +5235,6 @@ nodata:
                GOTO(stop, rc);
        }
 
                GOTO(stop, rc);
        }
 
-record:
-       LASSERT(count > 0);
-
-       rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       if ((count == 1 && la->la_nlink == 1) ||
-           S_ISDIR(lfsck_object_type(obj)))
-               /* Usually, it is for single linked object or dir, do nothing.*/
-               GOTO(stop, rc);
-
-       /* Following modification will be in another transaction.  */
-       if (handle != NULL) {
-               dt_write_unlock(env, obj);
-               dtlocked = false;
-
-               dt_trans_stop(env, dev, handle);
-               handle = NULL;
-
-               lfsck_ibits_unlock(&lh, LCK_EX);
-       }
-
-       ns->ln_mul_linked_checked++;
-       rc = lfsck_namespace_trace_update(env, com, &lnr->lnr_fid,
-                                         LNTF_CHECK_LINKEA, true);
-
-       GOTO(out, rc);
-
 stop:
        if (dtlocked)
                dt_write_unlock(env, obj);
 stop:
        if (dtlocked)
                dt_write_unlock(env, obj);
@@ -5225,6 +5283,9 @@ out:
                default:
                        break;
                }
                default:
                        break;
                }
+
+               if (count == 1 && S_ISREG(lfsck_object_type(obj)))
+                       dt_attr_get(env, obj, la, BYPASS_CAPA);
        }
 
        down_write(&com->lc_sem);
        }
 
        down_write(&com->lc_sem);
@@ -5291,9 +5352,12 @@ out:
                                               false);
                }
 
                                               false);
                }
 
-
                rc = 0;
        }
                rc = 0;
        }
+
+       if (count > 1 || la->la_nlink > 1)
+               ns->ln_mul_linked_checked++;
+
        up_write(&com->lc_sem);
 
        if (obj != NULL && !IS_ERR(obj))
        up_write(&com->lc_sem);
 
        if (obj != NULL && !IS_ERR(obj))
@@ -5338,10 +5402,11 @@ static int lfsck_namespace_scan_local_lpf_one(const struct lu_env *env,
        struct lu_fid                   *key    = &info->lti_fid;
        struct lu_attr                  *la     = &info->lti_la;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
        struct lu_fid                   *key    = &info->lti_fid;
        struct lu_attr                  *la     = &info->lti_la;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
-       struct dt_object                *obj    = com->lc_obj;
+       struct dt_object                *obj;
        struct dt_device                *dev    = lfsck->li_bottom;
        struct dt_object                *child  = NULL;
        struct thandle                  *th     = NULL;
        struct dt_device                *dev    = lfsck->li_bottom;
        struct dt_object                *child  = NULL;
        struct thandle                  *th     = NULL;
+       int                              idx;
        int                              rc     = 0;
        __u8                             flags  = 0;
        bool                             exist  = false;
        int                              rc     = 0;
        __u8                             flags  = 0;
        bool                             exist  = false;
@@ -5354,6 +5419,8 @@ static int lfsck_namespace_scan_local_lpf_one(const struct lu_env *env,
        LASSERT(dt_object_exists(child));
        LASSERT(!dt_object_remote(child));
 
        LASSERT(dt_object_exists(child));
        LASSERT(!dt_object_remote(child));
 
+       idx = lfsck_sub_trace_file_fid2idx(&ent->lde_fid);
+       obj = com->lc_sub_trace_objs[idx].lsto_obj;
        fid_cpu_to_be(key, &ent->lde_fid);
        rc = dt_lookup(env, obj, (struct dt_rec *)&flags,
                       (const struct dt_key *)key, BYPASS_CAPA);
        fid_cpu_to_be(key, &ent->lde_fid);
        rc = dt_lookup(env, obj, (struct dt_rec *)&flags,
                       (const struct dt_key *)key, BYPASS_CAPA);
@@ -5728,14 +5795,15 @@ out:
        RETURN(rc == 0 ? 1 : rc);
 }
 
        RETURN(rc == 0 ? 1 : rc);
 }
 
-static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
-                                               struct lfsck_component *com)
+static int
+lfsck_namespace_double_scan_one_trace_file(const struct lu_env *env,
+                                          struct lfsck_component *com,
+                                          struct dt_object *obj, bool first)
 {
        struct lfsck_instance   *lfsck  = com->lc_lfsck;
        struct ptlrpc_thread    *thread = &lfsck->li_thread;
        struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
        struct lfsck_namespace  *ns     = com->lc_file_ram;
 {
        struct lfsck_instance   *lfsck  = com->lc_lfsck;
        struct ptlrpc_thread    *thread = &lfsck->li_thread;
        struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
        struct lfsck_namespace  *ns     = com->lc_file_ram;
-       struct dt_object        *obj    = com->lc_obj;
        const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
        struct dt_object        *target;
        struct dt_it            *di;
        const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
        struct dt_object        *target;
        struct dt_it            *di;
@@ -5745,44 +5813,25 @@ static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
        __u8                     flags  = 0;
        ENTRY;
 
        __u8                     flags  = 0;
        ENTRY;
 
-       while (!list_empty(&lfsck->li_list_lmv)) {
-               struct lfsck_lmv_unit *llu;
-
-               spin_lock(&lfsck->li_lock);
-               llu = list_entry(lfsck->li_list_lmv.next,
-                                struct lfsck_lmv_unit, llu_link);
-               list_del_init(&llu->llu_link);
-               spin_unlock(&lfsck->li_lock);
-
-               rc = lfsck_namespace_rescan_striped_dir(env, com, llu);
-               if (rc <= 0)
-                       RETURN(rc);
-       }
-
-       CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
-              lfsck_lfsck2name(lfsck));
-
-       lfsck_namespace_scan_local_lpf(env, com);
-
-       com->lc_new_checked = 0;
-       com->lc_new_scanned = 0;
-       com->lc_time_last_checkpoint = cfs_time_current();
-       com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
-                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
-
        di = iops->init(env, obj, 0, BYPASS_CAPA);
        if (IS_ERR(di))
                RETURN(PTR_ERR(di));
 
        di = iops->init(env, obj, 0, BYPASS_CAPA);
        if (IS_ERR(di))
                RETURN(PTR_ERR(di));
 
-       fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
+       if (first)
+               fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
+       else
+               fid_zero(&fid);
        rc = iops->get(env, di, (const struct dt_key *)&fid);
        if (rc < 0)
                GOTO(fini, rc);
 
        rc = iops->get(env, di, (const struct dt_key *)&fid);
        if (rc < 0)
                GOTO(fini, rc);
 
-       /* Skip the start one, which either has been processed or non-exist. */
-       rc = iops->next(env, di);
-       if (rc != 0)
-               GOTO(put, rc);
+       if (first) {
+               /* The start one either has been processed or does not exist,
+                * skip it. */
+               rc = iops->next(env, di);
+               if (rc != 0)
+                       GOTO(put, rc);
+       }
 
        do {
                if (CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_DELAY3, cfs_fail_val) &&
 
        do {
                if (CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_DELAY3, cfs_fail_val) &&
@@ -5790,6 +5839,14 @@ static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
                        GOTO(put, rc = 0);
 
                key = iops->key(env, di);
                        GOTO(put, rc = 0);
 
                key = iops->key(env, di);
+               if (IS_ERR(key)) {
+                       rc = PTR_ERR(key);
+                       if (rc == -ENOENT)
+                               GOTO(put, rc = 1);
+
+                       goto checkpoint;
+               }
+
                fid_be_to_cpu(&fid, (const struct lu_fid *)key);
                if (!fid_is_sane(&fid)) {
                        rc = 0;
                fid_be_to_cpu(&fid, (const struct lu_fid *)key);
                if (!fid_is_sane(&fid)) {
                        rc = 0;
@@ -5818,7 +5875,8 @@ checkpoint:
                down_write(&com->lc_sem);
                com->lc_new_checked++;
                com->lc_new_scanned++;
                down_write(&com->lc_sem);
                com->lc_new_checked++;
                com->lc_new_scanned++;
-               ns->ln_fid_latest_scanned_phase2 = fid;
+               if (rc >= 0 && fid_is_sane(&fid))
+                       ns->ln_fid_latest_scanned_phase2 = fid;
                if (rc > 0)
                        ns->ln_objs_repaired_phase2++;
                else if (rc < 0)
                if (rc > 0)
                        ns->ln_objs_repaired_phase2++;
                else if (rc < 0)
@@ -5864,12 +5922,56 @@ put:
 fini:
        iops->fini(env, di);
 
 fini:
        iops->fini(env, di);
 
-       CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan stop: rc = %d\n",
-              lfsck_lfsck2name(lfsck), rc);
-
        return rc;
 }
 
        return rc;
 }
 
+static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
+                                               struct lfsck_component *com)
+{
+       struct lfsck_instance   *lfsck  = com->lc_lfsck;
+       struct lfsck_namespace  *ns     = com->lc_file_ram;
+       int                      rc;
+       int                      i;
+       ENTRY;
+
+       while (!list_empty(&lfsck->li_list_lmv)) {
+               struct lfsck_lmv_unit *llu;
+
+               spin_lock(&lfsck->li_lock);
+               llu = list_entry(lfsck->li_list_lmv.next,
+                                struct lfsck_lmv_unit, llu_link);
+               list_del_init(&llu->llu_link);
+               spin_unlock(&lfsck->li_lock);
+
+               rc = lfsck_namespace_rescan_striped_dir(env, com, llu);
+               if (rc <= 0)
+                       RETURN(rc);
+       }
+
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
+              lfsck_lfsck2name(lfsck));
+
+       lfsck_namespace_scan_local_lpf(env, com);
+
+       com->lc_new_checked = 0;
+       com->lc_new_scanned = 0;
+       com->lc_time_last_checkpoint = cfs_time_current();
+       com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
+                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
+
+       i = lfsck_sub_trace_file_fid2idx(&ns->ln_fid_latest_scanned_phase2);
+       rc = lfsck_namespace_double_scan_one_trace_file(env, com,
+                               com->lc_sub_trace_objs[i].lsto_obj, true);
+       while (rc > 0 && ++i < LFSCK_STF_COUNT)
+               rc = lfsck_namespace_double_scan_one_trace_file(env, com,
+                               com->lc_sub_trace_objs[i].lsto_obj, false);
+
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan stop at the No. %d "
+              "trace file: rc = %d\n", lfsck_lfsck2name(lfsck), i, rc);
+
+       RETURN(rc);
+}
+
 static void lfsck_namespace_assistant_fill_pos(const struct lu_env *env,
                                               struct lfsck_component *com,
                                               struct lfsck_position *pos)
 static void lfsck_namespace_assistant_fill_pos(const struct lu_env *env,
                                               struct lfsck_component *com,
                                               struct lfsck_position *pos)
@@ -6244,6 +6346,7 @@ int lfsck_namespace_setup(const struct lu_env *env,
        struct lfsck_namespace  *ns;
        struct dt_object        *root = NULL;
        struct dt_object        *obj;
        struct lfsck_namespace  *ns;
        struct dt_object        *root = NULL;
        struct dt_object        *obj;
+       int                      i;
        int                      rc;
        ENTRY;
 
        int                      rc;
        ENTRY;
 
@@ -6262,7 +6365,7 @@ int lfsck_namespace_setup(const struct lu_env *env,
        com->lc_ops = &lfsck_namespace_ops;
        com->lc_data = lfsck_assistant_data_init(
                        &lfsck_namespace_assistant_ops,
        com->lc_ops = &lfsck_namespace_ops;
        com->lc_data = lfsck_assistant_data_init(
                        &lfsck_namespace_assistant_ops,
-                       "lfsck_namespace");
+                       LFSCK_NAMESPACE);
        if (com->lc_data == NULL)
                GOTO(out, rc = -ENOMEM);
 
        if (com->lc_data == NULL)
                GOTO(out, rc = -ENOMEM);
 
@@ -6275,6 +6378,9 @@ int lfsck_namespace_setup(const struct lu_env *env,
        if (com->lc_file_disk == NULL)
                GOTO(out, rc = -ENOMEM);
 
        if (com->lc_file_disk == NULL)
                GOTO(out, rc = -ENOMEM);
 
+       for (i = 0; i < LFSCK_STF_COUNT; i++)
+               mutex_init(&com->lc_sub_trace_objs[i].lsto_mutex);
+
        root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
        if (IS_ERR(root))
                GOTO(out, rc = PTR_ERR(root));
        root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
        if (IS_ERR(root))
                GOTO(out, rc = PTR_ERR(root));
@@ -6282,23 +6388,20 @@ int lfsck_namespace_setup(const struct lu_env *env,
        if (unlikely(!dt_try_as_dir(env, root)))
                GOTO(out, rc = -ENOTDIR);
 
        if (unlikely(!dt_try_as_dir(env, root)))
                GOTO(out, rc = -ENOTDIR);
 
-       obj = local_index_find_or_create(env, lfsck->li_los, root,
-                                        lfsck_namespace_name,
-                                        S_IFREG | S_IRUGO | S_IWUSR,
-                                        &dt_lfsck_features);
+       obj = local_file_find_or_create(env, lfsck->li_los, root,
+                                       LFSCK_NAMESPACE,
+                                       S_IFREG | S_IRUGO | S_IWUSR);
        if (IS_ERR(obj))
                GOTO(out, rc = PTR_ERR(obj));
 
        com->lc_obj = obj;
        if (IS_ERR(obj))
                GOTO(out, rc = PTR_ERR(obj));
 
        com->lc_obj = obj;
-       rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
-       if (rc != 0)
-               GOTO(out, rc);
-
        rc = lfsck_namespace_load(env, com);
        if (rc == -ENODATA)
                rc = lfsck_namespace_init(env, com);
        else if (rc < 0)
                rc = lfsck_namespace_reset(env, com, true);
        rc = lfsck_namespace_load(env, com);
        if (rc == -ENODATA)
                rc = lfsck_namespace_init(env, com);
        else if (rc < 0)
                rc = lfsck_namespace_reset(env, com, true);
+       else
+               rc = lfsck_namespace_load_sub_trace_files(env, com, false);
        if (rc != 0)
                GOTO(out, rc);
 
        if (rc != 0)
                GOTO(out, rc);
 
index a9a191d..411f69b 100644 (file)
@@ -1534,6 +1534,7 @@ struct osd_lf_map {
        char            *olm_name;
        struct lu_fid    olm_fid;
        __u16            olm_flags;
        char            *olm_name;
        struct lu_fid    olm_fid;
        __u16            olm_flags;
+       __u16            olm_namelen;
        scandir_t        olm_scandir;
        filldir_t        olm_filldir;
 };
        scandir_t        olm_scandir;
        filldir_t        olm_filldir;
 };
@@ -1542,107 +1543,125 @@ struct osd_lf_map {
 static const struct osd_lf_map osd_lf_maps[] = {
        /* CATALOGS */
        { CATLIST, { FID_SEQ_LOCAL_FILE, LLOG_CATALOGS_OID, 0 }, OLF_SHOW_NAME,
 static const struct osd_lf_map osd_lf_maps[] = {
        /* CATALOGS */
        { CATLIST, { FID_SEQ_LOCAL_FILE, LLOG_CATALOGS_OID, 0 }, OLF_SHOW_NAME,
-               NULL, NULL },
+               sizeof(CATLIST) - 1, NULL, NULL },
 
        /* CONFIGS */
        { MOUNT_CONFIGS_DIR, { FID_SEQ_LOCAL_FILE, MGS_CONFIGS_OID, 0 },
 
        /* CONFIGS */
        { MOUNT_CONFIGS_DIR, { FID_SEQ_LOCAL_FILE, MGS_CONFIGS_OID, 0 },
-               OLF_SCAN_SUBITEMS, osd_ios_general_scan,
-               osd_ios_varfid_fill },
+               OLF_SCAN_SUBITEMS, sizeof(MOUNT_CONFIGS_DIR) - 1,
+               osd_ios_general_scan, osd_ios_varfid_fill },
 
        /* NIDTBL_VERSIONS */
        { MGS_NIDTBL_DIR, { 0, 0, 0 }, OLF_SCAN_SUBITEMS,
 
        /* NIDTBL_VERSIONS */
        { MGS_NIDTBL_DIR, { 0, 0, 0 }, OLF_SCAN_SUBITEMS,
-               osd_ios_general_scan, osd_ios_varfid_fill },
+               sizeof(MGS_NIDTBL_DIR) - 1, osd_ios_general_scan,
+               osd_ios_varfid_fill },
 
        /* PENDING */
 
        /* PENDING */
-       { "PENDING", { 0, 0, 0 }, 0, NULL, NULL },
+       { "PENDING", { 0, 0, 0 }, 0, sizeof("PENDING") - 1, NULL, NULL },
 
        /* ROOT */
        { "ROOT", { FID_SEQ_ROOT, FID_OID_ROOT, 0 },
 
        /* ROOT */
        { "ROOT", { FID_SEQ_ROOT, FID_OID_ROOT, 0 },
-               OLF_SCAN_SUBITEMS | OLF_HIDE_FID, osd_ios_ROOT_scan, NULL },
+               OLF_SCAN_SUBITEMS | OLF_HIDE_FID, sizeof("ROOT") - 1,
+               osd_ios_ROOT_scan, NULL },
 
        /* changelog_catalog */
 
        /* changelog_catalog */
-       { CHANGELOG_CATALOG, { 0, 0, 0 }, 0, NULL, NULL },
+       { CHANGELOG_CATALOG, { 0, 0, 0 }, 0, sizeof(CHANGELOG_CATALOG) - 1,
+               NULL, NULL },
 
        /* changelog_users */
 
        /* changelog_users */
-       { CHANGELOG_USERS, { 0, 0, 0 }, 0, NULL, NULL },
+       { CHANGELOG_USERS, { 0, 0, 0 }, 0, sizeof(CHANGELOG_USERS) - 1,
+               NULL, NULL },
 
        /* fld */
        { "fld", { FID_SEQ_LOCAL_FILE, FLD_INDEX_OID, 0 }, OLF_SHOW_NAME,
 
        /* fld */
        { "fld", { FID_SEQ_LOCAL_FILE, FLD_INDEX_OID, 0 }, OLF_SHOW_NAME,
-               NULL, NULL },
+               sizeof("fld") - 1, NULL, NULL },
 
        /* last_rcvd */
        { LAST_RCVD, { FID_SEQ_LOCAL_FILE, LAST_RECV_OID, 0 }, OLF_SHOW_NAME,
 
        /* last_rcvd */
        { LAST_RCVD, { FID_SEQ_LOCAL_FILE, LAST_RECV_OID, 0 }, OLF_SHOW_NAME,
-               NULL, NULL },
-
-       /* lfsck_bookmark */
-       { "lfsck_bookmark", { 0, 0, 0 }, 0, NULL, NULL },
+               sizeof(LAST_RCVD) - 1, NULL, NULL },
 
        /* lov_objid */
        { LOV_OBJID, { FID_SEQ_LOCAL_FILE, MDD_LOV_OBJ_OID, 0 }, OLF_SHOW_NAME,
 
        /* lov_objid */
        { LOV_OBJID, { FID_SEQ_LOCAL_FILE, MDD_LOV_OBJ_OID, 0 }, OLF_SHOW_NAME,
-               NULL, NULL },
+               sizeof(LOV_OBJID) - 1, NULL, NULL },
 
        /* lov_objseq */
        { LOV_OBJSEQ, { FID_SEQ_LOCAL_FILE, MDD_LOV_OBJ_OSEQ, 0 },
 
        /* lov_objseq */
        { LOV_OBJSEQ, { FID_SEQ_LOCAL_FILE, MDD_LOV_OBJ_OSEQ, 0 },
-               OLF_SHOW_NAME, NULL, NULL },
+               OLF_SHOW_NAME, sizeof(LOV_OBJSEQ) - 1, NULL, NULL },
 
        /* quota_master */
 
        /* quota_master */
-       { QMT_DIR, { 0, 0, 0 }, OLF_SCAN_SUBITEMS,
+       { QMT_DIR, { 0, 0, 0 }, OLF_SCAN_SUBITEMS, sizeof(QMT_DIR) - 1,
                osd_ios_general_scan, osd_ios_varfid_fill },
 
        /* quota_slave */
                osd_ios_general_scan, osd_ios_varfid_fill },
 
        /* quota_slave */
-       { QSD_DIR, { 0, 0, 0 }, OLF_SCAN_SUBITEMS,
+       { QSD_DIR, { 0, 0, 0 }, OLF_SCAN_SUBITEMS, sizeof(QSD_DIR) - 1,
                osd_ios_general_scan, osd_ios_varfid_fill },
 
        /* seq_ctl */
        { "seq_ctl", { FID_SEQ_LOCAL_FILE, FID_SEQ_CTL_OID, 0 },
                osd_ios_general_scan, osd_ios_varfid_fill },
 
        /* seq_ctl */
        { "seq_ctl", { FID_SEQ_LOCAL_FILE, FID_SEQ_CTL_OID, 0 },
-               OLF_SHOW_NAME, NULL, NULL },
+               OLF_SHOW_NAME, sizeof("seq_ctl") - 1, NULL, NULL },
 
        /* seq_srv */
        { "seq_srv", { FID_SEQ_LOCAL_FILE, FID_SEQ_SRV_OID, 0 },
 
        /* seq_srv */
        { "seq_srv", { FID_SEQ_LOCAL_FILE, FID_SEQ_SRV_OID, 0 },
-               OLF_SHOW_NAME, NULL, NULL },
+               OLF_SHOW_NAME, sizeof("seq_srv") - 1, NULL, NULL },
 
        /* health_check */
        { HEALTH_CHECK, { FID_SEQ_LOCAL_FILE, OFD_HEALTH_CHECK_OID, 0 },
 
        /* health_check */
        { HEALTH_CHECK, { FID_SEQ_LOCAL_FILE, OFD_HEALTH_CHECK_OID, 0 },
-               OLF_SHOW_NAME, NULL, NULL },
+               OLF_SHOW_NAME, sizeof(HEALTH_CHECK) - 1, NULL, NULL },
+
+       /* LFSCK */
+       { LFSCK_DIR, { 0, 0, 0 }, 0, sizeof(LFSCK_DIR) - 1,
+               osd_ios_general_scan, osd_ios_varfid_fill },
+
+       /* lfsck_bookmark */
+       { LFSCK_BOOKMARK, { 0, 0, 0 }, 0, sizeof(LFSCK_BOOKMARK) - 1,
+               NULL, NULL },
+
+       /* lfsck_layout */
+       { LFSCK_LAYOUT, { 0, 0, 0 }, 0, sizeof(LFSCK_LAYOUT) - 1,
+               NULL, NULL },
 
        /* lfsck_namespace */
 
        /* lfsck_namespace */
-       { "lfsck_namespace", { 0, 0, 0 }, 0, NULL, NULL },
+       { LFSCK_NAMESPACE, { 0, 0, 0 }, 0, sizeof(LFSCK_NAMESPACE) - 1,
+               NULL, NULL },
 
        /* OBJECTS, upgrade from old device */
 
        /* OBJECTS, upgrade from old device */
-       { OBJECTS, { 0, 0, 0 }, OLF_SCAN_SUBITEMS, osd_ios_OBJECTS_scan, NULL },
+       { OBJECTS, { 0, 0, 0 }, OLF_SCAN_SUBITEMS, sizeof(OBJECTS) - 1,
+               osd_ios_OBJECTS_scan, NULL },
 
        /* lquota_v2.user, upgrade from old device */
 
        /* lquota_v2.user, upgrade from old device */
-       { "lquota_v2.user", { 0, 0, 0 }, 0, NULL, NULL },
+       { "lquota_v2.user", { 0, 0, 0 }, 0, sizeof("lquota_v2.user") - 1,
+               NULL, NULL },
 
        /* lquota_v2.group, upgrade from old device */
 
        /* lquota_v2.group, upgrade from old device */
-       { "lquota_v2.group", { 0, 0, 0 }, 0, NULL, NULL },
+       { "lquota_v2.group", { 0, 0, 0 }, 0, sizeof("lquota_v2.group") - 1,
+               NULL, NULL },
 
        /* LAST_GROUP, upgrade from old device */
        { "LAST_GROUP", { FID_SEQ_LOCAL_FILE, OFD_LAST_GROUP_OID, 0 },
 
        /* LAST_GROUP, upgrade from old device */
        { "LAST_GROUP", { FID_SEQ_LOCAL_FILE, OFD_LAST_GROUP_OID, 0 },
-               OLF_SHOW_NAME, NULL, NULL },
+               OLF_SHOW_NAME, sizeof("LAST_GROUP") - 1, NULL, NULL },
 
        /* SLAVE_LOG, llog for destroy slave stripes of striped dir */
        { "SLAVE_LOG", { FID_SEQ_LOCAL_FILE, SLAVE_LLOG_CATALOGS_OID, 0 },
 
        /* SLAVE_LOG, llog for destroy slave stripes of striped dir */
        { "SLAVE_LOG", { FID_SEQ_LOCAL_FILE, SLAVE_LLOG_CATALOGS_OID, 0 },
-              OLF_SHOW_NAME, NULL, NULL },
+              OLF_SHOW_NAME, sizeof("SLAVE_LOG") - 1, NULL, NULL },
 
        /* lost+found */
        { "lost+found", { FID_SEQ_LOCAL_FILE, OSD_LPF_OID, 0 },
 
        /* lost+found */
        { "lost+found", { FID_SEQ_LOCAL_FILE, OSD_LPF_OID, 0 },
-               OLF_SCAN_SUBITEMS, osd_ios_general_scan, osd_ios_lf_fill },
+               OLF_SCAN_SUBITEMS, sizeof("lost+found") - 1,
+               osd_ios_general_scan, osd_ios_lf_fill },
 
 
-       { NULL, { 0, 0, 0 }, 0, NULL, NULL }
+       { NULL, { 0, 0, 0 }, 0, 0, NULL, NULL }
 };
 
 /* Add the new introduced files under .lustre/ in the list in the future. */
 static const struct osd_lf_map osd_dl_maps[] = {
        /* .lustre/fid */
        { "fid", { FID_SEQ_DOT_LUSTRE, FID_OID_DOT_LUSTRE_OBF, 0 }, 0,
 };
 
 /* Add the new introduced files under .lustre/ in the list in the future. */
 static const struct osd_lf_map osd_dl_maps[] = {
        /* .lustre/fid */
        { "fid", { FID_SEQ_DOT_LUSTRE, FID_OID_DOT_LUSTRE_OBF, 0 }, 0,
-               NULL, NULL },
+               sizeof("fid") - 1, NULL, NULL },
 
        /* .lustre/lost+found */
        { "lost+found", { FID_SEQ_DOT_LUSTRE, FID_OID_DOT_LUSTRE_LPF, 0 }, 0,
 
        /* .lustre/lost+found */
        { "lost+found", { FID_SEQ_DOT_LUSTRE, FID_OID_DOT_LUSTRE_LPF, 0 }, 0,
-               NULL, NULL },
+               sizeof("lost+found") - 1, NULL, NULL },
 
 
-       { NULL, { 0, 0, 0 }, 0, NULL, NULL }
+       { NULL, { 0, 0, 0 }, 0, 0, NULL, NULL }
 };
 
 struct osd_ios_item {
 };
 
 struct osd_ios_item {
@@ -1891,7 +1910,7 @@ static int osd_ios_dl_fill(void *buf, const char *name, int namelen,
                RETURN(0);
 
        for (map = osd_dl_maps; map->olm_name != NULL; map++) {
                RETURN(0);
 
        for (map = osd_dl_maps; map->olm_name != NULL; map++) {
-               if (strlen(map->olm_name) != namelen)
+               if (map->olm_namelen != namelen)
                        continue;
 
                if (strncmp(map->olm_name, name, namelen) == 0)
                        continue;
 
                if (strncmp(map->olm_name, name, namelen) == 0)
@@ -1927,7 +1946,7 @@ static int osd_ios_root_fill(void *buf, const char *name, int namelen,
                RETURN(0);
 
        for (map = osd_lf_maps; map->olm_name != NULL; map++) {
                RETURN(0);
 
        for (map = osd_lf_maps; map->olm_name != NULL; map++) {
-               if (strlen(map->olm_name) != namelen)
+               if (map->olm_namelen != namelen)
                        continue;
 
                if (strncmp(map->olm_name, name, namelen) == 0)
                        continue;
 
                if (strncmp(map->olm_name, name, namelen) == 0)
@@ -2169,7 +2188,7 @@ static int osd_initial_OI_scrub(struct osd_thread_info *info,
 
                child = osd_ios_lookup_one_len(map->olm_name,
                                               osd_sb(dev)->s_root,
 
                child = osd_ios_lookup_one_len(map->olm_name,
                                               osd_sb(dev)->s_root,
-                                              strlen(map->olm_name));
+                                              map->olm_namelen);
                if (!IS_ERR(child))
                        dput(child);
                else if (PTR_ERR(child) == -ENOENT)
                if (!IS_ERR(child))
                        dput(child);
                else if (PTR_ERR(child) == -ENOENT)
index b3b062b..4ad1b23 100644 (file)
@@ -720,11 +720,11 @@ test_7a()
        echo "stop $SINGLEMDS"
        stop $SINGLEMDS > /dev/null || error "(4) Fail to stop MDS!"
 
        echo "stop $SINGLEMDS"
        stop $SINGLEMDS > /dev/null || error "(4) Fail to stop MDS!"
 
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0 fail_val=0
        echo "start $SINGLEMDS"
        start $SINGLEMDS $MDT_DEVNAME $MOUNT_OPTS_SCRUB > /dev/null ||
                error "(5) Fail to start MDS!"
 
        echo "start $SINGLEMDS"
        start $SINGLEMDS $MDT_DEVNAME $MOUNT_OPTS_SCRUB > /dev/null ||
                error "(5) Fail to start MDS!"
 
-       do_facet $SINGLEMDS $LCTL set_param fail_loc=0 fail_val=0
        wait_update_facet $SINGLEMDS "$LCTL get_param -n \
                mdd.${MDT_DEV}.lfsck_namespace |
                awk '/^status/ { print \\\$2 }'" "completed" 30 || {
        wait_update_facet $SINGLEMDS "$LCTL get_param -n \
                mdd.${MDT_DEV}.lfsck_namespace |
                awk '/^status/ { print \\\$2 }'" "completed" 30 || {
@@ -754,14 +754,15 @@ test_7b()
                error "(4) unexpected status"
        }
 
                error "(4) unexpected status"
        }
 
+       umount_client $MOUNT
        echo "stop $SINGLEMDS"
        stop $SINGLEMDS > /dev/null || error "(5) Fail to stop MDS!"
 
        echo "stop $SINGLEMDS"
        stop $SINGLEMDS > /dev/null || error "(5) Fail to stop MDS!"
 
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0 fail_val=0
        echo "start $SINGLEMDS"
        start $SINGLEMDS $MDT_DEVNAME $MOUNT_OPTS_SCRUB > /dev/null ||
                error "(6) Fail to start MDS!"
 
        echo "start $SINGLEMDS"
        start $SINGLEMDS $MDT_DEVNAME $MOUNT_OPTS_SCRUB > /dev/null ||
                error "(6) Fail to start MDS!"
 
-       do_facet $SINGLEMDS $LCTL set_param fail_loc=0 fail_val=0
        wait_update_facet $SINGLEMDS "$LCTL get_param -n \
                mdd.${MDT_DEV}.lfsck_namespace |
                awk '/^status/ { print \\\$2 }'" "completed" 30 || {
        wait_update_facet $SINGLEMDS "$LCTL get_param -n \
                mdd.${MDT_DEV}.lfsck_namespace |
                awk '/^status/ { print \\\$2 }'" "completed" 30 || {