Whamcloud - gitweb
LU-8929 lfsck: dumper gets current position properly
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
index 75c939e..88a9ab2 100644 (file)
@@ -164,6 +164,9 @@ static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
        dst->ln_local_lpf_skipped = le64_to_cpu(src->ln_local_lpf_skipped);
        dst->ln_local_lpf_failed = le64_to_cpu(src->ln_local_lpf_failed);
        dst->ln_bitmap_size = le32_to_cpu(src->ln_bitmap_size);
+       dst->ln_time_latest_reset = le32_to_cpu(src->ln_time_latest_reset);
+       dst->ln_linkea_overflow_cleared =
+                               le64_to_cpu(src->ln_linkea_overflow_cleared);
 }
 
 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
@@ -233,6 +236,9 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
        dst->ln_local_lpf_skipped = cpu_to_le64(src->ln_local_lpf_skipped);
        dst->ln_local_lpf_failed = cpu_to_le64(src->ln_local_lpf_failed);
        dst->ln_bitmap_size = cpu_to_le32(src->ln_bitmap_size);
+       dst->ln_time_latest_reset = cpu_to_le32(src->ln_time_latest_reset);
+       dst->ln_linkea_overflow_cleared =
+                               cpu_to_le64(src->ln_linkea_overflow_cleared);
 }
 
 static void lfsck_namespace_record_failure(const struct lu_env *env,
@@ -434,71 +440,6 @@ log:
        return rc;
 }
 
-static struct dt_object *
-lfsck_namespace_load_one_trace_file(const struct lu_env *env,
-                                   struct lfsck_component *com,
-                                   struct dt_object *parent,
-                                   const char *name, bool reset)
-{
-       struct lfsck_instance   *lfsck = com->lc_lfsck;
-       struct dt_object        *obj;
-       int                      rc;
-
-       if (reset) {
-               rc = local_object_unlink(env, lfsck->li_bottom, parent, name);
-               if (rc != 0 && rc != -ENOENT)
-                       return ERR_PTR(rc);
-       }
-
-       obj = local_index_find_or_create(env, lfsck->li_los, parent, name,
-                                        S_IFREG | S_IRUGO | S_IWUSR,
-                                        &dt_lfsck_features);
-
-       return obj;
-}
-
-static int lfsck_namespace_load_sub_trace_files(const struct lu_env *env,
-                                               struct lfsck_component *com,
-                                               bool reset)
-{
-       char                            *name = lfsck_env_info(env)->lti_key;
-       struct lfsck_sub_trace_obj      *lsto;
-       struct dt_object                *obj;
-       int                              rc;
-       int                              i;
-
-       for (i = 0, lsto = &com->lc_sub_trace_objs[0];
-            i < LFSCK_STF_COUNT; i++, lsto++) {
-               snprintf(name, NAME_MAX, "%s_%02d", LFSCK_NAMESPACE, i);
-               mutex_lock(&lsto->lsto_mutex);
-               if (lsto->lsto_obj != NULL) {
-                       if (!reset) {
-                               mutex_unlock(&lsto->lsto_mutex);
-                               continue;
-                       }
-
-                       lfsck_object_put(env, lsto->lsto_obj);
-                       lsto->lsto_obj = NULL;
-               }
-
-               obj = lfsck_namespace_load_one_trace_file(env, com,
-                               com->lc_lfsck->li_lfsck_dir, name, reset);
-               LASSERT(obj != NULL);
-               if (IS_ERR(obj)) {
-                       rc = PTR_ERR(obj);
-               } else {
-                       lsto->lsto_obj = obj;
-                       rc = obj->do_ops->do_index_try(env, obj,
-                                                      &dt_lfsck_features);
-               }
-               mutex_unlock(&lsto->lsto_mutex);
-               if (rc != 0)
-                       return rc;
-       }
-
-       return 0;
-}
-
 static int lfsck_namespace_init(const struct lu_env *env,
                                struct lfsck_component *com)
 {
@@ -508,11 +449,13 @@ static int lfsck_namespace_init(const struct lu_env *env,
        memset(ns, 0, sizeof(*ns));
        ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
        ns->ln_status = LS_INIT;
+       ns->ln_time_latest_reset = cfs_time_current_sec();
        down_write(&com->lc_sem);
        rc = lfsck_namespace_store(env, com);
-       up_write(&com->lc_sem);
        if (rc == 0)
-               rc = lfsck_namespace_load_sub_trace_files(env, com, true);
+               rc = lfsck_load_sub_trace_files(env, com,
+                       &dt_lfsck_namespace_features, LFSCK_NAMESPACE, true);
+       up_write(&com->lc_sem);
 
        return rc;
 }
@@ -670,6 +613,11 @@ static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
 {
        int rc;
 
+       /* For remote updating LINKEA, there may be further LFSCK action
+        * on remote MDT after the updating, so update the LINKEA ASAP. */
+       if (dt_object_remote(obj))
+               handle->th_sync = 1;
+
        /* For destroying all invalid linkEA entries. */
        rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
        if (rc == 0)
@@ -681,7 +629,7 @@ static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
 }
 
 int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
-                      struct linkea_data *ldata)
+                      struct linkea_data *ldata, bool with_rec)
 {
        int rc;
 
@@ -711,8 +659,12 @@ int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
        if (unlikely(rc == 0))
                return -ENODATA;
 
-       if (rc > 0)
-               rc = linkea_init(ldata);
+       if (rc > 0) {
+               if (with_rec)
+                       rc = linkea_init_with_rec(ldata);
+               else
+                       rc = linkea_init(ldata);
+       }
 
        return rc;
 }
@@ -787,11 +739,22 @@ log:
 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
                             struct linkea_data *ldata, struct thandle *handle)
 {
-       const struct lu_buf *buf = lfsck_buf_get_const(env,
-                                                      ldata->ld_buf->lb_buf,
-                                                      ldata->ld_leh->leh_len);
+       struct lu_buf buf;
+       int rc;
 
-       return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle);
+       lfsck_buf_init(&buf, ldata->ld_buf->lb_buf, ldata->ld_leh->leh_len);
+
+again:
+       rc = dt_xattr_set(env, obj, &buf, XATTR_NAME_LINK, 0, handle);
+       if (unlikely(rc == -ENOSPC)) {
+               rc = linkea_overflow_shrink(ldata);
+               if (likely(rc > 0)) {
+                       buf.lb_len = rc;
+                       goto again;
+               }
+       }
+
+       return rc;
 }
 
 static int lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata,
@@ -991,11 +954,8 @@ again:
 
        cname->ln_name = info->lti_key;
        cname->ln_namelen = namelen;
-       rc = linkea_data_new(&ldata2, &info->lti_linkea_buf2);
-       if (rc != 0)
-               GOTO(log, rc);
-
-       rc = linkea_add_buf(&ldata2, cname, pfid);
+       rc = linkea_links_new(&ldata2, &info->lti_linkea_buf2,
+                             cname, pfid);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -1058,10 +1018,8 @@ again:
                GOTO(stop, rc);
 
        dt_write_lock(env, orphan, 0);
-       rc = lfsck_links_read2(env, orphan, &ldata2);
-       if (likely((rc == -ENODATA) || (rc == -EINVAL) ||
-                  (rc == 0 && ldata2.ld_leh != NULL &&
-                   ldata2.ld_leh->leh_reccount == 0))) {
+       rc = lfsck_links_read2_with_rec(env, orphan, &ldata2);
+       if (likely(rc == -ENODATA || rc == -EINVAL)) {
                if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
                        GOTO(unlock, rc = 1);
 
@@ -1395,11 +1353,8 @@ again:
        memset(dof, 0, sizeof(*dof));
        dof->dof_type = dt_mode_to_dft(S_IFDIR);
 
-       rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
-       if (rc != 0)
-               GOTO(unlock1, rc);
-
-       rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
+       rc = linkea_links_new(&ldata, &info->lti_linkea_buf2,
+                             cname, lfsck_dto2fid(parent));
        if (rc != 0)
                GOTO(unlock1, rc);
 
@@ -1587,7 +1542,8 @@ static int lfsck_namespace_shrink_linkea(const struct lu_env *env,
        else
                lfsck_namespace_filter_linkea_entry(ldata, cname, pfid,
                                                    true);
-       if (ldata->ld_leh->leh_reccount > 0) {
+       if (ldata->ld_leh->leh_reccount > 0 ||
+           unlikely(ldata->ld_leh->leh_overflow_time)) {
                lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
                               ldata->ld_leh->leh_len);
                buflen = linkea_buf.lb_len;
@@ -1614,10 +1570,9 @@ again:
        if (unlikely(lfsck_is_dead_obj(obj)))
                GOTO(unlock2, rc = -ENOENT);
 
-       rc = lfsck_links_read2(env, obj, &ldata_new);
-       if (rc != 0)
-               GOTO(unlock2,
-                    rc = (rc == -ENODATA ? 0 : rc));
+       rc = lfsck_links_read2_with_rec(env, obj, &ldata_new);
+       if (rc)
+               GOTO(unlock2, rc = (rc == -ENODATA ? 0 : rc));
 
        /* The specified linkEA entry has been removed by race. */
        rc = linkea_links_find(&ldata_new, cname, pfid);
@@ -1641,14 +1596,11 @@ again:
                goto again;
        }
 
-       if (ldata_new.ld_leh->leh_reccount > 0) {
-               lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
-                              ldata_new.ld_leh->leh_len);
-               rc = dt_xattr_set(env, obj, &linkea_buf,
-                                 XATTR_NAME_LINK, 0, th);
-       } else {
+       if (ldata_new.ld_leh->leh_reccount > 0 ||
+           unlikely(ldata->ld_leh->leh_overflow_time))
+               rc = lfsck_links_write(env, obj, &ldata_new, th);
+       else
                rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th);
-       }
 
        GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
 
@@ -1890,7 +1842,7 @@ static int lfsck_namespace_replace_cond(const struct lu_env *env,
 
 replace:
        dt_read_lock(env, child, 0);
-       rc = lfsck_links_read2(env, child, &ldata);
+       rc = lfsck_links_read2_with_rec(env, child, &ldata);
        dt_read_unlock(env, child);
 
        /* Someone changed the child, no need to replace. */
@@ -2238,11 +2190,7 @@ static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env,
        LASSERT(!dt_object_remote(obj));
        LASSERT(S_ISDIR(lfsck_object_type(obj)));
 
-       rc = linkea_data_new(&ldata, &info->lti_big_buf);
-       if (rc != 0)
-               GOTO(log, rc);
-
-       rc = linkea_add_buf(&ldata, cname, pfid);
+       rc = linkea_links_new(&ldata, &info->lti_big_buf, cname, pfid);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -2288,8 +2236,7 @@ static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env,
        if (rc != 0)
                GOTO(unlock, rc);
 
-       rc = dt_xattr_set(env, obj, &linkea_buf,
-                         XATTR_NAME_LINK, 0, th);
+       rc = lfsck_links_write(env, obj, &ldata, th);
 
        GOTO(unlock, rc = (rc == 0 ? 1 : rc));
 
@@ -2741,7 +2688,8 @@ again:
                 * When the LFSCK runs again, if the dangling name is still
                 * there, the LFSCK should move the orphan directory object
                 * back to the normal namespace. */
-               if (!lpf && !lu_fid_eq(pfid, &tfid) && once) {
+               if (!lpf && !fid_is_zero(pfid) &&
+                   !lu_fid_eq(pfid, &tfid) && once) {
                        linkea_next_entry(ldata);
                        continue;
                }
@@ -2792,7 +2740,7 @@ again:
                         * directory contains the specified child, but such
                         * parent does not match the dotdot name entry, then
                         * trust the linkEA. */
-                       if (!lu_fid_eq(pfid, pfid2)) {
+                       if (!fid_is_zero(pfid) && !lu_fid_eq(pfid, pfid2)) {
                                *type = LNIT_UNMATCHED_PAIRS;
                                rc = lfsck_namespace_repair_unmatched_pairs(env,
                                                com, child, pfid2, cname);
@@ -2804,11 +2752,8 @@ rebuild:
                        /* It is the most common case that we find the
                         * name entry corresponding to the linkEA entry
                         * that matches the ".." name entry. */
-                       rc = linkea_data_new(&ldata_new, &info->lti_big_buf);
-                       if (rc != 0)
-                               RETURN(rc);
-
-                       rc = linkea_add_buf(&ldata_new, cname, pfid2);
+                       rc = linkea_links_new(&ldata_new, &info->lti_big_buf,
+                                             cname, pfid2);
                        if (rc != 0)
                                RETURN(rc);
 
@@ -2882,6 +2827,10 @@ next:
                lfsck_linkea_del_buf(ldata, cname);
        } /* while (ldata->ld_lee != NULL) */
 
+       /* If there is still linkEA overflow, return. */
+       if (unlikely(ldata->ld_leh->leh_overflow_time))
+               RETURN(0);
+
        linkea_first_entry(ldata);
        if (ldata->ld_leh->leh_reccount == 1) {
                rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata,
@@ -2923,7 +2872,7 @@ next:
  *
  * If all the known name entries have been verified, then the object's hard
  * link attribute should match the object's linkEA entries count unless the
- * object's has too much hard link to be recorded in the linkEA. Such cases
+ * object's has too many hard link to be recorded in the linkEA. Such cases
  * should have been marked in the LFSCK trace file. Otherwise, trust the
  * linkEA to update the object's nlink attribute.
  *
@@ -2942,8 +2891,6 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env,
                                        struct dt_object *obj,
                                        struct lu_attr *la)
 {
-       struct lfsck_thread_info        *info   = lfsck_env_info(env);
-       struct lu_fid                   *tfid   = &info->lti_fid3;
        struct lfsck_namespace          *ns     = com->lc_file_ram;
        struct lfsck_instance           *lfsck  = com->lc_lfsck;
        struct dt_device                *dev    = lfsck_obj2dev(obj);
@@ -2952,9 +2899,7 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env,
        struct linkea_data               ldata  = { NULL };
        struct lustre_handle             lh     = { 0 };
        __u32                            old    = la->la_nlink;
-       int                              idx;
        int                              rc     = 0;
-       __u8                             flags;
        ENTRY;
 
        LASSERT(!dt_object_remote(obj));
@@ -2988,26 +2933,20 @@ static int lfsck_namespace_repair_nlink(const struct lu_env *env,
        if (ns->ln_flags & LF_INCOMPLETE)
                GOTO(unlock, rc = 0);
 
-       fid_cpu_to_be(tfid, cfid);
-       idx = lfsck_sub_trace_file_fid2idx(cfid);
-       rc = dt_lookup(env, com->lc_sub_trace_objs[idx].lsto_obj,
-                      (struct dt_rec *)&flags, (const struct dt_key *)tfid);
-       if (rc != 0)
-               GOTO(unlock, rc);
-
-       if (flags & LNTF_SKIP_NLINK)
-               GOTO(unlock, rc = 0);
-
        rc = dt_attr_get(env, obj, la);
        if (rc != 0)
                GOTO(unlock, rc = (rc == -ENOENT ? 0 : rc));
 
-       rc = lfsck_links_read2(env, obj, &ldata);
-       if (rc != 0)
+       rc = lfsck_links_read2_with_rec(env, obj, &ldata);
+       if (rc)
                GOTO(unlock, rc = (rc == -ENODATA ? 0 : rc));
 
-       if (la->la_nlink == ldata.ld_leh->leh_reccount ||
-           unlikely(la->la_nlink == 0))
+       /* XXX: Currently, we only update the nlink attribute if the known
+        *      linkEA entries is larger than the nlink attribute. That is
+        *      safe action. */
+       if (la->la_nlink >= ldata.ld_leh->leh_reccount ||
+           unlikely(la->la_nlink == 0 ||
+                    ldata.ld_leh->leh_overflow_time))
                GOTO(unlock, rc = 0);
 
        la->la_nlink = ldata.ld_leh->leh_reccount;
@@ -3284,6 +3223,117 @@ out:
        return rc;
 }
 
+#define lfsck_time_before(a, b)                \
+       (typecheck(__u32, a) &&         \
+        typecheck(__u32, b) &&         \
+        ((int)(a) - (int)(b) < 0))
+
+static inline bool
+lfsck_namespace_linkea_stale_overflow(struct linkea_data *ldata,
+                                     struct lfsck_namespace *ns)
+{
+       /* Both the leh_overflow_time and ln_time_latest_reset are
+        * local time based, so need NOT to care about clock drift
+        * among the servers. */
+       return ldata->ld_leh->leh_overflow_time &&
+              lfsck_time_before(ldata->ld_leh->leh_overflow_time,
+                                ns->ln_time_latest_reset);
+}
+
+/**
+ * Clear the object's linkEA overflow timestamp.
+ *
+ * If the MDT-object has too many hard links as to the linkEA cannot hold
+ * all of them, then overflow timestamp will be set in the linkEA header.
+ * If some hard links are removed after that, then it is possible to hold
+ * other missed linkEA entries. If the namespace LFSCK have added all the
+ * related linkEA entries, then it will remove the overflow timestamp.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] ldata    pointer to the linkEA data for the given @obj
+ * \param[in] obj      pointer to the dt_object to be handled
+ *
+ * \retval             positive number for repaired cases
+ * \retval             0 if nothing to be repaired
+ * \retval             negative error number on failure
+ */
+static int lfsck_namespace_linkea_clear_overflow(const struct lu_env *env,
+                                                struct lfsck_component *com,
+                                                struct linkea_data *ldata,
+                                                struct dt_object *obj)
+{
+       struct lfsck_namespace *ns = com->lc_file_ram;
+       struct lfsck_instance *lfsck = com->lc_lfsck;
+       struct dt_device *dev = lfsck_obj2dev(obj);
+       struct thandle *th = NULL;
+       struct lustre_handle lh = { 0 };
+       struct lu_buf linkea_buf;
+       int rc = 0;
+       ENTRY;
+
+       LASSERT(!dt_object_remote(obj));
+
+       rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
+                             MDS_INODELOCK_UPDATE, LCK_PW);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(log, rc = PTR_ERR(th));
+
+       rc = dt_declare_xattr_set(env, obj,
+                       lfsck_buf_get_const(env, NULL, MAX_LINKEA_SIZE),
+                       XATTR_NAME_LINK, 0, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, obj, 0);
+       rc = lfsck_links_read(env, obj, ldata);
+       if (rc != 0)
+               GOTO(unlock, rc);
+
+       if (unlikely(!lfsck_namespace_linkea_stale_overflow(ldata, ns)))
+               GOTO(unlock, rc = 0);
+
+       ldata->ld_leh->leh_overflow_time = 0;
+       if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+               GOTO(unlock, rc = 1);
+
+       /* If all known entries are in the linkEA, then the 'leh_reccount'
+        * should NOT be zero. */
+       LASSERT(ldata->ld_leh->leh_reccount > 0);
+
+       lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
+                      ldata->ld_leh->leh_len);
+       rc = dt_xattr_set(env, obj, &linkea_buf, XATTR_NAME_LINK, 0, th);
+       if (unlikely(rc == -ENOSPC))
+               rc = 0;
+       else if (!rc)
+               rc = 1;
+
+       GOTO(unlock, rc);
+
+unlock:
+       dt_write_unlock(env, obj);
+
+stop:
+       dt_trans_stop(env, dev, th);
+
+log:
+       lfsck_ibits_unlock(&lh, LCK_PW);
+       CDEBUG(D_LFSCK, "%s: clear linkea overflow timestamp for the object "
+              DFID": rc = %d\n",
+              lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
+
+       return rc;
+}
+
 /**
  * Double scan the MDT-object for namespace LFSCK.
  *
@@ -3349,12 +3399,23 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env,
                        lfsck_ibits_unlock(&lh, LCK_EX);
                }
 
-               GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
+               GOTO(out, rc);
        }
 
        if (rc != 0)
                GOTO(out, rc);
 
+       if (!(ns->ln_flags & LF_INCOMPLETE) &&
+           unlikely(lfsck_namespace_linkea_stale_overflow(&ldata, ns))) {
+               rc = lfsck_namespace_linkea_clear_overflow(env, com, &ldata,
+                                                          child);
+               if (rc < 0)
+                       GOTO(out, rc);
+
+               if (rc > 0)
+                       ns->ln_linkea_overflow_cleared++;
+       }
+
        linkea_first_entry(&ldata);
        while (ldata.ld_lee != NULL) {
                rc = lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid,
@@ -3527,7 +3588,7 @@ lost_parent:
                        GOTO(out, rc);
 
                /* If there is no name entry in the parent dir and the object
-                * link count is less than the linkea entries count, then the
+                * link count is fewer than the linkea entries count, then the
                 * linkea entry should be removed. */
                if (ldata.ld_leh->leh_reccount > la->la_nlink) {
                        rc = lfsck_namespace_shrink_linkea_cond(env, com,
@@ -3628,7 +3689,9 @@ out:
                 * other MDT that references this object with another name,
                 * so we cannot know whether this linkEA is valid or not.
                 * So keep it there and maybe resolved when next LFSCK run. */
-               if (!(ns->ln_flags & LF_INCOMPLETE)) {
+               if (!(ns->ln_flags & LF_INCOMPLETE) &&
+                   (ldata.ld_leh == NULL ||
+                    !ldata.ld_leh->leh_overflow_time)) {
                        /* If the child becomes orphan, then insert it into
                         * the global .lustre/lost+found/MDTxxxx directory. */
                        rc = lfsck_namespace_insert_orphan(env, com, child,
@@ -3657,7 +3720,8 @@ out:
                                       PFID(lfsck_dto2fid(child)),
                                       la->la_nlink, count,
                                       lfsck_object_type(child));
-                       } else {
+                       } else if (la->la_nlink < count &&
+                                  likely(!ldata.ld_leh->leh_overflow_time)) {
                                rc = lfsck_namespace_repair_nlink(env, com,
                                                                  child, la);
                                if (rc > 0) {
@@ -3718,6 +3782,7 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m,
                   "striped_shards_failed: %llu\n"
                   "striped_shards_skipped: %llu\n"
                   "name_hash_repaired: %llu\n"
+                  "linkea_overflow_cleared: %llu\n"
                   "success_count: %u\n"
                   "run_time_phase1: %u seconds\n"
                   "run_time_phase2: %u seconds\n",
@@ -3753,6 +3818,7 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m,
                   ns->ln_striped_shards_failed,
                   ns->ln_striped_shards_skipped,
                   ns->ln_name_hash_repaired,
+                  ns->ln_linkea_overflow_cleared,
                   ns->ln_success_count,
                   time_phase1,
                   time_phase2);
@@ -3817,7 +3883,6 @@ static int lfsck_namespace_reset(const struct lu_env *env,
        struct lfsck_namespace          *ns     = com->lc_file_ram;
        struct lfsck_assistant_data     *lad    = com->lc_data;
        struct dt_object                *root;
-       struct dt_object                *dto;
        int                              rc;
        ENTRY;
 
@@ -3841,16 +3906,16 @@ static int lfsck_namespace_reset(const struct lu_env *env,
        }
        ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
        ns->ln_status = LS_INIT;
+       ns->ln_time_latest_reset = cfs_time_current_sec();
 
-       lfsck_object_put(env, com->lc_obj);
-       com->lc_obj = NULL;
-       dto = lfsck_namespace_load_one_trace_file(env, com, root,
-                                                 LFSCK_NAMESPACE, true);
-       if (IS_ERR(dto))
-               GOTO(out, rc = PTR_ERR(dto));
+       rc = lfsck_load_one_trace_file(env, com, root, &com->lc_obj,
+                                      &dt_lfsck_namespace_features,
+                                      LFSCK_NAMESPACE, true);
+       if (rc)
+               GOTO(out, rc);
 
-       com->lc_obj = dto;
-       rc = lfsck_namespace_load_sub_trace_files(env, com, true);
+       rc = lfsck_load_sub_trace_files(env, com, &dt_lfsck_namespace_features,
+                                       LFSCK_NAMESPACE, true);
        if (rc != 0)
                GOTO(out, rc);
 
@@ -4157,7 +4222,7 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env,
                GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
        }
 
-       if (rc == -ENODATA) {
+       if (rc == -ENODATA || unlikely(!ldata.ld_leh->leh_reccount)) {
                rc = lfsck_namespace_check_for_double_scan(env, com, obj);
 
                GOTO(out, rc);
@@ -4356,7 +4421,6 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
 
        if (ns->ln_status == LS_SCANNING_PHASE1) {
                struct lfsck_position pos;
-               const struct dt_it_ops *iops;
                cfs_duration_t duration = cfs_time_current() -
                                          lfsck->li_time_last_checkpoint;
                __u64 checked = ns->ln_items_checked + com->lc_new_checked;
@@ -4382,33 +4446,37 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
                           speed,
                           new_checked);
 
-               LASSERT(lfsck->li_di_oit != NULL);
-
-               iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
-
-               /* The low layer otable-based iteration position may NOT
-                * exactly match the namespace-based directory traversal
-                * cookie. Generally, it is not a serious issue. But the
-                * caller should NOT make assumption on that. */
-               pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
-               if (!lfsck->li_current_oit_processed)
-                       pos.lp_oit_cookie--;
-
-               spin_lock(&lfsck->li_lock);
-               if (lfsck->li_di_dir != NULL) {
-                       pos.lp_dir_cookie = lfsck->li_cookie_dir;
-                       if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
+               if (likely(lfsck->li_di_oit)) {
+                       const struct dt_it_ops *iops =
+                               &lfsck->li_obj_oit->do_index_ops->dio_it;
+
+                       /* The low layer otable-based iteration position may NOT
+                        * exactly match the namespace-based directory traversal
+                        * cookie. Generally, it is not a serious issue. But the
+                        * caller should NOT make assumption on that. */
+                       pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
+                       if (!lfsck->li_current_oit_processed)
+                               pos.lp_oit_cookie--;
+
+                       spin_lock(&lfsck->li_lock);
+                       if (lfsck->li_di_dir) {
+                               pos.lp_dir_cookie = lfsck->li_cookie_dir;
+                               if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
+                                       fid_zero(&pos.lp_dir_parent);
+                                       pos.lp_dir_cookie = 0;
+                               } else {
+                                       pos.lp_dir_parent =
+                                       *lfsck_dto2fid(lfsck->li_obj_dir);
+                               }
+                       } else {
                                fid_zero(&pos.lp_dir_parent);
                                pos.lp_dir_cookie = 0;
-                       } else {
-                               pos.lp_dir_parent =
-                                       *lfsck_dto2fid(lfsck->li_obj_dir);
                        }
+                       spin_unlock(&lfsck->li_lock);
                } else {
-                       fid_zero(&pos.lp_dir_parent);
-                       pos.lp_dir_cookie = 0;
+                       pos = ns->ln_pos_last_checkpoint;
                }
-               spin_unlock(&lfsck->li_lock);
+
                lfsck_pos_dump(m, &pos, "current_position");
        } else if (ns->ln_status == LS_SCANNING_PHASE2) {
                cfs_duration_t duration = cfs_time_current() -
@@ -4599,100 +4667,6 @@ static int lfsck_namespace_in_notify(const struct lu_env *env,
        ENTRY;
 
        switch (lr->lr_event) {
-       case LE_SKIP_NLINK_DECLARE: {
-               struct dt_object        *obj;
-               struct lu_fid           *key   = &lfsck_env_info(env)->lti_fid3;
-               int                      idx;
-               __u8                     flags = 0;
-
-               LASSERT(th != NULL);
-
-               idx = lfsck_sub_trace_file_fid2idx(&lr->lr_fid);
-               mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
-               obj = com->lc_sub_trace_objs[idx].lsto_obj;
-               if (unlikely(obj == NULL)) {
-                       mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
-                       RETURN(0);
-               }
-
-               lfsck_object_get(obj);
-               fid_cpu_to_be(key, &lr->lr_fid);
-               rc = dt_declare_delete(env, obj,
-                                      (const struct dt_key *)key, th);
-               if (rc == 0)
-                       rc = dt_declare_insert(env, obj,
-                                              (const struct dt_rec *)&flags,
-                                              (const struct dt_key *)key, th);
-               mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
-               lfsck_object_put(env, obj);
-
-               RETURN(rc);
-       }
-       case LE_SKIP_NLINK: {
-               struct dt_object        *obj;
-               struct lu_fid           *key   = &lfsck_env_info(env)->lti_fid3;
-               int                      idx;
-               __u8                     flags = 0;
-               bool                     exist = false;
-               ENTRY;
-
-               LASSERT(th != NULL);
-
-               idx = lfsck_sub_trace_file_fid2idx(&lr->lr_fid);
-               mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
-               obj = com->lc_sub_trace_objs[idx].lsto_obj;
-               if (unlikely(obj == NULL)) {
-                       mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
-                       RETURN(0);
-               }
-
-               lfsck_object_get(obj);
-               fid_cpu_to_be(key, &lr->lr_fid);
-               rc = dt_lookup(env, obj, (struct dt_rec *)&flags,
-                              (const struct dt_key *)key);
-               if (rc == 0) {
-                       if (flags & LNTF_SKIP_NLINK) {
-                               mutex_unlock(
-                               &com->lc_sub_trace_objs[idx].lsto_mutex);
-                               lfsck_object_put(env, obj);
-
-                               RETURN(0);
-                       }
-
-                       exist = true;
-               } else if (rc != -ENOENT) {
-                       GOTO(log, rc);
-               }
-
-               flags |= LNTF_SKIP_NLINK;
-               if (exist) {
-                       rc = dt_delete(env, obj, (const struct dt_key *)key,
-                                      th);
-                       if (rc != 0)
-                               GOTO(log, rc);
-               }
-
-               rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
-                              (const struct dt_key *)key, th, 1);
-
-               GOTO(log, rc);
-
-log:
-               mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
-               lfsck_object_put(env, obj);
-               CDEBUG(D_LFSCK, "%s: RPC service thread mark the "DFID
-                      " to be skipped for namespace double scan: rc = %d\n",
-                      lfsck_lfsck2name(com->lc_lfsck), PFID(&lr->lr_fid), rc);
-
-               if (rc != 0)
-                       /* If we cannot record this object in the LFSCK tracing,
-                        * we have to mark the LFSC as LF_INCOMPLETE, then the
-                        * LFSCK will skip nlink attribute verification for
-                        * all objects. */
-                       ns->ln_flags |= LF_INCOMPLETE;
-
-               return 0;
-       }
        case LE_SET_LMV_MASTER: {
                struct dt_object        *obj;
 
@@ -4967,11 +4941,8 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env,
        if (IS_ERR(child))
                GOTO(log, rc = PTR_ERR(child));
 
-       rc = linkea_data_new(&ldata, &info->lti_linkea_buf2);
-       if (rc != 0)
-               GOTO(log, rc);
-
-       rc = linkea_add_buf(&ldata, cname, pfid);
+       rc = linkea_links_new(&ldata, &info->lti_linkea_buf2,
+                             cname, pfid);
        if (rc != 0)
                GOTO(log, rc);
 
@@ -5426,6 +5397,8 @@ nodata:
                        goto again;
                }
 
+               LASSERT(handle != NULL);
+
                if (dir == NULL) {
                        dir = lfsck_assistant_object_load(env, lfsck, lso);
                        if (IS_ERR(dir)) {
@@ -5461,37 +5434,8 @@ nodata:
                }
 
                rc = linkea_add_buf(&ldata, cname, pfid);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-               rc = lfsck_links_write(env, obj, &ldata, handle);
-               if (unlikely(rc == -ENOSPC) &&
-                   S_ISREG(lfsck_object_type(obj)) && !dt_object_remote(obj)) {
-                       if (handle != NULL) {
-                               LASSERT(dt_write_locked(env, obj));
-
-                               dt_write_unlock(env, obj);
-                               dtlocked = false;
-
-                               dt_trans_stop(env, dev, handle);
-                               handle = NULL;
-
-                               lfsck_ibits_unlock(&lh, LCK_EX);
-                       }
-
-                       rc = lfsck_namespace_trace_update(env, com,
-                                       &lnr->lnr_fid, LNTF_SKIP_NLINK, true);
-                       if (rc != 0)
-                               /* If we cannot record this object in the
-                                * LFSCK tracing, we have to mark the LFSCK
-                                * as LF_INCOMPLETE, then the LFSCK will
-                                * skip nlink attribute verification for
-                                * all objects. */
-                               ns->ln_flags |= LF_INCOMPLETE;
-
-                       GOTO(out, rc = 0);
-               }
-
+               if (rc == 0)
+                       rc = lfsck_links_write(env, obj, &ldata, handle);
                if (rc != 0)
                        GOTO(stop, rc);
 
@@ -5601,9 +5545,10 @@ trace:
                        if (log)
                                CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant "
                                       "repaired the entry: "DFID", parent "DFID
-                                      ", name %.*s\n", lfsck_lfsck2name(lfsck),
+                                      ", name %.*s, type %d\n",
+                                      lfsck_lfsck2name(lfsck),
                                       PFID(&lnr->lnr_fid), PFID(pfid),
-                                      lnr->lnr_namelen, lnr->lnr_name);
+                                      lnr->lnr_namelen, lnr->lnr_name, type);
 
                        switch (type) {
                        case LNIT_DANGLING:
@@ -6191,8 +6136,9 @@ checkpoint:
                down_write(&com->lc_sem);
                com->lc_new_checked++;
                com->lc_new_scanned++;
-               if (rc >= 0 && fid_is_sane(&fid))
+               if (rc >= 0)
                        ns->ln_fid_latest_scanned_phase2 = fid;
+
                if (rc > 0)
                        ns->ln_objs_repaired_phase2++;
                else if (rc < 0)
@@ -6212,10 +6158,8 @@ checkpoint:
                        ns->ln_time_last_checkpoint = cfs_time_current_sec();
                        ns->ln_objs_checked_phase2 += com->lc_new_checked;
                        com->lc_new_checked = 0;
-                       rc = lfsck_namespace_store(env, com);
+                       lfsck_namespace_store(env, com);
                        up_write(&com->lc_sem);
-                       if (rc != 0)
-                               GOTO(put, rc);
 
                        com->lc_time_last_checkpoint = cfs_time_current();
                        com->lc_time_next_checkpoint =
@@ -6478,7 +6422,7 @@ int lfsck_verify_linkea(const struct lu_env *env, struct dt_object *obj,
 
        LASSERT(S_ISDIR(lfsck_object_type(obj)));
 
-       rc = lfsck_links_read(env, obj, &ldata);
+       rc = lfsck_links_read_with_rec(env, obj, &ldata);
        if (rc == -ENODATA) {
                dirty = true;
        } else if (rc == 0) {
@@ -6495,11 +6439,8 @@ int lfsck_verify_linkea(const struct lu_env *env, struct dt_object *obj,
        if (!dirty)
                RETURN(rc);
 
-       rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf);
-       if (rc != 0)
-               RETURN(rc);
-
-       rc = linkea_add_buf(&ldata, cname, pfid);
+       rc = linkea_links_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf,
+                             cname, pfid);
        if (rc != 0)
                RETURN(rc);
 
@@ -6550,14 +6491,11 @@ int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj,
        struct linkea_data        ldata = { NULL };
        int                       rc;
 
-       rc = lfsck_links_read(env, obj, &ldata);
-       if (rc != 0)
+       rc = lfsck_links_read_with_rec(env, obj, &ldata);
+       if (rc)
                return rc;
 
        linkea_first_entry(&ldata);
-       if (ldata.ld_lee == NULL)
-               return -ENODATA;
-
        linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
        /* To guarantee the 'name' is terminated with '0'. */
        memcpy(name, cname->ln_name, cname->ln_namelen);
@@ -6705,7 +6643,7 @@ int lfsck_namespace_setup(const struct lu_env *env,
        obj = local_index_find_or_create(env, lfsck->li_los, root,
                                         LFSCK_NAMESPACE,
                                         S_IFREG | S_IRUGO | S_IWUSR,
-                                        &dt_lfsck_features);
+                                        &dt_lfsck_namespace_features);
        if (IS_ERR(obj))
                GOTO(out, rc = PTR_ERR(obj));
 
@@ -6716,7 +6654,8 @@ int lfsck_namespace_setup(const struct lu_env *env,
        else if (rc < 0)
                rc = lfsck_namespace_reset(env, com, true);
        else
-               rc = lfsck_namespace_load_sub_trace_files(env, com, false);
+               rc = lfsck_load_sub_trace_files(env, com,
+                       &dt_lfsck_namespace_features, LFSCK_NAMESPACE, false);
        if (rc != 0)
                GOTO(out, rc);