Whamcloud - gitweb
LU-2914 lfsck: split LFSCK code from mdd to lfsck
authorFan Yong <yong.fan@whamcloud.com>
Thu, 9 May 2013 09:34:31 +0000 (17:34 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 13 Jun 2013 05:59:05 +0000 (01:59 -0400)
The new LFSCK is used not only for namespace consistency verification,
but also for some other purposes, such as file layout consistency, DNE
consistency, and ect.

So it is more reasonable to move the new LFSCK code from mdd module to
another independent module -- lfsck, which can directly talk with lod,
local storage lib and ldlm. MDD (and OFD in next pahse) will register
as lfsck instance for related lfsck component(s), such as 'namespace'.
The lfsck processing will be independent from MDD, except the control:
start/stop/pause.

Test-Parameters: testlist=sanity-scrub,sanity-lfsck

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: Ia6cd619d8972f4a51effa1fe48099fced6875ab9
Reviewed-on: http://review.whamcloud.com/6321
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
29 files changed:
libcfs/include/libcfs/libcfs_debug.h
libcfs/libcfs/debug.c
lnet/utils/debug.c
lustre/Makefile.in
lustre/autoMakefile.am
lustre/autoconf/lustre-core.m4
lustre/include/Makefile.am
lustre/include/dt_object.h
lustre/include/lu_object.h
lustre/include/lustre_lfsck.h [new file with mode: 0644]
lustre/include/lustre_linkea.h
lustre/lfsck/Makefile.in [new file with mode: 0644]
lustre/lfsck/autoMakefile.am [new file with mode: 0644]
lustre/lfsck/lfsck_bookmark.c [new file with mode: 0644]
lustre/lfsck/lfsck_engine.c [new file with mode: 0644]
lustre/lfsck/lfsck_internal.h [new file with mode: 0644]
lustre/lfsck/lfsck_lib.c [new file with mode: 0644]
lustre/lfsck/lfsck_namespace.c [new file with mode: 0644]
lustre/mdd/Makefile.in
lustre/mdd/autoMakefile.am
lustre/mdd/mdd_device.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_lfsck.c [deleted file]
lustre/mdd/mdd_lfsck.h [deleted file]
lustre/mdd/mdd_lproc.c
lustre/mdt/mdt_handler.c
lustre/tests/sanity-lfsck.sh
lustre/tests/test-framework.sh

index b8e0053..a87894b 100644 (file)
@@ -107,7 +107,7 @@ struct ptldebug_header {
 #define S_LOV         0x00020000
 #define S_LQUOTA      0x00040000
 #define S_OSD          0x00080000
-/* unused */
+#define S_LFSCK       0x00100000
 /* unused */
 /* unused */
 #define S_LMV         0x00800000 /* b_new_cmd */
index f503781..2dcb0ea 100644 (file)
@@ -166,6 +166,8 @@ libcfs_debug_subsys2str(int subsys)
                 return "lquota";
        case S_OSD:
                return "osd";
+       case S_LFSCK:
+               return "lfsck";
         case S_LMV:
                 return "lmv";
         case S_SEC:
index ec4393e..20a6ba4 100644 (file)
@@ -57,14 +57,14 @@ static int debug_mask = ~0;
 #define MAX_MARK_SIZE 256
 
 static const char *libcfs_debug_subsystems[] =
-        {"undefined", "mdc", "mds", "osc",
-         "ost", "class", "log", "llite",
-         "rpc", "mgmt", "lnet", "lnd",
-         "pinger", "filter", "", "echo",
-         "ldlm", "lov", "lquota", "",
-         "", "", "", "lmv",
-         "", "sec", "gss", "", 
-         "mgc", "mgs", "fid", "fld", NULL};
+       {"undefined", "mdc", "mds", "osc",
+        "ost", "class", "log", "llite",
+        "rpc", "mgmt", "lnet", "lnd",
+        "pinger", "filter", "", "echo",
+        "ldlm", "lov", "lquota", "osd",
+        "lfsck", "", "", "lmv",
+        "", "sec", "gss", "",
+        "mgc", "mgs", "fid", "fld", NULL};
 static const char *libcfs_debug_masks[] =
         {"trace", "inode", "super", "ext2",
          "malloc", "cache", "info", "ioctl",
@@ -892,6 +892,7 @@ static struct mod_paths {
         {"fld", "lustre/fld"},
        {"lod", "lustre/lod"},
        {"osp", "lustre/osp"},
+       { "lfsck", "lustre/lfsck" },
         {NULL, NULL}
 };
 
index 49199e5..732a9d8 100644 (file)
@@ -5,7 +5,7 @@ subdir-m += ptlrpc
 subdir-m += obdecho
 subdir-m += mgc
 
-@SERVER_TRUE@subdir-m += ost mgs mdt mdd ofd quota osp lod
+@SERVER_TRUE@subdir-m += ost mgs mdt mdd ofd quota osp lod lfsck
 @CLIENT_TRUE@subdir-m += lov osc mdc lmv llite fld
 @LDISKFS_ENABLED_TRUE@subdir-m += osd-ldiskfs
 @ZFS_ENABLED_TRUE@subdir-m += osd-zfs
index 099048a..56bb608 100644 (file)
@@ -43,7 +43,7 @@ ALWAYS_SUBDIRS = include lvfs obdclass ldlm ptlrpc obdecho \
        mgc fid fld doc utils tests scripts autoconf contrib conf
 
 SERVER_SUBDIRS = ost mgs mdt mdd ofd osd-zfs osd-ldiskfs \
-       quota osp lod target
+       quota osp lod target lfsck
 
 CLIENT_SUBDIRS = mdc lmv llite lclient lov osc
 
index 31e83bf..9e6fb87 100644 (file)
@@ -2882,6 +2882,8 @@ lustre/mdc/Makefile
 lustre/mdc/autoMakefile
 lustre/lmv/Makefile
 lustre/lmv/autoMakefile
+lustre/lfsck/Makefile
+lustre/lfsck/autoMakefile
 lustre/mdt/Makefile
 lustre/mdt/autoMakefile
 lustre/mdd/Makefile
index c118a3c..7b0b369 100644 (file)
@@ -51,4 +51,4 @@ EXTRA_DIST = ioctl.h liblustre.h lprocfs_status.h lustre_cfg.h        \
              lustre_fid.h lustre_fld.h lustre_req_layout.h lustre_capa.h \
              lustre_idmap.h lustre_eacl.h interval_tree.h obd_cksum.h \
             lu_ref.h cl_object.h lustre_acl.h lclient.h lu_target.h \
-            lustre_update.h lustre_linkea.h
+            lustre_update.h lustre_linkea.h lustre_lfsck.h
index 284184d..fb81d4b 100644 (file)
@@ -697,6 +697,11 @@ struct local_oid_storage {
        __u32             los_last_oid;
 };
 
+static inline struct lu_device *dt2lu_dev(struct dt_device *d)
+{
+        return &d->dd_lu_dev;
+}
+
 static inline struct dt_object *lu2dt(struct lu_object *l)
 {
         LASSERT(l == NULL || IS_ERR(l) || lu_device_is_dt(l->lo_dev));
index c093d31..5a7551f 100644 (file)
@@ -509,7 +509,7 @@ enum lu_object_header_flags {
        /**
         * Mark this object has already been taken out of cache.
         */
-       LU_OBJECT_UNHASHED = 1
+       LU_OBJECT_UNHASHED = 1,
 };
 
 enum lu_object_header_attr {
diff --git a/lustre/include/lustre_lfsck.h b/lustre/include/lustre_lfsck.h
new file mode 100644 (file)
index 0000000..f75d507
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License version 2 for more details.  A copy is
+ * included in the COPYING file that accompanied this code.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2013, Intel Corporation.
+ */
+/*
+ * lustre/include/lustre_lfsck.h
+ *
+ * Lustre LFSCK exported functions.
+ *
+ * Author: Fan, Yong <fan.yong@intel.com>
+ */
+
+#ifndef _LUSTRE_LFSCK_H
+# define _LUSTRE_LFSCK_H
+
+#include <lustre/lustre_lfsck_user.h>
+#include <lustre_dlm.h>
+#include <lu_object.h>
+#include <dt_object.h>
+
+struct lfsck_start_param {
+       struct lfsck_start      *lsp_start;
+       struct ldlm_namespace   *lsp_namespace;
+};
+
+int lfsck_register(const struct lu_env *env, struct dt_device *key,
+                  struct dt_device *next, bool master);
+void lfsck_degister(const struct lu_env *env, struct dt_device *key);
+
+int lfsck_start(const struct lu_env *env, struct dt_device *key,
+               struct lfsck_start_param *lsp);
+int lfsck_stop(const struct lu_env *env, struct dt_device *key,
+              bool pause);
+
+int lfsck_get_speed(struct dt_device *key, void *buf, int len);
+int lfsck_set_speed(struct dt_device *key, int val);
+
+int lfsck_dump(struct dt_device *key, void *buf, int len, __u16 type);
+
+#endif /* _LUSTRE_LFSCK_H */
index 5790be9..acb8442 100644 (file)
@@ -27,6 +27,8 @@
  * Author: di wang <di.wang@intel.com>
  */
 
+#define DEFAULT_LINKEA_SIZE    4096
+
 struct linkea_data {
        /**
         * Buffer to keep link EA body.
diff --git a/lustre/lfsck/Makefile.in b/lustre/lfsck/Makefile.in
new file mode 100644 (file)
index 0000000..a703886
--- /dev/null
@@ -0,0 +1,4 @@
+MODULES := lfsck
+lfsck-objs := lfsck_lib.o lfsck_engine.o lfsck_bookmark.o lfsck_namespace.o
+
+@INCLUDE_RULES@
diff --git a/lustre/lfsck/autoMakefile.am b/lustre/lfsck/autoMakefile.am
new file mode 100644 (file)
index 0000000..ee8091c
--- /dev/null
@@ -0,0 +1,28 @@
+#
+# GPL HEADER START
+#
+# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 only,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License version 2 for more details.  A copy is
+# included in the COPYING file that accompanied this code.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# GPL HEADER END
+#
+
+if MODULES
+modulefs_DATA = lfsck$(KMODEXT)
+endif
+
+MOSTLYCLEANFILES := @MOSTLYCLEANFILES@
+EXTRA_DIST := $(lfsck-objs:%.o=%.c) lfsck_internal.h
diff --git a/lustre/lfsck/lfsck_bookmark.c b/lustre/lfsck/lfsck_bookmark.c
new file mode 100644 (file)
index 0000000..1f87bec
--- /dev/null
@@ -0,0 +1,180 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License version 2 for more details.  A copy is
+ * included in the COPYING file that accompanied this code.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012, 2013, Intel Corporation.
+ */
+/*
+ * lustre/lfsck/lfsck_bookmark.c
+ *
+ * Author: Fan, Yong <fan.yong@intel.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_LFSCK
+
+#include <lu_object.h>
+#include <dt_object.h>
+#include <lustre_fid.h>
+#include <lustre/lustre_user.h>
+
+#include "lfsck_internal.h"
+
+#define LFSCK_BOOKMARK_MAGIC   0x20130C1D
+
+static const char lfsck_bookmark_name[] = "lfsck_bookmark";
+
+static void lfsck_bookmark_le_to_cpu(struct lfsck_bookmark *des,
+                                    struct lfsck_bookmark *src)
+{
+       des->lb_magic = le32_to_cpu(src->lb_magic);
+       des->lb_version = le16_to_cpu(src->lb_version);
+       des->lb_param = le16_to_cpu(src->lb_param);
+       des->lb_speed_limit = le32_to_cpu(src->lb_speed_limit);
+}
+
+static void lfsck_bookmark_cpu_to_le(struct lfsck_bookmark *des,
+                                    struct lfsck_bookmark *src)
+{
+       des->lb_magic = cpu_to_le32(src->lb_magic);
+       des->lb_version = cpu_to_le16(src->lb_version);
+       des->lb_param = cpu_to_le16(src->lb_param);
+       des->lb_speed_limit = cpu_to_le32(src->lb_speed_limit);
+}
+
+static int lfsck_bookmark_load(const struct lu_env *env,
+                              struct lfsck_instance *lfsck)
+{
+       loff_t pos = 0;
+       int    len = sizeof(struct lfsck_bookmark);
+       int    rc;
+
+       rc = dt_record_read(env, lfsck->li_bookmark_obj,
+                           lfsck_buf_get(env, &lfsck->li_bookmark_disk, len),
+                           &pos);
+       if (rc == 0) {
+               struct lfsck_bookmark *bm = &lfsck->li_bookmark_ram;
+
+               lfsck_bookmark_le_to_cpu(bm, &lfsck->li_bookmark_disk);
+               if (bm->lb_magic != LFSCK_BOOKMARK_MAGIC) {
+                       CWARN("%.16s: invalid lfsck_bookmark magic "
+                             "0x%x != 0x%x\n", lfsck_lfsck2name(lfsck),
+                             bm->lb_magic, LFSCK_BOOKMARK_MAGIC);
+                       /* Process it as new lfsck_bookmark. */
+                       rc = -ENODATA;
+               }
+       } else {
+               if (rc == -EFAULT && pos == 0)
+                       /* return -ENODATA for empty lfsck_bookmark. */
+                       rc = -ENODATA;
+               else
+                       CERROR("%.16s: fail to load lfsck_bookmark, "
+                              "expected = %d, rc = %d\n",
+                              lfsck_lfsck2name(lfsck), len, rc);
+       }
+       return rc;
+}
+
+int lfsck_bookmark_store(const struct lu_env *env, struct lfsck_instance *lfsck)
+{
+       struct thandle    *handle;
+       struct dt_object  *obj    = lfsck->li_bookmark_obj;
+       loff_t             pos    = 0;
+       int                len    = sizeof(struct lfsck_bookmark);
+       int                rc;
+       ENTRY;
+
+       lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk,
+                                &lfsck->li_bookmark_ram);
+       handle = dt_trans_create(env, lfsck->li_bottom);
+       if (IS_ERR(handle)) {
+               rc = PTR_ERR(handle);
+               CERROR("%.16s: fail to create trans for storing "
+                      "lfsck_bookmark: %d\n,", lfsck_lfsck2name(lfsck), rc);
+               RETURN(rc);
+       }
+
+       rc = dt_declare_record_write(env, obj, len, 0, handle);
+       if (rc != 0) {
+               CERROR("%.16s: fail to declare trans for storing "
+                      "lfsck_bookmark: %d\n,", lfsck_lfsck2name(lfsck), rc);
+               GOTO(out, rc);
+       }
+
+       rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
+       if (rc != 0) {
+               CERROR("%.16s: fail to start trans for storing "
+                      "lfsck_bookmark: %d\n,", lfsck_lfsck2name(lfsck), rc);
+               GOTO(out, rc);
+       }
+
+       rc = dt_record_write(env, obj,
+                            lfsck_buf_get(env, &lfsck->li_bookmark_disk, len),
+                            &pos, handle);
+       if (rc != 0)
+               CERROR("%.16s: fail to store lfsck_bookmark, expected = %d, "
+                      "rc = %d\n", lfsck_lfsck2name(lfsck), len, rc);
+
+       GOTO(out, rc);
+
+out:
+       dt_trans_stop(env, lfsck->li_bottom, handle);
+       return rc;
+}
+
+static int lfsck_bookmark_init(const struct lu_env *env,
+                              struct lfsck_instance *lfsck)
+{
+       struct lfsck_bookmark *mb = &lfsck->li_bookmark_ram;
+       int rc;
+
+       memset(mb, 0, sizeof(*mb));
+       mb->lb_magic = LFSCK_BOOKMARK_MAGIC;
+       mb->lb_version = LFSCK_VERSION_V2;
+       mutex_lock(&lfsck->li_mutex);
+       rc = lfsck_bookmark_store(env, lfsck);
+       mutex_unlock(&lfsck->li_mutex);
+       return rc;
+}
+
+int lfsck_bookmark_setup(const struct lu_env *env,
+                        struct lfsck_instance *lfsck)
+{
+       struct dt_object *obj;
+       int               rc;
+       ENTRY;
+
+       obj = local_file_find_or_create(env, lfsck->li_los,
+                                       lfsck->li_local_root,
+                                       lfsck_bookmark_name,
+                                       S_IFREG | S_IRUGO | S_IWUSR);
+       if (IS_ERR(obj))
+               RETURN(PTR_ERR(obj));
+
+       lfsck->li_bookmark_obj = obj;
+       rc = lfsck_bookmark_load(env, lfsck);
+       if (rc == -ENODATA)
+               rc = lfsck_bookmark_init(env, lfsck);
+
+       RETURN(rc);
+}
diff --git a/lustre/lfsck/lfsck_engine.c b/lustre/lfsck/lfsck_engine.c
new file mode 100644 (file)
index 0000000..ac7fb1c
--- /dev/null
@@ -0,0 +1,355 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License version 2 for more details.  A copy is
+ * included in the COPYING file that accompanied this code.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012, 2013, Intel Corporation.
+ */
+/*
+ * lustre/lfsck/lfsck_engine.c
+ *
+ * Author: Fan, Yong <fan.yong@intel.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_LFSCK
+
+#include <lu_object.h>
+#include <dt_object.h>
+#include <lustre_net.h>
+#include <lustre_fid.h>
+#include <obd_support.h>
+#include <lustre_lib.h>
+
+#include "lfsck_internal.h"
+
+static void lfsck_unpack_ent(struct lu_dirent *ent)
+{
+       fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
+       ent->lde_hash = le64_to_cpu(ent->lde_hash);
+       ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
+       ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
+       ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
+
+       /* Make sure the name is terminated with '0'.
+        * The data (type) after ent::lde_name maybe
+        * broken, but we do not care. */
+       ent->lde_name[ent->lde_namelen] = 0;
+}
+
+static void lfsck_close_dir(const struct lu_env *env,
+                           struct lfsck_instance *lfsck)
+{
+       struct dt_object        *dir_obj  = lfsck->li_obj_dir;
+       const struct dt_it_ops  *dir_iops = &dir_obj->do_index_ops->dio_it;
+       struct dt_it            *dir_di   = lfsck->li_di_dir;
+
+       spin_lock(&lfsck->li_lock);
+       lfsck->li_di_dir = NULL;
+       spin_unlock(&lfsck->li_lock);
+
+       dir_iops->put(env, dir_di);
+       dir_iops->fini(env, dir_di);
+       lfsck->li_obj_dir = NULL;
+       lfsck_object_put(env, dir_obj);
+}
+
+static int lfsck_master_dir_engine(const struct lu_env *env,
+                                  struct lfsck_instance *lfsck)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       const struct dt_it_ops          *iops   =
+                       &lfsck->li_obj_dir->do_index_ops->dio_it;
+       struct dt_it                    *di     = lfsck->li_di_dir;
+       struct lu_dirent                *ent    = &info->lti_ent;
+       struct lu_fid                   *fid    = &info->lti_fid;
+       struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
+       struct ptlrpc_thread            *thread = &lfsck->li_thread;
+       int                              rc;
+       ENTRY;
+
+       do {
+               struct dt_object *child;
+
+               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
+                   cfs_fail_val > 0) {
+                       struct l_wait_info lwi;
+
+                       lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
+                                         NULL, NULL);
+                       l_wait_event(thread->t_ctl_waitq,
+                                    !thread_is_running(thread),
+                                    &lwi);
+               }
+
+               lfsck->li_new_scanned++;
+               rc = iops->rec(env, di, (struct dt_rec *)ent,
+                              lfsck->li_args_dir);
+               if (rc != 0) {
+                       lfsck_fail(env, lfsck, true);
+                       if (bk->lb_param & LPF_FAILOUT)
+                               RETURN(rc);
+                       else
+                               goto checkpoint;
+               }
+
+               lfsck_unpack_ent(ent);
+               if (ent->lde_attrs & LUDA_IGNORE)
+                       goto checkpoint;
+
+               *fid = ent->lde_fid;
+               child = lfsck_object_find(env, lfsck, fid);
+               if (child == NULL) {
+                       goto checkpoint;
+               } else if (IS_ERR(child)) {
+                       lfsck_fail(env, lfsck, true);
+                       if (bk->lb_param & LPF_FAILOUT)
+                               RETURN(PTR_ERR(child));
+                       else
+                               goto checkpoint;
+               }
+
+               /* XXX: Currently, skip remote object, the consistency for
+                *      remote object will be processed in LFSCK phase III. */
+               if (dt_object_exists(child) && !dt_object_remote(child))
+                       rc = lfsck_exec_dir(env, lfsck, child, ent);
+               lfsck_object_put(env, child);
+               if (rc != 0 && bk->lb_param & LPF_FAILOUT)
+                       RETURN(rc);
+
+checkpoint:
+               rc = lfsck_checkpoint(env, lfsck);
+               if (rc != 0 && bk->lb_param & LPF_FAILOUT)
+                       RETURN(rc);
+
+               /* Rate control. */
+               lfsck_control_speed(lfsck);
+               if (unlikely(!thread_is_running(thread)))
+                       RETURN(0);
+
+               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
+                       spin_lock(&lfsck->li_lock);
+                       thread_set_flags(thread, SVC_STOPPING);
+                       spin_unlock(&lfsck->li_lock);
+                       RETURN(-EINVAL);
+               }
+
+               rc = iops->next(env, di);
+       } while (rc == 0);
+
+       if (rc > 0 && !lfsck->li_oit_over)
+               lfsck_close_dir(env, lfsck);
+
+       RETURN(rc);
+}
+
+static int lfsck_master_oit_engine(const struct lu_env *env,
+                                  struct lfsck_instance *lfsck)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       const struct dt_it_ops          *iops   =
+                               &lfsck->li_obj_oit->do_index_ops->dio_it;
+       struct dt_it                    *di     = lfsck->li_di_oit;
+       struct lu_fid                   *fid    = &info->lti_fid;
+       struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
+       struct ptlrpc_thread            *thread = &lfsck->li_thread;
+       int                              rc;
+       ENTRY;
+
+       do {
+               struct dt_object *target;
+
+               if (lfsck->li_di_dir != NULL) {
+                       rc = lfsck_master_dir_engine(env, lfsck);
+                       if (rc <= 0)
+                               RETURN(rc);
+               }
+
+               if (unlikely(lfsck->li_oit_over))
+                       RETURN(1);
+
+               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY1) &&
+                   cfs_fail_val > 0) {
+                       struct l_wait_info lwi;
+
+                       lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
+                                         NULL, NULL);
+                       l_wait_event(thread->t_ctl_waitq,
+                                    !thread_is_running(thread),
+                                    &lwi);
+               }
+
+               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
+                       RETURN(0);
+
+               lfsck->li_current_oit_processed = 1;
+               lfsck->li_new_scanned++;
+               rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
+               if (rc != 0) {
+                       lfsck_fail(env, lfsck, true);
+                       if (bk->lb_param & LPF_FAILOUT)
+                               RETURN(rc);
+                       else
+                               goto checkpoint;
+               }
+
+               target = lfsck_object_find(env, lfsck, fid);
+               if (target == NULL) {
+                       goto checkpoint;
+               } else if (IS_ERR(target)) {
+                       lfsck_fail(env, lfsck, true);
+                       if (bk->lb_param & LPF_FAILOUT)
+                               RETURN(PTR_ERR(target));
+                       else
+                               goto checkpoint;
+               }
+
+               /* XXX: Currently, skip remote object, the consistency for
+                *      remote object will be processed in LFSCK phase III. */
+               if (dt_object_exists(target) && !dt_object_remote(target))
+                       rc = lfsck_exec_oit(env, lfsck, target);
+               lfsck_object_put(env, target);
+               if (rc != 0 && bk->lb_param & LPF_FAILOUT)
+                       RETURN(rc);
+
+checkpoint:
+               rc = lfsck_checkpoint(env, lfsck);
+               if (rc != 0 && bk->lb_param & LPF_FAILOUT)
+                       RETURN(rc);
+
+               /* Rate control. */
+               lfsck_control_speed(lfsck);
+
+               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
+                       spin_lock(&lfsck->li_lock);
+                       thread_set_flags(thread, SVC_STOPPING);
+                       spin_unlock(&lfsck->li_lock);
+                       RETURN(-EINVAL);
+               }
+
+               rc = iops->next(env, di);
+               if (unlikely(rc > 0))
+                       lfsck->li_oit_over = 1;
+               else if (likely(rc == 0))
+                       lfsck->li_current_oit_processed = 0;
+
+               if (unlikely(!thread_is_running(thread)))
+                       RETURN(0);
+       } while (rc == 0 || lfsck->li_di_dir != NULL);
+
+       RETURN(rc);
+}
+
+int lfsck_master_engine(void *args)
+{
+       struct lu_env            env;
+       struct lfsck_instance   *lfsck    = (struct lfsck_instance *)args;
+       struct ptlrpc_thread    *thread   = &lfsck->li_thread;
+       struct dt_object        *oit_obj  = lfsck->li_obj_oit;
+       const struct dt_it_ops  *oit_iops = &oit_obj->do_index_ops->dio_it;
+       struct dt_it            *oit_di;
+       int                      rc;
+       ENTRY;
+
+       cfs_daemonize("lfsck_master");
+       rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
+       if (rc != 0) {
+               CERROR("%s: LFSCK, fail to init env, rc = %d\n",
+                      lfsck_lfsck2name(lfsck), rc);
+               GOTO(noenv, rc);
+       }
+
+       oit_di = oit_iops->init(&env, oit_obj, lfsck->li_args_oit, BYPASS_CAPA);
+       if (IS_ERR(oit_di)) {
+               rc = PTR_ERR(oit_di);
+               CERROR("%s: LFSCK, fail to init iteration, rc = %d\n",
+                      lfsck_lfsck2name(lfsck), rc);
+               GOTO(fini_env, rc);
+       }
+
+       spin_lock(&lfsck->li_lock);
+       lfsck->li_di_oit = oit_di;
+       spin_unlock(&lfsck->li_lock);
+       rc = lfsck_prep(&env, lfsck);
+       if (rc != 0)
+               GOTO(fini_oit, rc);
+
+       CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = 0x%x, dir_flags = 0x%x, "
+              "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
+              ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
+              lfsck->li_pos_current.lp_oit_cookie,
+              lfsck->li_pos_current.lp_dir_cookie,
+              PFID(&lfsck->li_pos_current.lp_dir_parent),
+              cfs_curproc_pid());
+
+       spin_lock(&lfsck->li_lock);
+       thread_set_flags(thread, SVC_RUNNING);
+       spin_unlock(&lfsck->li_lock);
+       cfs_waitq_broadcast(&thread->t_ctl_waitq);
+
+       if (!cfs_list_empty(&lfsck->li_list_scan) ||
+           cfs_list_empty(&lfsck->li_list_double_scan))
+               rc = lfsck_master_oit_engine(&env, lfsck);
+       else
+               rc = 1;
+
+       CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = 0x%x, dir_flags = 0x%x, "
+              "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
+              ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
+              lfsck->li_pos_current.lp_oit_cookie,
+              lfsck->li_pos_current.lp_dir_cookie,
+              PFID(&lfsck->li_pos_current.lp_dir_parent),
+              cfs_curproc_pid(), rc);
+
+       if (lfsck->li_paused && cfs_list_empty(&lfsck->li_list_scan))
+               oit_iops->put(&env, oit_di);
+
+       if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
+               rc = lfsck_post(&env, lfsck, rc);
+       if (lfsck->li_di_dir != NULL)
+               lfsck_close_dir(&env, lfsck);
+
+fini_oit:
+       spin_lock(&lfsck->li_lock);
+       lfsck->li_di_oit = NULL;
+       spin_unlock(&lfsck->li_lock);
+
+       oit_iops->fini(&env, oit_di);
+       if (rc == 1) {
+               if (!cfs_list_empty(&lfsck->li_list_double_scan))
+                       rc = lfsck_double_scan(&env, lfsck);
+               else
+                       rc = 0;
+       }
+
+       /* XXX: Purge the pinned objects in the future. */
+
+fini_env:
+       lu_env_fini(&env);
+
+noenv:
+       spin_lock(&lfsck->li_lock);
+       thread_set_flags(thread, SVC_STOPPED);
+       cfs_waitq_broadcast(&thread->t_ctl_waitq);
+       spin_unlock(&lfsck->li_lock);
+       return rc;
+}
diff --git a/lustre/lfsck/lfsck_internal.h b/lustre/lfsck/lfsck_internal.h
new file mode 100644 (file)
index 0000000..0a0e5e6
--- /dev/null
@@ -0,0 +1,544 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License version 2 for more details.  A copy is
+ * included in the COPYING file that accompanied this code.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2013, Intel Corporation.
+ */
+/*
+ * lustre/lfsck/lfsck_internal.h
+ *
+ * Shared definitions and declarations for the LFSCK.
+ *
+ * Author: Fan, Yong <fan.yong@intel.com>
+ */
+
+#ifndef _LFSCK_INTERNAL_H
+# define _LFSCK_INTERNAL_H
+
+#include <lustre/lustre_lfsck_user.h>
+#include <lustre/lustre_user.h>
+#include <lustre/lustre_idl.h>
+#include <obd.h>
+#include <lu_object.h>
+#include <dt_object.h>
+#include <lustre_net.h>
+#include <lustre_dlm.h>
+#include <lustre_fid.h>
+
+#define HALF_SEC                       (CFS_HZ >> 1)
+#define LFSCK_CHECKPOINT_INTERVAL      60
+
+#define LFSCK_NAMEENTRY_DEAD           1 /* The object has been unlinked. */
+#define LFSCK_NAMEENTRY_REMOVED        2 /* The entry has been removed. */
+#define LFSCK_NAMEENTRY_RECREATED      3 /* The entry has been recreated. */
+
+enum lfsck_status {
+       /* The lfsck file is new created, for new MDT, upgrading from old disk,
+        * or re-creating the lfsck file manually. */
+       LS_INIT                 = 0,
+
+       /* The first-step system scanning. */
+       LS_SCANNING_PHASE1      = 1,
+
+       /* The second-step system scanning. */
+       LS_SCANNING_PHASE2      = 2,
+
+       /* The LFSCK processing has completed for all objects. */
+       LS_COMPLETED            = 3,
+
+       /* The LFSCK exited automatically for failure, will not auto restart. */
+       LS_FAILED               = 4,
+
+       /* The LFSCK is stopped manually, will not auto restart. */
+       LS_STOPPED              = 5,
+
+       /* LFSCK is paused automatically when umount,
+        * will be restarted automatically when remount. */
+       LS_PAUSED               = 6,
+
+       /* System crashed during the LFSCK,
+        * will be restarted automatically after recovery. */
+       LS_CRASHED              = 7,
+};
+
+enum lfsck_flags {
+       /* Finish to the cycle scanning. */
+       LF_SCANNED_ONCE = 0x00000001ULL,
+
+       /* There is some namespace inconsistency. */
+       LF_INCONSISTENT = 0x00000002ULL,
+
+       /* The device is upgraded from 1.8 format. */
+       LF_UPGRADE      = 0x00000004ULL,
+};
+
+struct lfsck_position {
+       /* low layer object table-based iteration position. */
+       __u64   lp_oit_cookie;
+
+       /* parent FID for directory traversal. */
+       struct lu_fid lp_dir_parent;
+
+       /* namespace-based directory traversal position. */
+       __u64   lp_dir_cookie;
+};
+
+struct lfsck_bookmark {
+       /* Magic number to detect that this struct contains valid data. */
+       __u32   lb_magic;
+
+       /* For compatible with old versions. */
+       __u16   lb_version;
+
+       /* See 'enum lfsck_param_flags' */
+       __u16   lb_param;
+
+       /* How many items can be scanned at most per second. */
+       __u32   lb_speed_limit;
+
+       /* For 64-bits aligned. */
+       __u32   lb_padding;
+
+       /* For future using. */
+       __u64   lb_reserved[6];
+};
+
+struct lfsck_namespace {
+       /* Magic number to detect that this struct contains valid data. */
+       __u32   ln_magic;
+
+       /* See 'enum lfsck_status'. */
+       __u32   ln_status;
+
+       /* See 'enum lfsck_flags'. */
+       __u32   ln_flags;
+
+       /* How many completed LFSCK runs on the device. */
+       __u32   ln_success_count;
+
+       /*  How long the LFSCK phase1 has run in seconds. */
+       __u32   ln_run_time_phase1;
+
+       /*  How long the LFSCK phase2 has run in seconds. */
+       __u32   ln_run_time_phase2;
+
+       /* Time for the last LFSCK completed in seconds since epoch. */
+       __u64   ln_time_last_complete;
+
+       /* Time for the latest LFSCK ran in seconds since epoch. */
+       __u64   ln_time_latest_start;
+
+       /* Time for the last LFSCK checkpoint in seconds since epoch. */
+       __u64   ln_time_last_checkpoint;
+
+       /* Position for the latest LFSCK started from. */
+       struct lfsck_position   ln_pos_latest_start;
+
+       /* Position for the last LFSCK checkpoint. */
+       struct lfsck_position   ln_pos_last_checkpoint;
+
+       /* Position for the first should be updated object. */
+       struct lfsck_position   ln_pos_first_inconsistent;
+
+       /* How many items (including dir) have been checked. */
+       __u64   ln_items_checked;
+
+       /* How many items have been repaired. */
+       __u64   ln_items_repaired;
+
+       /* How many items failed to be processed. */
+       __u64   ln_items_failed;
+
+       /* How many directories have been traversed. */
+       __u64   ln_dirs_checked;
+
+       /* How many multiple-linked objects have been checked. */
+       __u64   ln_mlinked_checked;
+
+       /* How many objects have been double scanned. */
+       __u64   ln_objs_checked_phase2;
+
+       /* How many objects have been reparied during double scan. */
+       __u64   ln_objs_repaired_phase2;
+
+       /* How many objects failed to be processed during double scan. */
+       __u64   ln_objs_failed_phase2;
+
+       /* How many objects with nlink fixed. */
+       __u64   ln_objs_nlink_repaired;
+
+       /* How many objects were lost before, but found back now. */
+       __u64   ln_objs_lost_found;
+
+       /* The latest object has been processed (failed) during double scan. */
+       struct lu_fid   ln_fid_latest_scanned_phase2;
+
+       /* For further using. 256-bytes aligned now. */
+       __u64   ln_reserved[2];
+};
+
+struct lfsck_component;
+
+struct lfsck_operations {
+       int (*lfsck_reset)(const struct lu_env *env,
+                          struct lfsck_component *com,
+                          bool init);
+
+       void (*lfsck_fail)(const struct lu_env *env,
+                          struct lfsck_component *com,
+                          bool new_checked);
+
+       int (*lfsck_checkpoint)(const struct lu_env *env,
+                               struct lfsck_component *com,
+                               bool init);
+
+       int (*lfsck_prep)(const struct lu_env *env,
+                         struct lfsck_component *com);
+
+       int (*lfsck_exec_oit)(const struct lu_env *env,
+                             struct lfsck_component *com,
+                             struct dt_object *obj);
+
+       int (*lfsck_exec_dir)(const struct lu_env *env,
+                             struct lfsck_component *com,
+                             struct dt_object *obj,
+                             struct lu_dirent *ent);
+
+       int (*lfsck_post)(const struct lu_env *env,
+                         struct lfsck_component *com,
+                         int result,
+                         bool init);
+
+       int (*lfsck_dump)(const struct lu_env *env,
+                         struct lfsck_component *com,
+                         char *buf,
+                         int len);
+
+       int (*lfsck_double_scan)(const struct lu_env *env,
+                                struct lfsck_component *com);
+};
+
+struct lfsck_component {
+       /* into lfsck_instance::li_list_(scan,double_scan,idle} */
+       cfs_list_t               lc_link;
+
+       /* into lfsck_instance::li_list_dir */
+       cfs_list_t               lc_link_dir;
+       struct rw_semaphore      lc_sem;
+       cfs_atomic_t             lc_ref;
+
+       struct lfsck_position    lc_pos_start;
+       struct lfsck_instance   *lc_lfsck;
+       struct dt_object        *lc_obj;
+       struct lfsck_operations *lc_ops;
+       void                    *lc_file_ram;
+       void                    *lc_file_disk;
+       __u32                    lc_file_size;
+
+       /* How many objects have been checked since last checkpoint. */
+       __u32                    lc_new_checked;
+       unsigned int             lc_journal:1;
+       __u16                    lc_type;
+};
+
+struct lfsck_instance {
+       struct mutex              li_mutex;
+       spinlock_t                li_lock;
+
+       /* Link into the lfsck_instance_list. */
+       cfs_list_t                li_link;
+
+       /* For the components in (first) scanning via otable-based iteration. */
+       cfs_list_t                li_list_scan;
+
+       /* For the components in scanning via directory traversal. Because
+        * directory traversal cannot guarantee all the object be scanned,
+        * so the component in the li_list_dir must be in li_list_scan. */
+       cfs_list_t                li_list_dir;
+
+       /* For the components in double scanning. */
+       cfs_list_t                li_list_double_scan;
+
+       /* For the components those are not scanning now. */
+       cfs_list_t                li_list_idle;
+
+       cfs_atomic_t              li_ref;
+       struct ptlrpc_thread      li_thread;
+
+       /* The time for last checkpoint, jiffies */
+       cfs_time_t                li_time_last_checkpoint;
+
+       /* The time for next checkpoint, jiffies */
+       cfs_time_t                li_time_next_checkpoint;
+
+       struct dt_device         *li_next;
+       struct dt_device         *li_bottom;
+       struct ldlm_namespace    *li_namespace;
+       struct local_oid_storage *li_los;
+       struct dt_object         *li_local_root; /* backend root "/" */
+       struct lu_fid             li_global_root_fid; /* /ROOT */
+       struct dt_object         *li_bookmark_obj;
+       struct lfsck_bookmark     li_bookmark_ram;
+       struct lfsck_bookmark     li_bookmark_disk;
+       struct lfsck_position     li_pos_current;
+
+       /* Obj for otable-based iteration */
+       struct dt_object         *li_obj_oit;
+
+       /* Obj for directory traversal */
+       struct dt_object         *li_obj_dir;
+
+       /* It for otable-based iteration */
+       struct dt_it             *li_di_oit;
+
+       /* It for directory traversal */
+       struct dt_it             *li_di_dir;
+
+       /* Arguments for low layer otable-based iteration. */
+       __u32                     li_args_oit;
+
+       /* Arugments for namespace-based directory traversal. */
+       __u32                     li_args_dir;
+
+       /* Schedule for every N objects. */
+       __u32                     li_sleep_rate;
+
+       /* Sleep N jiffies for each schedule. */
+       __u32                     li_sleep_jif;
+
+       /* How many objects have been scanned since last sleep. */
+       __u32                     li_new_scanned;
+
+       unsigned int              li_paused:1, /* The lfsck is paused. */
+                                 li_oit_over:1, /* oit is finished. */
+                                 li_drop_dryrun:1, /* Ever dryrun, not now. */
+                                 li_master:1, /* Master instance or not. */
+                                 li_current_oit_processed:1;
+};
+
+enum lfsck_linkea_flags {
+       /* The linkea entries does not match the object nlinks. */
+       LLF_UNMATCH_NLINKS      = 0x01,
+
+       /* Fail to repair the multiple-linked objects during the double scan. */
+       LLF_REPAIR_FAILED       = 0x02,
+};
+
+struct lfsck_thread_info {
+       struct lu_name          lti_name;
+       struct lu_buf           lti_buf;
+       struct lu_buf           lti_linkea_buf;
+       struct lu_fid           lti_fid;
+       struct lu_fid           lti_fid2;
+       struct lu_attr          lti_la;
+       /* lti_ent and lti_key must be conjoint,
+        * then lti_ent::lde_name will be lti_key. */
+       struct lu_dirent        lti_ent;
+       char                    lti_key[NAME_MAX + 16];
+};
+
+/* lfsck_lib.c */
+void lfsck_component_cleanup(const struct lu_env *env,
+                            struct lfsck_component *com);
+int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
+                   const char *prefix);
+int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix);
+int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
+                  const char *prefix);
+void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
+                   struct lfsck_position *pos, bool init);
+void lfsck_control_speed(struct lfsck_instance *lfsck);
+int lfsck_reset(const struct lu_env *env, struct lfsck_instance *lfsck,
+               bool init);
+void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
+               bool new_checked);
+int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck);
+int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck);
+int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
+                  struct dt_object *obj);
+int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
+                  struct dt_object *obj, struct lu_dirent *ent);
+int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
+              int result);
+int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck);
+
+/* lfsck_engine.c */
+int lfsck_master_engine(void *args);
+
+/* lfsck_bookmark.c */
+int lfsck_bookmark_store(const struct lu_env *env,
+                        struct lfsck_instance *lfsck);
+int lfsck_bookmark_setup(const struct lu_env *env,
+                        struct lfsck_instance *lfsck);
+
+/* lfsck_namespace.c */
+int lfsck_namespace_setup(const struct lu_env *env,
+                         struct lfsck_instance *lfsck);
+
+extern const char *lfsck_status_names[];
+extern const char *lfsck_flags_names[];
+extern const char *lfsck_param_names[];
+extern struct lu_context_key lfsck_thread_key;
+
+static inline struct lfsck_thread_info *
+lfsck_env_info(const struct lu_env *env)
+{
+       struct lfsck_thread_info *info;
+
+       info = lu_context_key_get(&env->le_ctx, &lfsck_thread_key);
+       LASSERT(info != NULL);
+       return info;
+}
+
+static inline const struct lu_name *
+lfsck_name_get_const(const struct lu_env *env, const void *area, ssize_t len)
+{
+       struct lu_name *lname;
+
+       lname = &lfsck_env_info(env)->lti_name;
+       lname->ln_name = area;
+       lname->ln_namelen = len;
+       return lname;
+}
+
+static inline struct lu_buf *
+lfsck_buf_get(const struct lu_env *env, void *area, ssize_t len)
+{
+       struct lu_buf *buf;
+
+       buf = &lfsck_env_info(env)->lti_buf;
+       buf->lb_buf = area;
+       buf->lb_len = len;
+       return buf;
+}
+
+static inline const struct lu_buf *
+lfsck_buf_get_const(const struct lu_env *env, const void *area, ssize_t len)
+{
+       struct lu_buf *buf;
+
+       buf = &lfsck_env_info(env)->lti_buf;
+       buf->lb_buf = (void *)area;
+       buf->lb_len = len;
+       return buf;
+}
+
+static inline char *lfsck_lfsck2name(struct lfsck_instance *lfsck)
+{
+       return lfsck->li_bottom->dd_lu_dev.ld_obd->obd_name;
+}
+
+static inline const struct lu_fid *lfsck_dto2fid(const struct dt_object *obj)
+{
+       return lu_object_fid(&obj->do_lu);
+}
+
+static inline void lfsck_pos_set_zero(struct lfsck_position *pos)
+{
+       memset(pos, 0, sizeof(*pos));
+}
+
+static inline int lfsck_pos_is_zero(const struct lfsck_position *pos)
+{
+       return pos->lp_oit_cookie == 0 && fid_is_zero(&pos->lp_dir_parent);
+}
+
+static inline int lfsck_pos_is_eq(const struct lfsck_position *pos1,
+                                 const struct lfsck_position *pos2)
+{
+       if (pos1->lp_oit_cookie < pos2->lp_oit_cookie)
+               return -1;
+
+       if (pos1->lp_oit_cookie > pos2->lp_oit_cookie)
+               return 1;
+
+       if (fid_is_zero(&pos1->lp_dir_parent) &&
+           !fid_is_zero(&pos2->lp_dir_parent))
+               return -1;
+
+       if (!fid_is_zero(&pos1->lp_dir_parent) &&
+           fid_is_zero(&pos2->lp_dir_parent))
+               return 1;
+
+       if (fid_is_zero(&pos1->lp_dir_parent) &&
+           fid_is_zero(&pos2->lp_dir_parent))
+               return 0;
+
+       LASSERT(lu_fid_eq(&pos1->lp_dir_parent, &pos2->lp_dir_parent));
+
+       if (pos1->lp_dir_cookie < pos2->lp_dir_cookie)
+               return -1;
+
+       if (pos1->lp_dir_cookie > pos2->lp_dir_cookie)
+               return 1;
+
+       return 0;
+}
+
+static void inline lfsck_position_le_to_cpu(struct lfsck_position *des,
+                                           struct lfsck_position *src)
+{
+       des->lp_oit_cookie = le64_to_cpu(src->lp_oit_cookie);
+       fid_le_to_cpu(&des->lp_dir_parent, &src->lp_dir_parent);
+       des->lp_dir_cookie = le64_to_cpu(src->lp_dir_cookie);
+}
+
+static void inline lfsck_position_cpu_to_le(struct lfsck_position *des,
+                                           struct lfsck_position *src)
+{
+       des->lp_oit_cookie = cpu_to_le64(src->lp_oit_cookie);
+       fid_cpu_to_le(&des->lp_dir_parent, &src->lp_dir_parent);
+       des->lp_dir_cookie = cpu_to_le64(src->lp_dir_cookie);
+}
+
+static inline cfs_umode_t lfsck_object_type(const struct dt_object *obj)
+{
+       return lu_object_attr(&obj->do_lu);
+}
+
+static inline int lfsck_is_dead_obj(const struct dt_object *obj)
+{
+       struct lu_object_header *loh = obj->do_lu.lo_header;
+
+       return !!test_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
+}
+
+static inline struct dt_object *lfsck_object_find(const struct lu_env *env,
+                                                 struct lfsck_instance *lfsck,
+                                                 const struct lu_fid *fid)
+{
+       return lu2dt(lu_object_find_slice(env, dt2lu_dev(lfsck->li_next),
+                    fid, NULL));
+}
+
+static inline struct dt_object *lfsck_object_get(struct dt_object *obj)
+{
+       lu_object_get(&obj->do_lu);
+       return obj;
+}
+
+static inline void lfsck_object_put(const struct lu_env *env,
+                                   struct dt_object *obj)
+{
+       lu_object_put(env, &obj->do_lu);
+}
+
+#endif /* _LFSCK_INTERNAL_H */
diff --git a/lustre/lfsck/lfsck_lib.c b/lustre/lfsck/lfsck_lib.c
new file mode 100644 (file)
index 0000000..83e9c54
--- /dev/null
@@ -0,0 +1,1206 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License version 2 for more details.  A copy is
+ * included in the COPYING file that accompanied this code.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012, 2013, Intel Corporation.
+ */
+/*
+ * lustre/lfsck/lfsck_lib.c
+ *
+ * Author: Fan, Yong <fan.yong@intel.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_LFSCK
+
+#include <libcfs/list.h>
+#include <lu_object.h>
+#include <dt_object.h>
+#include <lustre_lib.h>
+#include <lustre_net.h>
+#include <lustre_lfsck.h>
+#include <lustre/lustre_lfsck_user.h>
+
+#include "lfsck_internal.h"
+
+/* define lfsck thread key */
+LU_KEY_INIT(lfsck, struct lfsck_thread_info);
+
+static void lfsck_key_fini(const struct lu_context *ctx,
+                          struct lu_context_key *key, void *data)
+{
+       struct lfsck_thread_info *info = data;
+
+       lu_buf_free(&info->lti_linkea_buf);
+       OBD_FREE_PTR(info);
+}
+
+LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
+LU_KEY_INIT_GENERIC(lfsck);
+
+static CFS_LIST_HEAD(lfsck_instance_list);
+static DEFINE_SPINLOCK(lfsck_instance_lock);
+
+const char *lfsck_status_names[] = {
+       "init",
+       "scanning-phase1",
+       "scanning-phase2",
+       "completed",
+       "failed",
+       "stopped",
+       "paused",
+       "crashed",
+       NULL
+};
+
+const char *lfsck_flags_names[] = {
+       "scanned-once",
+       "inconsistent",
+       "upgrade",
+       NULL
+};
+
+const char *lfsck_param_names[] = {
+       "failout",
+       "dryrun",
+       NULL
+};
+
+static inline mdsno_t lfsck_dev_idx(struct dt_device *dev)
+{
+       return dev->dd_lu_dev.ld_site->ld_seq_site->ss_node_id;
+}
+
+static inline void lfsck_component_get(struct lfsck_component *com)
+{
+       atomic_inc(&com->lc_ref);
+}
+
+static inline void lfsck_component_put(const struct lu_env *env,
+                                      struct lfsck_component *com)
+{
+       if (atomic_dec_and_test(&com->lc_ref)) {
+               if (com->lc_obj != NULL)
+                       lu_object_put(env, &com->lc_obj->do_lu);
+               if (com->lc_file_ram != NULL)
+                       OBD_FREE(com->lc_file_ram, com->lc_file_size);
+               if (com->lc_file_disk != NULL)
+                       OBD_FREE(com->lc_file_disk, com->lc_file_size);
+               OBD_FREE_PTR(com);
+       }
+}
+
+static inline struct lfsck_component *
+__lfsck_component_find(struct lfsck_instance *lfsck, __u16 type, cfs_list_t *list)
+{
+       struct lfsck_component *com;
+
+       cfs_list_for_each_entry(com, list, lc_link) {
+               if (com->lc_type == type)
+                       return com;
+       }
+       return NULL;
+}
+
+static struct lfsck_component *
+lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
+{
+       struct lfsck_component *com;
+
+       spin_lock(&lfsck->li_lock);
+       com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
+       if (com != NULL)
+               goto unlock;
+
+       com = __lfsck_component_find(lfsck, type,
+                                    &lfsck->li_list_double_scan);
+       if (com != NULL)
+               goto unlock;
+
+       com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
+
+unlock:
+       if (com != NULL)
+               lfsck_component_get(com);
+       spin_unlock(&lfsck->li_lock);
+       return com;
+}
+
+void lfsck_component_cleanup(const struct lu_env *env,
+                            struct lfsck_component *com)
+{
+       if (!cfs_list_empty(&com->lc_link))
+               cfs_list_del_init(&com->lc_link);
+       if (!cfs_list_empty(&com->lc_link_dir))
+               cfs_list_del_init(&com->lc_link_dir);
+
+       lfsck_component_put(env, com);
+}
+
+static void lfsck_instance_cleanup(const struct lu_env *env,
+                                  struct lfsck_instance *lfsck)
+{
+       struct ptlrpc_thread    *thread = &lfsck->li_thread;
+       struct lfsck_component  *com;
+       ENTRY;
+
+       LASSERT(list_empty(&lfsck->li_link));
+       LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
+
+       if (lfsck->li_obj_oit != NULL) {
+               lu_object_put(env, &lfsck->li_obj_oit->do_lu);
+               lfsck->li_obj_oit = NULL;
+       }
+
+       LASSERT(lfsck->li_obj_dir == NULL);
+
+       while (!cfs_list_empty(&lfsck->li_list_scan)) {
+               com = cfs_list_entry(lfsck->li_list_scan.next,
+                                    struct lfsck_component,
+                                    lc_link);
+               lfsck_component_cleanup(env, com);
+       }
+
+       LASSERT(cfs_list_empty(&lfsck->li_list_dir));
+
+       while (!cfs_list_empty(&lfsck->li_list_double_scan)) {
+               com = cfs_list_entry(lfsck->li_list_double_scan.next,
+                                    struct lfsck_component,
+                                    lc_link);
+               lfsck_component_cleanup(env, com);
+       }
+
+       while (!cfs_list_empty(&lfsck->li_list_idle)) {
+               com = cfs_list_entry(lfsck->li_list_idle.next,
+                                    struct lfsck_component,
+                                    lc_link);
+               lfsck_component_cleanup(env, com);
+       }
+
+       if (lfsck->li_bookmark_obj != NULL) {
+               lu_object_put(env, &lfsck->li_bookmark_obj->do_lu);
+               lfsck->li_bookmark_obj = NULL;
+       }
+
+       if (lfsck->li_los != NULL) {
+               local_oid_storage_fini(env, lfsck->li_los);
+               lfsck->li_los = NULL;
+       }
+
+       if (lfsck->li_local_root != NULL) {
+               lu_object_put(env, &lfsck->li_local_root->do_lu);
+               lfsck->li_local_root = NULL;
+       }
+
+       OBD_FREE_PTR(lfsck);
+}
+
+static inline void lfsck_instance_get(struct lfsck_instance *lfsck)
+{
+       atomic_inc(&lfsck->li_ref);
+}
+
+static inline void lfsck_instance_put(const struct lu_env *env,
+                                     struct lfsck_instance *lfsck)
+{
+       if (atomic_dec_and_test(&lfsck->li_ref))
+               lfsck_instance_cleanup(env, lfsck);
+}
+
+static inline struct lfsck_instance *lfsck_instance_find(struct dt_device *key,
+                                                        bool ref, bool unlink)
+{
+       struct lfsck_instance *lfsck;
+
+       spin_lock(&lfsck_instance_lock);
+       cfs_list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
+               if (lfsck->li_bottom == key) {
+                       if (ref)
+                               lfsck_instance_get(lfsck);
+                       if (unlink)
+                               list_del_init(&lfsck->li_link);
+                       spin_unlock(&lfsck_instance_lock);
+                       return lfsck;
+               }
+       }
+       spin_unlock(&lfsck_instance_lock);
+       return NULL;
+}
+
+static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
+{
+       struct lfsck_instance *tmp;
+
+       spin_lock(&lfsck_instance_lock);
+       cfs_list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
+               if (lfsck->li_bottom == tmp->li_bottom) {
+                       spin_unlock(&lfsck_instance_lock);
+                       return -EEXIST;
+               }
+       }
+
+       cfs_list_add_tail(&lfsck->li_link, &lfsck_instance_list);
+       spin_unlock(&lfsck_instance_lock);
+       return 0;
+}
+
+int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
+                   const char *prefix)
+{
+       int save = *len;
+       int flag;
+       int rc;
+       int i;
+
+       rc = snprintf(*buf, *len, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
+       if (rc <= 0)
+               return -ENOSPC;
+
+       *buf += rc;
+       *len -= rc;
+       for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
+               if (flag & bits) {
+                       bits &= ~flag;
+                       rc = snprintf(*buf, *len, "%s%c", names[i],
+                                     bits != 0 ? ',' : '\n');
+                       if (rc <= 0)
+                               return -ENOSPC;
+
+                       *buf += rc;
+                       *len -= rc;
+               }
+       }
+       return save - *len;
+}
+
+int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
+{
+       int rc;
+
+       if (time != 0)
+               rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
+                             cfs_time_current_sec() - time);
+       else
+               rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
+       if (rc <= 0)
+               return -ENOSPC;
+
+       *buf += rc;
+       *len -= rc;
+       return rc;
+}
+
+int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
+                  const char *prefix)
+{
+       int rc;
+
+       if (fid_is_zero(&pos->lp_dir_parent)) {
+               if (pos->lp_oit_cookie == 0)
+                       rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
+                                     prefix);
+               else
+                       rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
+                                     prefix, pos->lp_oit_cookie);
+       } else {
+               rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
+                             prefix, pos->lp_oit_cookie,
+                             PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
+       }
+       if (rc <= 0)
+               return -ENOSPC;
+
+       *buf += rc;
+       *len -= rc;
+       return rc;
+}
+
+void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
+                   struct lfsck_position *pos, bool init)
+{
+       const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
+
+       spin_lock(&lfsck->li_lock);
+       if (unlikely(lfsck->li_di_oit == NULL)) {
+               spin_unlock(&lfsck->li_lock);
+               memset(pos, 0, sizeof(*pos));
+               return;
+       }
+
+       pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
+       if (!lfsck->li_current_oit_processed && !init)
+               pos->lp_oit_cookie--;
+
+       LASSERT(pos->lp_oit_cookie > 0);
+
+       if (lfsck->li_di_dir != NULL) {
+               struct dt_object *dto = lfsck->li_obj_dir;
+
+               pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
+                                                       lfsck->li_di_dir);
+
+               if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
+                       fid_zero(&pos->lp_dir_parent);
+                       pos->lp_dir_cookie = 0;
+               } else {
+                       pos->lp_dir_parent = *lu_object_fid(&dto->do_lu);
+               }
+       } else {
+               fid_zero(&pos->lp_dir_parent);
+               pos->lp_dir_cookie = 0;
+       }
+       spin_unlock(&lfsck->li_lock);
+}
+
+static void __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
+{
+       lfsck->li_bookmark_ram.lb_speed_limit = limit;
+       if (limit != LFSCK_SPEED_NO_LIMIT) {
+               if (limit > CFS_HZ) {
+                       lfsck->li_sleep_rate = limit / CFS_HZ;
+                       lfsck->li_sleep_jif = 1;
+               } else {
+                       lfsck->li_sleep_rate = 1;
+                       lfsck->li_sleep_jif = CFS_HZ / limit;
+               }
+       } else {
+               lfsck->li_sleep_jif = 0;
+               lfsck->li_sleep_rate = 0;
+       }
+}
+
+void lfsck_control_speed(struct lfsck_instance *lfsck)
+{
+       struct ptlrpc_thread *thread = &lfsck->li_thread;
+       struct l_wait_info    lwi;
+
+       if (lfsck->li_sleep_jif > 0 &&
+           lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
+               spin_lock(&lfsck->li_lock);
+               if (likely(lfsck->li_sleep_jif > 0 &&
+                          lfsck->li_new_scanned >= lfsck->li_sleep_rate)) {
+                       lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
+                                              LWI_ON_SIGNAL_NOOP, NULL);
+                       spin_unlock(&lfsck->li_lock);
+
+                       l_wait_event(thread->t_ctl_waitq,
+                                    !thread_is_running(thread),
+                                    &lwi);
+                       lfsck->li_new_scanned = 0;
+               } else {
+                       spin_unlock(&lfsck->li_lock);
+               }
+       }
+}
+
+static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
+                           struct lu_fid *fid)
+{
+       if (unlikely(!S_ISDIR(lfsck_object_type(obj)) ||
+                    !dt_try_as_dir(env, obj)))
+               return -ENOTDIR;
+
+       return dt_lookup(env, obj, (struct dt_rec *)fid,
+                        (const struct dt_key *)"..", BYPASS_CAPA);
+}
+
+static int lfsck_needs_scan_dir(const struct lu_env *env,
+                               struct lfsck_instance *lfsck,
+                               struct dt_object *obj)
+{
+       struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
+       int            depth = 0;
+       int            rc;
+
+       if (!lfsck->li_master || !S_ISDIR(lfsck_object_type(obj)) ||
+           cfs_list_empty(&lfsck->li_list_dir))
+              RETURN(0);
+
+       while (1) {
+               /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
+                *      which is the agent directory to manage the objects
+                *      which name entries reside on remote MDTs. Related
+                *      consistency verification will be processed in LFSCK
+                *      phase III. */
+               if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
+                       if (depth > 0)
+                               lfsck_object_put(env, obj);
+                       return 1;
+               }
+
+               /* .lustre doesn't contain "real" user objects, no need lfsck */
+               if (fid_is_dot_lustre(lfsck_dto2fid(obj))) {
+                       if (depth > 0)
+                               lfsck_object_put(env, obj);
+                       return 0;
+               }
+
+               dt_read_lock(env, obj, MOR_TGT_CHILD);
+               if (unlikely(lfsck_is_dead_obj(obj))) {
+                       dt_read_unlock(env, obj);
+                       if (depth > 0)
+                               lfsck_object_put(env, obj);
+                       return 0;
+               }
+
+               rc = dt_xattr_get(env, obj,
+                                 lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
+                                 BYPASS_CAPA);
+               dt_read_unlock(env, obj);
+               if (rc >= 0) {
+                       if (depth > 0)
+                               lfsck_object_put(env, obj);
+                       return 1;
+               }
+
+               if (rc < 0 && rc != -ENODATA) {
+                       if (depth > 0)
+                               lfsck_object_put(env, obj);
+                       return rc;
+               }
+
+               rc = lfsck_parent_fid(env, obj, fid);
+               if (depth > 0)
+                       lfsck_object_put(env, obj);
+               if (rc != 0)
+                       return rc;
+
+               if (unlikely(lu_fid_eq(fid,
+                                      lfsck_dto2fid(lfsck->li_local_root))))
+                       return 0;
+
+               obj = lfsck_object_find(env, lfsck, fid);
+               if (obj == NULL)
+                       return 0;
+               else if (IS_ERR(obj))
+                       return PTR_ERR(obj);
+
+               if (!dt_object_exists(obj)) {
+                       lfsck_object_put(env, obj);
+                       return 0;
+               }
+
+               /* Currently, only client visible directory can be remote. */
+               if (dt_object_remote(obj)) {
+                       lfsck_object_put(env, obj);
+                       return 1;
+               }
+
+               depth++;
+       }
+       return 0;
+}
+
+/* LFSCK wrap functions */
+
+void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
+               bool new_checked)
+{
+       struct lfsck_component *com;
+
+       cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
+               com->lc_ops->lfsck_fail(env, com, new_checked);
+       }
+}
+
+int lfsck_checkpoint(const struct lu_env *env, struct lfsck_instance *lfsck)
+{
+       struct lfsck_component *com;
+       int                     rc;
+
+       if (likely(cfs_time_beforeq(cfs_time_current(),
+                                   lfsck->li_time_next_checkpoint)))
+               return 0;
+
+       lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
+       cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
+               rc = com->lc_ops->lfsck_checkpoint(env, com, false);
+               if (rc != 0)
+                       return rc;;
+       }
+
+       lfsck->li_time_last_checkpoint = cfs_time_current();
+       lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
+                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
+       return 0;
+}
+
+int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck)
+{
+       struct dt_object       *obj     = NULL;
+       struct lfsck_component *com;
+       struct lfsck_component *next;
+       struct lfsck_position  *pos     = NULL;
+       const struct dt_it_ops *iops    =
+                               &lfsck->li_obj_oit->do_index_ops->dio_it;
+       struct dt_it           *di;
+       int                     rc;
+       ENTRY;
+
+       LASSERT(lfsck->li_obj_dir == NULL);
+       LASSERT(lfsck->li_di_dir == NULL);
+
+       lfsck->li_current_oit_processed = 0;
+       cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
+               com->lc_new_checked = 0;
+               if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+                       com->lc_journal = 0;
+
+               rc = com->lc_ops->lfsck_prep(env, com);
+               if (rc != 0)
+                       RETURN(rc);
+
+               if ((pos == NULL) ||
+                   (!lfsck_pos_is_zero(&com->lc_pos_start) &&
+                    lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
+                       pos = &com->lc_pos_start;
+       }
+
+       /* Init otable-based iterator. */
+       if (pos == NULL) {
+               rc = iops->load(env, lfsck->li_di_oit, 0);
+               if (rc > 0) {
+                       lfsck->li_oit_over = 1;
+                       rc = 0;
+               }
+
+               GOTO(out, rc);
+       }
+
+       rc = iops->load(env, lfsck->li_di_oit, pos->lp_oit_cookie);
+       if (rc < 0)
+               GOTO(out, rc);
+       else if (rc > 0)
+               lfsck->li_oit_over = 1;
+
+       if (!lfsck->li_master || fid_is_zero(&pos->lp_dir_parent))
+               GOTO(out, rc = 0);
+
+       /* Find the directory for namespace-based traverse. */
+       obj = lfsck_object_find(env, lfsck, &pos->lp_dir_parent);
+       if (obj == NULL)
+               GOTO(out, rc = 0);
+       else if (IS_ERR(obj))
+               RETURN(PTR_ERR(obj));
+
+       /* XXX: Currently, skip remote object, the consistency for
+        *      remote object will be processed in LFSCK phase III. */
+       if (!dt_object_exists(obj) || dt_object_remote(obj) ||
+           unlikely(!S_ISDIR(lfsck_object_type(obj))))
+               GOTO(out, rc = 0);
+
+       if (unlikely(!dt_try_as_dir(env, obj)))
+               GOTO(out, rc = -ENOTDIR);
+
+       /* Init the namespace-based directory traverse. */
+       iops = &obj->do_index_ops->dio_it;
+       di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
+       if (IS_ERR(di))
+               GOTO(out, rc = PTR_ERR(di));
+
+       LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
+
+       rc = iops->load(env, di, pos->lp_dir_cookie);
+       if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
+               rc = iops->next(env, di);
+       else if (rc > 0)
+               rc = 0;
+
+       if (rc != 0) {
+               iops->put(env, di);
+               iops->fini(env, di);
+               GOTO(out, rc);
+       }
+
+       lfsck->li_obj_dir = lfsck_object_get(obj);
+       spin_lock(&lfsck->li_lock);
+       lfsck->li_di_dir = di;
+       spin_unlock(&lfsck->li_lock);
+
+       GOTO(out, rc = 0);
+
+out:
+       if (obj != NULL)
+               lfsck_object_put(env, obj);
+
+       if (rc < 0) {
+               cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan,
+                                            lc_link)
+                       com->lc_ops->lfsck_post(env, com, rc, true);
+
+               return rc;
+       }
+
+       rc = 0;
+       lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, true);
+       cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
+               rc = com->lc_ops->lfsck_checkpoint(env, com, true);
+               if (rc != 0)
+                       break;
+       }
+
+       lfsck->li_time_last_checkpoint = cfs_time_current();
+       lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
+                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
+       return rc;
+}
+
+int lfsck_exec_oit(const struct lu_env *env, struct lfsck_instance *lfsck,
+                  struct dt_object *obj)
+{
+       struct lfsck_component *com;
+       const struct dt_it_ops *iops;
+       struct dt_it           *di;
+       int                     rc;
+       ENTRY;
+
+       LASSERT(lfsck->li_obj_dir == NULL);
+
+       cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
+               rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       rc = lfsck_needs_scan_dir(env, lfsck, obj);
+       if (rc <= 0)
+               GOTO(out, rc);
+
+       if (unlikely(!dt_try_as_dir(env, obj)))
+               GOTO(out, rc = -ENOTDIR);
+
+       iops = &obj->do_index_ops->dio_it;
+       di = iops->init(env, obj, lfsck->li_args_dir, BYPASS_CAPA);
+       if (IS_ERR(di))
+               GOTO(out, rc = PTR_ERR(di));
+
+       rc = iops->load(env, di, 0);
+       if (rc == 0)
+               rc = iops->next(env, di);
+       else if (rc > 0)
+               rc = 0;
+
+       if (rc != 0) {
+               iops->put(env, di);
+               iops->fini(env, di);
+               GOTO(out, rc);
+       }
+
+       lfsck->li_obj_dir = lfsck_object_get(obj);
+       spin_lock(&lfsck->li_lock);
+       lfsck->li_di_dir = di;
+       spin_unlock(&lfsck->li_lock);
+
+       GOTO(out, rc = 0);
+
+out:
+       if (rc < 0)
+               lfsck_fail(env, lfsck, false);
+       return (rc > 0 ? 0 : rc);
+}
+
+int lfsck_exec_dir(const struct lu_env *env, struct lfsck_instance *lfsck,
+                  struct dt_object *obj, struct lu_dirent *ent)
+{
+       struct lfsck_component *com;
+       int                     rc;
+
+       cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
+               rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
+               if (rc != 0)
+                       return rc;
+       }
+       return 0;
+}
+
+int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
+              int result)
+{
+       struct lfsck_component *com;
+       struct lfsck_component *next;
+       int                     rc;
+
+       lfsck_pos_fill(env, lfsck, &lfsck->li_pos_current, false);
+       cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
+               rc = com->lc_ops->lfsck_post(env, com, result, false);
+               if (rc != 0)
+                       return rc;
+       }
+
+       lfsck->li_time_last_checkpoint = cfs_time_current();
+       lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
+                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
+       return result;
+}
+
+int lfsck_double_scan(const struct lu_env *env, struct lfsck_instance *lfsck)
+{
+       struct lfsck_component *com;
+       struct lfsck_component *next;
+       int                     rc;
+
+       cfs_list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
+                                    lc_link) {
+               if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+                       com->lc_journal = 0;
+
+               rc = com->lc_ops->lfsck_double_scan(env, com);
+               if (rc != 0)
+                       return rc;
+       }
+       return 0;
+}
+
+/* external interfaces */
+
+int lfsck_get_speed(struct dt_device *key, void *buf, int len)
+{
+       struct lu_env           env;
+       struct lfsck_instance  *lfsck;
+       int                     rc;
+       ENTRY;
+
+       lfsck = lfsck_instance_find(key, true, false);
+       if (unlikely(lfsck == NULL))
+               RETURN(-ENODEV);
+
+       rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       rc = snprintf(buf, len, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
+       lu_env_fini(&env);
+
+       GOTO(out, rc);
+
+out:
+       lfsck_instance_put(&env, lfsck);
+       return rc;
+}
+EXPORT_SYMBOL(lfsck_get_speed);
+
+int lfsck_set_speed(struct dt_device *key, int val)
+{
+       struct lu_env           env;
+       struct lfsck_instance  *lfsck;
+       int                     rc;
+       ENTRY;
+
+       lfsck = lfsck_instance_find(key, true, false);
+       if (unlikely(lfsck == NULL))
+               RETURN(-ENODEV);
+
+       rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       mutex_lock(&lfsck->li_mutex);
+       __lfsck_set_speed(lfsck, val);
+       rc = lfsck_bookmark_store(&env, lfsck);
+       mutex_unlock(&lfsck->li_mutex);
+       lu_env_fini(&env);
+
+       GOTO(out, rc);
+
+out:
+       lfsck_instance_put(&env, lfsck);
+       return rc;
+}
+EXPORT_SYMBOL(lfsck_set_speed);
+
+int lfsck_dump(struct dt_device *key, void *buf, int len, __u16 type)
+{
+       struct lu_env           env;
+       struct lfsck_instance  *lfsck;
+       struct lfsck_component *com   = NULL;
+       int                     rc;
+       ENTRY;
+
+       lfsck = lfsck_instance_find(key, true, false);
+       if (unlikely(lfsck == NULL))
+               RETURN(-ENODEV);
+
+       com = lfsck_component_find(lfsck, type);
+       if (com == NULL)
+               GOTO(out, rc = -ENOTSUPP);
+
+       rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       rc = com->lc_ops->lfsck_dump(&env, com, buf, len);
+       lu_env_fini(&env);
+
+       GOTO(out, rc);
+
+out:
+       if (com != NULL)
+               lfsck_component_put(&env, com);
+       lfsck_instance_put(&env, lfsck);
+       return rc;
+}
+EXPORT_SYMBOL(lfsck_dump);
+
+int lfsck_start(const struct lu_env *env, struct dt_device *key,
+               struct lfsck_start_param *lsp)
+{
+       struct lfsck_start     *start  = lsp->lsp_start;
+       struct lfsck_instance  *lfsck;
+       struct lfsck_bookmark  *bk;
+       struct ptlrpc_thread   *thread;
+       struct lfsck_component *com;
+       struct l_wait_info      lwi    = { 0 };
+       bool                    dirty  = false;
+       int                     rc     = 0;
+       __u16                   valid  = 0;
+       __u16                   flags  = 0;
+       ENTRY;
+
+       lfsck = lfsck_instance_find(key, true, false);
+       if (unlikely(lfsck == NULL))
+               RETURN(-ENODEV);
+
+       /* start == NULL means auto trigger paused LFSCK. */
+       if ((start == NULL) &&
+           (cfs_list_empty(&lfsck->li_list_scan) ||
+            OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
+               GOTO(put, rc = 0);
+
+       bk = &lfsck->li_bookmark_ram;
+       thread = &lfsck->li_thread;
+       mutex_lock(&lfsck->li_mutex);
+       spin_lock(&lfsck->li_lock);
+       if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
+               spin_unlock(&lfsck->li_lock);
+               GOTO(out, rc = -EALREADY);
+       }
+
+       spin_unlock(&lfsck->li_lock);
+
+       lfsck->li_namespace = lsp->lsp_namespace;
+       lfsck->li_paused = 0;
+       lfsck->li_oit_over = 0;
+       lfsck->li_drop_dryrun = 0;
+       lfsck->li_new_scanned = 0;
+
+       /* For auto trigger. */
+       if (start == NULL)
+               goto trigger;
+
+       start->ls_version = bk->lb_version;
+       if (start->ls_valid & LSV_SPEED_LIMIT) {
+               __lfsck_set_speed(lfsck, start->ls_speed_limit);
+               dirty = true;
+       }
+
+       if (start->ls_valid & LSV_ERROR_HANDLE) {
+               valid |= DOIV_ERROR_HANDLE;
+               if (start->ls_flags & LPF_FAILOUT)
+                       flags |= DOIF_FAILOUT;
+
+               if ((start->ls_flags & LPF_FAILOUT) &&
+                   !(bk->lb_param & LPF_FAILOUT)) {
+                       bk->lb_param |= LPF_FAILOUT;
+                       dirty = true;
+               } else if (!(start->ls_flags & LPF_FAILOUT) &&
+                          (bk->lb_param & LPF_FAILOUT)) {
+                       bk->lb_param &= ~LPF_FAILOUT;
+                       dirty = true;
+               }
+       }
+
+       if (start->ls_valid & LSV_DRYRUN) {
+               if ((start->ls_flags & LPF_DRYRUN) &&
+                   !(bk->lb_param & LPF_DRYRUN)) {
+                       bk->lb_param |= LPF_DRYRUN;
+                       dirty = true;
+               } else if (!(start->ls_flags & LPF_DRYRUN) &&
+                          (bk->lb_param & LPF_DRYRUN)) {
+                       bk->lb_param &= ~LPF_DRYRUN;
+                       lfsck->li_drop_dryrun = 1;
+                       dirty = true;
+               }
+       }
+
+       if (dirty) {
+               rc = lfsck_bookmark_store(env, lfsck);
+               if (rc != 0)
+                       GOTO(out, rc);
+       }
+
+       if (start->ls_flags & LPF_RESET)
+               flags |= DOIF_RESET;
+
+       if (start->ls_active != 0) {
+               struct lfsck_component *next;
+               __u16 type = 1;
+
+               if (start->ls_active == LFSCK_TYPES_ALL)
+                       start->ls_active = LFSCK_TYPES_SUPPORTED;
+
+               if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
+                       start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
+                       GOTO(out, rc = -ENOTSUPP);
+               }
+
+               cfs_list_for_each_entry_safe(com, next,
+                                            &lfsck->li_list_scan, lc_link) {
+                       if (!(com->lc_type & start->ls_active)) {
+                               rc = com->lc_ops->lfsck_post(env, com, 0,
+                                                            false);
+                               if (rc != 0)
+                                       GOTO(out, rc);
+                       }
+               }
+
+               while (start->ls_active != 0) {
+                       if (type & start->ls_active) {
+                               com = __lfsck_component_find(lfsck, type,
+                                                       &lfsck->li_list_idle);
+                               if (com != NULL) {
+                                       /* The component status will be updated
+                                        * when its prep() is called later by
+                                        * the LFSCK main engine. */
+                                       cfs_list_del_init(&com->lc_link);
+                                       cfs_list_add_tail(&com->lc_link,
+                                                         &lfsck->li_list_scan);
+                               }
+                               start->ls_active &= ~type;
+                       }
+                       type <<= 1;
+               }
+       }
+
+       cfs_list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
+               start->ls_active |= com->lc_type;
+               if (flags & DOIF_RESET) {
+                       rc = com->lc_ops->lfsck_reset(env, com, false);
+                       if (rc != 0)
+                               GOTO(out, rc);
+               }
+       }
+
+trigger:
+       lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
+       if (bk->lb_param & LPF_DRYRUN)
+               lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
+
+       if (bk->lb_param & LPF_FAILOUT) {
+               valid |= DOIV_ERROR_HANDLE;
+               flags |= DOIF_FAILOUT;
+       }
+
+       if (!cfs_list_empty(&lfsck->li_list_scan))
+               flags |= DOIF_OUTUSED;
+
+       lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
+       thread_set_flags(thread, 0);
+       if (lfsck->li_master)
+               rc = cfs_create_thread(lfsck_master_engine, lfsck, 0);
+       if (rc < 0)
+               CERROR("%s: cannot start LFSCK thread, rc = %d\n",
+                      lfsck_lfsck2name(lfsck), rc);
+       else
+               l_wait_event(thread->t_ctl_waitq,
+                            thread_is_running(thread) ||
+                            thread_is_stopped(thread),
+                            &lwi);
+
+       GOTO(out, rc = 0);
+
+out:
+       mutex_unlock(&lfsck->li_mutex);
+put:
+       lfsck_instance_put(env, lfsck);
+       return (rc < 0 ? rc : 0);
+}
+EXPORT_SYMBOL(lfsck_start);
+
+int lfsck_stop(const struct lu_env *env, struct dt_device *key, bool pause)
+{
+       struct lfsck_instance   *lfsck;
+       struct ptlrpc_thread    *thread;
+       struct l_wait_info       lwi    = { 0 };
+       ENTRY;
+
+       lfsck = lfsck_instance_find(key, true, false);
+       if (unlikely(lfsck == NULL))
+               RETURN(-ENODEV);
+
+       thread = &lfsck->li_thread;
+       mutex_lock(&lfsck->li_mutex);
+       spin_lock(&lfsck->li_lock);
+       if (thread_is_init(thread) || thread_is_stopped(thread)) {
+               spin_unlock(&lfsck->li_lock);
+               mutex_unlock(&lfsck->li_mutex);
+               lfsck_instance_put(env, lfsck);
+               RETURN(-EALREADY);
+       }
+
+       if (pause)
+               lfsck->li_paused = 1;
+       thread_set_flags(thread, SVC_STOPPING);
+       /* The LFSCK thread may be sleeping on low layer wait queue,
+        * wake it up. */
+       if (likely(lfsck->li_di_oit != NULL))
+               lfsck->li_obj_oit->do_index_ops->dio_it.put(env,
+                                                           lfsck->li_di_oit);
+       spin_unlock(&lfsck->li_lock);
+
+       cfs_waitq_broadcast(&thread->t_ctl_waitq);
+       l_wait_event(thread->t_ctl_waitq,
+                    thread_is_stopped(thread),
+                    &lwi);
+       mutex_unlock(&lfsck->li_mutex);
+       lfsck_instance_put(env, lfsck);
+
+       RETURN(0);
+}
+EXPORT_SYMBOL(lfsck_stop);
+
+int lfsck_register(const struct lu_env *env, struct dt_device *key,
+                  struct dt_device *next, bool master)
+{
+       struct lfsck_instance   *lfsck;
+       struct dt_object        *root;
+       struct dt_object        *obj;
+       struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
+       int                      rc;
+       ENTRY;
+
+       lfsck = lfsck_instance_find(key, false, false);
+       if (unlikely(lfsck != NULL))
+               RETURN(-EEXIST);
+
+       OBD_ALLOC_PTR(lfsck);
+       if (lfsck == NULL)
+               RETURN(-ENOMEM);
+
+       mutex_init(&lfsck->li_mutex);
+       spin_lock_init(&lfsck->li_lock);
+       CFS_INIT_LIST_HEAD(&lfsck->li_link);
+       CFS_INIT_LIST_HEAD(&lfsck->li_list_scan);
+       CFS_INIT_LIST_HEAD(&lfsck->li_list_dir);
+       CFS_INIT_LIST_HEAD(&lfsck->li_list_double_scan);
+       CFS_INIT_LIST_HEAD(&lfsck->li_list_idle);
+       atomic_set(&lfsck->li_ref, 1);
+       cfs_waitq_init(&lfsck->li_thread.t_ctl_waitq);
+       lfsck->li_next = next;
+       lfsck->li_bottom = key;
+
+       fid->f_seq = FID_SEQ_LOCAL_NAME;
+       fid->f_oid = 1;
+       fid->f_ver = 0;
+       rc = local_oid_storage_init(env, lfsck->li_bottom, fid, &lfsck->li_los);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       rc = dt_root_get(env, key, fid);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       root = dt_locate(env, lfsck->li_bottom, fid);
+       if (IS_ERR(root))
+               GOTO(out, rc = PTR_ERR(root));
+
+       lfsck->li_local_root = root;
+        dt_try_as_dir(env, root);
+       if (master) {
+               lfsck->li_master = 1;
+               if (lfsck_dev_idx(lfsck->li_bottom) == 0) {
+                       rc = dt_lookup(env, root,
+                               (struct dt_rec *)(&lfsck->li_global_root_fid),
+                               (const struct dt_key *)"ROOT", BYPASS_CAPA);
+                       if (rc != 0)
+                               GOTO(out, rc);
+               }
+       }
+
+       fid->f_seq = FID_SEQ_LOCAL_FILE;
+       fid->f_oid = OTABLE_IT_OID;
+       fid->f_ver = 0;
+       obj = dt_locate(env, lfsck->li_bottom, fid);
+       if (IS_ERR(obj))
+               GOTO(out, rc = PTR_ERR(obj));
+
+       lfsck->li_obj_oit = obj;
+       rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
+       if (rc != 0) {
+               if (rc == -ENOTSUPP)
+                       GOTO(add, rc = 0);
+
+               GOTO(out, rc);
+       }
+
+       rc = lfsck_bookmark_setup(env, lfsck);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       if (master) {
+               rc = lfsck_namespace_setup(env, lfsck);
+               if (rc < 0)
+                       GOTO(out, rc);
+       }
+
+       /* XXX: more LFSCK components initialization to be added here. */
+
+add:
+       rc = lfsck_instance_add(lfsck);
+out:
+       if (rc != 0)
+               lfsck_instance_cleanup(env, lfsck);
+       return rc;
+}
+EXPORT_SYMBOL(lfsck_register);
+
+void lfsck_degister(const struct lu_env *env, struct dt_device *key)
+{
+       struct lfsck_instance *lfsck;
+
+       lfsck = lfsck_instance_find(key, false, true);
+       if (lfsck != NULL)
+               lfsck_instance_put(env, lfsck);
+}
+EXPORT_SYMBOL(lfsck_degister);
+
+static int __init lfsck_init(void)
+{
+       int rc;
+
+       lfsck_key_init_generic(&lfsck_thread_key, NULL);
+       rc = lu_context_key_register(&lfsck_thread_key);
+       return rc;
+}
+
+static void __exit lfsck_exit(void)
+{
+       LASSERT(cfs_list_empty(&lfsck_instance_list));
+
+       lu_context_key_degister(&lfsck_thread_key);
+}
+
+MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
+MODULE_DESCRIPTION("LFSCK");
+MODULE_LICENSE("GPL");
+
+cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);
diff --git a/lustre/lfsck/lfsck_namespace.c b/lustre/lfsck/lfsck_namespace.c
new file mode 100644 (file)
index 0000000..1ebb714
--- /dev/null
@@ -0,0 +1,1541 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License version 2 for more details.  A copy is
+ * included in the COPYING file that accompanied this code.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012, 2013, Intel Corporation.
+ */
+/*
+ * lustre/lfsck/lfsck_namespace.c
+ *
+ * Author: Fan, Yong <fan.yong@intel.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_LFSCK
+
+#include <lustre/lustre_idl.h>
+#include <lu_object.h>
+#include <dt_object.h>
+#include <lustre_linkea.h>
+#include <lustre_fid.h>
+#include <lustre_lib.h>
+#include <lustre_net.h>
+#include <lustre/lustre_user.h>
+
+#include "lfsck_internal.h"
+
+#define LFSCK_NAMESPACE_MAGIC  0xA0629D03
+
+static const char lfsck_namespace_name[] = "lfsck_namespace";
+
+static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *des,
+                                     struct lfsck_namespace *src)
+{
+       des->ln_magic = le32_to_cpu(src->ln_magic);
+       des->ln_status = le32_to_cpu(src->ln_status);
+       des->ln_flags = le32_to_cpu(src->ln_flags);
+       des->ln_success_count = le32_to_cpu(src->ln_success_count);
+       des->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
+       des->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
+       des->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
+       des->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
+       des->ln_time_last_checkpoint =
+                               le64_to_cpu(src->ln_time_last_checkpoint);
+       lfsck_position_le_to_cpu(&des->ln_pos_latest_start,
+                                &src->ln_pos_latest_start);
+       lfsck_position_le_to_cpu(&des->ln_pos_last_checkpoint,
+                                &src->ln_pos_last_checkpoint);
+       lfsck_position_le_to_cpu(&des->ln_pos_first_inconsistent,
+                                &src->ln_pos_first_inconsistent);
+       des->ln_items_checked = le64_to_cpu(src->ln_items_checked);
+       des->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
+       des->ln_items_failed = le64_to_cpu(src->ln_items_failed);
+       des->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
+       des->ln_mlinked_checked = le64_to_cpu(src->ln_mlinked_checked);
+       des->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
+       des->ln_objs_repaired_phase2 =
+                               le64_to_cpu(src->ln_objs_repaired_phase2);
+       des->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
+       des->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
+       des->ln_objs_lost_found = le64_to_cpu(src->ln_objs_lost_found);
+       fid_le_to_cpu(&des->ln_fid_latest_scanned_phase2,
+                     &src->ln_fid_latest_scanned_phase2);
+}
+
+static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *des,
+                                     struct lfsck_namespace *src)
+{
+       des->ln_magic = cpu_to_le32(src->ln_magic);
+       des->ln_status = cpu_to_le32(src->ln_status);
+       des->ln_flags = cpu_to_le32(src->ln_flags);
+       des->ln_success_count = cpu_to_le32(src->ln_success_count);
+       des->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
+       des->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
+       des->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
+       des->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
+       des->ln_time_last_checkpoint =
+                               cpu_to_le64(src->ln_time_last_checkpoint);
+       lfsck_position_cpu_to_le(&des->ln_pos_latest_start,
+                                &src->ln_pos_latest_start);
+       lfsck_position_cpu_to_le(&des->ln_pos_last_checkpoint,
+                                &src->ln_pos_last_checkpoint);
+       lfsck_position_cpu_to_le(&des->ln_pos_first_inconsistent,
+                                &src->ln_pos_first_inconsistent);
+       des->ln_items_checked = cpu_to_le64(src->ln_items_checked);
+       des->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
+       des->ln_items_failed = cpu_to_le64(src->ln_items_failed);
+       des->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
+       des->ln_mlinked_checked = cpu_to_le64(src->ln_mlinked_checked);
+       des->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
+       des->ln_objs_repaired_phase2 =
+                               cpu_to_le64(src->ln_objs_repaired_phase2);
+       des->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
+       des->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
+       des->ln_objs_lost_found = cpu_to_le64(src->ln_objs_lost_found);
+       fid_cpu_to_le(&des->ln_fid_latest_scanned_phase2,
+                     &src->ln_fid_latest_scanned_phase2);
+}
+
+/**
+ * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
+ * \retval 0: succeed.
+ * \retval -ve: failed cases.
+ */
+static int lfsck_namespace_load(const struct lu_env *env,
+                               struct lfsck_component *com)
+{
+       int len = com->lc_file_size;
+       int rc;
+
+       rc = dt_xattr_get(env, com->lc_obj,
+                         lfsck_buf_get(env, com->lc_file_disk, len),
+                         XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
+       if (rc == len) {
+               struct lfsck_namespace *ns = com->lc_file_ram;
+
+               lfsck_namespace_le_to_cpu(ns,
+                               (struct lfsck_namespace *)com->lc_file_disk);
+               if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
+                       CWARN("%.16s: invalid lfsck_namespace magic "
+                             "0x%x != 0x%x\n",
+                             lfsck_lfsck2name(com->lc_lfsck),
+                             ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
+                       rc = 1;
+               } else {
+                       rc = 0;
+               }
+       } else if (rc != -ENODATA) {
+               CERROR("%.16s: fail to load lfsck_namespace, expected = %d, "
+                      "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck), len, rc);
+               if (rc >= 0)
+                       rc = 1;
+       }
+       return rc;
+}
+
+static int lfsck_namespace_store(const struct lu_env *env,
+                                struct lfsck_component *com, bool init)
+{
+       struct dt_object        *obj    = com->lc_obj;
+       struct lfsck_instance   *lfsck  = com->lc_lfsck;
+       struct thandle          *handle;
+       int                      len    = com->lc_file_size;
+       int                      rc;
+       ENTRY;
+
+       lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
+                                 (struct lfsck_namespace *)com->lc_file_ram);
+       handle = dt_trans_create(env, lfsck->li_bottom);
+       if (IS_ERR(handle)) {
+               rc = PTR_ERR(handle);
+               CERROR("%.16s: fail to create trans for storing "
+                      "lfsck_namespace: %d\n,", lfsck_lfsck2name(lfsck), rc);
+               RETURN(rc);
+       }
+
+       rc = dt_declare_xattr_set(env, obj,
+                                 lfsck_buf_get(env, com->lc_file_disk, len),
+                                 XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
+       if (rc != 0) {
+               CERROR("%.16s: fail to declare trans for storing "
+                      "lfsck_namespace: %d\n,", lfsck_lfsck2name(lfsck), rc);
+               GOTO(out, rc);
+       }
+
+       rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
+       if (rc != 0) {
+               CERROR("%.16s: fail to start trans for storing "
+                      "lfsck_namespace: %d\n,", lfsck_lfsck2name(lfsck), rc);
+               GOTO(out, rc);
+       }
+
+       rc = dt_xattr_set(env, obj,
+                         lfsck_buf_get(env, com->lc_file_disk, len),
+                         XATTR_NAME_LFSCK_NAMESPACE,
+                         init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
+                         handle, BYPASS_CAPA);
+       if (rc != 0)
+               CERROR("%.16s: fail to store lfsck_namespace, len = %d, "
+                      "rc = %d\n", lfsck_lfsck2name(lfsck), len, rc);
+
+       GOTO(out, rc);
+
+out:
+       dt_trans_stop(env, lfsck->li_bottom, handle);
+       return rc;
+}
+
+static int lfsck_namespace_init(const struct lu_env *env,
+                               struct lfsck_component *com)
+{
+       struct lfsck_namespace *ns = (struct lfsck_namespace *)com->lc_file_ram;
+       int rc;
+
+       memset(ns, 0, sizeof(*ns));
+       ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
+       ns->ln_status = LS_INIT;
+       down_write(&com->lc_sem);
+       rc = lfsck_namespace_store(env, com, true);
+       up_write(&com->lc_sem);
+       return rc;
+}
+
+static int lfsck_namespace_lookup(const struct lu_env *env,
+                                 struct lfsck_component *com,
+                                 const struct lu_fid *fid, __u8 *flags)
+{
+       struct lu_fid *key = &lfsck_env_info(env)->lti_fid;
+       int            rc;
+
+       fid_cpu_to_be(key, fid);
+       rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)flags,
+                      (const struct dt_key *)key, BYPASS_CAPA);
+       return rc;
+}
+
+static int lfsck_namespace_delete(const struct lu_env *env,
+                                 struct lfsck_component *com,
+                                 const struct lu_fid *fid)
+{
+       struct lfsck_instance   *lfsck  = com->lc_lfsck;
+       struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
+       struct thandle          *handle;
+       struct dt_object        *obj    = com->lc_obj;
+       int                      rc;
+       ENTRY;
+
+       handle = dt_trans_create(env, lfsck->li_bottom);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
+
+       rc = dt_declare_delete(env, obj, (const struct dt_key *)fid, handle);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       fid_cpu_to_be(key, fid);
+       rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
+                      BYPASS_CAPA);
+
+       GOTO(out, rc);
+
+out:
+       dt_trans_stop(env, lfsck->li_bottom, handle);
+       return rc;
+}
+
+static int lfsck_namespace_update(const struct lu_env *env,
+                                 struct lfsck_component *com,
+                                 const struct lu_fid *fid,
+                                 __u8 flags, bool force)
+{
+       struct lfsck_instance   *lfsck  = com->lc_lfsck;
+       struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
+       struct thandle          *handle;
+       struct dt_object        *obj    = com->lc_obj;
+       int                      rc;
+       bool                     exist  = false;
+       __u8                     tf;
+       ENTRY;
+
+       rc = lfsck_namespace_lookup(env, com, fid, &tf);
+       if (rc != 0 && rc != -ENOENT)
+               RETURN(rc);
+
+       if (rc == 0) {
+               if (!force || flags == tf)
+                       RETURN(0);
+
+               exist = true;
+               handle = dt_trans_create(env, lfsck->li_bottom);
+               if (IS_ERR(handle))
+                       RETURN(PTR_ERR(handle));
+
+               rc = dt_declare_delete(env, obj, (const struct dt_key *)fid,
+                                      handle);
+               if (rc != 0)
+                       GOTO(out, rc);
+       } else {
+               handle = dt_trans_create(env, lfsck->li_bottom);
+               if (IS_ERR(handle))
+                       RETURN(PTR_ERR(handle));
+       }
+
+       rc = dt_declare_insert(env, obj, (const struct dt_rec *)&flags,
+                              (const struct dt_key *)fid, handle);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       fid_cpu_to_be(key, fid);
+       if (exist) {
+               rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
+                              BYPASS_CAPA);
+               if (rc != 0) {
+                       CERROR("%s: fail to insert "DFID", rc = %d\n",
+                              lfsck_lfsck2name(com->lc_lfsck), PFID(fid), rc);
+                       GOTO(out, rc);
+               }
+       }
+
+       rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
+                      (const struct dt_key *)key, handle, BYPASS_CAPA, 1);
+
+       GOTO(out, rc);
+
+out:
+       dt_trans_stop(env, lfsck->li_bottom, handle);
+       return rc;
+}
+
+static int lfsck_namespace_check_exist(const struct lu_env *env,
+                                      struct lfsck_instance *lfsck,
+                                      struct dt_object *obj, const char *name)
+{
+       struct dt_object *dir = lfsck->li_obj_dir;
+       struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
+       int               rc;
+       ENTRY;
+
+       if (unlikely(lfsck_is_dead_obj(obj)))
+               RETURN(LFSCK_NAMEENTRY_DEAD);
+
+       rc = dt_lookup(env, dir, (struct dt_rec *)fid,
+                      (const struct dt_key *)name, BYPASS_CAPA);
+       if (rc == -ENOENT)
+               RETURN(LFSCK_NAMEENTRY_REMOVED);
+
+       if (rc < 0)
+               RETURN(rc);
+
+       if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
+               RETURN(LFSCK_NAMEENTRY_RECREATED);
+
+       RETURN(0);
+}
+
+static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
+                                           struct dt_object *obj,
+                                           struct thandle *handle)
+{
+       int rc;
+
+       /* For destroying all invalid linkEA entries. */
+       rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
+       if (rc != 0)
+               return rc;
+
+       /* For insert new linkEA entry. */
+       rc = dt_declare_xattr_set(env, obj,
+                       lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
+                       XATTR_NAME_LINK, 0, handle);
+       return rc;
+}
+
+static int lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
+                           struct linkea_data *ldata)
+{
+       int rc;
+
+       ldata->ld_buf =
+               lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf,
+                                      CFS_PAGE_SIZE);
+       if (ldata->ld_buf->lb_buf == NULL)
+               return -ENOMEM;
+
+       if (!dt_object_exists(obj))
+               return -ENODATA;
+
+       rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
+       if (rc == -ERANGE) {
+               /* Buf was too small, figure out what we need. */
+               lu_buf_free(ldata->ld_buf);
+               rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
+                                 BYPASS_CAPA);
+               if (rc < 0)
+                       return rc;
+
+               ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
+               if (ldata->ld_buf->lb_buf == NULL)
+                       return -ENOMEM;
+
+               rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
+                                 BYPASS_CAPA);
+       }
+       if (rc < 0)
+               return rc;
+
+       linkea_init(ldata);
+
+       return 0;
+}
+
+static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
+                            struct linkea_data *ldata, struct thandle *handle)
+{
+       const struct lu_buf *buf = lfsck_buf_get_const(env,
+                                                      ldata->ld_buf->lb_buf,
+                                                      ldata->ld_leh->leh_len);
+
+       return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
+                           BYPASS_CAPA);
+}
+
+/**
+ * \retval +ve repaired
+ * \retval 0   no need to repair
+ * \retval -ve error cases
+ */
+static int lfsck_namespace_double_scan_one(const struct lu_env *env,
+                                          struct lfsck_component *com,
+                                          struct dt_object *child, __u8 flags)
+{
+       struct lfsck_thread_info *info    = lfsck_env_info(env);
+       struct lu_attr           *la      = &info->lti_la;
+       struct lu_name           *cname   = &info->lti_name;
+       struct lu_fid            *pfid    = &info->lti_fid;
+       struct lu_fid            *cfid    = &info->lti_fid2;
+       struct lfsck_instance   *lfsck    = com->lc_lfsck;
+       struct lfsck_bookmark   *bk       = &lfsck->li_bookmark_ram;
+       struct lfsck_namespace  *ns       =
+                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct linkea_data       ldata    = { 0 };
+       struct thandle          *handle   = NULL;
+       bool                     locked   = false;
+       bool                     update   = false;
+       int                      count;
+       int                      rc;
+       ENTRY;
+
+       if (com->lc_journal) {
+
+again:
+               LASSERT(!locked);
+
+               com->lc_journal = 1;
+               handle = dt_trans_create(env, lfsck->li_next);
+               if (IS_ERR(handle))
+                       RETURN(rc = PTR_ERR(handle));
+
+               rc = dt_declare_xattr_set(env, child,
+                       lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
+                       XATTR_NAME_LINK, 0, handle);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               rc = dt_trans_start(env, lfsck->li_next, handle);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               dt_write_lock(env, child, MOR_TGT_CHILD);
+               locked = true;
+       }
+
+       if (unlikely(lfsck_is_dead_obj(child)))
+               GOTO(stop, rc = 0);
+
+       rc = dt_attr_get(env, child, la, BYPASS_CAPA);
+       if (rc == 0)
+               rc = lfsck_links_read(env, child, &ldata);
+       if (rc != 0) {
+               if ((bk->lb_param & LPF_DRYRUN) &&
+                   (rc == -EINVAL || rc == -ENODATA))
+                       rc = 1;
+
+               GOTO(stop, rc);
+       }
+
+       ldata.ld_lee = LINKEA_FIRST_ENTRY(ldata);
+       count = ldata.ld_leh->leh_reccount;
+       while (count-- > 0) {
+               struct dt_object *parent = NULL;
+
+               linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname,
+                                   pfid);
+               if (!fid_is_sane(pfid))
+                       goto shrink;
+
+               parent = lfsck_object_find(env, lfsck, pfid);
+               if (parent == NULL)
+                       goto shrink;
+               else if (IS_ERR(parent))
+                       GOTO(stop, rc = PTR_ERR(parent));
+
+               if (!dt_object_exists(parent))
+                       goto shrink;
+
+               /* XXX: Currently, skip remote object, the consistency for
+                *      remote object will be processed in LFSCK phase III. */
+               if (dt_object_remote(parent)) {
+                       lfsck_object_put(env, parent);
+                       ldata.ld_lee = LINKEA_NEXT_ENTRY(ldata);
+                       continue;
+               }
+
+               if (unlikely(!dt_try_as_dir(env, parent)))
+                       goto shrink;
+
+               /* To guarantee the 'name' is terminated with '0'. */
+               memcpy(info->lti_key, cname->ln_name, cname->ln_namelen);
+               info->lti_key[cname->ln_namelen] = 0;
+               cname->ln_name = info->lti_key;
+               rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
+                              (const struct dt_key *)cname->ln_name,
+                              BYPASS_CAPA);
+               if (rc != 0 && rc != -ENOENT) {
+                       lfsck_object_put(env, parent);
+                       GOTO(stop, rc);
+               }
+
+               if (rc == 0) {
+                       if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
+                               lfsck_object_put(env, parent);
+                               ldata.ld_lee = LINKEA_NEXT_ENTRY(ldata);
+                               continue;
+                       }
+
+                       goto shrink;
+               }
+
+               if (ldata.ld_leh->leh_reccount > la->la_nlink)
+                       goto shrink;
+
+               /* XXX: For the case of there is linkea entry, but without name
+                *      entry pointing to the object, and the object link count
+                *      isn't less than the count of name entries, then add the
+                *      name entry back to namespace.
+                *
+                *      It is out of LFSCK 1.5 scope, will implement it in the
+                *      future. Keep the linkEA entry. */
+               lfsck_object_put(env, parent);
+               ldata.ld_lee = LINKEA_NEXT_ENTRY(ldata);
+               continue;
+
+shrink:
+               if (parent != NULL)
+                       lfsck_object_put(env, parent);
+               if (bk->lb_param & LPF_DRYRUN)
+                       RETURN(1);
+
+               CDEBUG(D_LFSCK, "Remove linkEA: "DFID"[%.*s], "DFID"\n",
+                      PFID(lfsck_dto2fid(child)), cname->ln_namelen, cname->ln_name,
+                      PFID(pfid));
+               linkea_del_buf(&ldata, cname);
+               update = true;
+       }
+
+       if (update) {
+               if (!com->lc_journal) {
+                       com->lc_journal = 1;
+                       goto again;
+               }
+
+               rc = lfsck_links_write(env, child, &ldata, handle);
+       }
+
+       GOTO(stop, rc);
+
+stop:
+       if (locked)
+               dt_write_unlock(env, child);
+
+       if (handle != NULL)
+               dt_trans_stop(env, lfsck->li_next, handle);
+
+       if (rc == 0 && update) {
+               ns->ln_objs_nlink_repaired++;
+               rc = 1;
+       }
+       return rc;
+}
+
+/* namespace APIs */
+
+static int lfsck_namespace_reset(const struct lu_env *env,
+                                struct lfsck_component *com, bool init)
+{
+       struct lfsck_instance   *lfsck = com->lc_lfsck;
+       struct lfsck_namespace  *ns    =
+                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct dt_object        *dto;
+       int                      rc;
+       ENTRY;
+
+       down_write(&com->lc_sem);
+       if (init) {
+               memset(ns, 0, sizeof(*ns));
+       } else {
+               __u32 count = ns->ln_success_count;
+               __u64 last_time = ns->ln_time_last_complete;
+
+               memset(ns, 0, sizeof(*ns));
+               ns->ln_success_count = count;
+               ns->ln_time_last_complete = last_time;
+       }
+       ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
+       ns->ln_status = LS_INIT;
+
+       rc = local_object_unlink(env, lfsck->li_bottom, lfsck->li_local_root,
+                                lfsck_namespace_name);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       dto = local_index_find_or_create(env, lfsck->li_los, lfsck->li_local_root,
+                                        lfsck_namespace_name,
+                                        S_IFREG | S_IRUGO | S_IWUSR,
+                                        &dt_lfsck_features);
+       if (IS_ERR(dto))
+               GOTO(out, rc = PTR_ERR(dto));
+
+       rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       com->lc_obj = dto;
+       rc = lfsck_namespace_store(env, com, true);
+
+       GOTO(out, rc);
+
+out:
+       up_write(&com->lc_sem);
+       return rc;
+}
+
+static void
+lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
+                    bool new_checked)
+{
+       struct lfsck_namespace *ns = (struct lfsck_namespace *)com->lc_file_ram;
+
+       down_write(&com->lc_sem);
+       if (new_checked)
+               com->lc_new_checked++;
+       ns->ln_items_failed++;
+       if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
+               lfsck_pos_fill(env, com->lc_lfsck,
+                              &ns->ln_pos_first_inconsistent, false);
+       up_write(&com->lc_sem);
+}
+
+static int lfsck_namespace_checkpoint(const struct lu_env *env,
+                                     struct lfsck_component *com, bool init)
+{
+       struct lfsck_instance   *lfsck = com->lc_lfsck;
+       struct lfsck_namespace  *ns    =
+                               (struct lfsck_namespace *)com->lc_file_ram;
+       int                      rc;
+
+       if (com->lc_new_checked == 0 && !init)
+               return 0;
+
+       down_write(&com->lc_sem);
+
+       if (init) {
+               ns->ln_pos_latest_start = lfsck->li_pos_current;
+       } else {
+               ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
+               ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
+                               HALF_SEC - lfsck->li_time_last_checkpoint);
+               ns->ln_time_last_checkpoint = cfs_time_current_sec();
+               ns->ln_items_checked += com->lc_new_checked;
+               com->lc_new_checked = 0;
+       }
+
+       rc = lfsck_namespace_store(env, com, false);
+
+       up_write(&com->lc_sem);
+       return rc;
+}
+
+static int lfsck_namespace_prep(const struct lu_env *env,
+                               struct lfsck_component *com)
+{
+       struct lfsck_instance   *lfsck  = com->lc_lfsck;
+       struct lfsck_namespace  *ns     =
+                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct lfsck_position   *pos    = &com->lc_pos_start;
+
+       if (ns->ln_status == LS_COMPLETED) {
+               int rc;
+
+               rc = lfsck_namespace_reset(env, com, false);
+               if (rc != 0)
+                       return rc;
+       }
+
+       down_write(&com->lc_sem);
+
+       ns->ln_time_latest_start = cfs_time_current_sec();
+
+       spin_lock(&lfsck->li_lock);
+       if (ns->ln_flags & LF_SCANNED_ONCE) {
+               if (!lfsck->li_drop_dryrun ||
+                   lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
+                       ns->ln_status = LS_SCANNING_PHASE2;
+                       cfs_list_del_init(&com->lc_link);
+                       cfs_list_add_tail(&com->lc_link,
+                                         &lfsck->li_list_double_scan);
+                       if (!cfs_list_empty(&com->lc_link_dir))
+                               cfs_list_del_init(&com->lc_link_dir);
+                       lfsck_pos_set_zero(pos);
+               } else {
+                       ns->ln_status = LS_SCANNING_PHASE1;
+                       ns->ln_run_time_phase1 = 0;
+                       ns->ln_run_time_phase2 = 0;
+                       ns->ln_items_checked = 0;
+                       ns->ln_items_repaired = 0;
+                       ns->ln_items_failed = 0;
+                       ns->ln_dirs_checked = 0;
+                       ns->ln_mlinked_checked = 0;
+                       ns->ln_objs_checked_phase2 = 0;
+                       ns->ln_objs_repaired_phase2 = 0;
+                       ns->ln_objs_failed_phase2 = 0;
+                       ns->ln_objs_nlink_repaired = 0;
+                       ns->ln_objs_lost_found = 0;
+                       fid_zero(&ns->ln_fid_latest_scanned_phase2);
+                       if (cfs_list_empty(&com->lc_link_dir))
+                               cfs_list_add_tail(&com->lc_link_dir,
+                                                 &lfsck->li_list_dir);
+                       *pos = ns->ln_pos_first_inconsistent;
+               }
+       } else {
+               ns->ln_status = LS_SCANNING_PHASE1;
+               if (cfs_list_empty(&com->lc_link_dir))
+                       cfs_list_add_tail(&com->lc_link_dir,
+                                         &lfsck->li_list_dir);
+               if (!lfsck->li_drop_dryrun ||
+                   lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
+                       *pos = ns->ln_pos_last_checkpoint;
+                       pos->lp_oit_cookie++;
+               } else {
+                       *pos = ns->ln_pos_first_inconsistent;
+               }
+       }
+       spin_unlock(&lfsck->li_lock);
+
+       up_write(&com->lc_sem);
+       return 0;
+}
+
+static int lfsck_namespace_exec_oit(const struct lu_env *env,
+                                   struct lfsck_component *com,
+                                   struct dt_object *obj)
+{
+       down_write(&com->lc_sem);
+       com->lc_new_checked++;
+       if (S_ISDIR(lfsck_object_type(obj)))
+               ((struct lfsck_namespace *)com->lc_file_ram)->ln_dirs_checked++;
+       up_write(&com->lc_sem);
+       return 0;
+}
+
+static int lfsck_namespace_exec_dir(const struct lu_env *env,
+                                   struct lfsck_component *com,
+                                   struct dt_object *obj,
+                                   struct lu_dirent *ent)
+{
+       struct lfsck_thread_info   *info     = lfsck_env_info(env);
+       struct lu_attr             *la       = &info->lti_la;
+       struct lfsck_instance      *lfsck    = com->lc_lfsck;
+       struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
+       struct lfsck_namespace     *ns       =
+                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct linkea_data          ldata    = { 0 };
+       const struct lu_fid        *pfid     =
+                               lu_object_fid(&lfsck->li_obj_dir->do_lu);
+       const struct lu_fid        *cfid     = lfsck_dto2fid(obj);
+       const struct lu_name       *cname;
+       struct thandle             *handle   = NULL;
+       bool                        repaired = false;
+       bool                        locked   = false;
+       int                         count    = 0;
+       int                         rc;
+       ENTRY;
+
+       cname = lfsck_name_get_const(env, ent->lde_name, ent->lde_namelen);
+       down_write(&com->lc_sem);
+       com->lc_new_checked++;
+
+       if (ent->lde_attrs & LUDA_UPGRADE) {
+               ns->ln_flags |= LF_UPGRADE;
+               repaired = true;
+       } else if (ent->lde_attrs & LUDA_REPAIR) {
+               ns->ln_flags |= LF_INCONSISTENT;
+               repaired = true;
+       }
+
+       if (ent->lde_name[0] == '.' &&
+           (ent->lde_namelen == 1 ||
+            (ent->lde_namelen == 2 && ent->lde_name[1] == '.') ||
+            fid_is_dot_lustre(&ent->lde_fid)))
+               GOTO(out, rc = 0);
+
+       if (!(bk->lb_param & LPF_DRYRUN) &&
+           (com->lc_journal || repaired)) {
+
+again:
+               LASSERT(!locked);
+
+               com->lc_journal = 1;
+               handle = dt_trans_create(env, lfsck->li_next);
+               if (IS_ERR(handle))
+                       GOTO(out, rc = PTR_ERR(handle));
+
+               rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               rc = dt_trans_start(env, lfsck->li_next, handle);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               dt_write_lock(env, obj, MOR_TGT_CHILD);
+               locked = true;
+       }
+
+       rc = lfsck_namespace_check_exist(env, lfsck, obj, ent->lde_name);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = lfsck_links_read(env, obj, &ldata);
+       if (rc == 0) {
+               count = ldata.ld_leh->leh_reccount;
+               rc = linkea_links_find(&ldata, cname, pfid);
+               if (rc == 0) {
+                       /* For dir, if there are more than one linkea entries,
+                        * then remove all the other redundant linkea entries.*/
+                       if (unlikely(count > 1 &&
+                                    S_ISDIR(lfsck_object_type(obj))))
+                               goto unmatch;
+
+                       goto record;
+               } else {
+
+unmatch:
+                       ns->ln_flags |= LF_INCONSISTENT;
+                       if (bk->lb_param & LPF_DRYRUN) {
+                               repaired = true;
+                               goto record;
+                       }
+
+                       /*For dir, remove the unmatched linkea entry directly.*/
+                       if (S_ISDIR(lfsck_object_type(obj))) {
+                               if (!com->lc_journal)
+                                       goto again;
+
+                               rc = dt_xattr_del(env, obj, XATTR_NAME_LINK,
+                                                 handle, BYPASS_CAPA);
+                               if (rc != 0)
+                                       GOTO(stop, rc);
+
+                               goto nodata;
+                       } else {
+                               goto add;
+                       }
+               }
+       } else if (unlikely(rc == -EINVAL)) {
+               ns->ln_flags |= LF_INCONSISTENT;
+               if (bk->lb_param & LPF_DRYRUN) {
+                       count = 1;
+                       repaired = true;
+                       goto record;
+               }
+
+               if (!com->lc_journal)
+                       goto again;
+
+               /* The magic crashed, we are not sure whether there are more
+                * corrupt data in the linkea, so remove all linkea entries. */
+               rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
+                                 BYPASS_CAPA);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               goto nodata;
+       } else if (rc == -ENODATA) {
+               ns->ln_flags |= LF_UPGRADE;
+               if (bk->lb_param & LPF_DRYRUN) {
+                       count = 1;
+                       repaired = true;
+                       goto record;
+               }
+
+nodata:
+               rc = linkea_data_new(&ldata,
+                                    &lfsck_env_info(env)->lti_linkea_buf);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+add:
+               if (!com->lc_journal)
+                       goto again;
+
+               rc = linkea_add_buf(&ldata, cname, pfid);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               rc = lfsck_links_write(env, obj, &ldata, handle);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               count = ldata.ld_leh->leh_reccount;
+               repaired = true;
+       } else {
+               GOTO(stop, rc);
+       }
+
+record:
+       LASSERT(count > 0);
+
+       rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       if ((count == 1) &&
+           (la->la_nlink == 1 || S_ISDIR(lfsck_object_type(obj))))
+               /* Usually, it is for single linked object or dir, do nothing.*/
+               GOTO(stop, rc);
+
+       /* Following modification will be in another transaction.  */
+       if (handle != NULL) {
+               LASSERT(dt_write_locked(env, obj));
+
+               dt_write_unlock(env, obj);
+               locked = false;
+
+               dt_trans_stop(env, lfsck->li_next, handle);
+               handle = NULL;
+       }
+
+       ns->ln_mlinked_checked++;
+       rc = lfsck_namespace_update(env, com, cfid,
+                       count != la->la_nlink ? LLF_UNMATCH_NLINKS : 0, false);
+
+       GOTO(out, rc);
+
+stop:
+       if (locked)
+               dt_write_unlock(env, obj);
+
+       if (handle != NULL)
+               dt_trans_stop(env, lfsck->li_next, handle);
+
+out:
+       if (rc < 0) {
+               ns->ln_items_failed++;
+               if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
+                       lfsck_pos_fill(env, lfsck,
+                                      &ns->ln_pos_first_inconsistent, false);
+               if (!(bk->lb_param & LPF_FAILOUT))
+                       rc = 0;
+       } else {
+               if (repaired)
+                       ns->ln_items_repaired++;
+               else
+                       com->lc_journal = 0;
+               rc = 0;
+       }
+       up_write(&com->lc_sem);
+       return rc;
+}
+
+static int lfsck_namespace_post(const struct lu_env *env,
+                               struct lfsck_component *com,
+                               int result, bool init)
+{
+       struct lfsck_instance   *lfsck = com->lc_lfsck;
+       struct lfsck_namespace  *ns    =
+                               (struct lfsck_namespace *)com->lc_file_ram;
+       int                      rc;
+
+       down_write(&com->lc_sem);
+
+       spin_lock(&lfsck->li_lock);
+       if (!init)
+               ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
+       if (result > 0) {
+               ns->ln_status = LS_SCANNING_PHASE2;
+               ns->ln_flags |= LF_SCANNED_ONCE;
+               ns->ln_flags &= ~LF_UPGRADE;
+               cfs_list_del_init(&com->lc_link);
+               cfs_list_del_init(&com->lc_link_dir);
+               cfs_list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
+       } else if (result == 0) {
+               if (lfsck->li_paused) {
+                       ns->ln_status = LS_PAUSED;
+               } else {
+                       ns->ln_status = LS_STOPPED;
+                       cfs_list_del_init(&com->lc_link);
+                       cfs_list_del_init(&com->lc_link_dir);
+                       cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
+               }
+       } else {
+               ns->ln_status = LS_FAILED;
+               cfs_list_del_init(&com->lc_link);
+               cfs_list_del_init(&com->lc_link_dir);
+               cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
+       }
+       spin_unlock(&lfsck->li_lock);
+
+       if (!init) {
+               ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
+                               HALF_SEC - lfsck->li_time_last_checkpoint);
+               ns->ln_time_last_checkpoint = cfs_time_current_sec();
+               ns->ln_items_checked += com->lc_new_checked;
+               com->lc_new_checked = 0;
+       }
+
+       rc = lfsck_namespace_store(env, com, false);
+
+       up_write(&com->lc_sem);
+       return rc;
+}
+
+static int
+lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
+                    char *buf, int len)
+{
+       struct lfsck_instance   *lfsck = com->lc_lfsck;
+       struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
+       struct lfsck_namespace  *ns    =
+                               (struct lfsck_namespace *)com->lc_file_ram;
+       int                      save  = len;
+       int                      ret   = -ENOSPC;
+       int                      rc;
+
+       down_read(&com->lc_sem);
+       rc = snprintf(buf, len,
+                     "name: lfsck_namespace\n"
+                     "magic: 0x%x\n"
+                     "version: %d\n"
+                     "status: %s\n",
+                     ns->ln_magic,
+                     bk->lb_version,
+                     lfsck_status_names[ns->ln_status]);
+       if (rc <= 0)
+               goto out;
+
+       buf += rc;
+       len -= rc;
+       rc = lfsck_bits_dump(&buf, &len, ns->ln_flags, lfsck_flags_names,
+                            "flags");
+       if (rc < 0)
+               goto out;
+
+       rc = lfsck_bits_dump(&buf, &len, bk->lb_param, lfsck_param_names,
+                            "param");
+       if (rc < 0)
+               goto out;
+
+       rc = lfsck_time_dump(&buf, &len, ns->ln_time_last_complete,
+                            "time_since_last_completed");
+       if (rc < 0)
+               goto out;
+
+       rc = lfsck_time_dump(&buf, &len, ns->ln_time_latest_start,
+                            "time_since_latest_start");
+       if (rc < 0)
+               goto out;
+
+       rc = lfsck_time_dump(&buf, &len, ns->ln_time_last_checkpoint,
+                            "time_since_last_checkpoint");
+       if (rc < 0)
+               goto out;
+
+       rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_latest_start,
+                           "latest_start_position");
+       if (rc < 0)
+               goto out;
+
+       rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_last_checkpoint,
+                           "last_checkpoint_position");
+       if (rc < 0)
+               goto out;
+
+       rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_first_inconsistent,
+                           "first_failure_position");
+       if (rc < 0)
+               goto out;
+
+       if (ns->ln_status == LS_SCANNING_PHASE1) {
+               struct lfsck_position pos;
+               cfs_duration_t duration = cfs_time_current() -
+                                         lfsck->li_time_last_checkpoint;
+               __u64 checked = ns->ln_items_checked + com->lc_new_checked;
+               __u64 speed = checked;
+               __u64 new_checked = com->lc_new_checked * CFS_HZ;
+               __u32 rtime = ns->ln_run_time_phase1 +
+                             cfs_duration_sec(duration + HALF_SEC);
+
+               if (duration != 0)
+                       do_div(new_checked, duration);
+               if (rtime != 0)
+                       do_div(speed, rtime);
+               rc = snprintf(buf, len,
+                             "checked_phase1: "LPU64"\n"
+                             "checked_phase2: "LPU64"\n"
+                             "updated_phase1: "LPU64"\n"
+                             "updated_phase2: "LPU64"\n"
+                             "failed_phase1: "LPU64"\n"
+                             "failed_phase2: "LPU64"\n"
+                             "dirs: "LPU64"\n"
+                             "M-linked: "LPU64"\n"
+                             "nlinks_repaired: "LPU64"\n"
+                             "lost_found: "LPU64"\n"
+                             "success_count: %u\n"
+                             "run_time_phase1: %u seconds\n"
+                             "run_time_phase2: %u seconds\n"
+                             "average_speed_phase1: "LPU64" items/sec\n"
+                             "average_speed_phase2: N/A\n"
+                             "real-time_speed_phase1: "LPU64" items/sec\n"
+                             "real-time_speed_phase2: N/A\n",
+                             checked,
+                             ns->ln_objs_checked_phase2,
+                             ns->ln_items_repaired,
+                             ns->ln_objs_repaired_phase2,
+                             ns->ln_items_failed,
+                             ns->ln_objs_failed_phase2,
+                             ns->ln_dirs_checked,
+                             ns->ln_mlinked_checked,
+                             ns->ln_objs_nlink_repaired,
+                             ns->ln_objs_lost_found,
+                             ns->ln_success_count,
+                             rtime,
+                             ns->ln_run_time_phase2,
+                             speed,
+                             new_checked);
+               if (rc <= 0)
+                       goto out;
+
+               buf += rc;
+               len -= rc;
+               lfsck_pos_fill(env, lfsck, &pos, false);
+               rc = lfsck_pos_dump(&buf, &len, &pos, "current_position");
+               if (rc <= 0)
+                       goto out;
+       } else if (ns->ln_status == LS_SCANNING_PHASE2) {
+               cfs_duration_t duration = cfs_time_current() -
+                                         lfsck->li_time_last_checkpoint;
+               __u64 checked = ns->ln_objs_checked_phase2 +
+                               com->lc_new_checked;
+               __u64 speed1 = ns->ln_items_checked;
+               __u64 speed2 = checked;
+               __u64 new_checked = com->lc_new_checked * CFS_HZ;
+               __u32 rtime = ns->ln_run_time_phase2 +
+                             cfs_duration_sec(duration + HALF_SEC);
+
+               if (duration != 0)
+                       do_div(new_checked, duration);
+               if (ns->ln_run_time_phase1 != 0)
+                       do_div(speed1, ns->ln_run_time_phase1);
+               if (rtime != 0)
+                       do_div(speed2, rtime);
+               rc = snprintf(buf, len,
+                             "checked_phase1: "LPU64"\n"
+                             "checked_phase2: "LPU64"\n"
+                             "updated_phase1: "LPU64"\n"
+                             "updated_phase2: "LPU64"\n"
+                             "failed_phase1: "LPU64"\n"
+                             "failed_phase2: "LPU64"\n"
+                             "dirs: "LPU64"\n"
+                             "M-linked: "LPU64"\n"
+                             "nlinks_repaired: "LPU64"\n"
+                             "lost_found: "LPU64"\n"
+                             "success_count: %u\n"
+                             "run_time_phase1: %u seconds\n"
+                             "run_time_phase2: %u seconds\n"
+                             "average_speed_phase1: "LPU64" items/sec\n"
+                             "average_speed_phase2: "LPU64" objs/sec\n"
+                             "real-time_speed_phase1: N/A\n"
+                             "real-time_speed_phase2: "LPU64" objs/sec\n"
+                             "current_position: "DFID"\n",
+                             ns->ln_items_checked,
+                             checked,
+                             ns->ln_items_repaired,
+                             ns->ln_objs_repaired_phase2,
+                             ns->ln_items_failed,
+                             ns->ln_objs_failed_phase2,
+                             ns->ln_dirs_checked,
+                             ns->ln_mlinked_checked,
+                             ns->ln_objs_nlink_repaired,
+                             ns->ln_objs_lost_found,
+                             ns->ln_success_count,
+                             ns->ln_run_time_phase1,
+                             rtime,
+                             speed1,
+                             speed2,
+                             new_checked,
+                             PFID(&ns->ln_fid_latest_scanned_phase2));
+               if (rc <= 0)
+                       goto out;
+
+               buf += rc;
+               len -= rc;
+       } else {
+               __u64 speed1 = ns->ln_items_checked;
+               __u64 speed2 = ns->ln_objs_checked_phase2;
+
+               if (ns->ln_run_time_phase1 != 0)
+                       do_div(speed1, ns->ln_run_time_phase1);
+               if (ns->ln_run_time_phase2 != 0)
+                       do_div(speed2, ns->ln_run_time_phase2);
+               rc = snprintf(buf, len,
+                             "checked_phase1: "LPU64"\n"
+                             "checked_phase2: "LPU64"\n"
+                             "updated_phase1: "LPU64"\n"
+                             "updated_phase2: "LPU64"\n"
+                             "failed_phase1: "LPU64"\n"
+                             "failed_phase2: "LPU64"\n"
+                             "dirs: "LPU64"\n"
+                             "M-linked: "LPU64"\n"
+                             "nlinks_repaired: "LPU64"\n"
+                             "lost_found: "LPU64"\n"
+                             "success_count: %u\n"
+                             "run_time_phase1: %u seconds\n"
+                             "run_time_phase2: %u seconds\n"
+                             "average_speed_phase1: "LPU64" items/sec\n"
+                             "average_speed_phase2: "LPU64" objs/sec\n"
+                             "real-time_speed_phase1: N/A\n"
+                             "real-time_speed_phase2: N/A\n"
+                             "current_position: N/A\n",
+                             ns->ln_items_checked,
+                             ns->ln_objs_checked_phase2,
+                             ns->ln_items_repaired,
+                             ns->ln_objs_repaired_phase2,
+                             ns->ln_items_failed,
+                             ns->ln_objs_failed_phase2,
+                             ns->ln_dirs_checked,
+                             ns->ln_mlinked_checked,
+                             ns->ln_objs_nlink_repaired,
+                             ns->ln_objs_lost_found,
+                             ns->ln_success_count,
+                             ns->ln_run_time_phase1,
+                             ns->ln_run_time_phase2,
+                             speed1,
+                             speed2);
+               if (rc <= 0)
+                       goto out;
+
+               buf += rc;
+               len -= rc;
+       }
+       ret = save - len;
+
+out:
+       up_read(&com->lc_sem);
+       return ret;
+}
+
+static int lfsck_namespace_double_scan(const struct lu_env *env,
+                                      struct lfsck_component *com)
+{
+       struct lfsck_instance   *lfsck  = com->lc_lfsck;
+       struct ptlrpc_thread    *thread = &lfsck->li_thread;
+       struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
+       struct lfsck_namespace  *ns     =
+                               (struct lfsck_namespace *)com->lc_file_ram;
+       struct dt_object        *obj    = com->lc_obj;
+       const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
+       struct dt_object        *target;
+       struct dt_it            *di;
+       struct dt_key           *key;
+       struct lu_fid            fid;
+       int                      rc;
+       __u8                     flags;
+       ENTRY;
+
+       lfsck->li_new_scanned = 0;
+       lfsck->li_time_last_checkpoint = cfs_time_current();
+       lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
+                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
+
+       di = iops->init(env, obj, 0, BYPASS_CAPA);
+       if (IS_ERR(di))
+               RETURN(PTR_ERR(di));
+
+       fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
+       rc = iops->get(env, di, (const struct dt_key *)&fid);
+       if (rc < 0)
+               GOTO(fini, rc);
+
+       /* Skip the start one, which either has been processed or non-exist. */
+       rc = iops->next(env, di);
+       if (rc != 0)
+               GOTO(put, rc);
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_DOUBLESCAN))
+               GOTO(put, rc = 0);
+
+       do {
+               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
+                   cfs_fail_val > 0) {
+                       struct l_wait_info lwi;
+
+                       lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
+                                         NULL, NULL);
+                       l_wait_event(thread->t_ctl_waitq,
+                                    !thread_is_running(thread),
+                                    &lwi);
+               }
+
+               key = iops->key(env, di);
+               fid_be_to_cpu(&fid, (const struct lu_fid *)key);
+               target = lfsck_object_find(env, lfsck, &fid);
+               down_write(&com->lc_sem);
+               if (target == NULL) {
+                       rc = 0;
+                       goto checkpoint;
+               } else if (IS_ERR(target)) {
+                       rc = PTR_ERR(target);
+                       goto checkpoint;
+               }
+
+               /* XXX: Currently, skip remote object, the consistency for
+                *      remote object will be processed in LFSCK phase III. */
+               if (!dt_object_exists(target) || dt_object_remote(target))
+                       goto obj_put;
+
+               rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
+               if (rc == 0)
+                       rc = lfsck_namespace_double_scan_one(env, com,
+                                                            target, flags);
+
+obj_put:
+               lfsck_object_put(env, target);
+
+checkpoint:
+               lfsck->li_new_scanned++;
+               com->lc_new_checked++;
+               ns->ln_fid_latest_scanned_phase2 = fid;
+               if (rc > 0)
+                       ns->ln_objs_repaired_phase2++;
+               else if (rc < 0)
+                       ns->ln_objs_failed_phase2++;
+               up_write(&com->lc_sem);
+
+               if ((rc == 0) || ((rc > 0) && !(bk->lb_param & LPF_DRYRUN))) {
+                       lfsck_namespace_delete(env, com, &fid);
+               } else if (rc < 0) {
+                       flags |= LLF_REPAIR_FAILED;
+                       lfsck_namespace_update(env, com, &fid, flags, true);
+               }
+
+               if (rc < 0 && bk->lb_param & LPF_FAILOUT)
+                       GOTO(put, rc);
+
+               if (likely(cfs_time_beforeq(cfs_time_current(),
+                                           lfsck->li_time_next_checkpoint)) ||
+                   com->lc_new_checked == 0)
+                       goto speed;
+
+               down_write(&com->lc_sem);
+               ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
+                               HALF_SEC - lfsck->li_time_last_checkpoint);
+               ns->ln_time_last_checkpoint = cfs_time_current_sec();
+               ns->ln_objs_checked_phase2 += com->lc_new_checked;
+               com->lc_new_checked = 0;
+               rc = lfsck_namespace_store(env, com, false);
+               up_write(&com->lc_sem);
+               if (rc != 0)
+                       GOTO(put, rc);
+
+               lfsck->li_time_last_checkpoint = cfs_time_current();
+               lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
+                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
+
+speed:
+               lfsck_control_speed(lfsck);
+               if (unlikely(!thread_is_running(thread)))
+                       GOTO(put, rc = 0);
+
+               rc = iops->next(env, di);
+       } while (rc == 0);
+
+       GOTO(put, rc);
+
+put:
+       iops->put(env, di);
+
+fini:
+       iops->fini(env, di);
+       down_write(&com->lc_sem);
+
+       ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
+                               HALF_SEC - lfsck->li_time_last_checkpoint);
+       ns->ln_time_last_checkpoint = cfs_time_current_sec();
+       ns->ln_objs_checked_phase2 += com->lc_new_checked;
+       com->lc_new_checked = 0;
+
+       if (rc > 0) {
+               com->lc_journal = 0;
+               ns->ln_status = LS_COMPLETED;
+               if (!(bk->lb_param & LPF_DRYRUN))
+                       ns->ln_flags &=
+                       ~(LF_SCANNED_ONCE | LF_INCONSISTENT | LF_UPGRADE);
+               ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
+               ns->ln_success_count++;
+       } else if (rc == 0) {
+               if (lfsck->li_paused)
+                       ns->ln_status = LS_PAUSED;
+               else
+                       ns->ln_status = LS_STOPPED;
+       } else {
+               ns->ln_status = LS_FAILED;
+       }
+
+       if (ns->ln_status != LS_PAUSED) {
+               spin_lock(&lfsck->li_lock);
+               cfs_list_del_init(&com->lc_link);
+               cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
+               spin_unlock(&lfsck->li_lock);
+       }
+
+       rc = lfsck_namespace_store(env, com, false);
+
+       up_write(&com->lc_sem);
+       return rc;
+}
+
+static struct lfsck_operations lfsck_namespace_ops = {
+       .lfsck_reset            = lfsck_namespace_reset,
+       .lfsck_fail             = lfsck_namespace_fail,
+       .lfsck_checkpoint       = lfsck_namespace_checkpoint,
+       .lfsck_prep             = lfsck_namespace_prep,
+       .lfsck_exec_oit         = lfsck_namespace_exec_oit,
+       .lfsck_exec_dir         = lfsck_namespace_exec_dir,
+       .lfsck_post             = lfsck_namespace_post,
+       .lfsck_dump             = lfsck_namespace_dump,
+       .lfsck_double_scan      = lfsck_namespace_double_scan,
+};
+
+int lfsck_namespace_setup(const struct lu_env *env,
+                         struct lfsck_instance *lfsck)
+{
+       struct lfsck_component  *com;
+       struct lfsck_namespace  *ns;
+       struct dt_object        *obj;
+       int                      rc;
+       ENTRY;
+
+       LASSERT(lfsck->li_master);
+
+       OBD_ALLOC_PTR(com);
+       if (com == NULL)
+               RETURN(-ENOMEM);
+
+       CFS_INIT_LIST_HEAD(&com->lc_link);
+       CFS_INIT_LIST_HEAD(&com->lc_link_dir);
+       init_rwsem(&com->lc_sem);
+       atomic_set(&com->lc_ref, 1);
+       com->lc_lfsck = lfsck;
+       com->lc_type = LT_NAMESPACE;
+       com->lc_ops = &lfsck_namespace_ops;
+       com->lc_file_size = sizeof(struct lfsck_namespace);
+       OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
+       if (com->lc_file_ram == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
+       if (com->lc_file_disk == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       obj = local_index_find_or_create(env, lfsck->li_los,
+                                        lfsck->li_local_root,
+                                        lfsck_namespace_name,
+                                        S_IFREG | S_IRUGO | S_IWUSR,
+                                        &dt_lfsck_features);
+       if (IS_ERR(obj))
+               GOTO(out, rc = PTR_ERR(obj));
+
+       com->lc_obj = obj;
+       rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       rc = lfsck_namespace_load(env, com);
+       if (rc > 0)
+               rc = lfsck_namespace_reset(env, com, true);
+       else if (rc == -ENODATA)
+               rc = lfsck_namespace_init(env, com);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       ns = (struct lfsck_namespace *)com->lc_file_ram;
+       switch (ns->ln_status) {
+       case LS_INIT:
+       case LS_COMPLETED:
+       case LS_FAILED:
+       case LS_STOPPED:
+               cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
+               break;
+       default:
+               CERROR("%s: unknown status: %u\n",
+                      lfsck_lfsck2name(lfsck), ns->ln_status);
+               /* fall through */
+       case LS_SCANNING_PHASE1:
+       case LS_SCANNING_PHASE2:
+               /* No need to store the status to disk right now.
+                * If the system crashed before the status stored,
+                * it will be loaded back when next time. */
+               ns->ln_status = LS_CRASHED;
+               /* fall through */
+       case LS_PAUSED:
+       case LS_CRASHED:
+               cfs_list_add_tail(&com->lc_link, &lfsck->li_list_scan);
+               cfs_list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
+               break;
+       }
+
+       GOTO(out, rc = 0);
+
+out:
+       if (rc != 0)
+               lfsck_component_cleanup(env, com);
+       return rc;
+}
index 62f9606..5fe2c02 100644 (file)
@@ -1,7 +1,7 @@
 MODULES := mdd
 mdd-objs := mdd_object.o mdd_lov.o mdd_orphans.o mdd_lproc.o mdd_dir.o
 mdd-objs += mdd_device.o mdd_trans.o mdd_permission.o mdd_lock.o
-mdd-objs += mdd_lfsck.o mdd_compat.o
+mdd-objs += mdd_compat.o
 
 EXTRA_PRE_CFLAGS := -I@LINUX@/fs -I@LDISKFS_DIR@ -I@LDISKFS_DIR@/ldiskfs
 
index 7f4aaad..81c2229 100644 (file)
@@ -41,4 +41,4 @@ modulefs_DATA = mdd$(KMODEXT)
 endif
 
 MOSTLYCLEANFILES := @MOSTLYCLEANFILES@
-EXTRA_DIST := $(mdd-objs:%.o=%.c) mdd_internal.h mdd_lfsck.h
+EXTRA_DIST := $(mdd-objs:%.o=%.c) mdd_internal.h
index 237d575..37f6508 100644 (file)
@@ -710,7 +710,7 @@ out:
 static void mdd_device_shutdown(const struct lu_env *env, struct mdd_device *m,
                                struct lustre_cfg *cfg)
 {
-       mdd_lfsck_cleanup(env, m);
+       lfsck_degister(env, m->mdd_bottom);
        mdd_changelog_fini(env, m);
        orph_index_fini(env, m);
        if (m->mdd_dot_lustre_objs.mdd_obf)
@@ -880,7 +880,7 @@ static int mdd_prepare(const struct lu_env *env,
                GOTO(out_orph, rc);
        }
 
-       rc = mdd_lfsck_setup(env, mdd);
+       rc = lfsck_register(env, mdd->mdd_bottom, mdd->mdd_child, true);
        if (rc != 0) {
                CERROR("%s: failed to initialize lfsck: rc = %d\n",
                       mdd2obd_dev(mdd)->obd_name, rc);
@@ -1309,16 +1309,16 @@ static int mdd_iocontrol(const struct lu_env *env, struct md_device *m,
                 RETURN(0);
         }
        case OBD_IOC_START_LFSCK: {
-               rc = mdd_lfsck_start(env, &mdd->mdd_lfsck,
-                                    (struct lfsck_start *)karg);
+               rc = lfsck_start(env, mdd->mdd_bottom,
+                                (struct lfsck_start_param *)karg);
                RETURN(rc);
        }
        case OBD_IOC_STOP_LFSCK: {
-               rc = mdd_lfsck_stop(env, &mdd->mdd_lfsck, false);
+               rc = lfsck_stop(env, mdd->mdd_bottom, false);
                RETURN(rc);
        }
        case OBD_IOC_PAUSE_LFSCK: {
-               rc = mdd_lfsck_stop(env, &mdd->mdd_lfsck, true);
+               rc = lfsck_stop(env, mdd->mdd_bottom, true);
                RETURN(rc);
        }
         }
index b835757..398f146 100644 (file)
@@ -1108,7 +1108,7 @@ int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
                ea_len = ldata->ld_leh->leh_len;
                linkea = ldata->ld_buf->lb_buf;
        } else {
-               ea_len = 4096;
+               ea_len = DEFAULT_LINKEA_SIZE;
                linkea = NULL;
        }
 
@@ -1289,6 +1289,7 @@ int mdd_finish_unlink(const struct lu_env *env,
 
        if (ma->ma_attr.la_nlink == 0 || is_dir) {
                 obj->mod_flags |= DEAD_OBJ;
+
                 /* add new orphan and the object
                  * will be deleted during mdd_close() */
                 if (obj->mod_count) {
index cd20421..2c34466 100644 (file)
 #include <md_object.h>
 #include <dt_object.h>
 #include <lustre_fsfilt.h>
-#include <lustre/lustre_lfsck_user.h>
+#include <lustre_lfsck.h>
 #include <lustre_fid.h>
 #include <lustre_capa.h>
 #include <lprocfs_status.h>
 #include <lustre_log.h>
 #include <lustre_linkea.h>
 
-#include "mdd_lfsck.h"
-
 /* PDO lock is unnecessary for current MDT stack because operations
  * are already protected by ldlm lock */
 #define MDD_DISABLE_PDO_LOCK    1
@@ -94,9 +92,6 @@ struct mdd_dot_lustre_objs {
         struct mdd_object *mdd_obf;
 };
 
-extern const char lfsck_bookmark_name[];
-extern const char lfsck_namespace_name[];
-
 struct mdd_device {
         struct md_device                 mdd_md_dev;
        struct obd_export               *mdd_child_exp;
@@ -111,7 +106,6 @@ struct mdd_device {
         unsigned long                    mdd_atime_diff;
         struct mdd_object               *mdd_dot_lustre;
         struct mdd_dot_lustre_objs       mdd_dot_lustre_objs;
-       struct md_lfsck                  mdd_lfsck;
        unsigned int                     mdd_sync_permission;
        int                              mdd_connects;
        struct local_oid_storage        *mdd_los;
@@ -434,18 +428,6 @@ int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
 int mdd_txn_start_cb(const struct lu_env *env, struct thandle *,
                      void *cookie);
 
-/* mdd_lfsck.c */
-int mdd_lfsck_set_speed(const struct lu_env *env, struct md_lfsck *lfsck,
-                       __u32 limit);
-int mdd_lfsck_start(const struct lu_env *env, struct md_lfsck *lfsck,
-                   struct lfsck_start *start);
-int mdd_lfsck_stop(const struct lu_env *env, struct md_lfsck *lfsck,
-                  bool pause);
-int mdd_lfsck_setup(const struct lu_env *env, struct mdd_device *mdd);
-void mdd_lfsck_cleanup(const struct lu_env *env, struct mdd_device *mdd);
-int mdd_lfsck_dump(const struct lu_env *env, struct md_lfsck *lfsck,
-                  __u16 type, char *buf, int len);
-
 /* mdd_device.c */
 struct lu_object *mdd_object_alloc(const struct lu_env *env,
                                    const struct lu_object_header *hdr,
diff --git a/lustre/mdd/mdd_lfsck.c b/lustre/mdd/mdd_lfsck.c
deleted file mode 100644 (file)
index 3adfda1..0000000
+++ /dev/null
@@ -1,2971 +0,0 @@
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
-
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License version 2 for more details.  A copy is
- * included in the COPYING file that accompanied this code.
-
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2012, 2013, Intel Corporation.
- */
-/*
- * lustre/mdd/mdd_lfsck.c
- *
- * Top-level entry points into mdd module
- *
- * LFSCK controller, which scans the whole device through low layer
- * iteration APIs, drives all lfsck compeonents, controls the speed.
- *
- * Author: Fan Yong <yong.fan@whamcloud.com>
- */
-
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-#define DEBUG_SUBSYSTEM S_MDS
-
-#include <lustre/lustre_idl.h>
-#include <lustre_fid.h>
-#include <obd_support.h>
-
-#include "mdd_internal.h"
-#include "mdd_lfsck.h"
-
-#define HALF_SEC                       (CFS_HZ >> 1)
-#define LFSCK_CHECKPOINT_INTERVAL      60
-
-#define LFSCK_NAMEENTRY_DEAD           1 /* The object has been unlinked. */
-#define LFSCK_NAMEENTRY_REMOVED        2 /* The entry has been removed. */
-#define LFSCK_NAMEENTRY_RECREATED      3 /* The entry has been recreated. */
-
-const char lfsck_bookmark_name[] = "lfsck_bookmark";
-const char lfsck_namespace_name[] = "lfsck_namespace";
-
-static const char *lfsck_status_names[] = {
-       "init",
-       "scanning-phase1",
-       "scanning-phase2",
-       "completed",
-       "failed",
-       "stopped",
-       "paused",
-       "crashed",
-       NULL
-};
-
-static const char *lfsck_flags_names[] = {
-       "scanned-once",
-       "inconsistent",
-       "upgrade",
-       NULL
-};
-
-static const char *lfsck_param_names[] = {
-       "failout",
-       "dryrun",
-       NULL
-};
-
-/* misc functions */
-
-static inline struct mdd_device *mdd_lfsck2mdd(struct md_lfsck *lfsck)
-{
-       return container_of0(lfsck, struct mdd_device, mdd_lfsck);
-}
-
-static inline char *mdd_lfsck2name(struct md_lfsck *lfsck)
-{
-       struct mdd_device *mdd = mdd_lfsck2mdd(lfsck);
-
-       return mdd2obd_dev(mdd)->obd_name;
-}
-
-static inline void mdd_lfsck_component_get(struct lfsck_component *com)
-{
-       atomic_inc(&com->lc_ref);
-}
-
-static inline void mdd_lfsck_component_put(const struct lu_env *env,
-                                          struct lfsck_component *com)
-{
-       if (atomic_dec_and_test(&com->lc_ref)) {
-               if (com->lc_obj != NULL)
-                       lu_object_put(env, &com->lc_obj->do_lu);
-               if (com->lc_file_ram != NULL)
-                       OBD_FREE(com->lc_file_ram, com->lc_file_size);
-               if (com->lc_file_disk != NULL)
-                       OBD_FREE(com->lc_file_disk, com->lc_file_size);
-               OBD_FREE_PTR(com);
-       }
-}
-
-static inline struct lfsck_component *
-__mdd_lfsck_component_find(struct md_lfsck *lfsck, __u16 type, cfs_list_t *list)
-{
-       struct lfsck_component *com;
-
-       cfs_list_for_each_entry(com, list, lc_link) {
-               if (com->lc_type == type)
-                       return com;
-       }
-       return NULL;
-}
-
-static struct lfsck_component *
-mdd_lfsck_component_find(struct md_lfsck *lfsck, __u16 type)
-{
-       struct lfsck_component *com;
-
-       spin_lock(&lfsck->ml_lock);
-       com = __mdd_lfsck_component_find(lfsck, type, &lfsck->ml_list_scan);
-       if (com != NULL)
-               goto unlock;
-
-       com = __mdd_lfsck_component_find(lfsck, type,
-                                        &lfsck->ml_list_double_scan);
-       if (com != NULL)
-               goto unlock;
-
-       com = __mdd_lfsck_component_find(lfsck, type, &lfsck->ml_list_idle);
-
-unlock:
-       if (com != NULL)
-               mdd_lfsck_component_get(com);
-       spin_unlock(&lfsck->ml_lock);
-       return com;
-}
-
-static void mdd_lfsck_component_cleanup(const struct lu_env *env,
-                                       struct lfsck_component *com)
-{
-       if (!cfs_list_empty(&com->lc_link))
-               cfs_list_del_init(&com->lc_link);
-       if (!cfs_list_empty(&com->lc_link_dir))
-               cfs_list_del_init(&com->lc_link_dir);
-
-       mdd_lfsck_component_put(env, com);
-}
-
-static int lfsck_bits_dump(char **buf, int *len, int bits, const char *names[],
-                          const char *prefix)
-{
-       int save = *len;
-       int flag;
-       int rc;
-       int i;
-
-       rc = snprintf(*buf, *len, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
-       if (rc <= 0)
-               return -ENOSPC;
-
-       *buf += rc;
-       *len -= rc;
-       for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
-               if (flag & bits) {
-                       bits &= ~flag;
-                       rc = snprintf(*buf, *len, "%s%c", names[i],
-                                     bits != 0 ? ',' : '\n');
-                       if (rc <= 0)
-                               return -ENOSPC;
-
-                       *buf += rc;
-                       *len -= rc;
-               }
-       }
-       return save - *len;
-}
-
-static int lfsck_time_dump(char **buf, int *len, __u64 time, const char *prefix)
-{
-       int rc;
-
-       if (time != 0)
-               rc = snprintf(*buf, *len, "%s: "LPU64" seconds\n", prefix,
-                             cfs_time_current_sec() - time);
-       else
-               rc = snprintf(*buf, *len, "%s: N/A\n", prefix);
-       if (rc <= 0)
-               return -ENOSPC;
-
-       *buf += rc;
-       *len -= rc;
-       return rc;
-}
-
-static int lfsck_pos_dump(char **buf, int *len, struct lfsck_position *pos,
-                         const char *prefix)
-{
-       int rc;
-
-       if (fid_is_zero(&pos->lp_dir_parent)) {
-               if (pos->lp_oit_cookie == 0)
-                       rc = snprintf(*buf, *len, "%s: N/A, N/A, N/A\n",
-                                     prefix);
-               else
-                       rc = snprintf(*buf, *len, "%s: "LPU64", N/A, N/A\n",
-                                     prefix, pos->lp_oit_cookie);
-       } else {
-               rc = snprintf(*buf, *len, "%s: "LPU64", "DFID", "LPU64"\n",
-                             prefix, pos->lp_oit_cookie,
-                             PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
-       }
-       if (rc <= 0)
-               return -ENOSPC;
-
-       *buf += rc;
-       *len -= rc;
-       return rc;
-}
-
-static void mdd_lfsck_pos_fill(const struct lu_env *env, struct md_lfsck *lfsck,
-                              struct lfsck_position *pos, bool init)
-{
-       const struct dt_it_ops *iops = &lfsck->ml_obj_oit->do_index_ops->dio_it;
-
-       spin_lock(&lfsck->ml_lock);
-       if (unlikely(lfsck->ml_di_oit == NULL)) {
-               spin_unlock(&lfsck->ml_lock);
-               memset(pos, 0, sizeof(*pos));
-               return;
-       }
-
-       pos->lp_oit_cookie = iops->store(env, lfsck->ml_di_oit);
-       if (!lfsck->ml_current_oit_processed && !init)
-               pos->lp_oit_cookie--;
-
-       LASSERT(pos->lp_oit_cookie > 0);
-
-       if (lfsck->ml_di_dir != NULL) {
-               struct dt_object *dto = lfsck->ml_obj_dir;
-
-               pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
-                                                       lfsck->ml_di_dir);
-
-               if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
-                       fid_zero(&pos->lp_dir_parent);
-                       pos->lp_dir_cookie = 0;
-               } else {
-                       pos->lp_dir_parent = *lu_object_fid(&dto->do_lu);
-               }
-       } else {
-               fid_zero(&pos->lp_dir_parent);
-               pos->lp_dir_cookie = 0;
-       }
-       spin_unlock(&lfsck->ml_lock);
-}
-
-static inline void mdd_lfsck_pos_set_zero(struct lfsck_position *pos)
-{
-       memset(pos, 0, sizeof(*pos));
-}
-
-static inline int mdd_lfsck_pos_is_zero(const struct lfsck_position *pos)
-{
-       return pos->lp_oit_cookie == 0 && fid_is_zero(&pos->lp_dir_parent);
-}
-
-static inline int mdd_lfsck_pos_is_eq(const struct lfsck_position *pos1,
-                                     const struct lfsck_position *pos2)
-{
-       if (pos1->lp_oit_cookie < pos2->lp_oit_cookie)
-               return -1;
-
-       if (pos1->lp_oit_cookie > pos2->lp_oit_cookie)
-               return 1;
-
-       if (fid_is_zero(&pos1->lp_dir_parent) &&
-           !fid_is_zero(&pos2->lp_dir_parent))
-               return -1;
-
-       if (!fid_is_zero(&pos1->lp_dir_parent) &&
-           fid_is_zero(&pos2->lp_dir_parent))
-               return 1;
-
-       if (fid_is_zero(&pos1->lp_dir_parent) &&
-           fid_is_zero(&pos2->lp_dir_parent))
-               return 0;
-
-       LASSERT(lu_fid_eq(&pos1->lp_dir_parent, &pos2->lp_dir_parent));
-
-       if (pos1->lp_dir_cookie < pos2->lp_dir_cookie)
-               return -1;
-
-       if (pos1->lp_dir_cookie > pos2->lp_dir_cookie)
-               return 1;
-
-       return 0;
-}
-
-static void mdd_lfsck_close_dir(const struct lu_env *env,
-                               struct md_lfsck *lfsck)
-{
-       struct dt_object        *dir_obj  = lfsck->ml_obj_dir;
-       const struct dt_it_ops  *dir_iops = &dir_obj->do_index_ops->dio_it;
-       struct dt_it            *dir_di   = lfsck->ml_di_dir;
-
-       spin_lock(&lfsck->ml_lock);
-       lfsck->ml_di_dir = NULL;
-       spin_unlock(&lfsck->ml_lock);
-
-       dir_iops->put(env, dir_di);
-       dir_iops->fini(env, dir_di);
-       lfsck->ml_obj_dir = NULL;
-       lu_object_put(env, &dir_obj->do_lu);
-}
-
-static void __mdd_lfsck_set_speed(struct md_lfsck *lfsck, __u32 limit)
-{
-       lfsck->ml_bookmark_ram.lb_speed_limit = limit;
-       if (limit != LFSCK_SPEED_NO_LIMIT) {
-               if (limit > CFS_HZ) {
-                       lfsck->ml_sleep_rate = limit / CFS_HZ;
-                       lfsck->ml_sleep_jif = 1;
-               } else {
-                       lfsck->ml_sleep_rate = 1;
-                       lfsck->ml_sleep_jif = CFS_HZ / limit;
-               }
-       } else {
-               lfsck->ml_sleep_jif = 0;
-               lfsck->ml_sleep_rate = 0;
-       }
-}
-
-static void mdd_lfsck_control_speed(struct md_lfsck *lfsck)
-{
-       struct ptlrpc_thread *thread = &lfsck->ml_thread;
-       struct l_wait_info    lwi;
-
-       if (lfsck->ml_sleep_jif > 0 &&
-           lfsck->ml_new_scanned >= lfsck->ml_sleep_rate) {
-               spin_lock(&lfsck->ml_lock);
-               if (likely(lfsck->ml_sleep_jif > 0 &&
-                          lfsck->ml_new_scanned >= lfsck->ml_sleep_rate)) {
-                       lwi = LWI_TIMEOUT_INTR(lfsck->ml_sleep_jif, NULL,
-                                              LWI_ON_SIGNAL_NOOP, NULL);
-                       spin_unlock(&lfsck->ml_lock);
-
-                       l_wait_event(thread->t_ctl_waitq,
-                                    !thread_is_running(thread),
-                                    &lwi);
-                       lfsck->ml_new_scanned = 0;
-               } else {
-                       spin_unlock(&lfsck->ml_lock);
-               }
-       }
-}
-
-/* lfsck_bookmark file ops */
-
-static void inline mdd_lfsck_bookmark_to_cpu(struct lfsck_bookmark *des,
-                                            struct lfsck_bookmark *src)
-{
-       des->lb_magic = le32_to_cpu(src->lb_magic);
-       des->lb_version = le16_to_cpu(src->lb_version);
-       des->lb_param = le16_to_cpu(src->lb_param);
-       des->lb_speed_limit = le32_to_cpu(src->lb_speed_limit);
-}
-
-static void inline mdd_lfsck_bookmark_to_le(struct lfsck_bookmark *des,
-                                           struct lfsck_bookmark *src)
-{
-       des->lb_magic = cpu_to_le32(src->lb_magic);
-       des->lb_version = cpu_to_le16(src->lb_version);
-       des->lb_param = cpu_to_le16(src->lb_param);
-       des->lb_speed_limit = cpu_to_le32(src->lb_speed_limit);
-}
-
-static int mdd_lfsck_bookmark_load(const struct lu_env *env,
-                                  struct md_lfsck *lfsck)
-{
-       loff_t pos = 0;
-       int    len = sizeof(struct lfsck_bookmark);
-       int    rc;
-
-       rc = dt_record_read(env, lfsck->ml_bookmark_obj,
-                           mdd_buf_get(env, &lfsck->ml_bookmark_disk, len),
-                           &pos);
-       if (rc == 0) {
-               struct lfsck_bookmark *bm = &lfsck->ml_bookmark_ram;
-
-               mdd_lfsck_bookmark_to_cpu(bm, &lfsck->ml_bookmark_disk);
-               if (bm->lb_magic != LFSCK_BOOKMARK_MAGIC) {
-                       CWARN("%.16s: invalid lfsck_bookmark magic "
-                             "0x%x != 0x%x\n", mdd_lfsck2name(lfsck),
-                             bm->lb_magic, LFSCK_BOOKMARK_MAGIC);
-                       /* Process it as new lfsck_bookmark. */
-                       rc = -ENODATA;
-               }
-       } else {
-               if (rc == -EFAULT && pos == 0)
-                       /* return -ENODATA for empty lfsck_bookmark. */
-                       rc = -ENODATA;
-               else
-                       CERROR("%.16s: fail to load lfsck_bookmark, "
-                              "expected = %d, rc = %d\n",
-                              mdd_lfsck2name(lfsck), len, rc);
-       }
-       return rc;
-}
-
-static int mdd_lfsck_bookmark_store(const struct lu_env *env,
-                                   struct md_lfsck *lfsck)
-{
-       struct mdd_device *mdd    = mdd_lfsck2mdd(lfsck);
-       struct thandle    *handle;
-       struct dt_object  *obj    = lfsck->ml_bookmark_obj;
-       loff_t             pos    = 0;
-       int                len    = sizeof(struct lfsck_bookmark);
-       int                rc;
-       ENTRY;
-
-       mdd_lfsck_bookmark_to_le(&lfsck->ml_bookmark_disk,
-                                &lfsck->ml_bookmark_ram);
-       handle = dt_trans_create(env, mdd->mdd_bottom);
-       if (IS_ERR(handle)) {
-               rc = PTR_ERR(handle);
-               CERROR("%.16s: fail to create trans for storing "
-                      "lfsck_bookmark: %d\n,", mdd_lfsck2name(lfsck), rc);
-               RETURN(rc);
-       }
-
-       rc = dt_declare_record_write(env, obj, len, 0, handle);
-       if (rc != 0) {
-               CERROR("%.16s: fail to declare trans for storing "
-                      "lfsck_bookmark: %d\n,", mdd_lfsck2name(lfsck), rc);
-               GOTO(out, rc);
-       }
-
-       rc = dt_trans_start_local(env, mdd->mdd_bottom, handle);
-       if (rc != 0) {
-               CERROR("%.16s: fail to start trans for storing "
-                      "lfsck_bookmark: %d\n,", mdd_lfsck2name(lfsck), rc);
-               GOTO(out, rc);
-       }
-
-       rc = dt_record_write(env, obj,
-                            mdd_buf_get(env, &lfsck->ml_bookmark_disk, len),
-                            &pos, handle);
-       if (rc != 0)
-               CERROR("%.16s: fail to store lfsck_bookmark, expected = %d, "
-                      "rc = %d\n", mdd_lfsck2name(lfsck), len, rc);
-
-       GOTO(out, rc);
-
-out:
-       dt_trans_stop(env, mdd->mdd_bottom, handle);
-       return rc;
-}
-
-static int mdd_lfsck_bookmark_init(const struct lu_env *env,
-                                  struct md_lfsck *lfsck)
-{
-       struct lfsck_bookmark *mb = &lfsck->ml_bookmark_ram;
-       int rc;
-
-       memset(mb, 0, sizeof(*mb));
-       mb->lb_magic = LFSCK_BOOKMARK_MAGIC;
-       mb->lb_version = LFSCK_VERSION_V2;
-       mutex_lock(&lfsck->ml_mutex);
-       rc = mdd_lfsck_bookmark_store(env, lfsck);
-       mutex_unlock(&lfsck->ml_mutex);
-       return rc;
-}
-
-/* lfsck_namespace file ops */
-
-static void inline mdd_lfsck_position_to_cpu(struct lfsck_position *des,
-                                            struct lfsck_position *src)
-{
-       des->lp_oit_cookie = le64_to_cpu(src->lp_oit_cookie);
-       fid_le_to_cpu(&des->lp_dir_parent, &src->lp_dir_parent);
-       des->lp_dir_cookie = le64_to_cpu(src->lp_dir_cookie);
-}
-
-static void inline mdd_lfsck_position_to_le(struct lfsck_position *des,
-                                            struct lfsck_position *src)
-{
-       des->lp_oit_cookie = cpu_to_le64(src->lp_oit_cookie);
-       fid_cpu_to_le(&des->lp_dir_parent, &src->lp_dir_parent);
-       des->lp_dir_cookie = cpu_to_le64(src->lp_dir_cookie);
-}
-
-static void inline mdd_lfsck_namespace_to_cpu(struct lfsck_namespace *des,
-                                             struct lfsck_namespace *src)
-{
-       des->ln_magic = le32_to_cpu(src->ln_magic);
-       des->ln_status = le32_to_cpu(src->ln_status);
-       des->ln_flags = le32_to_cpu(src->ln_flags);
-       des->ln_success_count = le32_to_cpu(src->ln_success_count);
-       des->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
-       des->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
-       des->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
-       des->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
-       des->ln_time_last_checkpoint =
-                               le64_to_cpu(src->ln_time_last_checkpoint);
-       mdd_lfsck_position_to_cpu(&des->ln_pos_latest_start,
-                                 &src->ln_pos_latest_start);
-       mdd_lfsck_position_to_cpu(&des->ln_pos_last_checkpoint,
-                                 &src->ln_pos_last_checkpoint);
-       mdd_lfsck_position_to_cpu(&des->ln_pos_first_inconsistent,
-                                 &src->ln_pos_first_inconsistent);
-       des->ln_items_checked = le64_to_cpu(src->ln_items_checked);
-       des->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
-       des->ln_items_failed = le64_to_cpu(src->ln_items_failed);
-       des->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
-       des->ln_mlinked_checked = le64_to_cpu(src->ln_mlinked_checked);
-       des->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
-       des->ln_objs_repaired_phase2 =
-                               le64_to_cpu(src->ln_objs_repaired_phase2);
-       des->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
-       des->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
-       des->ln_objs_lost_found = le64_to_cpu(src->ln_objs_lost_found);
-       fid_le_to_cpu(&des->ln_fid_latest_scanned_phase2,
-                     &src->ln_fid_latest_scanned_phase2);
-}
-
-static void inline mdd_lfsck_namespace_to_le(struct lfsck_namespace *des,
-                                            struct lfsck_namespace *src)
-{
-       des->ln_magic = cpu_to_le32(src->ln_magic);
-       des->ln_status = cpu_to_le32(src->ln_status);
-       des->ln_flags = cpu_to_le32(src->ln_flags);
-       des->ln_success_count = cpu_to_le32(src->ln_success_count);
-       des->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
-       des->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
-       des->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
-       des->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
-       des->ln_time_last_checkpoint =
-                               cpu_to_le64(src->ln_time_last_checkpoint);
-       mdd_lfsck_position_to_le(&des->ln_pos_latest_start,
-                                &src->ln_pos_latest_start);
-       mdd_lfsck_position_to_le(&des->ln_pos_last_checkpoint,
-                                &src->ln_pos_last_checkpoint);
-       mdd_lfsck_position_to_le(&des->ln_pos_first_inconsistent,
-                                &src->ln_pos_first_inconsistent);
-       des->ln_items_checked = cpu_to_le64(src->ln_items_checked);
-       des->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
-       des->ln_items_failed = cpu_to_le64(src->ln_items_failed);
-       des->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
-       des->ln_mlinked_checked = cpu_to_le64(src->ln_mlinked_checked);
-       des->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
-       des->ln_objs_repaired_phase2 =
-                               cpu_to_le64(src->ln_objs_repaired_phase2);
-       des->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
-       des->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
-       des->ln_objs_lost_found = cpu_to_le64(src->ln_objs_lost_found);
-       fid_cpu_to_le(&des->ln_fid_latest_scanned_phase2,
-                     &src->ln_fid_latest_scanned_phase2);
-}
-
-/**
- * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
- * \retval 0: succeed.
- * \retval -ve: failed cases.
- */
-static int mdd_lfsck_namespace_load(const struct lu_env *env,
-                                   struct lfsck_component *com)
-{
-       int len = com->lc_file_size;
-       int rc;
-
-       rc = dt_xattr_get(env, com->lc_obj,
-                         mdd_buf_get(env, com->lc_file_disk, len),
-                         XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
-       if (rc == len) {
-               struct lfsck_namespace *ns = com->lc_file_ram;
-
-               mdd_lfsck_namespace_to_cpu(ns,
-                               (struct lfsck_namespace *)com->lc_file_disk);
-               if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
-                       CWARN("%.16s: invalid lfsck_namespace magic "
-                             "0x%x != 0x%x\n",
-                             mdd_lfsck2name(com->lc_lfsck),
-                             ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
-                       rc = 1;
-               } else {
-                       rc = 0;
-               }
-       } else if (rc != -ENODATA) {
-               CERROR("%.16s: fail to load lfsck_namespace, expected = %d, "
-                      "rc = %d\n", mdd_lfsck2name(com->lc_lfsck), len, rc);
-               if (rc >= 0)
-                       rc = 1;
-       }
-       return rc;
-}
-
-static int mdd_lfsck_namespace_store(const struct lu_env *env,
-                                    struct lfsck_component *com, bool init)
-{
-       struct dt_object  *obj    = com->lc_obj;
-       struct md_lfsck   *lfsck  = com->lc_lfsck;
-       struct mdd_device *mdd    = mdd_lfsck2mdd(lfsck);
-       struct thandle    *handle;
-       int                len    = com->lc_file_size;
-       int                rc;
-       ENTRY;
-
-       mdd_lfsck_namespace_to_le((struct lfsck_namespace *)com->lc_file_disk,
-                                 (struct lfsck_namespace *)com->lc_file_ram);
-       handle = dt_trans_create(env, mdd->mdd_bottom);
-       if (IS_ERR(handle)) {
-               rc = PTR_ERR(handle);
-               CERROR("%.16s: fail to create trans for storing "
-                      "lfsck_namespace: %d\n,", mdd_lfsck2name(lfsck), rc);
-               RETURN(rc);
-       }
-
-       rc = dt_declare_xattr_set(env, obj,
-                                 mdd_buf_get(env, com->lc_file_disk, len),
-                                 XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
-       if (rc != 0) {
-               CERROR("%.16s: fail to declare trans for storing "
-                      "lfsck_namespace: %d\n,", mdd_lfsck2name(lfsck), rc);
-               GOTO(out, rc);
-       }
-
-       rc = dt_trans_start_local(env, mdd->mdd_bottom, handle);
-       if (rc != 0) {
-               CERROR("%.16s: fail to start trans for storing "
-                      "lfsck_namespace: %d\n,", mdd_lfsck2name(lfsck), rc);
-               GOTO(out, rc);
-       }
-
-       rc = dt_xattr_set(env, obj,
-                         mdd_buf_get(env, com->lc_file_disk, len),
-                         XATTR_NAME_LFSCK_NAMESPACE,
-                         init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
-                         handle, BYPASS_CAPA);
-       if (rc != 0)
-               CERROR("%.16s: fail to store lfsck_namespace, len = %d, "
-                      "rc = %d\n", mdd_lfsck2name(lfsck), len, rc);
-
-       GOTO(out, rc);
-
-out:
-       dt_trans_stop(env, mdd->mdd_bottom, handle);
-       return rc;
-}
-
-static int mdd_lfsck_namespace_init(const struct lu_env *env,
-                                   struct lfsck_component *com)
-{
-       struct lfsck_namespace *ns = (struct lfsck_namespace *)com->lc_file_ram;
-       int rc;
-
-       memset(ns, 0, sizeof(*ns));
-       ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
-       ns->ln_status = LS_INIT;
-       down_write(&com->lc_sem);
-       rc = mdd_lfsck_namespace_store(env, com, true);
-       up_write(&com->lc_sem);
-       return rc;
-}
-
-static int mdd_lfsck_namespace_lookup(const struct lu_env *env,
-                                     struct lfsck_component *com,
-                                     const struct lu_fid *fid,
-                                     __u8 *flags)
-{
-       struct lu_fid *key = &mdd_env_info(env)->mti_fid;
-       int            rc;
-
-       fid_cpu_to_be(key, fid);
-       rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)flags,
-                      (const struct dt_key *)key, BYPASS_CAPA);
-       return rc;
-}
-
-static int mdd_lfsck_namespace_delete(const struct lu_env *env,
-                                     struct lfsck_component *com,
-                                     const struct lu_fid *fid)
-{
-       struct mdd_device *mdd    = mdd_lfsck2mdd(com->lc_lfsck);
-       struct lu_fid     *key    = &mdd_env_info(env)->mti_fid;
-       struct thandle    *handle;
-       struct dt_object *obj     = com->lc_obj;
-       int               rc;
-       ENTRY;
-
-       handle = dt_trans_create(env, mdd->mdd_bottom);
-       if (IS_ERR(handle))
-               RETURN(PTR_ERR(handle));
-
-       rc = dt_declare_delete(env, obj, (const struct dt_key *)fid, handle);
-       if (rc != 0)
-               GOTO(out, rc);
-
-       rc = dt_trans_start_local(env, mdd->mdd_bottom, handle);
-       if (rc != 0)
-               GOTO(out, rc);
-
-       fid_cpu_to_be(key, fid);
-       rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
-                      BYPASS_CAPA);
-
-       GOTO(out, rc);
-
-out:
-       dt_trans_stop(env, mdd->mdd_bottom, handle);
-       return rc;
-}
-
-static int mdd_lfsck_namespace_update(const struct lu_env *env,
-                                     struct lfsck_component *com,
-                                     const struct lu_fid *fid,
-                                     __u8 flags, bool force)
-{
-       struct mdd_device *mdd    = mdd_lfsck2mdd(com->lc_lfsck);
-       struct lu_fid     *key    = &mdd_env_info(env)->mti_fid;
-       struct thandle    *handle;
-       struct dt_object *obj     = com->lc_obj;
-       int               rc;
-       bool              exist   = false;
-       __u8              tf;
-       ENTRY;
-
-       rc = mdd_lfsck_namespace_lookup(env, com, fid, &tf);
-       if (rc != 0 && rc != -ENOENT)
-               RETURN(rc);
-
-       if (rc == 0) {
-               if (!force || flags == tf)
-                       RETURN(0);
-
-               exist = true;
-               handle = dt_trans_create(env, mdd->mdd_bottom);
-               if (IS_ERR(handle))
-                       RETURN(PTR_ERR(handle));
-
-               rc = dt_declare_delete(env, obj, (const struct dt_key *)fid,
-                                      handle);
-               if (rc != 0)
-                       GOTO(out, rc);
-       } else {
-               handle = dt_trans_create(env, mdd->mdd_bottom);
-               if (IS_ERR(handle))
-                       RETURN(PTR_ERR(handle));
-       }
-
-       rc = dt_declare_insert(env, obj, (const struct dt_rec *)&flags,
-                              (const struct dt_key *)fid, handle);
-       if (rc != 0)
-               GOTO(out, rc);
-
-       rc = dt_trans_start_local(env, mdd->mdd_bottom, handle);
-       if (rc != 0)
-               GOTO(out, rc);
-
-       fid_cpu_to_be(key, fid);
-       if (exist) {
-               rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
-                              BYPASS_CAPA);
-               if (rc != 0) {
-                       CERROR("%s: fail to insert "DFID", rc = %d\n",
-                              mdd_lfsck2name(com->lc_lfsck), PFID(fid), rc);
-                       GOTO(out, rc);
-               }
-       }
-
-       rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
-                      (const struct dt_key *)key, handle, BYPASS_CAPA, 1);
-
-       GOTO(out, rc);
-
-out:
-       dt_trans_stop(env, mdd->mdd_bottom, handle);
-       return rc;
-}
-
-/**
- * \retval +ve repaired
- * \retval 0   no need to repair
- * \retval -ve error cases
- */
-static int mdd_lfsck_namespace_double_scan_one(const struct lu_env *env,
-                                              struct lfsck_component *com,
-                                              struct mdd_object *child,
-                                              __u8 flags)
-{
-       struct mdd_thread_info  *info     = mdd_env_info(env);
-       struct lu_attr          *la       = &info->mti_la;
-       struct lu_name          *cname    = &info->mti_name;
-       struct lu_fid           *pfid     = &info->mti_fid;
-       struct lu_fid           *cfid     = &info->mti_fid2;
-       struct md_lfsck         *lfsck    = com->lc_lfsck;
-       struct mdd_device       *mdd      = mdd_lfsck2mdd(lfsck);
-       struct lfsck_bookmark   *bk       = &lfsck->ml_bookmark_ram;
-       struct lfsck_namespace  *ns       =
-                               (struct lfsck_namespace *)com->lc_file_ram;
-       struct linkea_data       ldata    = { 0 };
-       struct thandle          *handle   = NULL;
-       bool                     locked   = false;
-       bool                     update   = false;
-       int                      count;
-       int                      rc;
-       ENTRY;
-
-       if (com->lc_journal) {
-
-again:
-               LASSERT(!locked);
-
-               com->lc_journal = 1;
-               handle = mdd_trans_create(env, mdd);
-               if (IS_ERR(handle))
-                       RETURN(rc = PTR_ERR(handle));
-
-               rc = mdd_declare_links_add(env, child, handle, NULL);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-               rc = mdd_trans_start(env, mdd, handle);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-               mdd_write_lock(env, child, MOR_TGT_CHILD);
-               locked = true;
-       }
-
-       if (unlikely(mdd_is_dead_obj(child)))
-               GOTO(stop, rc = 0);
-
-       rc = mdd_links_read(env, child, &ldata);
-       if (rc != 0) {
-               if ((bk->lb_param & LPF_DRYRUN) &&
-                   (rc == -EINVAL || rc == -ENODATA))
-                       rc = 1;
-
-               GOTO(stop, rc);
-       }
-
-       rc = mdd_la_get(env, child, la, BYPASS_CAPA);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       ldata.ld_lee = LINKEA_FIRST_ENTRY(ldata);
-       count = ldata.ld_leh->leh_reccount;
-       while (count-- > 0) {
-               struct mdd_object *parent = NULL;
-               struct dt_object *dir;
-
-               linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname,
-                                   pfid);
-               if (!fid_is_sane(pfid))
-                       goto shrink;
-
-               parent = mdd_object_find(env, mdd, pfid);
-               if (parent == NULL)
-                       goto shrink;
-               else if (IS_ERR(parent))
-                       GOTO(stop, rc = PTR_ERR(parent));
-
-               if (!mdd_object_exists(parent))
-                       goto shrink;
-
-               /* XXX: Currently, skip remote object, the consistency for
-                *      remote object will be processed in LFSCK phase III. */
-               if (mdd_object_remote(parent)) {
-                       mdd_object_put(env, parent);
-                       ldata.ld_lee = LINKEA_NEXT_ENTRY(ldata);
-                       continue;
-               }
-
-               dir = mdd_object_child(parent);
-               if (unlikely(!dt_try_as_dir(env, dir)))
-                       goto shrink;
-
-               /* To guarantee the 'name' is terminated with '0'. */
-               memcpy(info->mti_key, cname->ln_name, cname->ln_namelen);
-               info->mti_key[cname->ln_namelen] = 0;
-               cname->ln_name = info->mti_key;
-               rc = dt_lookup(env, dir, (struct dt_rec *)cfid,
-                              (const struct dt_key *)cname->ln_name,
-                              BYPASS_CAPA);
-               if (rc != 0 && rc != -ENOENT) {
-                       mdd_object_put(env, parent);
-                       GOTO(stop, rc);
-               }
-
-               if (rc == 0) {
-                       if (lu_fid_eq(cfid, mdo2fid(child))) {
-                               mdd_object_put(env, parent);
-                               ldata.ld_lee = LINKEA_NEXT_ENTRY(ldata);
-                               continue;
-                       }
-
-                       goto shrink;
-               }
-
-               if (ldata.ld_leh->leh_reccount > la->la_nlink)
-                       goto shrink;
-
-               /* XXX: For the case of there is linkea entry, but without name
-                *      entry pointing to the object, and the object link count
-                *      isn't less than the count of name entries, then add the
-                *      name entry back to namespace.
-                *
-                *      It is out of LFSCK 1.5 scope, will implement it in the
-                *      future. Keep the linkEA entry. */
-               mdd_object_put(env, parent);
-               ldata.ld_lee = LINKEA_NEXT_ENTRY(ldata);
-               continue;
-
-shrink:
-               if (parent != NULL)
-                       mdd_object_put(env, parent);
-               if (bk->lb_param & LPF_DRYRUN)
-                       RETURN(1);
-
-               CDEBUG(D_LFSCK, "Remove linkEA: "DFID"[%.*s], "DFID"\n",
-                      PFID(mdo2fid(child)), cname->ln_namelen, cname->ln_name,
-                      PFID(pfid));
-               linkea_del_buf(&ldata, cname);
-               update = true;
-       }
-
-       if (update) {
-               if (!com->lc_journal) {
-                       com->lc_journal = 1;
-                       goto again;
-               }
-
-               rc = mdd_links_write(env, child, &ldata, handle);
-       }
-
-       GOTO(stop, rc);
-
-stop:
-       if (locked)
-               mdd_write_unlock(env, child);
-
-       if (handle != NULL)
-               mdd_trans_stop(env, mdd, rc, handle);
-
-       if (rc == 0 && update) {
-               ns->ln_objs_nlink_repaired++;
-               rc = 1;
-       }
-       return rc;
-}
-
-/* namespace APIs */
-
-static int mdd_lfsck_namespace_reset(const struct lu_env *env,
-                                    struct lfsck_component *com, bool init)
-{
-       struct lfsck_namespace  *ns   = (struct lfsck_namespace *)com->lc_file_ram;
-       struct mdd_device       *mdd  = mdd_lfsck2mdd(com->lc_lfsck);
-       struct dt_object        *dto, *root;
-       int                      rc;
-       ENTRY;
-
-       down_write(&com->lc_sem);
-       if (init) {
-               memset(ns, 0, sizeof(*ns));
-       } else {
-               __u32 count = ns->ln_success_count;
-               __u64 last_time = ns->ln_time_last_complete;
-
-               memset(ns, 0, sizeof(*ns));
-               ns->ln_success_count = count;
-               ns->ln_time_last_complete = last_time;
-       }
-       ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
-       ns->ln_status = LS_INIT;
-
-       root = dt_locate(env, mdd->mdd_bottom, &mdd->mdd_local_root_fid);
-       if (unlikely(IS_ERR(root)))
-               GOTO(out, rc = PTR_ERR(root));
-
-       rc = local_object_unlink(env, mdd->mdd_bottom, root,
-                                lfsck_namespace_name);
-       if (rc != 0)
-               GOTO(out, rc);
-
-       dto = local_index_find_or_create(env, mdd->mdd_los, root,
-                                        lfsck_namespace_name,
-                                        S_IFREG | S_IRUGO | S_IWUSR,
-                                        &dt_lfsck_features);
-       if (IS_ERR(dto))
-               GOTO(out, rc = PTR_ERR(dto));
-
-       rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
-       if (rc != 0)
-               GOTO(out, rc);
-       com->lc_obj = dto;
-
-       rc = mdd_lfsck_namespace_store(env, com, true);
-
-       GOTO(out, rc);
-out:
-       lu_object_put(env, &root->do_lu);
-       up_write(&com->lc_sem);
-       return rc;
-}
-
-static void
-mdd_lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
-                        bool new_checked)
-{
-       struct lfsck_namespace *ns = (struct lfsck_namespace *)com->lc_file_ram;
-
-       down_write(&com->lc_sem);
-       if (new_checked)
-               com->lc_new_checked++;
-       ns->ln_items_failed++;
-       if (mdd_lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
-               mdd_lfsck_pos_fill(env, com->lc_lfsck,
-                                  &ns->ln_pos_first_inconsistent, false);
-       up_write(&com->lc_sem);
-}
-
-static int mdd_lfsck_namespace_checkpoint(const struct lu_env *env,
-                                         struct lfsck_component *com,
-                                         bool init)
-{
-       struct md_lfsck         *lfsck = com->lc_lfsck;
-       struct lfsck_namespace  *ns    =
-                               (struct lfsck_namespace *)com->lc_file_ram;
-       int                      rc;
-
-       if (com->lc_new_checked == 0 && !init)
-               return 0;
-
-       down_write(&com->lc_sem);
-
-       if (init) {
-               ns->ln_pos_latest_start = lfsck->ml_pos_current;
-       } else {
-               ns->ln_pos_last_checkpoint = lfsck->ml_pos_current;
-               ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
-                               HALF_SEC - lfsck->ml_time_last_checkpoint);
-               ns->ln_time_last_checkpoint = cfs_time_current_sec();
-               ns->ln_items_checked += com->lc_new_checked;
-               com->lc_new_checked = 0;
-       }
-
-       rc = mdd_lfsck_namespace_store(env, com, false);
-
-       up_write(&com->lc_sem);
-       return rc;
-}
-
-static int mdd_lfsck_namespace_prep(const struct lu_env *env,
-                                   struct lfsck_component *com)
-{
-       struct md_lfsck         *lfsck  = com->lc_lfsck;
-       struct lfsck_namespace  *ns     =
-                               (struct lfsck_namespace *)com->lc_file_ram;
-       struct lfsck_position   *pos    = &com->lc_pos_start;
-
-       if (ns->ln_status == LS_COMPLETED) {
-               int rc;
-
-               rc = mdd_lfsck_namespace_reset(env, com, false);
-               if (rc != 0)
-                       return rc;
-       }
-
-       down_write(&com->lc_sem);
-
-       ns->ln_time_latest_start = cfs_time_current_sec();
-
-       spin_lock(&lfsck->ml_lock);
-       if (ns->ln_flags & LF_SCANNED_ONCE) {
-               if (!lfsck->ml_drop_dryrun ||
-                   mdd_lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
-                       ns->ln_status = LS_SCANNING_PHASE2;
-                       cfs_list_del_init(&com->lc_link);
-                       cfs_list_add_tail(&com->lc_link,
-                                         &lfsck->ml_list_double_scan);
-                       if (!cfs_list_empty(&com->lc_link_dir))
-                               cfs_list_del_init(&com->lc_link_dir);
-                       mdd_lfsck_pos_set_zero(pos);
-               } else {
-                       ns->ln_status = LS_SCANNING_PHASE1;
-                       ns->ln_run_time_phase1 = 0;
-                       ns->ln_run_time_phase2 = 0;
-                       ns->ln_items_checked = 0;
-                       ns->ln_items_repaired = 0;
-                       ns->ln_items_failed = 0;
-                       ns->ln_dirs_checked = 0;
-                       ns->ln_mlinked_checked = 0;
-                       ns->ln_objs_checked_phase2 = 0;
-                       ns->ln_objs_repaired_phase2 = 0;
-                       ns->ln_objs_failed_phase2 = 0;
-                       ns->ln_objs_nlink_repaired = 0;
-                       ns->ln_objs_lost_found = 0;
-                       fid_zero(&ns->ln_fid_latest_scanned_phase2);
-                       if (cfs_list_empty(&com->lc_link_dir))
-                               cfs_list_add_tail(&com->lc_link_dir,
-                                                 &lfsck->ml_list_dir);
-                       *pos = ns->ln_pos_first_inconsistent;
-               }
-       } else {
-               ns->ln_status = LS_SCANNING_PHASE1;
-               if (cfs_list_empty(&com->lc_link_dir))
-                       cfs_list_add_tail(&com->lc_link_dir,
-                                         &lfsck->ml_list_dir);
-               if (!lfsck->ml_drop_dryrun ||
-                   mdd_lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
-                       *pos = ns->ln_pos_last_checkpoint;
-                       pos->lp_oit_cookie++;
-               } else {
-                       *pos = ns->ln_pos_first_inconsistent;
-               }
-       }
-       spin_unlock(&lfsck->ml_lock);
-
-       up_write(&com->lc_sem);
-       return 0;
-}
-
-static int mdd_lfsck_namespace_exec_oit(const struct lu_env *env,
-                                       struct lfsck_component *com,
-                                       struct mdd_object *obj)
-{
-       down_write(&com->lc_sem);
-       com->lc_new_checked++;
-       if (S_ISDIR(mdd_object_type(obj)))
-               ((struct lfsck_namespace *)com->lc_file_ram)->ln_dirs_checked++;
-       up_write(&com->lc_sem);
-       return 0;
-}
-
-static int mdd_declare_lfsck_namespace_exec_dir(const struct lu_env *env,
-                                               struct mdd_object *obj,
-                                               struct thandle *handle)
-{
-       int rc;
-
-       /* For destroying all invalid linkEA entries. */
-       rc = mdo_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
-       if (rc != 0)
-               return rc;
-
-       /* For insert new linkEA entry. */
-       rc = mdd_declare_links_add(env, obj, handle, NULL);
-       return rc;
-}
-
-static int mdd_lfsck_namespace_check_exist(const struct lu_env *env,
-                                          struct md_lfsck *lfsck,
-                                          struct mdd_object *obj,
-                                          const char *name)
-{
-       struct dt_object *dir = lfsck->ml_obj_dir;
-       struct lu_fid    *fid = &mdd_env_info(env)->mti_fid;
-       int               rc;
-       ENTRY;
-
-       if (unlikely(mdd_is_dead_obj(obj)))
-               RETURN(LFSCK_NAMEENTRY_DEAD);
-
-       rc = dt_lookup(env, dir, (struct dt_rec *)fid,
-                      (const struct dt_key *)name, BYPASS_CAPA);
-       if (rc == -ENOENT)
-               RETURN(LFSCK_NAMEENTRY_REMOVED);
-
-       if (rc < 0)
-               RETURN(rc);
-
-       if (!lu_fid_eq(fid, mdo2fid(obj)))
-               RETURN(LFSCK_NAMEENTRY_RECREATED);
-
-       RETURN(0);
-}
-
-static int mdd_lfsck_namespace_exec_dir(const struct lu_env *env,
-                                       struct lfsck_component *com,
-                                       struct mdd_object *obj,
-                                       struct lu_dirent *ent)
-{
-       struct mdd_thread_info     *info     = mdd_env_info(env);
-       struct lu_attr             *la       = &info->mti_la;
-       struct md_lfsck            *lfsck    = com->lc_lfsck;
-       struct lfsck_bookmark      *bk       = &lfsck->ml_bookmark_ram;
-       struct lfsck_namespace     *ns       =
-                               (struct lfsck_namespace *)com->lc_file_ram;
-       struct mdd_device          *mdd      = mdd_lfsck2mdd(lfsck);
-       struct linkea_data          ldata    = { 0 };
-       const struct lu_fid        *pfid     =
-                               lu_object_fid(&lfsck->ml_obj_dir->do_lu);
-       const struct lu_fid        *cfid     = mdo2fid(obj);
-       const struct lu_name       *cname;
-       struct thandle             *handle   = NULL;
-       bool                        repaired = false;
-       bool                        locked   = false;
-       int                         count    = 0;
-       int                         rc;
-       ENTRY;
-
-       cname = mdd_name_get_const(env, ent->lde_name, ent->lde_namelen);
-       down_write(&com->lc_sem);
-       com->lc_new_checked++;
-
-       if (ent->lde_attrs & LUDA_UPGRADE) {
-               ns->ln_flags |= LF_UPGRADE;
-               repaired = true;
-       } else if (ent->lde_attrs & LUDA_REPAIR) {
-               ns->ln_flags |= LF_INCONSISTENT;
-               repaired = true;
-       }
-
-       if (ent->lde_name[0] == '.' &&
-           (ent->lde_namelen == 1 ||
-            (ent->lde_namelen == 2 && ent->lde_name[1] == '.') ||
-            fid_is_dot_lustre(&ent->lde_fid)))
-               GOTO(out, rc = 0);
-
-       if (!(bk->lb_param & LPF_DRYRUN) &&
-           (com->lc_journal || repaired)) {
-
-again:
-               LASSERT(!locked);
-
-               com->lc_journal = 1;
-               handle = mdd_trans_create(env, mdd);
-               if (IS_ERR(handle))
-                       GOTO(out, rc = PTR_ERR(handle));
-
-               rc = mdd_declare_lfsck_namespace_exec_dir(env, obj, handle);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-               rc = mdd_trans_start(env, mdd, handle);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-               mdd_write_lock(env, obj, MOR_TGT_CHILD);
-               locked = true;
-       }
-
-       rc = mdd_lfsck_namespace_check_exist(env, lfsck, obj, ent->lde_name);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       rc = mdd_links_read(env, obj, &ldata);
-       if (rc == 0) {
-               count = ldata.ld_leh->leh_reccount;
-               rc = linkea_links_find(&ldata, cname, pfid);
-               if (rc == 0) {
-                       /* For dir, if there are more than one linkea entries,
-                        * then remove all the other redundant linkea entries.*/
-                       if (unlikely(count > 1 &&
-                                    S_ISDIR(mdd_object_type(obj))))
-                               goto unmatch;
-
-                       goto record;
-               } else {
-
-unmatch:
-                       ns->ln_flags |= LF_INCONSISTENT;
-                       if (bk->lb_param & LPF_DRYRUN) {
-                               repaired = true;
-                               goto record;
-                       }
-
-                       /*For dir, remove the unmatched linkea entry directly.*/
-                       if (S_ISDIR(mdd_object_type(obj))) {
-                               if (!com->lc_journal)
-                                       goto again;
-
-                               rc = mdo_xattr_del(env, obj, XATTR_NAME_LINK,
-                                                  handle, BYPASS_CAPA);
-                               if (rc != 0)
-                                       GOTO(stop, rc);
-
-                               goto nodata;
-                       } else {
-                               goto add;
-                       }
-               }
-       } else if (unlikely(rc == -EINVAL)) {
-               ns->ln_flags |= LF_INCONSISTENT;
-               if (bk->lb_param & LPF_DRYRUN) {
-                       count = 1;
-                       repaired = true;
-                       goto record;
-               }
-
-               if (!com->lc_journal)
-                       goto again;
-
-               /* The magic crashed, we are not sure whether there are more
-                * corrupt data in the linkea, so remove all linkea entries. */
-               rc = mdo_xattr_del(env, obj, XATTR_NAME_LINK, handle,
-                                  BYPASS_CAPA);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-               goto nodata;
-       } else if (rc == -ENODATA) {
-               ns->ln_flags |= LF_UPGRADE;
-               if (bk->lb_param & LPF_DRYRUN) {
-                       count = 1;
-                       repaired = true;
-                       goto record;
-               }
-
-nodata:
-               rc = linkea_data_new(&ldata, &mdd_env_info(env)->mti_link_buf);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-add:
-               if (!com->lc_journal)
-                       goto again;
-
-               rc = linkea_add_buf(&ldata, cname, pfid);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-               rc = mdd_links_write(env, obj, &ldata, handle);
-               if (rc != 0)
-                       GOTO(stop, rc);
-
-               count = ldata.ld_leh->leh_reccount;
-               repaired = true;
-       } else {
-               GOTO(stop, rc);
-       }
-
-record:
-       LASSERT(count > 0);
-
-       rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
-       if (rc != 0)
-               GOTO(stop, rc);
-
-       if ((count == 1) &&
-           (la->la_nlink == 1 || S_ISDIR(mdd_object_type(obj))))
-               /* Usually, it is for single linked object or dir, do nothing.*/
-               GOTO(stop, rc);
-
-       /* Following modification will be in another transaction.  */
-       if (handle != NULL) {
-               LASSERT(mdd_write_locked(env, obj));
-
-               mdd_write_unlock(env, obj);
-               locked = false;
-
-               mdd_trans_stop(env, mdd, 0, handle);
-               handle = NULL;
-       }
-
-       ns->ln_mlinked_checked++;
-       rc = mdd_lfsck_namespace_update(env, com, cfid,
-                       count != la->la_nlink ? LLF_UNMATCH_NLINKS : 0, false);
-
-       GOTO(out, rc);
-
-stop:
-       if (locked)
-               mdd_write_unlock(env, obj);
-
-       if (handle != NULL)
-               mdd_trans_stop(env, mdd, rc, handle);
-
-out:
-       if (rc < 0) {
-               ns->ln_items_failed++;
-               if (mdd_lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
-                       mdd_lfsck_pos_fill(env, lfsck,
-                                          &ns->ln_pos_first_inconsistent,
-                                          false);
-               if (!(bk->lb_param & LPF_FAILOUT))
-                       rc = 0;
-       } else {
-               if (repaired)
-                       ns->ln_items_repaired++;
-               else
-                       com->lc_journal = 0;
-               rc = 0;
-       }
-       up_write(&com->lc_sem);
-       return rc;
-}
-
-static int mdd_lfsck_namespace_post(const struct lu_env *env,
-                                   struct lfsck_component *com,
-                                   int result, bool init)
-{
-       struct md_lfsck         *lfsck = com->lc_lfsck;
-       struct lfsck_namespace  *ns    =
-                               (struct lfsck_namespace *)com->lc_file_ram;
-       int                      rc;
-
-       down_write(&com->lc_sem);
-
-       spin_lock(&lfsck->ml_lock);
-       if (!init)
-               ns->ln_pos_last_checkpoint = lfsck->ml_pos_current;
-       if (result > 0) {
-               ns->ln_status = LS_SCANNING_PHASE2;
-               ns->ln_flags |= LF_SCANNED_ONCE;
-               ns->ln_flags &= ~LF_UPGRADE;
-               cfs_list_del_init(&com->lc_link);
-               cfs_list_del_init(&com->lc_link_dir);
-               cfs_list_add_tail(&com->lc_link, &lfsck->ml_list_double_scan);
-       } else if (result == 0) {
-               if (lfsck->ml_paused) {
-                       ns->ln_status = LS_PAUSED;
-               } else {
-                       ns->ln_status = LS_STOPPED;
-                       cfs_list_del_init(&com->lc_link);
-                       cfs_list_del_init(&com->lc_link_dir);
-                       cfs_list_add_tail(&com->lc_link, &lfsck->ml_list_idle);
-               }
-       } else {
-               ns->ln_status = LS_FAILED;
-               cfs_list_del_init(&com->lc_link);
-               cfs_list_del_init(&com->lc_link_dir);
-               cfs_list_add_tail(&com->lc_link, &lfsck->ml_list_idle);
-       }
-       spin_unlock(&lfsck->ml_lock);
-
-       if (!init) {
-               ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
-                               HALF_SEC - lfsck->ml_time_last_checkpoint);
-               ns->ln_time_last_checkpoint = cfs_time_current_sec();
-               ns->ln_items_checked += com->lc_new_checked;
-               com->lc_new_checked = 0;
-       }
-
-       rc = mdd_lfsck_namespace_store(env, com, false);
-
-       up_write(&com->lc_sem);
-       return rc;
-}
-
-static int
-mdd_lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
-                        char *buf, int len)
-{
-       struct md_lfsck         *lfsck = com->lc_lfsck;
-       struct lfsck_bookmark   *bk    = &lfsck->ml_bookmark_ram;
-       struct lfsck_namespace  *ns    =
-                               (struct lfsck_namespace *)com->lc_file_ram;
-       int                      save  = len;
-       int                      ret   = -ENOSPC;
-       int                      rc;
-
-       down_read(&com->lc_sem);
-       rc = snprintf(buf, len,
-                     "name: lfsck_namespace\n"
-                     "magic: 0x%x\n"
-                     "version: %d\n"
-                     "status: %s\n",
-                     ns->ln_magic,
-                     bk->lb_version,
-                     lfsck_status_names[ns->ln_status]);
-       if (rc <= 0)
-               goto out;
-
-       buf += rc;
-       len -= rc;
-       rc = lfsck_bits_dump(&buf, &len, ns->ln_flags, lfsck_flags_names,
-                            "flags");
-       if (rc < 0)
-               goto out;
-
-       rc = lfsck_bits_dump(&buf, &len, bk->lb_param, lfsck_param_names,
-                            "param");
-       if (rc < 0)
-               goto out;
-
-       rc = lfsck_time_dump(&buf, &len, ns->ln_time_last_complete,
-                            "time_since_last_completed");
-       if (rc < 0)
-               goto out;
-
-       rc = lfsck_time_dump(&buf, &len, ns->ln_time_latest_start,
-                            "time_since_latest_start");
-       if (rc < 0)
-               goto out;
-
-       rc = lfsck_time_dump(&buf, &len, ns->ln_time_last_checkpoint,
-                            "time_since_last_checkpoint");
-       if (rc < 0)
-               goto out;
-
-       rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_latest_start,
-                           "latest_start_position");
-       if (rc < 0)
-               goto out;
-
-       rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_last_checkpoint,
-                           "last_checkpoint_position");
-       if (rc < 0)
-               goto out;
-
-       rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_first_inconsistent,
-                           "first_failure_position");
-       if (rc < 0)
-               goto out;
-
-       if (ns->ln_status == LS_SCANNING_PHASE1) {
-               struct lfsck_position pos;
-               cfs_duration_t duration = cfs_time_current() -
-                                         lfsck->ml_time_last_checkpoint;
-               __u64 checked = ns->ln_items_checked + com->lc_new_checked;
-               __u64 speed = checked;
-               __u64 new_checked = com->lc_new_checked * CFS_HZ;
-               __u32 rtime = ns->ln_run_time_phase1 +
-                             cfs_duration_sec(duration + HALF_SEC);
-
-               if (duration != 0)
-                       do_div(new_checked, duration);
-               if (rtime != 0)
-                       do_div(speed, rtime);
-               rc = snprintf(buf, len,
-                             "checked_phase1: "LPU64"\n"
-                             "checked_phase2: "LPU64"\n"
-                             "updated_phase1: "LPU64"\n"
-                             "updated_phase2: "LPU64"\n"
-                             "failed_phase1: "LPU64"\n"
-                             "failed_phase2: "LPU64"\n"
-                             "dirs: "LPU64"\n"
-                             "M-linked: "LPU64"\n"
-                             "nlinks_repaired: "LPU64"\n"
-                             "lost_found: "LPU64"\n"
-                             "success_count: %u\n"
-                             "run_time_phase1: %u seconds\n"
-                             "run_time_phase2: %u seconds\n"
-                             "average_speed_phase1: "LPU64" items/sec\n"
-                             "average_speed_phase2: N/A\n"
-                             "real-time_speed_phase1: "LPU64" items/sec\n"
-                             "real-time_speed_phase2: N/A\n",
-                             checked,
-                             ns->ln_objs_checked_phase2,
-                             ns->ln_items_repaired,
-                             ns->ln_objs_repaired_phase2,
-                             ns->ln_items_failed,
-                             ns->ln_objs_failed_phase2,
-                             ns->ln_dirs_checked,
-                             ns->ln_mlinked_checked,
-                             ns->ln_objs_nlink_repaired,
-                             ns->ln_objs_lost_found,
-                             ns->ln_success_count,
-                             rtime,
-                             ns->ln_run_time_phase2,
-                             speed,
-                             new_checked);
-               if (rc <= 0)
-                       goto out;
-
-               buf += rc;
-               len -= rc;
-               mdd_lfsck_pos_fill(env, lfsck, &pos, false);
-               rc = lfsck_pos_dump(&buf, &len, &pos, "current_position");
-               if (rc <= 0)
-                       goto out;
-       } else if (ns->ln_status == LS_SCANNING_PHASE2) {
-               cfs_duration_t duration = cfs_time_current() -
-                                         lfsck->ml_time_last_checkpoint;
-               __u64 checked = ns->ln_objs_checked_phase2 +
-                               com->lc_new_checked;
-               __u64 speed1 = ns->ln_items_checked;
-               __u64 speed2 = checked;
-               __u64 new_checked = com->lc_new_checked * CFS_HZ;
-               __u32 rtime = ns->ln_run_time_phase2 +
-                             cfs_duration_sec(duration + HALF_SEC);
-
-               if (duration != 0)
-                       do_div(new_checked, duration);
-               if (ns->ln_run_time_phase1 != 0)
-                       do_div(speed1, ns->ln_run_time_phase1);
-               if (rtime != 0)
-                       do_div(speed2, rtime);
-               rc = snprintf(buf, len,
-                             "checked_phase1: "LPU64"\n"
-                             "checked_phase2: "LPU64"\n"
-                             "updated_phase1: "LPU64"\n"
-                             "updated_phase2: "LPU64"\n"
-                             "failed_phase1: "LPU64"\n"
-                             "failed_phase2: "LPU64"\n"
-                             "dirs: "LPU64"\n"
-                             "M-linked: "LPU64"\n"
-                             "nlinks_repaired: "LPU64"\n"
-                             "lost_found: "LPU64"\n"
-                             "success_count: %u\n"
-                             "run_time_phase1: %u seconds\n"
-                             "run_time_phase2: %u seconds\n"
-                             "average_speed_phase1: "LPU64" items/sec\n"
-                             "average_speed_phase2: "LPU64" objs/sec\n"
-                             "real-time_speed_phase1: N/A\n"
-                             "real-time_speed_phase2: "LPU64" objs/sec\n"
-                             "current_position: "DFID"\n",
-                             ns->ln_items_checked,
-                             checked,
-                             ns->ln_items_repaired,
-                             ns->ln_objs_repaired_phase2,
-                             ns->ln_items_failed,
-                             ns->ln_objs_failed_phase2,
-                             ns->ln_dirs_checked,
-                             ns->ln_mlinked_checked,
-                             ns->ln_objs_nlink_repaired,
-                             ns->ln_objs_lost_found,
-                             ns->ln_success_count,
-                             ns->ln_run_time_phase1,
-                             rtime,
-                             speed1,
-                             speed2,
-                             new_checked,
-                             PFID(&ns->ln_fid_latest_scanned_phase2));
-               if (rc <= 0)
-                       goto out;
-
-               buf += rc;
-               len -= rc;
-       } else {
-               __u64 speed1 = ns->ln_items_checked;
-               __u64 speed2 = ns->ln_objs_checked_phase2;
-
-               if (ns->ln_run_time_phase1 != 0)
-                       do_div(speed1, ns->ln_run_time_phase1);
-               if (ns->ln_run_time_phase2 != 0)
-                       do_div(speed2, ns->ln_run_time_phase2);
-               rc = snprintf(buf, len,
-                             "checked_phase1: "LPU64"\n"
-                             "checked_phase2: "LPU64"\n"
-                             "updated_phase1: "LPU64"\n"
-                             "updated_phase2: "LPU64"\n"
-                             "failed_phase1: "LPU64"\n"
-                             "failed_phase2: "LPU64"\n"
-                             "dirs: "LPU64"\n"
-                             "M-linked: "LPU64"\n"
-                             "nlinks_repaired: "LPU64"\n"
-                             "lost_found: "LPU64"\n"
-                             "success_count: %u\n"
-                             "run_time_phase1: %u seconds\n"
-                             "run_time_phase2: %u seconds\n"
-                             "average_speed_phase1: "LPU64" items/sec\n"
-                             "average_speed_phase2: "LPU64" objs/sec\n"
-                             "real-time_speed_phase1: N/A\n"
-                             "real-time_speed_phase2: N/A\n"
-                             "current_position: N/A\n",
-                             ns->ln_items_checked,
-                             ns->ln_objs_checked_phase2,
-                             ns->ln_items_repaired,
-                             ns->ln_objs_repaired_phase2,
-                             ns->ln_items_failed,
-                             ns->ln_objs_failed_phase2,
-                             ns->ln_dirs_checked,
-                             ns->ln_mlinked_checked,
-                             ns->ln_objs_nlink_repaired,
-                             ns->ln_objs_lost_found,
-                             ns->ln_success_count,
-                             ns->ln_run_time_phase1,
-                             ns->ln_run_time_phase2,
-                             speed1,
-                             speed2);
-               if (rc <= 0)
-                       goto out;
-
-               buf += rc;
-               len -= rc;
-       }
-       ret = save - len;
-
-out:
-       up_read(&com->lc_sem);
-       return ret;
-}
-
-static int mdd_lfsck_namespace_double_scan(const struct lu_env *env,
-                                          struct lfsck_component *com)
-{
-       struct md_lfsck         *lfsck  = com->lc_lfsck;
-       struct ptlrpc_thread    *thread = &lfsck->ml_thread;
-       struct mdd_device       *mdd    = mdd_lfsck2mdd(lfsck);
-       struct lfsck_bookmark   *bk     = &lfsck->ml_bookmark_ram;
-       struct lfsck_namespace  *ns     =
-                               (struct lfsck_namespace *)com->lc_file_ram;
-       struct dt_object        *obj    = com->lc_obj;
-       const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
-       struct mdd_object       *target;
-       struct dt_it            *di;
-       struct dt_key           *key;
-       struct lu_fid            fid;
-       int                      rc;
-       __u8                     flags;
-       ENTRY;
-
-       lfsck->ml_new_scanned = 0;
-       lfsck->ml_time_last_checkpoint = cfs_time_current();
-       lfsck->ml_time_next_checkpoint = lfsck->ml_time_last_checkpoint +
-                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
-
-       di = iops->init(env, obj, 0, BYPASS_CAPA);
-       if (IS_ERR(di))
-               RETURN(PTR_ERR(di));
-
-       fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
-       rc = iops->get(env, di, (const struct dt_key *)&fid);
-       if (rc < 0)
-               GOTO(fini, rc);
-
-       /* Skip the start one, which either has been processed or non-exist. */
-       rc = iops->next(env, di);
-       if (rc != 0)
-               GOTO(put, rc);
-
-       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_DOUBLESCAN))
-               GOTO(put, rc = 0);
-
-       do {
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
-                   cfs_fail_val > 0) {
-                       struct l_wait_info lwi;
-
-                       lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
-                                         NULL, NULL);
-                       l_wait_event(thread->t_ctl_waitq,
-                                    !thread_is_running(thread),
-                                    &lwi);
-               }
-
-               key = iops->key(env, di);
-               fid_be_to_cpu(&fid, (const struct lu_fid *)key);
-               target = mdd_object_find(env, mdd, &fid);
-               down_write(&com->lc_sem);
-               if (target == NULL) {
-                       rc = 0;
-                       goto checkpoint;
-               } else if (IS_ERR(target)) {
-                       rc = PTR_ERR(target);
-                       goto checkpoint;
-               }
-
-               /* XXX: Currently, skip remote object, the consistency for
-                *      remote object will be processed in LFSCK phase III. */
-               if (!mdd_object_exists(target) || mdd_object_remote(target))
-                       goto obj_put;
-
-               rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
-               if (rc == 0)
-                       rc = mdd_lfsck_namespace_double_scan_one(env, com,
-                                                                target, flags);
-
-obj_put:
-               mdd_object_put(env, target);
-
-checkpoint:
-               lfsck->ml_new_scanned++;
-               com->lc_new_checked++;
-               ns->ln_fid_latest_scanned_phase2 = fid;
-               if (rc > 0)
-                       ns->ln_objs_repaired_phase2++;
-               else if (rc < 0)
-                       ns->ln_objs_failed_phase2++;
-               up_write(&com->lc_sem);
-
-               if ((rc == 0) || ((rc > 0) && !(bk->lb_param & LPF_DRYRUN))) {
-                       mdd_lfsck_namespace_delete(env, com, &fid);
-               } else if (rc < 0) {
-                       flags |= LLF_REPAIR_FAILED;
-                       mdd_lfsck_namespace_update(env, com, &fid, flags, true);
-               }
-
-               if (rc < 0 && bk->lb_param & LPF_FAILOUT)
-                       GOTO(put, rc);
-
-               if (likely(cfs_time_beforeq(cfs_time_current(),
-                                           lfsck->ml_time_next_checkpoint)) ||
-                   com->lc_new_checked == 0)
-                       goto speed;
-
-               down_write(&com->lc_sem);
-               ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
-                               HALF_SEC - lfsck->ml_time_last_checkpoint);
-               ns->ln_time_last_checkpoint = cfs_time_current_sec();
-               ns->ln_objs_checked_phase2 += com->lc_new_checked;
-               com->lc_new_checked = 0;
-               rc = mdd_lfsck_namespace_store(env, com, false);
-               up_write(&com->lc_sem);
-               if (rc != 0)
-                       GOTO(put, rc);
-
-               lfsck->ml_time_last_checkpoint = cfs_time_current();
-               lfsck->ml_time_next_checkpoint = lfsck->ml_time_last_checkpoint +
-                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
-
-speed:
-               mdd_lfsck_control_speed(lfsck);
-               if (unlikely(!thread_is_running(thread)))
-                       GOTO(put, rc = 0);
-
-               rc = iops->next(env, di);
-       } while (rc == 0);
-
-       GOTO(put, rc);
-
-put:
-       iops->put(env, di);
-
-fini:
-       iops->fini(env, di);
-       down_write(&com->lc_sem);
-
-       ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
-                               HALF_SEC - lfsck->ml_time_last_checkpoint);
-       ns->ln_time_last_checkpoint = cfs_time_current_sec();
-       ns->ln_objs_checked_phase2 += com->lc_new_checked;
-       com->lc_new_checked = 0;
-
-       if (rc > 0) {
-               com->lc_journal = 0;
-               ns->ln_status = LS_COMPLETED;
-               if (!(bk->lb_param & LPF_DRYRUN))
-                       ns->ln_flags &=
-                       ~(LF_SCANNED_ONCE | LF_INCONSISTENT | LF_UPGRADE);
-               ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
-               ns->ln_success_count++;
-       } else if (rc == 0) {
-               if (lfsck->ml_paused)
-                       ns->ln_status = LS_PAUSED;
-               else
-                       ns->ln_status = LS_STOPPED;
-       } else {
-               ns->ln_status = LS_FAILED;
-       }
-
-       if (ns->ln_status != LS_PAUSED) {
-               spin_lock(&lfsck->ml_lock);
-               cfs_list_del_init(&com->lc_link);
-               cfs_list_add_tail(&com->lc_link, &lfsck->ml_list_idle);
-               spin_unlock(&lfsck->ml_lock);
-       }
-
-       rc = mdd_lfsck_namespace_store(env, com, false);
-
-       up_write(&com->lc_sem);
-       return rc;
-}
-
-static struct lfsck_operations mdd_lfsck_namespace_ops = {
-       .lfsck_reset            = mdd_lfsck_namespace_reset,
-       .lfsck_fail             = mdd_lfsck_namespace_fail,
-       .lfsck_checkpoint       = mdd_lfsck_namespace_checkpoint,
-       .lfsck_prep             = mdd_lfsck_namespace_prep,
-       .lfsck_exec_oit         = mdd_lfsck_namespace_exec_oit,
-       .lfsck_exec_dir         = mdd_lfsck_namespace_exec_dir,
-       .lfsck_post             = mdd_lfsck_namespace_post,
-       .lfsck_dump             = mdd_lfsck_namespace_dump,
-       .lfsck_double_scan      = mdd_lfsck_namespace_double_scan,
-};
-
-/* LFSCK component setup/cleanup functions */
-
-static int mdd_lfsck_namespace_setup(const struct lu_env *env,
-                                    struct md_lfsck *lfsck)
-{
-       struct mdd_device       *mdd = mdd_lfsck2mdd(lfsck);
-       struct lfsck_component  *com;
-       struct lfsck_namespace  *ns;
-       struct dt_object        *obj, *root;
-       int                      rc;
-       ENTRY;
-
-       OBD_ALLOC_PTR(com);
-       if (com == NULL)
-               RETURN(-ENOMEM);
-
-       CFS_INIT_LIST_HEAD(&com->lc_link);
-       CFS_INIT_LIST_HEAD(&com->lc_link_dir);
-       init_rwsem(&com->lc_sem);
-       atomic_set(&com->lc_ref, 1);
-       com->lc_lfsck = lfsck;
-       com->lc_type = LT_NAMESPACE;
-       com->lc_ops = &mdd_lfsck_namespace_ops;
-       com->lc_file_size = sizeof(struct lfsck_namespace);
-       OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
-       if (com->lc_file_ram == NULL)
-               GOTO(out, rc = -ENOMEM);
-
-       OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
-       if (com->lc_file_disk == NULL)
-               GOTO(out, rc = -ENOMEM);
-
-       root = dt_locate(env, mdd->mdd_bottom, &mdd->mdd_local_root_fid);
-       if (unlikely(IS_ERR(root)))
-               GOTO(out, rc = PTR_ERR(root));
-
-       obj = local_index_find_or_create(env, mdd->mdd_los, root,
-                                        lfsck_namespace_name,
-                                        S_IFREG | S_IRUGO | S_IWUSR,
-                                        &dt_lfsck_features);
-       lu_object_put(env, &root->do_lu);
-       if (IS_ERR(obj))
-               GOTO(out, rc = PTR_ERR(obj));
-
-       com->lc_obj = obj;
-       rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
-       if (rc != 0)
-               GOTO(out, rc);
-
-       rc = mdd_lfsck_namespace_load(env, com);
-       if (rc > 0)
-               rc = mdd_lfsck_namespace_reset(env, com, true);
-       else if (rc == -ENODATA)
-               rc = mdd_lfsck_namespace_init(env, com);
-       if (rc != 0)
-               GOTO(out, rc);
-
-       ns = (struct lfsck_namespace *)com->lc_file_ram;
-       switch (ns->ln_status) {
-       case LS_INIT:
-       case LS_COMPLETED:
-       case LS_FAILED:
-       case LS_STOPPED:
-               cfs_list_add_tail(&com->lc_link, &lfsck->ml_list_idle);
-               break;
-       default:
-               CERROR("%s: unknown status: %u\n",
-                      mdd_lfsck2name(lfsck), ns->ln_status);
-               /* fall through */
-       case LS_SCANNING_PHASE1:
-       case LS_SCANNING_PHASE2:
-               /* No need to store the status to disk right now.
-                * If the system crashed before the status stored,
-                * it will be loaded back when next time. */
-               ns->ln_status = LS_CRASHED;
-               /* fall through */
-       case LS_PAUSED:
-       case LS_CRASHED:
-               cfs_list_add_tail(&com->lc_link, &lfsck->ml_list_scan);
-               cfs_list_add_tail(&com->lc_link_dir, &lfsck->ml_list_dir);
-               break;
-       }
-
-       GOTO(out, rc = 0);
-
-out:
-       if (rc != 0)
-               mdd_lfsck_component_cleanup(env, com);
-       return rc;
-}
-
-/* helper functions for framework */
-
-static int object_needs_lfsck(const struct lu_env *env, struct mdd_device *mdd,
-                             struct mdd_object *obj)
-{
-       struct lu_fid *fid   = &mdd_env_info(env)->mti_fid;
-       int            depth = 0;
-       int            rc;
-
-       LASSERT(S_ISDIR(mdd_object_type(obj)));
-
-       while (1) {
-               if (mdd_is_root(mdd, mdo2fid(obj))) {
-                       if (depth > 0)
-                               mdd_object_put(env, obj);
-                       return 1;
-               }
-
-               /* .lustre doesn't contain "real" user objects, no need lfsck */
-               if (fid_is_dot_lustre(mdo2fid(obj))) {
-                       if (depth > 0)
-                               mdd_object_put(env, obj);
-                       return 0;
-               }
-
-               mdd_read_lock(env, obj, MOR_TGT_CHILD);
-               if (unlikely(mdd_is_dead_obj(obj))) {
-                       mdd_read_unlock(env, obj);
-                       if (depth > 0)
-                               mdd_object_put(env, obj);
-                       return 0;
-               }
-
-               rc = dt_xattr_get(env, mdd_object_child(obj),
-                                 mdd_buf_get(env, NULL, 0), XATTR_NAME_LINK,
-                                 BYPASS_CAPA);
-               mdd_read_unlock(env, obj);
-               if (rc >= 0) {
-                       if (depth > 0)
-                               mdd_object_put(env, obj);
-                       return 1;
-               }
-
-               if (rc < 0 && rc != -ENODATA) {
-                       if (depth > 0)
-                               mdd_object_put(env, obj);
-                       return rc;
-               }
-
-               rc = mdd_parent_fid(env, obj, fid);
-               if (depth > 0)
-                       mdd_object_put(env, obj);
-               if (rc != 0)
-                       return rc;
-
-               if (unlikely(lu_fid_eq(fid, &mdd->mdd_local_root_fid)))
-                       return 0;
-
-               obj = mdd_object_find(env, mdd, fid);
-               if (obj == NULL)
-                       return 0;
-               else if (IS_ERR(obj))
-                       return PTR_ERR(obj);
-
-               if (!mdd_object_exists(obj)) {
-                       mdd_object_put(env, obj);
-                       return 0;
-               }
-
-               /* Currently, only client visible directory can be remote. */
-               if (mdd_object_remote(obj)) {
-                       mdd_object_put(env, obj);
-                       return 1;
-               }
-
-               depth++;
-       }
-       return 0;
-}
-
-static void mdd_lfsck_unpack_ent(struct lu_dirent *ent)
-{
-       fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
-       ent->lde_hash = le64_to_cpu(ent->lde_hash);
-       ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
-       ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
-       ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
-
-       /* Make sure the name is terminated with '0'.
-        * The data (type) after ent::lde_name maybe
-        * broken, but we do not care. */
-       ent->lde_name[ent->lde_namelen] = 0;
-}
-
-/* LFSCK wrap functions */
-
-static void mdd_lfsck_fail(const struct lu_env *env, struct md_lfsck *lfsck,
-                          bool new_checked)
-{
-       struct lfsck_component *com;
-
-       cfs_list_for_each_entry(com, &lfsck->ml_list_scan, lc_link) {
-               com->lc_ops->lfsck_fail(env, com, new_checked);
-       }
-}
-
-static int mdd_lfsck_checkpoint(const struct lu_env *env,
-                               struct md_lfsck *lfsck)
-{
-       struct lfsck_component *com;
-       int                     rc;
-
-       if (likely(cfs_time_beforeq(cfs_time_current(),
-                                   lfsck->ml_time_next_checkpoint)))
-               return 0;
-
-       mdd_lfsck_pos_fill(env, lfsck, &lfsck->ml_pos_current, false);
-       cfs_list_for_each_entry(com, &lfsck->ml_list_scan, lc_link) {
-               rc = com->lc_ops->lfsck_checkpoint(env, com, false);
-               if (rc != 0)
-                       return rc;;
-       }
-
-       lfsck->ml_time_last_checkpoint = cfs_time_current();
-       lfsck->ml_time_next_checkpoint = lfsck->ml_time_last_checkpoint +
-                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
-       return 0;
-}
-
-static int mdd_lfsck_prep(struct lu_env *env, struct md_lfsck *lfsck)
-{
-       struct mdd_device      *mdd     = mdd_lfsck2mdd(lfsck);
-       struct mdd_object      *obj     = NULL;
-       struct dt_object       *dt_obj;
-       struct lfsck_component *com;
-       struct lfsck_component *next;
-       struct lfsck_position  *pos     = NULL;
-       const struct dt_it_ops *iops    =
-                               &lfsck->ml_obj_oit->do_index_ops->dio_it;
-       struct dt_it           *di;
-       int                     rc;
-       ENTRY;
-
-       LASSERT(lfsck->ml_obj_dir == NULL);
-       LASSERT(lfsck->ml_di_dir == NULL);
-
-       lfsck->ml_current_oit_processed = 0;
-       cfs_list_for_each_entry_safe(com, next, &lfsck->ml_list_scan, lc_link) {
-               com->lc_new_checked = 0;
-               if (lfsck->ml_bookmark_ram.lb_param & LPF_DRYRUN)
-                       com->lc_journal = 0;
-
-               rc = com->lc_ops->lfsck_prep(env, com);
-               if (rc != 0)
-                       RETURN(rc);
-
-               if ((pos == NULL) ||
-                   (!mdd_lfsck_pos_is_zero(&com->lc_pos_start) &&
-                    mdd_lfsck_pos_is_eq(pos, &com->lc_pos_start) > 0))
-                       pos = &com->lc_pos_start;
-       }
-
-       /* Init otable-based iterator. */
-       if (pos == NULL) {
-               rc = iops->load(env, lfsck->ml_di_oit, 0);
-               if (rc > 0) {
-                       lfsck->ml_oit_over = 1;
-                       rc = 0;
-               }
-
-               GOTO(out, rc);
-       }
-
-       rc = iops->load(env, lfsck->ml_di_oit, pos->lp_oit_cookie);
-       if (rc < 0)
-               GOTO(out, rc);
-       else if (rc > 0)
-               lfsck->ml_oit_over = 1;
-
-       if (fid_is_zero(&pos->lp_dir_parent))
-               GOTO(out, rc = 0);
-
-       /* Find the directory for namespace-based traverse. */
-       obj = mdd_object_find(env, mdd, &pos->lp_dir_parent);
-       if (obj == NULL)
-               GOTO(out, rc = 0);
-       else if (IS_ERR(obj))
-               RETURN(PTR_ERR(obj));
-
-       /* XXX: Currently, skip remote object, the consistency for
-        *      remote object will be processed in LFSCK phase III. */
-       if (!mdd_object_exists(obj) || mdd_object_remote(obj) ||
-           unlikely(!S_ISDIR(mdd_object_type(obj))))
-               GOTO(out, rc = 0);
-
-       if (unlikely(mdd_is_dead_obj(obj)))
-               GOTO(out, rc = 0);
-
-       dt_obj = mdd_object_child(obj);
-       if (unlikely(!dt_try_as_dir(env, dt_obj)))
-               GOTO(out, rc = -ENOTDIR);
-
-       /* Init the namespace-based directory traverse. */
-       iops = &dt_obj->do_index_ops->dio_it;
-       di = iops->init(env, dt_obj, lfsck->ml_args_dir, BYPASS_CAPA);
-       if (IS_ERR(di))
-               GOTO(out, rc = PTR_ERR(di));
-
-       LASSERT(pos->lp_dir_cookie < MDS_DIR_END_OFF);
-
-       rc = iops->load(env, di, pos->lp_dir_cookie);
-       if ((rc == 0) || (rc > 0 && pos->lp_dir_cookie > 0))
-               rc = iops->next(env, di);
-       else if (rc > 0)
-               rc = 0;
-
-       if (rc != 0) {
-               iops->put(env, di);
-               iops->fini(env, di);
-               GOTO(out, rc);
-       }
-
-       lfsck->ml_obj_dir = dt_obj;
-       spin_lock(&lfsck->ml_lock);
-       lfsck->ml_di_dir = di;
-       spin_unlock(&lfsck->ml_lock);
-       obj = NULL;
-
-       GOTO(out, rc = 0);
-
-out:
-       if (obj != NULL)
-               mdd_object_put(env, obj);
-
-       if (rc < 0) {
-               cfs_list_for_each_entry_safe(com, next, &lfsck->ml_list_scan,
-                                            lc_link)
-                       com->lc_ops->lfsck_post(env, com, rc, true);
-
-               return rc;
-       }
-
-       rc = 0;
-       mdd_lfsck_pos_fill(env, lfsck, &lfsck->ml_pos_current, true);
-       cfs_list_for_each_entry(com, &lfsck->ml_list_scan, lc_link) {
-               rc = com->lc_ops->lfsck_checkpoint(env, com, true);
-               if (rc != 0)
-                       break;
-       }
-
-       lfsck->ml_time_last_checkpoint = cfs_time_current();
-       lfsck->ml_time_next_checkpoint = lfsck->ml_time_last_checkpoint +
-                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
-       return rc;
-}
-
-static int mdd_lfsck_exec_oit(const struct lu_env *env, struct md_lfsck *lfsck,
-                             struct mdd_object *obj)
-{
-       struct lfsck_component *com;
-       struct dt_object       *dt_obj;
-       const struct dt_it_ops *iops;
-       struct dt_it           *di;
-       int                     rc;
-       ENTRY;
-
-       LASSERT(lfsck->ml_obj_dir == NULL);
-
-       cfs_list_for_each_entry(com, &lfsck->ml_list_scan, lc_link) {
-               rc = com->lc_ops->lfsck_exec_oit(env, com, obj);
-               if (rc != 0)
-                       RETURN(rc);
-       }
-
-       if (!S_ISDIR(mdd_object_type(obj)) ||
-           cfs_list_empty(&lfsck->ml_list_dir))
-              RETURN(0);
-
-       rc = object_needs_lfsck(env, mdd_lfsck2mdd(lfsck), obj);
-       if (rc <= 0)
-               GOTO(out, rc);
-
-       if (unlikely(mdd_is_dead_obj(obj)))
-               GOTO(out, rc = 0);
-
-       dt_obj = mdd_object_child(obj);
-       if (unlikely(!dt_try_as_dir(env, dt_obj)))
-               GOTO(out, rc = -ENOTDIR);
-
-       iops = &dt_obj->do_index_ops->dio_it;
-       di = iops->init(env, dt_obj, lfsck->ml_args_dir, BYPASS_CAPA);
-       if (IS_ERR(di))
-               GOTO(out, rc = PTR_ERR(di));
-
-       rc = iops->load(env, di, 0);
-       if (rc == 0)
-               rc = iops->next(env, di);
-       else if (rc > 0)
-               rc = 0;
-
-       if (rc != 0) {
-               iops->put(env, di);
-               iops->fini(env, di);
-               GOTO(out, rc);
-       }
-
-       mdd_object_get(obj);
-       lfsck->ml_obj_dir = dt_obj;
-       spin_lock(&lfsck->ml_lock);
-       lfsck->ml_di_dir = di;
-       spin_unlock(&lfsck->ml_lock);
-
-       GOTO(out, rc = 0);
-
-out:
-       if (rc < 0)
-               mdd_lfsck_fail(env, lfsck, false);
-       return (rc > 0 ? 0 : rc);
-}
-
-static int mdd_lfsck_exec_dir(const struct lu_env *env, struct md_lfsck *lfsck,
-                             struct mdd_object *obj, struct lu_dirent *ent)
-{
-       struct lfsck_component *com;
-       int                     rc;
-
-       cfs_list_for_each_entry(com, &lfsck->ml_list_scan, lc_link) {
-               rc = com->lc_ops->lfsck_exec_dir(env, com, obj, ent);
-               if (rc != 0)
-                       return rc;
-       }
-       return 0;
-}
-
-static int mdd_lfsck_post(const struct lu_env *env, struct md_lfsck *lfsck,
-                         int result)
-{
-       struct lfsck_component *com;
-       struct lfsck_component *next;
-       int                     rc;
-
-       mdd_lfsck_pos_fill(env, lfsck, &lfsck->ml_pos_current, false);
-       cfs_list_for_each_entry_safe(com, next, &lfsck->ml_list_scan, lc_link) {
-               rc = com->lc_ops->lfsck_post(env, com, result, false);
-               if (rc != 0)
-                       return rc;
-       }
-
-       lfsck->ml_time_last_checkpoint = cfs_time_current();
-       lfsck->ml_time_next_checkpoint = lfsck->ml_time_last_checkpoint +
-                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
-       return result;
-}
-
-static int mdd_lfsck_double_scan(const struct lu_env *env,
-                                struct md_lfsck *lfsck)
-{
-       struct lfsck_component *com;
-       struct lfsck_component *next;
-       int                     rc;
-
-       cfs_list_for_each_entry_safe(com, next, &lfsck->ml_list_double_scan,
-                                    lc_link) {
-               if (lfsck->ml_bookmark_ram.lb_param & LPF_DRYRUN)
-                       com->lc_journal = 0;
-
-               rc = com->lc_ops->lfsck_double_scan(env, com);
-               if (rc != 0)
-                       return rc;
-       }
-       return 0;
-}
-
-/* LFSCK engines */
-
-static int mdd_lfsck_dir_engine(const struct lu_env *env,
-                               struct md_lfsck *lfsck)
-{
-       struct mdd_thread_info  *info   = mdd_env_info(env);
-       struct mdd_device       *mdd    = mdd_lfsck2mdd(lfsck);
-       const struct dt_it_ops  *iops   =
-                       &lfsck->ml_obj_dir->do_index_ops->dio_it;
-       struct dt_it            *di     = lfsck->ml_di_dir;
-       struct lu_dirent        *ent    = &info->mti_ent;
-       struct lu_fid           *fid    = &info->mti_fid;
-       struct lfsck_bookmark   *bk     = &lfsck->ml_bookmark_ram;
-       struct ptlrpc_thread    *thread = &lfsck->ml_thread;
-       int                      rc;
-       ENTRY;
-
-       do {
-               struct mdd_object *child;
-
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
-                   cfs_fail_val > 0) {
-                       struct l_wait_info lwi;
-
-                       lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
-                                         NULL, NULL);
-                       l_wait_event(thread->t_ctl_waitq,
-                                    !thread_is_running(thread),
-                                    &lwi);
-               }
-
-               lfsck->ml_new_scanned++;
-               rc = iops->rec(env, di, (struct dt_rec *)ent,
-                              lfsck->ml_args_dir);
-               if (rc != 0) {
-                       mdd_lfsck_fail(env, lfsck, true);
-                       if (bk->lb_param & LPF_FAILOUT)
-                               RETURN(rc);
-                       else
-                               goto checkpoint;
-               }
-
-               mdd_lfsck_unpack_ent(ent);
-               if (ent->lde_attrs & LUDA_IGNORE)
-                       goto checkpoint;
-
-               *fid = ent->lde_fid;
-               child = mdd_object_find(env, mdd, fid);
-               if (child == NULL) {
-                       goto checkpoint;
-               } else if (IS_ERR(child)) {
-                       mdd_lfsck_fail(env, lfsck, true);
-                       if (bk->lb_param & LPF_FAILOUT)
-                               RETURN(PTR_ERR(child));
-                       else
-                               goto checkpoint;
-               }
-
-               /* XXX: Currently, skip remote object, the consistency for
-                *      remote object will be processed in LFSCK phase III. */
-               if (mdd_object_exists(child) && !mdd_object_remote(child))
-                       rc = mdd_lfsck_exec_dir(env, lfsck, child, ent);
-               mdd_object_put(env, child);
-               if (rc != 0 && bk->lb_param & LPF_FAILOUT)
-                       RETURN(rc);
-
-checkpoint:
-               rc = mdd_lfsck_checkpoint(env, lfsck);
-               if (rc != 0 && bk->lb_param & LPF_FAILOUT)
-                       RETURN(rc);
-
-               /* Rate control. */
-               mdd_lfsck_control_speed(lfsck);
-               if (unlikely(!thread_is_running(thread)))
-                       RETURN(0);
-
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
-                       spin_lock(&lfsck->ml_lock);
-                       thread_set_flags(thread, SVC_STOPPING);
-                       spin_unlock(&lfsck->ml_lock);
-                       RETURN(-EINVAL);
-               }
-
-               rc = iops->next(env, di);
-       } while (rc == 0);
-
-       if (rc > 0 && !lfsck->ml_oit_over)
-               mdd_lfsck_close_dir(env, lfsck);
-
-       RETURN(rc);
-}
-
-static int mdd_lfsck_oit_engine(const struct lu_env *env,
-                               struct md_lfsck *lfsck)
-{
-       struct mdd_thread_info  *info   = mdd_env_info(env);
-       struct mdd_device       *mdd    = mdd_lfsck2mdd(lfsck);
-       const struct dt_it_ops  *iops   =
-                               &lfsck->ml_obj_oit->do_index_ops->dio_it;
-       struct dt_it            *di     = lfsck->ml_di_oit;
-       struct lu_fid           *fid    = &info->mti_fid;
-       struct lfsck_bookmark   *bk     = &lfsck->ml_bookmark_ram;
-       struct ptlrpc_thread    *thread = &lfsck->ml_thread;
-       int                      rc;
-       ENTRY;
-
-       do {
-               struct mdd_object *target;
-
-               if (lfsck->ml_di_dir != NULL) {
-                       rc = mdd_lfsck_dir_engine(env, lfsck);
-                       if (rc <= 0)
-                               RETURN(rc);
-               }
-
-               if (unlikely(lfsck->ml_oit_over))
-                       RETURN(1);
-
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY1) &&
-                   cfs_fail_val > 0) {
-                       struct l_wait_info lwi;
-
-                       lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
-                                         NULL, NULL);
-                       l_wait_event(thread->t_ctl_waitq,
-                                    !thread_is_running(thread),
-                                    &lwi);
-               }
-
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
-                       RETURN(0);
-
-               lfsck->ml_current_oit_processed = 1;
-               lfsck->ml_new_scanned++;
-               rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
-               if (rc != 0) {
-                       mdd_lfsck_fail(env, lfsck, true);
-                       if (bk->lb_param & LPF_FAILOUT)
-                               RETURN(rc);
-                       else
-                               goto checkpoint;
-               }
-
-               target = mdd_object_find(env, mdd, fid);
-               if (target == NULL) {
-                       goto checkpoint;
-               } else if (IS_ERR(target)) {
-                       mdd_lfsck_fail(env, lfsck, true);
-                       if (bk->lb_param & LPF_FAILOUT)
-                               RETURN(PTR_ERR(target));
-                       else
-                               goto checkpoint;
-               }
-
-               /* XXX: Currently, skip remote object, the consistency for
-                *      remote object will be processed in LFSCK phase III. */
-               if (mdd_object_exists(target) && !mdd_object_remote(target))
-                       rc = mdd_lfsck_exec_oit(env, lfsck, target);
-               mdd_object_put(env, target);
-               if (rc != 0 && bk->lb_param & LPF_FAILOUT)
-                       RETURN(rc);
-
-checkpoint:
-               rc = mdd_lfsck_checkpoint(env, lfsck);
-               if (rc != 0 && bk->lb_param & LPF_FAILOUT)
-                       RETURN(rc);
-
-               /* Rate control. */
-               mdd_lfsck_control_speed(lfsck);
-
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
-                       spin_lock(&lfsck->ml_lock);
-                       thread_set_flags(thread, SVC_STOPPING);
-                       spin_unlock(&lfsck->ml_lock);
-                       RETURN(-EINVAL);
-               }
-
-               rc = iops->next(env, di);
-               if (unlikely(rc > 0))
-                       lfsck->ml_oit_over = 1;
-               else if (likely(rc == 0))
-                       lfsck->ml_current_oit_processed = 0;
-
-               if (unlikely(!thread_is_running(thread)))
-                       RETURN(0);
-       } while (rc == 0 || lfsck->ml_di_dir != NULL);
-
-       RETURN(rc);
-}
-
-static int mdd_lfsck_main(void *args)
-{
-       struct lu_env            env;
-       struct md_lfsck         *lfsck    = (struct md_lfsck *)args;
-       struct ptlrpc_thread    *thread   = &lfsck->ml_thread;
-       struct dt_object        *oit_obj  = lfsck->ml_obj_oit;
-       const struct dt_it_ops  *oit_iops = &oit_obj->do_index_ops->dio_it;
-       struct dt_it            *oit_di;
-       int                      rc;
-       ENTRY;
-
-       cfs_daemonize("lfsck");
-       rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
-       if (rc != 0) {
-               CERROR("%s: LFSCK, fail to init env, rc = %d\n",
-                      mdd_lfsck2name(lfsck), rc);
-               GOTO(noenv, rc);
-       }
-
-       oit_di = oit_iops->init(&env, oit_obj, lfsck->ml_args_oit, BYPASS_CAPA);
-       if (IS_ERR(oit_di)) {
-               rc = PTR_ERR(oit_di);
-               CERROR("%s: LFSCK, fail to init iteration, rc = %d\n",
-                      mdd_lfsck2name(lfsck), rc);
-               GOTO(fini_env, rc);
-       }
-
-       spin_lock(&lfsck->ml_lock);
-       lfsck->ml_di_oit = oit_di;
-       spin_unlock(&lfsck->ml_lock);
-       rc = mdd_lfsck_prep(&env, lfsck);
-       if (rc != 0)
-               GOTO(fini_oit, rc);
-
-       CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = 0x%x, dir_flags = 0x%x, "
-              "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
-              ", pid = %d\n", lfsck->ml_args_oit, lfsck->ml_args_dir,
-              lfsck->ml_pos_current.lp_oit_cookie,
-              lfsck->ml_pos_current.lp_dir_cookie,
-              PFID(&lfsck->ml_pos_current.lp_dir_parent),
-              cfs_curproc_pid());
-
-       spin_lock(&lfsck->ml_lock);
-       thread_set_flags(thread, SVC_RUNNING);
-       spin_unlock(&lfsck->ml_lock);
-       cfs_waitq_broadcast(&thread->t_ctl_waitq);
-
-       if (!cfs_list_empty(&lfsck->ml_list_scan) ||
-           cfs_list_empty(&lfsck->ml_list_double_scan))
-               rc = mdd_lfsck_oit_engine(&env, lfsck);
-       else
-               rc = 1;
-
-       CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = 0x%x, dir_flags = 0x%x, "
-              "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
-              ", pid = %d, rc = %d\n", lfsck->ml_args_oit, lfsck->ml_args_dir,
-              lfsck->ml_pos_current.lp_oit_cookie,
-              lfsck->ml_pos_current.lp_dir_cookie,
-              PFID(&lfsck->ml_pos_current.lp_dir_parent),
-              cfs_curproc_pid(), rc);
-
-       if (lfsck->ml_paused && cfs_list_empty(&lfsck->ml_list_scan))
-               oit_iops->put(&env, oit_di);
-
-       if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
-               rc = mdd_lfsck_post(&env, lfsck, rc);
-       if (lfsck->ml_di_dir != NULL)
-               mdd_lfsck_close_dir(&env, lfsck);
-
-fini_oit:
-       spin_lock(&lfsck->ml_lock);
-       lfsck->ml_di_oit = NULL;
-       spin_unlock(&lfsck->ml_lock);
-
-       oit_iops->fini(&env, oit_di);
-       if (rc == 1) {
-               if (!cfs_list_empty(&lfsck->ml_list_double_scan))
-                       rc = mdd_lfsck_double_scan(&env, lfsck);
-               else
-                       rc = 0;
-       }
-
-       /* XXX: Purge the pinned objects in the future. */
-
-fini_env:
-       lu_env_fini(&env);
-
-noenv:
-       spin_lock(&lfsck->ml_lock);
-       thread_set_flags(thread, SVC_STOPPED);
-       cfs_waitq_broadcast(&thread->t_ctl_waitq);
-       spin_unlock(&lfsck->ml_lock);
-       return rc;
-}
-
-/* external interfaces */
-
-int mdd_lfsck_set_speed(const struct lu_env *env, struct md_lfsck *lfsck,
-                       __u32 limit)
-{
-       int rc;
-
-       mutex_lock(&lfsck->ml_mutex);
-       __mdd_lfsck_set_speed(lfsck, limit);
-       rc = mdd_lfsck_bookmark_store(env, lfsck);
-       mutex_unlock(&lfsck->ml_mutex);
-       return rc;
-}
-
-int mdd_lfsck_dump(const struct lu_env *env, struct md_lfsck *lfsck,
-                  __u16 type, char *buf, int len)
-{
-       struct lfsck_component *com;
-       int                     rc;
-
-       if (!lfsck->ml_initialized)
-               return -ENODEV;
-
-       com = mdd_lfsck_component_find(lfsck, type);
-       if (com == NULL)
-               return -ENOTSUPP;
-
-       rc = com->lc_ops->lfsck_dump(env, com, buf, len);
-       mdd_lfsck_component_put(env, com);
-       return rc;
-}
-
-int mdd_lfsck_start(const struct lu_env *env, struct md_lfsck *lfsck,
-                   struct lfsck_start *start)
-{
-       struct lfsck_bookmark  *bk     = &lfsck->ml_bookmark_ram;
-       struct ptlrpc_thread   *thread = &lfsck->ml_thread;
-       struct lfsck_component *com;
-       struct l_wait_info      lwi    = { 0 };
-       bool                    dirty  = false;
-       int                     rc     = 0;
-       __u16                   valid  = 0;
-       __u16                   flags  = 0;
-       ENTRY;
-
-       if (lfsck->ml_obj_oit == NULL)
-               RETURN(-ENOTSUPP);
-
-       /* start == NULL means auto trigger paused LFSCK. */
-       if ((start == NULL) &&
-           (cfs_list_empty(&lfsck->ml_list_scan) ||
-            OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
-               RETURN(0);
-
-       mutex_lock(&lfsck->ml_mutex);
-       spin_lock(&lfsck->ml_lock);
-       if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
-               spin_unlock(&lfsck->ml_lock);
-               mutex_unlock(&lfsck->ml_mutex);
-               RETURN(-EALREADY);
-       }
-
-       spin_unlock(&lfsck->ml_lock);
-
-       lfsck->ml_paused = 0;
-       lfsck->ml_oit_over = 0;
-       lfsck->ml_drop_dryrun = 0;
-       lfsck->ml_new_scanned = 0;
-
-       /* For auto trigger. */
-       if (start == NULL)
-               goto trigger;
-
-       start->ls_version = bk->lb_version;
-       if (start->ls_valid & LSV_SPEED_LIMIT) {
-               __mdd_lfsck_set_speed(lfsck, start->ls_speed_limit);
-               dirty = true;
-       }
-
-       if (start->ls_valid & LSV_ERROR_HANDLE) {
-               valid |= DOIV_ERROR_HANDLE;
-               if (start->ls_flags & LPF_FAILOUT)
-                       flags |= DOIF_FAILOUT;
-
-               if ((start->ls_flags & LPF_FAILOUT) &&
-                   !(bk->lb_param & LPF_FAILOUT)) {
-                       bk->lb_param |= LPF_FAILOUT;
-                       dirty = true;
-               } else if (!(start->ls_flags & LPF_FAILOUT) &&
-                          (bk->lb_param & LPF_FAILOUT)) {
-                       bk->lb_param &= ~LPF_FAILOUT;
-                       dirty = true;
-               }
-       }
-
-       if (start->ls_valid & LSV_DRYRUN) {
-               if ((start->ls_flags & LPF_DRYRUN) &&
-                   !(bk->lb_param & LPF_DRYRUN)) {
-                       bk->lb_param |= LPF_DRYRUN;
-                       dirty = true;
-               } else if (!(start->ls_flags & LPF_DRYRUN) &&
-                          (bk->lb_param & LPF_DRYRUN)) {
-                       bk->lb_param &= ~LPF_DRYRUN;
-                       lfsck->ml_drop_dryrun = 1;
-                       dirty = true;
-               }
-       }
-
-       if (dirty) {
-               rc = mdd_lfsck_bookmark_store(env, lfsck);
-               if (rc != 0)
-                       GOTO(out, rc);
-       }
-
-       if (start->ls_flags & LPF_RESET)
-               flags |= DOIF_RESET;
-
-       if (start->ls_active != 0) {
-               struct lfsck_component *next;
-               __u16 type = 1;
-
-               if (start->ls_active == LFSCK_TYPES_ALL)
-                       start->ls_active = LFSCK_TYPES_SUPPORTED;
-
-               if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
-                       start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
-                       GOTO(out, rc = -ENOTSUPP);
-               }
-
-               cfs_list_for_each_entry_safe(com, next,
-                                            &lfsck->ml_list_scan, lc_link) {
-                       if (!(com->lc_type & start->ls_active)) {
-                               rc = com->lc_ops->lfsck_post(env, com, 0,
-                                                            false);
-                               if (rc != 0)
-                                       GOTO(out, rc);
-                       }
-               }
-
-               while (start->ls_active != 0) {
-                       if (type & start->ls_active) {
-                               com = __mdd_lfsck_component_find(lfsck, type,
-                                                       &lfsck->ml_list_idle);
-                               if (com != NULL) {
-                                       /* The component status will be updated
-                                        * when its prep() is called later by
-                                        * the LFSCK main engine. */
-                                       cfs_list_del_init(&com->lc_link);
-                                       cfs_list_add_tail(&com->lc_link,
-                                                         &lfsck->ml_list_scan);
-                               }
-                               start->ls_active &= ~type;
-                       }
-                       type <<= 1;
-               }
-       }
-
-       cfs_list_for_each_entry(com, &lfsck->ml_list_scan, lc_link) {
-               start->ls_active |= com->lc_type;
-               if (flags & DOIF_RESET) {
-                       rc = com->lc_ops->lfsck_reset(env, com, false);
-                       if (rc != 0)
-                               GOTO(out, rc);
-               }
-       }
-
-trigger:
-       lfsck->ml_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
-       if (bk->lb_param & LPF_DRYRUN)
-               lfsck->ml_args_dir |= LUDA_VERIFY_DRYRUN;
-
-       if (bk->lb_param & LPF_FAILOUT) {
-               valid |= DOIV_ERROR_HANDLE;
-               flags |= DOIF_FAILOUT;
-       }
-
-       if (!cfs_list_empty(&lfsck->ml_list_scan))
-               flags |= DOIF_OUTUSED;
-
-       lfsck->ml_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
-       thread_set_flags(thread, 0);
-       rc = cfs_create_thread(mdd_lfsck_main, lfsck, 0);
-       if (rc < 0)
-               CERROR("%s: cannot start LFSCK thread, rc = %d\n",
-                      mdd_lfsck2name(lfsck), rc);
-       else
-               l_wait_event(thread->t_ctl_waitq,
-                            thread_is_running(thread) ||
-                            thread_is_stopped(thread),
-                            &lwi);
-
-       GOTO(out, rc = 0);
-
-out:
-       mutex_unlock(&lfsck->ml_mutex);
-       return (rc < 0 ? rc : 0);
-}
-
-int mdd_lfsck_stop(const struct lu_env *env, struct md_lfsck *lfsck,
-                  bool pause)
-{
-       struct ptlrpc_thread *thread = &lfsck->ml_thread;
-       struct l_wait_info    lwi    = { 0 };
-       ENTRY;
-
-       if (!lfsck->ml_initialized)
-               RETURN(0);
-
-       mutex_lock(&lfsck->ml_mutex);
-       spin_lock(&lfsck->ml_lock);
-       if (thread_is_init(thread) || thread_is_stopped(thread)) {
-               spin_unlock(&lfsck->ml_lock);
-               mutex_unlock(&lfsck->ml_mutex);
-               RETURN(-EALREADY);
-       }
-
-       if (pause)
-               lfsck->ml_paused = 1;
-       thread_set_flags(thread, SVC_STOPPING);
-       /* The LFSCK thread may be sleeping on low layer wait queue,
-        * wake it up. */
-       if (likely(lfsck->ml_di_oit != NULL))
-               lfsck->ml_obj_oit->do_index_ops->dio_it.put(env,
-                                                           lfsck->ml_di_oit);
-       spin_unlock(&lfsck->ml_lock);
-
-       cfs_waitq_broadcast(&thread->t_ctl_waitq);
-       l_wait_event(thread->t_ctl_waitq,
-                    thread_is_stopped(thread),
-                    &lwi);
-       mutex_unlock(&lfsck->ml_mutex);
-
-       RETURN(0);
-}
-
-static const struct lu_fid lfsck_it_fid = { .f_seq = FID_SEQ_LOCAL_FILE,
-                                           .f_oid = OTABLE_IT_OID,
-                                           .f_ver = 0 };
-
-int mdd_lfsck_setup(const struct lu_env *env, struct mdd_device *mdd)
-{
-       struct md_lfsck         *lfsck = &mdd->mdd_lfsck;
-       struct dt_object        *obj;
-       struct lu_fid            fid;
-       int                      rc;
-
-       ENTRY;
-
-       LASSERT(!lfsck->ml_initialized);
-
-       lfsck->ml_initialized = 1;
-       mutex_init(&lfsck->ml_mutex);
-       spin_lock_init(&lfsck->ml_lock);
-       CFS_INIT_LIST_HEAD(&lfsck->ml_list_scan);
-       CFS_INIT_LIST_HEAD(&lfsck->ml_list_dir);
-       CFS_INIT_LIST_HEAD(&lfsck->ml_list_double_scan);
-       CFS_INIT_LIST_HEAD(&lfsck->ml_list_idle);
-       cfs_waitq_init(&lfsck->ml_thread.t_ctl_waitq);
-
-       obj = dt_locate(env, mdd->mdd_bottom, &lfsck_it_fid);
-       if (IS_ERR(obj))
-               RETURN(PTR_ERR(obj));
-
-       lfsck->ml_obj_oit = obj;
-       rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
-       if (rc != 0) {
-               if (rc == -ENOTSUPP)
-                       RETURN(0);
-               GOTO(out, rc);
-       }
-
-       /* LFSCK bookmark */
-       fid_zero(&fid);
-       rc = mdd_local_file_create(env, mdd, &mdd->mdd_local_root_fid,
-                                  lfsck_bookmark_name,
-                                  S_IFREG | S_IRUGO | S_IWUSR, &fid);
-       if (rc < 0)
-               GOTO(out, rc);
-
-       obj = dt_locate(env, mdd->mdd_bottom, &fid);
-       if (IS_ERR(obj))
-               GOTO(out, rc = PTR_ERR(obj));
-
-       LASSERT(lu_object_exists(&obj->do_lu));
-       lfsck->ml_bookmark_obj = obj;
-
-       rc = mdd_lfsck_bookmark_load(env, lfsck);
-       if (rc == -ENODATA)
-               rc = mdd_lfsck_bookmark_init(env, lfsck);
-       if (rc != 0)
-               GOTO(out, rc);
-
-       rc = mdd_lfsck_namespace_setup(env, lfsck);
-       if (rc < 0)
-               GOTO(out, rc);
-       /* XXX: LFSCK components initialization to be added here. */
-       RETURN(0);
-out:
-       lu_object_put(env, &lfsck->ml_obj_oit->do_lu);
-       lfsck->ml_obj_oit = NULL;
-       return 0;
-}
-
-void mdd_lfsck_cleanup(const struct lu_env *env, struct mdd_device *mdd)
-{
-       struct md_lfsck         *lfsck  = &mdd->mdd_lfsck;
-       struct ptlrpc_thread    *thread = &lfsck->ml_thread;
-       struct lfsck_component  *com;
-
-       if (!lfsck->ml_initialized)
-               return;
-
-       LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
-
-       if (lfsck->ml_obj_oit != NULL) {
-               lu_object_put(env, &lfsck->ml_obj_oit->do_lu);
-               lfsck->ml_obj_oit = NULL;
-       }
-
-       LASSERT(lfsck->ml_obj_dir == NULL);
-
-       if (lfsck->ml_bookmark_obj != NULL) {
-               lu_object_put(env, &lfsck->ml_bookmark_obj->do_lu);
-               lfsck->ml_bookmark_obj = NULL;
-       }
-
-       while (!cfs_list_empty(&lfsck->ml_list_scan)) {
-               com = cfs_list_entry(lfsck->ml_list_scan.next,
-                                    struct lfsck_component,
-                                    lc_link);
-               mdd_lfsck_component_cleanup(env, com);
-       }
-
-       LASSERT(cfs_list_empty(&lfsck->ml_list_dir));
-
-       while (!cfs_list_empty(&lfsck->ml_list_double_scan)) {
-               com = cfs_list_entry(lfsck->ml_list_double_scan.next,
-                                    struct lfsck_component,
-                                    lc_link);
-               mdd_lfsck_component_cleanup(env, com);
-       }
-
-       while (!cfs_list_empty(&lfsck->ml_list_idle)) {
-               com = cfs_list_entry(lfsck->ml_list_idle.next,
-                                    struct lfsck_component,
-                                    lc_link);
-               mdd_lfsck_component_cleanup(env, com);
-       }
-}
diff --git a/lustre/mdd/mdd_lfsck.h b/lustre/mdd/mdd_lfsck.h
deleted file mode 100644 (file)
index 7a86bdb..0000000
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
-
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License version 2 for more details.  A copy is
- * included in the COPYING file that accompanied this code.
-
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2013, Intel Corporation.
- */
-/*
- * lustre/mdd/mdd_lfsck.h
- *
- * Shared definitions and declarations for the LFSCK.
- *
- * Author: Fan, Yong <fan.yong@intel.com>
- */
-
-#ifndef _MDD_LFSCK_H
-# define _MDD_LFSCK_H
-
-#include <lustre/lustre_lfsck_user.h>
-
-enum lfsck_status {
-       /* The lfsck file is new created, for new MDT, upgrading from old disk,
-        * or re-creating the lfsck file manually. */
-       LS_INIT                 = 0,
-
-       /* The first-step system scanning. */
-       LS_SCANNING_PHASE1      = 1,
-
-       /* The second-step system scanning. */
-       LS_SCANNING_PHASE2      = 2,
-
-       /* The LFSCK processing has completed for all objects. */
-       LS_COMPLETED            = 3,
-
-       /* The LFSCK exited automatically for failure, will not auto restart. */
-       LS_FAILED               = 4,
-
-       /* The LFSCK is stopped manually, will not auto restart. */
-       LS_STOPPED              = 5,
-
-       /* LFSCK is paused automatically when umount,
-        * will be restarted automatically when remount. */
-       LS_PAUSED               = 6,
-
-       /* System crashed during the LFSCK,
-        * will be restarted automatically after recovery. */
-       LS_CRASHED              = 7,
-};
-
-enum lfsck_flags {
-       /* Finish to the cycle scanning. */
-       LF_SCANNED_ONCE = 0x00000001ULL,
-
-       /* There is some namespace inconsistency. */
-       LF_INCONSISTENT = 0x00000002ULL,
-
-       /* The device is upgraded from 1.8 format. */
-       LF_UPGRADE      = 0x00000004ULL,
-};
-
-struct lfsck_position {
-       /* local layer object table-based iteration position. */
-       __u64   lp_oit_cookie;
-
-       /* parent FID for directory traversal. */
-       struct lu_fid lp_dir_parent;
-
-       /* namespace-based directory traversal position. */
-       __u64   lp_dir_cookie;
-};
-
-#define LFSCK_BOOKMARK_MAGIC   0x20130C1D
-
-struct lfsck_bookmark {
-       /* Magic number to detect that this struct contains valid data. */
-       __u32   lb_magic;
-
-       /* For compatible with old versions. */
-       __u16   lb_version;
-
-       /* See 'enum lfsck_param_flags' */
-       __u16   lb_param;
-
-       /* How many items can be scanned at most per second. */
-       __u32   lb_speed_limit;
-
-       /* For 64-bits aligned. */
-       __u32   lb_padding;
-
-       /* For future using. */
-       __u64   lb_reserved[6];
-};
-
-#define LFSCK_NAMESPACE_MAGIC  0xA0629D03
-
-struct lfsck_namespace {
-       /* Magic number to detect that this struct contains valid data. */
-       __u32   ln_magic;
-
-       /* See 'enum lfsck_status'. */
-       __u32   ln_status;
-
-       /* See 'enum lfsck_flags'. */
-       __u32   ln_flags;
-
-       /* How many completed LFSCK runs on the device. */
-       __u32   ln_success_count;
-
-       /*  How long the LFSCK phase1 has run in seconds. */
-       __u32   ln_run_time_phase1;
-
-       /*  How long the LFSCK phase2 has run in seconds. */
-       __u32   ln_run_time_phase2;
-
-       /* Time for the last LFSCK completed in seconds since epoch. */
-       __u64   ln_time_last_complete;
-
-       /* Time for the latest LFSCK ran in seconds since epoch. */
-       __u64   ln_time_latest_start;
-
-       /* Time for the last LFSCK checkpoint in seconds since epoch. */
-       __u64   ln_time_last_checkpoint;
-
-       /* Position for the latest LFSCK started from. */
-       struct lfsck_position   ln_pos_latest_start;
-
-       /* Position for the last LFSCK checkpoint. */
-       struct lfsck_position   ln_pos_last_checkpoint;
-
-       /* Position for the first should be updated object. */
-       struct lfsck_position   ln_pos_first_inconsistent;
-
-       /* How many items (including dir) have been checked. */
-       __u64   ln_items_checked;
-
-       /* How many items have been repaired. */
-       __u64   ln_items_repaired;
-
-       /* How many items failed to be processed. */
-       __u64   ln_items_failed;
-
-       /* How many directories have been traversed. */
-       __u64   ln_dirs_checked;
-
-       /* How many multiple-linked objects have been checked. */
-       __u64   ln_mlinked_checked;
-
-       /* How many objects have been double scanned. */
-       __u64   ln_objs_checked_phase2;
-
-       /* How many objects have been reparied during double scan. */
-       __u64   ln_objs_repaired_phase2;
-
-       /* How many objects failed to be processed during double scan. */
-       __u64   ln_objs_failed_phase2;
-
-       /* How many objects with nlink fixed. */
-       __u64   ln_objs_nlink_repaired;
-
-       /* How many objects were lost before, but found back now. */
-       __u64   ln_objs_lost_found;
-
-       /* The latest object has been processed (failed) during double scan. */
-       struct lu_fid   ln_fid_latest_scanned_phase2;
-
-       /* For further using. 256-bytes aligned now. */
-       __u64   ln_reserved[2];
-};
-
-struct lfsck_component;
-struct mdd_object;
-
-struct lfsck_operations {
-       int (*lfsck_reset)(const struct lu_env *env,
-                          struct lfsck_component *com,
-                          bool init);
-
-       void (*lfsck_fail)(const struct lu_env *env,
-                          struct lfsck_component *com,
-                          bool new_checked);
-
-       int (*lfsck_checkpoint)(const struct lu_env *env,
-                               struct lfsck_component *com,
-                               bool init);
-
-       int (*lfsck_prep)(const struct lu_env *env,
-                         struct lfsck_component *com);
-
-       int (*lfsck_exec_oit)(const struct lu_env *env,
-                             struct lfsck_component *com,
-                             struct mdd_object *obj);
-
-       int (*lfsck_exec_dir)(const struct lu_env *env,
-                             struct lfsck_component *com,
-                             struct mdd_object *obj,
-                             struct lu_dirent *ent);
-
-       int (*lfsck_post)(const struct lu_env *env,
-                         struct lfsck_component *com,
-                         int result, bool init);
-
-       int (*lfsck_dump)(const struct lu_env *env,
-                         struct lfsck_component *com,
-                         char *buf,
-                         int len);
-
-       int (*lfsck_double_scan)(const struct lu_env *env,
-                                struct lfsck_component *com);
-};
-
-struct lfsck_component {
-       /* into md_lfsck::ml_list_(scan,double_scan,idle} */
-       cfs_list_t               lc_link;
-
-       /* into md_lfsck::ml_list_dir */
-       cfs_list_t               lc_link_dir;
-       struct rw_semaphore      lc_sem;
-       cfs_atomic_t             lc_ref;
-
-       struct lfsck_position    lc_pos_start;
-       struct md_lfsck         *lc_lfsck;
-       struct dt_object        *lc_obj;
-       struct lfsck_operations *lc_ops;
-       void                    *lc_file_ram;
-       void                    *lc_file_disk;
-       __u32                    lc_file_size;
-
-       /* How many objects have been checked since last checkpoint. */
-       __u32                    lc_new_checked;
-       unsigned int             lc_journal:1;
-       __u16                    lc_type;
-};
-
-struct md_lfsck {
-       struct mutex             ml_mutex;
-       spinlock_t               ml_lock;
-
-       /* For the components in (first) scanning via otable-based iteration. */
-       cfs_list_t               ml_list_scan;
-
-       /* For the components in scanning via directory traversal. Because
-        * directory traversal cannot guarantee all the object be scanned,
-        * so the component in the ml_list_dir must be in ml_list_scan. */
-       cfs_list_t               ml_list_dir;
-
-       /* For the components in double scanning. */
-       cfs_list_t               ml_list_double_scan;
-
-       /* For the components those are not scanning now. */
-       cfs_list_t               ml_list_idle;
-
-       struct ptlrpc_thread     ml_thread;
-
-       /* The time for last checkpoint, jiffies */
-       cfs_time_t               ml_time_last_checkpoint;
-
-       /* The time for next checkpoint, jiffies */
-       cfs_time_t               ml_time_next_checkpoint;
-
-       struct dt_object        *ml_bookmark_obj;
-       struct lfsck_bookmark    ml_bookmark_ram;
-       struct lfsck_bookmark    ml_bookmark_disk;
-       struct lfsck_position    ml_pos_current;
-
-       /* Obj for otable-based iteration */
-       struct dt_object        *ml_obj_oit;
-
-       /* Obj for directory traversal */
-       struct dt_object        *ml_obj_dir;
-
-       /* It for otable-based iteration */
-       struct dt_it            *ml_di_oit;
-
-       /* It for directory traversal */
-       struct dt_it            *ml_di_dir;
-
-       /* Arguments for low layer otable-based iteration. */
-       __u32                    ml_args_oit;
-
-       /* Arugments for namespace-based directory traversal. */
-       __u32                    ml_args_dir;
-
-       /* Schedule for every N objects. */
-       __u32                    ml_sleep_rate;
-
-       /* Sleep N jiffies for each schedule. */
-       __u32                    ml_sleep_jif;
-
-       /* How many objects have been scanned since last sleep. */
-       __u32                    ml_new_scanned;
-
-       unsigned int             ml_paused:1, /* The lfsck is paused. */
-                                ml_oit_over:1, /* oit is finished. */
-                                ml_drop_dryrun:1, /* Ever dryrun, not now. */
-                                ml_initialized:1, /* lfsck_setup is called. */
-                                ml_current_oit_processed:1;
-};
-
-enum lfsck_linkea_flags {
-       /* The linkea entries does not match the object nlinks. */
-       LLF_UNMATCH_NLINKS      = 0x01,
-
-       /* Fail to repair the multiple-linked objects during the double scan. */
-       LLF_REPAIR_FAILED       = 0x02,
-};
-
-#endif /* _MDD_LFSCK_H */
index cc3c4b9..b3259d3 100644 (file)
@@ -269,18 +269,19 @@ static int lprocfs_rd_lfsck_speed_limit(char *page, char **start, off_t off,
                                        int count, int *eof, void *data)
 {
        struct mdd_device *mdd = data;
+       int rc;
 
        LASSERT(mdd != NULL);
        *eof = 1;
-       return snprintf(page, count, "%u\n",
-                       mdd->mdd_lfsck.ml_bookmark_ram.lb_speed_limit);
+
+       rc = lfsck_get_speed(mdd->mdd_bottom, page, count);
+       return rc != 0 ? rc : count;
 }
 
 static int lprocfs_wr_lfsck_speed_limit(struct file *file, const char *buffer,
                                        unsigned long count, void *data)
 {
        struct mdd_device *mdd = data;
-       struct md_lfsck *lfsck;
        __u32 val;
        int rc;
 
@@ -289,36 +290,20 @@ static int lprocfs_wr_lfsck_speed_limit(struct file *file, const char *buffer,
        if (rc != 0)
                return rc;
 
-       lfsck = &mdd->mdd_lfsck;
-       if (val != lfsck->ml_bookmark_ram.lb_speed_limit) {
-               struct lu_env env;
-
-               rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
-               if (rc != 0)
-                       return rc;
-
-               rc = mdd_lfsck_set_speed(&env, lfsck, val);
-               lu_env_fini(&env);
-       }
+       rc = lfsck_set_speed(mdd->mdd_bottom, val);
        return rc != 0 ? rc : count;
 }
 
 static int lprocfs_rd_lfsck_namespace(char *page, char **start, off_t off,
                                      int count, int *eof, void *data)
 {
-       struct lu_env env;
        struct mdd_device *mdd = data;
        int rc;
 
        LASSERT(mdd != NULL);
        *eof = 1;
 
-       rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
-       if (rc != 0)
-               return rc;
-
-       rc = mdd_lfsck_dump(&env, &mdd->mdd_lfsck, LT_NAMESPACE, page, count);
-       lu_env_fini(&env);