Whamcloud - gitweb
LU-1267 lfsck: framework (1) for MDT-OST consistency
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
index 1ebb714..d1ba149 100644 (file)
  * Author: Fan, Yong <fan.yong@intel.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_LFSCK
 
 #include <lustre/lustre_idl.h>
 #include <lu_object.h>
 #include <dt_object.h>
+#include <md_object.h>
 #include <lustre_linkea.h>
 #include <lustre_fid.h>
 #include <lustre_lib.h>
@@ -136,16 +134,15 @@ static int lfsck_namespace_load(const struct lu_env *env,
                lfsck_namespace_le_to_cpu(ns,
                                (struct lfsck_namespace *)com->lc_file_disk);
                if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
-                       CWARN("%.16s: invalid lfsck_namespace magic "
-                             "0x%x != 0x%x\n",
-                             lfsck_lfsck2name(com->lc_lfsck),
-                             ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
+                       CWARN("%s: invalid lfsck_namespace magic %#x != %#x\n",
+                             lfsck_lfsck2name(com->lc_lfsck), ns->ln_magic,
+                             LFSCK_NAMESPACE_MAGIC);
                        rc = 1;
                } else {
                        rc = 0;
                }
        } else if (rc != -ENODATA) {
-               CERROR("%.16s: fail to load lfsck_namespace, expected = %d, "
+               CERROR("%s: fail to load lfsck_namespace: expected = %d, "
                       "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck), len, rc);
                if (rc >= 0)
                        rc = 1;
@@ -168,8 +165,8 @@ static int lfsck_namespace_store(const struct lu_env *env,
        handle = dt_trans_create(env, lfsck->li_bottom);
        if (IS_ERR(handle)) {
                rc = PTR_ERR(handle);
-               CERROR("%.16s: fail to create trans for storing "
-                      "lfsck_namespace: %d\n,", lfsck_lfsck2name(lfsck), rc);
+               CERROR("%s: fail to create trans for storing lfsck_namespace: "
+                      "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
                RETURN(rc);
        }
 
@@ -177,15 +174,15 @@ static int lfsck_namespace_store(const struct lu_env *env,
                                  lfsck_buf_get(env, com->lc_file_disk, len),
                                  XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
        if (rc != 0) {
-               CERROR("%.16s: fail to declare trans for storing "
-                      "lfsck_namespace: %d\n,", lfsck_lfsck2name(lfsck), rc);
+               CERROR("%s: fail to declare trans for storing lfsck_namespace: "
+                      "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
                GOTO(out, rc);
        }
 
        rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
        if (rc != 0) {
-               CERROR("%.16s: fail to start trans for storing "
-                      "lfsck_namespace: %d\n,", lfsck_lfsck2name(lfsck), rc);
+               CERROR("%s: fail to start trans for storing lfsck_namespace: "
+                      "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
                GOTO(out, rc);
        }
 
@@ -195,7 +192,7 @@ static int lfsck_namespace_store(const struct lu_env *env,
                          init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
                          handle, BYPASS_CAPA);
        if (rc != 0)
-               CERROR("%.16s: fail to store lfsck_namespace, len = %d, "
+               CERROR("%s: fail to store lfsck_namespace: len = %d, "
                       "rc = %d\n", lfsck_lfsck2name(lfsck), len, rc);
 
        GOTO(out, rc);
@@ -208,7 +205,7 @@ out:
 static int lfsck_namespace_init(const struct lu_env *env,
                                struct lfsck_component *com)
 {
-       struct lfsck_namespace *ns = (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace *ns = com->lc_file_ram;
        int rc;
 
        memset(ns, 0, sizeof(*ns));
@@ -318,7 +315,7 @@ static int lfsck_namespace_update(const struct lu_env *env,
                rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
                               BYPASS_CAPA);
                if (rc != 0) {
-                       CERROR("%s: fail to insert "DFID", rc = %d\n",
+                       CERROR("%s: fail to insert "DFID": rc = %d\n",
                               lfsck_lfsck2name(com->lc_lfsck), PFID(fid), rc);
                        GOTO(out, rc);
                }
@@ -385,7 +382,7 @@ static int lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
 
        ldata->ld_buf =
                lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf,
-                                      CFS_PAGE_SIZE);
+                                      PAGE_CACHE_SIZE);
        if (ldata->ld_buf->lb_buf == NULL)
                return -ENOMEM;
 
@@ -428,6 +425,38 @@ static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
 }
 
 /**
+ * \retval ve: removed entries
+ */
+static int lfsck_linkea_entry_unpack(struct lfsck_instance *lfsck,
+                                    struct linkea_data *ldata,
+                                    struct lu_name *cname,
+                                    struct lu_fid *pfid)
+{
+       struct link_ea_entry    *oldlee;
+       int                      oldlen;
+       int                      removed = 0;
+
+       linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
+       oldlee = ldata->ld_lee;
+       oldlen = ldata->ld_reclen;
+       linkea_next_entry(ldata);
+       while (ldata->ld_lee != NULL) {
+               ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
+                                  ldata->ld_lee->lee_reclen[1];
+               if (unlikely(ldata->ld_reclen == oldlen &&
+                            memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
+                       linkea_del_buf(ldata, cname);
+                       removed++;
+               } else {
+                       linkea_next_entry(ldata);
+               }
+       }
+       ldata->ld_lee = oldlee;
+       ldata->ld_reclen = oldlen;
+       return removed;
+}
+
+/**
  * \retval +ve repaired
  * \retval 0   no need to repair
  * \retval -ve error cases
@@ -443,13 +472,11 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env,
        struct lu_fid            *cfid    = &info->lti_fid2;
        struct lfsck_instance   *lfsck    = com->lc_lfsck;
        struct lfsck_bookmark   *bk       = &lfsck->li_bookmark_ram;
-       struct lfsck_namespace  *ns       =
-                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace  *ns       = com->lc_file_ram;
        struct linkea_data       ldata    = { 0 };
        struct thandle          *handle   = NULL;
        bool                     locked   = false;
        bool                     update   = false;
-       int                      count;
        int                      rc;
        ENTRY;
 
@@ -458,6 +485,7 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env,
 again:
                LASSERT(!locked);
 
+               update = false;
                com->lc_journal = 1;
                handle = dt_trans_create(env, lfsck->li_next);
                if (IS_ERR(handle))
@@ -491,13 +519,14 @@ again:
                GOTO(stop, rc);
        }
 
-       ldata.ld_lee = LINKEA_FIRST_ENTRY(ldata);
-       count = ldata.ld_leh->leh_reccount;
-       while (count-- > 0) {
+       linkea_first_entry(&ldata);
+       while (ldata.ld_lee != NULL) {
                struct dt_object *parent = NULL;
 
-               linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname,
-                                   pfid);
+               rc = lfsck_linkea_entry_unpack(lfsck, &ldata, cname, pfid);
+               if (rc > 0)
+                       update = true;
+
                if (!fid_is_sane(pfid))
                        goto shrink;
 
@@ -514,7 +543,7 @@ again:
                 *      remote object will be processed in LFSCK phase III. */
                if (dt_object_remote(parent)) {
                        lfsck_object_put(env, parent);
-                       ldata.ld_lee = LINKEA_NEXT_ENTRY(ldata);
+                       linkea_next_entry(&ldata);
                        continue;
                }
 
@@ -536,25 +565,29 @@ again:
                if (rc == 0) {
                        if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
                                lfsck_object_put(env, parent);
-                               ldata.ld_lee = LINKEA_NEXT_ENTRY(ldata);
+                               linkea_next_entry(&ldata);
                                continue;
                        }
 
                        goto shrink;
                }
 
+               /* If there is no name entry in the parent dir and the object
+                * link count is less than the linkea entries count, then the
+                * linkea entry should be removed. */
                if (ldata.ld_leh->leh_reccount > la->la_nlink)
                        goto shrink;
 
-               /* XXX: For the case of there is linkea entry, but without name
-                *      entry pointing to the object, and the object link count
-                *      isn't less than the count of name entries, then add the
-                *      name entry back to namespace.
-                *
-                *      It is out of LFSCK 1.5 scope, will implement it in the
-                *      future. Keep the linkEA entry. */
+               /* XXX: For the case of there is a linkea entry, but without
+                *      name entry pointing to the object and its hard links
+                *      count is not less than the object name entries count,
+                *      then seems we should add the 'missed' name entry back
+                *      to namespace, but before LFSCK phase III finished, we
+                *      do not know whether the object has some inconsistency
+                *      on other MDTs. So now, do NOT add the name entry back
+                *      to the namespace, but keep the linkEA entry. LU-2914 */
                lfsck_object_put(env, parent);
-               ldata.ld_lee = LINKEA_NEXT_ENTRY(ldata);
+               linkea_next_entry(&ldata);
                continue;
 
 shrink:
@@ -582,8 +615,21 @@ shrink:
        GOTO(stop, rc);
 
 stop:
-       if (locked)
+       if (locked) {
+       /* XXX: For the case linkea entries count does not match the object hard
+        *      links count, we cannot update the later one simply. Before LFSCK
+        *      phase III finished, we cannot know whether there are some remote
+        *      name entries to be repaired or not. LU-2914 */
+               if (rc == 0 && !lfsck_is_dead_obj(child) &&
+                   ldata.ld_leh != NULL &&
+                   ldata.ld_leh->leh_reccount != la->la_nlink)
+                       CWARN("%s: the object "DFID" linkEA entry count %u "
+                             "may not match its hardlink count %u\n",
+                             lfsck_lfsck2name(lfsck), PFID(cfid),
+                             ldata.ld_leh->leh_reccount, la->la_nlink);
+
                dt_write_unlock(env, child);
+       }
 
        if (handle != NULL)
                dt_trans_stop(env, lfsck->li_next, handle);
@@ -592,6 +638,7 @@ stop:
                ns->ln_objs_nlink_repaired++;
                rc = 1;
        }
+
        return rc;
 }
 
@@ -601,12 +648,21 @@ static int lfsck_namespace_reset(const struct lu_env *env,
                                 struct lfsck_component *com, bool init)
 {
        struct lfsck_instance   *lfsck = com->lc_lfsck;
-       struct lfsck_namespace  *ns    =
-                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace  *ns    = com->lc_file_ram;
+       struct dt_object        *root;
        struct dt_object        *dto;
        int                      rc;
        ENTRY;
 
+       root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
+       if (IS_ERR(root))
+               RETURN(PTR_ERR(root));
+
+       if (unlikely(!dt_try_as_dir(env, root))) {
+               lu_object_put(env, &root->do_lu);
+               RETURN(-ENOTDIR);
+       }
+
        down_write(&com->lc_sem);
        if (init) {
                memset(ns, 0, sizeof(*ns));
@@ -621,29 +677,32 @@ static int lfsck_namespace_reset(const struct lu_env *env,
        ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
        ns->ln_status = LS_INIT;
 
-       rc = local_object_unlink(env, lfsck->li_bottom, lfsck->li_local_root,
+       rc = local_object_unlink(env, lfsck->li_bottom, root,
                                 lfsck_namespace_name);
        if (rc != 0)
                GOTO(out, rc);
 
-       dto = local_index_find_or_create(env, lfsck->li_los, lfsck->li_local_root,
+       lfsck_object_put(env, com->lc_obj);
+       com->lc_obj = NULL;
+       dto = local_index_find_or_create(env, lfsck->li_los, root,
                                         lfsck_namespace_name,
                                         S_IFREG | S_IRUGO | S_IWUSR,
                                         &dt_lfsck_features);
        if (IS_ERR(dto))
                GOTO(out, rc = PTR_ERR(dto));
 
+       com->lc_obj = dto;
        rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
        if (rc != 0)
                GOTO(out, rc);
 
-       com->lc_obj = dto;
        rc = lfsck_namespace_store(env, com, true);
 
        GOTO(out, rc);
 
 out:
        up_write(&com->lc_sem);
+       lu_object_put(env, &root->do_lu);
        return rc;
 }
 
@@ -651,7 +710,7 @@ static void
 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
                     bool new_checked)
 {
-       struct lfsck_namespace *ns = (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace *ns = com->lc_file_ram;
 
        down_write(&com->lc_sem);
        if (new_checked)
@@ -667,8 +726,7 @@ static int lfsck_namespace_checkpoint(const struct lu_env *env,
                                      struct lfsck_component *com, bool init)
 {
        struct lfsck_instance   *lfsck = com->lc_lfsck;
-       struct lfsck_namespace  *ns    =
-                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace  *ns    = com->lc_file_ram;
        int                      rc;
 
        if (com->lc_new_checked == 0 && !init)
@@ -697,8 +755,7 @@ static int lfsck_namespace_prep(const struct lu_env *env,
                                struct lfsck_component *com)
 {
        struct lfsck_instance   *lfsck  = com->lc_lfsck;
-       struct lfsck_namespace  *ns     =
-                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace  *ns     = com->lc_file_ram;
        struct lfsck_position   *pos    = &com->lc_pos_start;
 
        if (ns->ln_status == LS_COMPLETED) {
@@ -784,16 +841,16 @@ static int lfsck_namespace_exec_dir(const struct lu_env *env,
        struct lu_attr             *la       = &info->lti_la;
        struct lfsck_instance      *lfsck    = com->lc_lfsck;
        struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
-       struct lfsck_namespace     *ns       =
-                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace     *ns       = com->lc_file_ram;
        struct linkea_data          ldata    = { 0 };
-       const struct lu_fid        *pfid     =
-                               lu_object_fid(&lfsck->li_obj_dir->do_lu);
+       const struct lu_fid        *pfid     = lfsck_dto2fid(lfsck->li_obj_dir);
        const struct lu_fid        *cfid     = lfsck_dto2fid(obj);
        const struct lu_name       *cname;
        struct thandle             *handle   = NULL;
        bool                        repaired = false;
        bool                        locked   = false;
+       bool                        remove;
+       bool                        newdata;
        int                         count    = 0;
        int                         rc;
        ENTRY;
@@ -847,75 +904,61 @@ again:
        if (rc == 0) {
                count = ldata.ld_leh->leh_reccount;
                rc = linkea_links_find(&ldata, cname, pfid);
-               if (rc == 0) {
-                       /* For dir, if there are more than one linkea entries,
-                        * then remove all the other redundant linkea entries.*/
-                       if (unlikely(count > 1 &&
-                                    S_ISDIR(lfsck_object_type(obj))))
-                               goto unmatch;
-
+               if ((rc == 0) &&
+                   (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
                        goto record;
-               } else {
 
-unmatch:
-                       ns->ln_flags |= LF_INCONSISTENT;
-                       if (bk->lb_param & LPF_DRYRUN) {
-                               repaired = true;
-                               goto record;
-                       }
-
-                       /*For dir, remove the unmatched linkea entry directly.*/
-                       if (S_ISDIR(lfsck_object_type(obj))) {
-                               if (!com->lc_journal)
-                                       goto again;
-
-                               rc = dt_xattr_del(env, obj, XATTR_NAME_LINK,
-                                                 handle, BYPASS_CAPA);
-                               if (rc != 0)
-                                       GOTO(stop, rc);
-
-                               goto nodata;
-                       } else {
-                               goto add;
-                       }
+               ns->ln_flags |= LF_INCONSISTENT;
+               /* For dir, if there are more than one linkea entries, or the
+                * linkea entry does not match the name entry, then remove all
+                * and add the correct one. */
+               if (S_ISDIR(lfsck_object_type(obj))) {
+                       remove = true;
+                       newdata = true;
+               } else {
+                       remove = false;
+                       newdata = false;
                }
+               goto nodata;
        } else if (unlikely(rc == -EINVAL)) {
+               count = 1;
                ns->ln_flags |= LF_INCONSISTENT;
-               if (bk->lb_param & LPF_DRYRUN) {
-                       count = 1;
-                       repaired = true;
-                       goto record;
-               }
-
-               if (!com->lc_journal)
-                       goto again;
-
                /* The magic crashed, we are not sure whether there are more
                 * corrupt data in the linkea, so remove all linkea entries. */
-               rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
-                                 BYPASS_CAPA);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
+               remove = true;
+               newdata = true;
                goto nodata;
        } else if (rc == -ENODATA) {
+               count = 1;
                ns->ln_flags |= LF_UPGRADE;
+               remove = false;
+               newdata = true;
+
+nodata:
                if (bk->lb_param & LPF_DRYRUN) {
-                       count = 1;
                        repaired = true;
                        goto record;
                }
 
-nodata:
-               rc = linkea_data_new(&ldata,
-                                    &lfsck_env_info(env)->lti_linkea_buf);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-add:
                if (!com->lc_journal)
                        goto again;
 
+               if (remove) {
+                       LASSERT(newdata);
+
+                       rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
+                                         BYPASS_CAPA);
+                       if (rc != 0)
+                               GOTO(stop, rc);
+               }
+
+               if (newdata) {
+                       rc = linkea_data_new(&ldata,
+                                       &lfsck_env_info(env)->lti_linkea_buf);
+                       if (rc != 0)
+                               GOTO(stop, rc);
+               }
+
                rc = linkea_add_buf(&ldata, cname, pfid);
                if (rc != 0)
                        GOTO(stop, rc);
@@ -975,10 +1018,16 @@ out:
                if (!(bk->lb_param & LPF_FAILOUT))
                        rc = 0;
        } else {
-               if (repaired)
+               if (repaired) {
                        ns->ln_items_repaired++;
-               else
+                       if (bk->lb_param & LPF_DRYRUN &&
+                           lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
+                               lfsck_pos_fill(env, lfsck,
+                                              &ns->ln_pos_first_inconsistent,
+                                              false);
+               } else {
                        com->lc_journal = 0;
+               }
                rc = 0;
        }
        up_write(&com->lc_sem);
@@ -990,8 +1039,7 @@ static int lfsck_namespace_post(const struct lu_env *env,
                                int result, bool init)
 {
        struct lfsck_instance   *lfsck = com->lc_lfsck;
-       struct lfsck_namespace  *ns    =
-                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace  *ns    = com->lc_file_ram;
        int                      rc;
 
        down_write(&com->lc_sem);
@@ -1043,8 +1091,7 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
 {
        struct lfsck_instance   *lfsck = com->lc_lfsck;
        struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
-       struct lfsck_namespace  *ns    =
-                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace  *ns    = com->lc_file_ram;
        int                      save  = len;
        int                      ret   = -ENOSPC;
        int                      rc;
@@ -1052,12 +1099,12 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
        down_read(&com->lc_sem);
        rc = snprintf(buf, len,
                      "name: lfsck_namespace\n"
-                     "magic: 0x%x\n"
+                     "magic: %#x\n"
                      "version: %d\n"
                      "status: %s\n",
                      ns->ln_magic,
                      bk->lb_version,
-                     lfsck_status_names[ns->ln_status]);
+                     lfsck_status2names(ns->ln_status));
        if (rc <= 0)
                goto out;
 
@@ -1105,11 +1152,12 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
 
        if (ns->ln_status == LS_SCANNING_PHASE1) {
                struct lfsck_position pos;
+               const struct dt_it_ops *iops;
                cfs_duration_t duration = cfs_time_current() -
                                          lfsck->li_time_last_checkpoint;
                __u64 checked = ns->ln_items_checked + com->lc_new_checked;
                __u64 speed = checked;
-               __u64 new_checked = com->lc_new_checked * CFS_HZ;
+               __u64 new_checked = com->lc_new_checked * HZ;
                __u32 rtime = ns->ln_run_time_phase1 +
                              cfs_duration_sec(duration + HALF_SEC);
 
@@ -1155,7 +1203,34 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
 
                buf += rc;
                len -= rc;
-               lfsck_pos_fill(env, lfsck, &pos, false);
+
+               LASSERT(lfsck->li_di_oit != NULL);
+
+               iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
+
+               /* The low layer otable-based iteration position may NOT
+                * exactly match the namespace-based directory traversal
+                * cookie. Generally, it is not a serious issue. But the
+                * caller should NOT make assumption on that. */
+               pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
+               if (!lfsck->li_current_oit_processed)
+                       pos.lp_oit_cookie--;
+
+               spin_lock(&lfsck->li_lock);
+               if (lfsck->li_di_dir != NULL) {
+                       pos.lp_dir_cookie = lfsck->li_cookie_dir;
+                       if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
+                               fid_zero(&pos.lp_dir_parent);
+                               pos.lp_dir_cookie = 0;
+                       } else {
+                               pos.lp_dir_parent =
+                                       *lfsck_dto2fid(lfsck->li_obj_dir);
+                       }
+               } else {
+                       fid_zero(&pos.lp_dir_parent);
+                       pos.lp_dir_cookie = 0;
+               }
+               spin_unlock(&lfsck->li_lock);
                rc = lfsck_pos_dump(&buf, &len, &pos, "current_position");
                if (rc <= 0)
                        goto out;
@@ -1166,7 +1241,7 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
                                com->lc_new_checked;
                __u64 speed1 = ns->ln_items_checked;
                __u64 speed2 = checked;
-               __u64 new_checked = com->lc_new_checked * CFS_HZ;
+               __u64 new_checked = com->lc_new_checked * HZ;
                __u32 rtime = ns->ln_run_time_phase2 +
                              cfs_duration_sec(duration + HALF_SEC);
 
@@ -1278,8 +1353,7 @@ static int lfsck_namespace_double_scan(const struct lu_env *env,
        struct lfsck_instance   *lfsck  = com->lc_lfsck;
        struct ptlrpc_thread    *thread = &lfsck->li_thread;
        struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
-       struct lfsck_namespace  *ns     =
-                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_namespace  *ns     = com->lc_file_ram;
        struct dt_object        *obj    = com->lc_obj;
        const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
        struct dt_object        *target;
@@ -1287,12 +1361,13 @@ static int lfsck_namespace_double_scan(const struct lu_env *env,
        struct dt_key           *key;
        struct lu_fid            fid;
        int                      rc;
-       __u8                     flags;
+       __u8                     flags = 0;
        ENTRY;
 
-       lfsck->li_new_scanned = 0;
-       lfsck->li_time_last_checkpoint = cfs_time_current();
-       lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
+       com->lc_new_checked = 0;
+       com->lc_new_scanned = 0;
+       com->lc_time_last_checkpoint = cfs_time_current();
+       com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
                                cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
 
        di = iops->init(env, obj, 0, BYPASS_CAPA);
@@ -1338,20 +1413,18 @@ static int lfsck_namespace_double_scan(const struct lu_env *env,
 
                /* XXX: Currently, skip remote object, the consistency for
                 *      remote object will be processed in LFSCK phase III. */
-               if (!dt_object_exists(target) || dt_object_remote(target))
-                       goto obj_put;
-
-               rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
-               if (rc == 0)
-                       rc = lfsck_namespace_double_scan_one(env, com,
-                                                            target, flags);
+               if (dt_object_exists(target) && !dt_object_remote(target)) {
+                       rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
+                       if (rc == 0)
+                               rc = lfsck_namespace_double_scan_one(env, com,
+                                                               target, flags);
+               }
 
-obj_put:
                lfsck_object_put(env, target);
 
 checkpoint:
-               lfsck->li_new_scanned++;
                com->lc_new_checked++;
+               com->lc_new_scanned++;
                ns->ln_fid_latest_scanned_phase2 = fid;
                if (rc > 0)
                        ns->ln_objs_repaired_phase2++;
@@ -1369,28 +1442,28 @@ checkpoint:
                if (rc < 0 && bk->lb_param & LPF_FAILOUT)
                        GOTO(put, rc);
 
-               if (likely(cfs_time_beforeq(cfs_time_current(),
-                                           lfsck->li_time_next_checkpoint)) ||
-                   com->lc_new_checked == 0)
-                       goto speed;
-
-               down_write(&com->lc_sem);
-               ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
-                               HALF_SEC - lfsck->li_time_last_checkpoint);
-               ns->ln_time_last_checkpoint = cfs_time_current_sec();
-               ns->ln_objs_checked_phase2 += com->lc_new_checked;
-               com->lc_new_checked = 0;
-               rc = lfsck_namespace_store(env, com, false);
-               up_write(&com->lc_sem);
-               if (rc != 0)
-                       GOTO(put, rc);
-
-               lfsck->li_time_last_checkpoint = cfs_time_current();
-               lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
+               if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
+                                             cfs_time_current())) &&
+                   com->lc_new_checked != 0) {
+                       down_write(&com->lc_sem);
+                       ns->ln_run_time_phase2 +=
+                               cfs_duration_sec(cfs_time_current() +
+                               HALF_SEC - com->lc_time_last_checkpoint);
+                       ns->ln_time_last_checkpoint = cfs_time_current_sec();
+                       ns->ln_objs_checked_phase2 += com->lc_new_checked;
+                       com->lc_new_checked = 0;
+                       rc = lfsck_namespace_store(env, com, false);
+                       up_write(&com->lc_sem);
+                       if (rc != 0)
+                               GOTO(put, rc);
+
+                       com->lc_time_last_checkpoint = cfs_time_current();
+                       com->lc_time_next_checkpoint =
+                               com->lc_time_last_checkpoint +
                                cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
+               }
 
-speed:
-               lfsck_control_speed(lfsck);
+               lfsck_control_speed_by_self(com);
                if (unlikely(!thread_is_running(thread)))
                        GOTO(put, rc = 0);
 
@@ -1416,8 +1489,7 @@ fini:
                com->lc_journal = 0;
                ns->ln_status = LS_COMPLETED;
                if (!(bk->lb_param & LPF_DRYRUN))
-                       ns->ln_flags &=
-                       ~(LF_SCANNED_ONCE | LF_INCONSISTENT | LF_UPGRADE);
+                       ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
                ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
                ns->ln_success_count++;
        } else if (rc == 0) {
@@ -1459,6 +1531,7 @@ int lfsck_namespace_setup(const struct lu_env *env,
 {
        struct lfsck_component  *com;
        struct lfsck_namespace  *ns;
+       struct dt_object        *root = NULL;
        struct dt_object        *obj;
        int                      rc;
        ENTRY;
@@ -1485,8 +1558,14 @@ int lfsck_namespace_setup(const struct lu_env *env,
        if (com->lc_file_disk == NULL)
                GOTO(out, rc = -ENOMEM);
 
-       obj = local_index_find_or_create(env, lfsck->li_los,
-                                        lfsck->li_local_root,
+       root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
+       if (IS_ERR(root))
+               GOTO(out, rc = PTR_ERR(root));
+
+       if (unlikely(!dt_try_as_dir(env, root)))
+               GOTO(out, rc = -ENOTDIR);
+
+       obj = local_index_find_or_create(env, lfsck->li_los, root,
                                         lfsck_namespace_name,
                                         S_IFREG | S_IRUGO | S_IWUSR,
                                         &dt_lfsck_features);
@@ -1506,16 +1585,18 @@ int lfsck_namespace_setup(const struct lu_env *env,
        if (rc != 0)
                GOTO(out, rc);
 
-       ns = (struct lfsck_namespace *)com->lc_file_ram;
+       ns = com->lc_file_ram;
        switch (ns->ln_status) {
        case LS_INIT:
        case LS_COMPLETED:
        case LS_FAILED:
        case LS_STOPPED:
+               spin_lock(&lfsck->li_lock);
                cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
+               spin_unlock(&lfsck->li_lock);
                break;
        default:
-               CERROR("%s: unknown status: %u\n",
+               CERROR("%s: unknown lfsck_namespace status: rc = %u\n",
                       lfsck_lfsck2name(lfsck), ns->ln_status);
                /* fall through */
        case LS_SCANNING_PHASE1:
@@ -1527,14 +1608,18 @@ int lfsck_namespace_setup(const struct lu_env *env,
                /* fall through */
        case LS_PAUSED:
        case LS_CRASHED:
+               spin_lock(&lfsck->li_lock);
                cfs_list_add_tail(&com->lc_link, &lfsck->li_list_scan);
                cfs_list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
+               spin_unlock(&lfsck->li_lock);
                break;
        }
 
        GOTO(out, rc = 0);
 
 out:
+       if (root != NULL && !IS_ERR(root))
+               lu_object_put(env, &root->do_lu);
        if (rc != 0)
                lfsck_component_cleanup(env, com);
        return rc;