Whamcloud - gitweb
LU-4788 lfsck: enable verification for remote object 17/11317/25
authorFan Yong <fan.yong@intel.com>
Fri, 1 Aug 2014 01:00:31 +0000 (09:00 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 24 Sep 2014 03:14:49 +0000 (03:14 +0000)
Based on the LFSCK 1.5 framework, enable the namespace LFSCK
scanning for remote object.

During the first-stage scanning, if the object contains remote
linkEA entry or multiple linkEA entries or claims as multiple
linked, then it will be recorded in the namespace LFSCK tracing
file for double scanning.

Some cleanup for the namespace LFSCK tracing file (lfsck_namespace)
and other code cleanup.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: Ibc87ae9a5c6b7f67a9215140cf2cb89640bce0a9
Reviewed-on: http://review.whamcloud.com/11317
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
14 files changed:
lustre/include/lustre/lustre_lfsck_user.h
lustre/include/obd_support.h
lustre/lfsck/lfsck_bookmark.c
lustre/lfsck/lfsck_engine.c
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c
lustre/lfsck/lfsck_namespace.c
lustre/mdd/mdd_dir.c
lustre/ptlrpc/wiretest.c
lustre/tests/sanity-lfsck.sh
lustre/utils/lustre_lfsck.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index e116e07..9e40222 100644 (file)
@@ -50,8 +50,8 @@ enum lfsck_param_flags {
        /* Broadcast the command to other MDTs. Only valid on the sponsor MDT */
        LPF_BROADCAST           = 0x0010,
 
-       /* Handle orphan objects. */
-       LPF_ORPHAN              = 0x0020,
+       /* Handle orphan OST-objects. */
+       LPF_OST_ORPHAN          = 0x0020,
 
        /* Create OST-object for dangling LOV EA. */
        LPF_CREATE_OSTOBJ       = 0x0040,
@@ -64,9 +64,6 @@ enum lfsck_type {
        /* For MDT-OST (layout, object) consistency check/repair. */
        LFSCK_TYPE_LAYOUT       = 0x0001,
 
-       /* For MDT-MDT (remote object) consistency check/repair. */
-       LFSCK_TYPE_DNE          = 0x0002,
-
        /* For MDT (FID-in-dirent, linkEA) consistency check/repair. */
        LFSCK_TYPE_NAMESPACE    = 0x0004,
        LFSCK_TYPES_SUPPORTED   = (LFSCK_TYPE_SCRUB | LFSCK_TYPE_LAYOUT |
index 5412ecb..a18ade4 100644 (file)
@@ -516,6 +516,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LFSCK_LOST_SPEOBJ     0x161a
 #define OBD_FAIL_LFSCK_DELAY5          0x161b
 #define OBD_FAIL_LFSCK_BAD_NETWORK     0x161c
+#define OBD_FAIL_LFSCK_NO_LINKEA       0x161d
 
 #define OBD_FAIL_LFSCK_NOTIFY_NET      0x16f0
 #define OBD_FAIL_LFSCK_QUERY_NET       0x16f1
index 0036522..ffaa73d 100644 (file)
@@ -219,8 +219,8 @@ int lfsck_set_param(const struct lu_env *env, struct lfsck_instance *lfsck,
                        dirty = true;
                }
 
-               if (bk->lb_param & LPF_ORPHAN) {
-                       bk->lb_param &= ~LPF_ORPHAN;
+               if (bk->lb_param & LPF_OST_ORPHAN) {
+                       bk->lb_param &= ~LPF_OST_ORPHAN;
                        dirty = true;
                }
 
@@ -279,13 +279,13 @@ int lfsck_set_param(const struct lu_env *env, struct lfsck_instance *lfsck,
                        }
                }
 
-               if ((bk->lb_param & LPF_ORPHAN) &&
-                   !(start->ls_flags & LPF_ORPHAN)) {
-                       bk->lb_param &= ~LPF_ORPHAN;
+               if ((bk->lb_param & LPF_OST_ORPHAN) &&
+                   !(start->ls_flags & LPF_OST_ORPHAN)) {
+                       bk->lb_param &= ~LPF_OST_ORPHAN;
                        dirty = true;
-               } else if (!(bk->lb_param & LPF_ORPHAN) &&
-                          (start->ls_flags & LPF_ORPHAN)) {
-                       bk->lb_param |= LPF_ORPHAN;
+               } else if (!(bk->lb_param & LPF_OST_ORPHAN) &&
+                          (start->ls_flags & LPF_OST_ORPHAN)) {
+                       bk->lb_param |= LPF_OST_ORPHAN;
                        dirty = true;
                }
 
index 028c584..8831e3e 100644 (file)
@@ -175,93 +175,127 @@ static int lfsck_parent_fid(const struct lu_env *env, struct dt_object *obj,
                         (const struct dt_key *)"..", BYPASS_CAPA);
 }
 
+/**
+ * Check whether needs to scan the directory or not.
+ *
+ * 1) If we are not doing namespace LFSCK, or the given @obj is not directory,
+ *    then needs not to scan the @obj. Otherwise,
+ * 2) Global /ROOT needs to be scanned, backend root needs not to be scanned.
+ * 3) If the @obj is neither IGIF nor normal FID (including .lustre and its
+ *    sub-directories that have been scanned when the LFSCK engine start),
+ *    then needs not to be scanned.
+ * 4) If it is a remote object, then scanning the object will be done on the
+ *    MDT on which the object really resides.
+ * 5) If the local object has normal FID, then needs to be scanned. Otherwise,
+ * 6) If the object has linkEA, then needs to be scanned. Otherwise,
+ * 7) If none of the previous conditions are true, we need to check the parent
+ *    directories whether this subdirectory is in a tree that should be scanned.
+ *    Set the parent as current @obj, repeat 2)-7).
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] lfsck    pointer to the lfsck instance
+ * \param[in] obj      pointer to the object to be checked
+ *
+ * \retval             positive number if the directory needs to be scanned
+ * \retval             0 if the directory needs NOT to be scanned
+ * \retval             negative error number on failure
+ */
 static int lfsck_needs_scan_dir(const struct lu_env *env,
                                struct lfsck_instance *lfsck,
                                struct dt_object *obj)
 {
-       struct lu_fid *fid   = &lfsck_env_info(env)->lti_fid;
-       int            depth = 0;
-       int            rc;
+       struct lfsck_thread_info *info    = lfsck_env_info(env);
+       struct lu_fid            *fid     = &info->lti_fid;
+       struct lu_seq_range      *range   = &info->lti_range;
+       struct dt_device         *dev     = lfsck->li_bottom;
+       struct seq_server_site   *ss      = lu_site2seq(dev->dd_lu_dev.ld_site);
+       __u32                     idx     = lfsck_dev_idx(dev);
+       int                       depth   = 0;
+       int                       rc      = 0;
 
        if (list_empty(&lfsck->li_list_dir) || !S_ISDIR(lfsck_object_type(obj)))
-               RETURN(0);
+               return 0;
+
+       LASSERT(ss != NULL);
 
+       *fid = *lfsck_dto2fid(obj);
        while (1) {
-               /* XXX: Currently, we do not scan the "/REMOTE_PARENT_DIR",
-                *      which is the agent directory to manage the objects
-                *      which name entries reside on remote MDTs. Related
-                *      consistency verification will be processed in LFSCK
-                *      phase III. */
-               if (lu_fid_eq(lfsck_dto2fid(obj), &lfsck->li_global_root_fid)) {
-                       if (depth > 0)
-                               lfsck_object_put(env, obj);
+               /* Global /ROOT is visible. */
+               if (unlikely(lu_fid_eq(fid, &lfsck->li_global_root_fid)))
                        return 1;
-               }
 
-               /* No need to check .lustre and its children. */
-               if (fid_seq_is_dot(fid_seq(lfsck_dto2fid(obj)))) {
-                       if (depth > 0)
-                               lfsck_object_put(env, obj);
+               /* Backend root is invisible. */
+               if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
                        return 0;
+
+               if (!fid_is_norm(fid) && !fid_is_igif(fid))
+                       return 0;
+
+               fld_range_set_mdt(range);
+               rc = fld_local_lookup(env, ss->ss_server_fld,
+                                     fid_seq(fid), range);
+               if (rc != 0 || range->lsr_index != idx) {
+                       /* Current FID should NOT be for the input parameter
+                        * @obj, because the lfsck_master_oit_engine() has
+                        * filtered out agent object. So current FID is for
+                        * the ancestor of the original input parameter @obj.
+                        * So he ancestor is a remote directory. The input
+                        * parameter @obj is local directory, and should be
+                        * scanned under such case. */
+                       LASSERT(depth > 0);
+
+                       return 1;
+               }
+
+               /* normal FID on this target (locally) must be for the
+                * client-side visiable object. */
+               if (fid_is_norm(fid))
+                       return 1;
+
+               if (obj == NULL) {
+                       obj = lfsck_object_find(env, lfsck, fid);
+                       if (IS_ERR(obj))
+                               return PTR_ERR(obj);
+
+                       depth++;
+                       if (!dt_object_exists(obj))
+                               GOTO(out, rc = 0);
                }
 
                dt_read_lock(env, obj, MOR_TGT_CHILD);
                if (unlikely(lfsck_is_dead_obj(obj))) {
                        dt_read_unlock(env, obj);
-                       if (depth > 0)
-                               lfsck_object_put(env, obj);
-                       return 0;
+
+                       GOTO(out, rc = 0);
                }
 
                rc = dt_xattr_get(env, obj,
                                  lfsck_buf_get(env, NULL, 0), XATTR_NAME_LINK,
                                  BYPASS_CAPA);
                dt_read_unlock(env, obj);
-               if (rc >= 0) {
-                       if (depth > 0)
-                               lfsck_object_put(env, obj);
-                       return 1;
-               }
+               if (rc >= 0)
+                       GOTO(out, rc = 1);
 
-               if (rc < 0 && rc != -ENODATA) {
-                       if (depth > 0)
-                               lfsck_object_put(env, obj);
-                       return rc;
-               }
+               if (rc < 0 && rc != -ENODATA)
+                       GOTO(out, rc);
 
                rc = lfsck_parent_fid(env, obj, fid);
                if (depth > 0)
                        lfsck_object_put(env, obj);
+
+               obj = NULL;
                if (rc != 0)
                        return rc;
 
-               if (unlikely(lu_fid_eq(fid, &lfsck->li_local_root_fid)))
+               if (!fid_is_sane(fid))
                        return 0;
+       }
 
-               obj = lfsck_object_find(env, lfsck, fid);
-               if (IS_ERR(obj))
-                       return PTR_ERR(obj);
-
-               if (!dt_object_exists(obj)) {
-                       lfsck_object_put(env, obj);
-                       return 0;
-               }
-
-               if (dt_object_remote(obj)) {
-                       /* .lustre/lost+found/MDTxxx can be remote directory. */
-                       if (fid_seq_is_dot(fid_seq(lfsck_dto2fid(obj))))
-                               rc = 0;
-                       else
-                               /* Other remote directory should be client
-                                * visible and need to be checked. */
-                               rc = 1;
-                       lfsck_object_put(env, obj);
-                       return rc;
-               }
+out:
+       if (depth > 0 && obj != NULL)
+               lfsck_object_put(env, obj);
 
-               depth++;
-       }
-       return 0;
+       return rc;
 }
 
 /* LFSCK wrap functions */
@@ -319,9 +353,6 @@ static int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck,
        lfsck->li_current_oit_processed = 0;
        list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
                com->lc_new_checked = 0;
-               if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
-                       com->lc_journal = 0;
-
                rc = com->lc_ops->lfsck_prep(env, com, lsp);
                if (rc != 0)
                        GOTO(out, rc);
@@ -357,8 +388,8 @@ static int lfsck_prep(const struct lu_env *env, struct lfsck_instance *lfsck,
        if (IS_ERR(obj))
                RETURN(PTR_ERR(obj));
 
-       /* XXX: Currently, skip remote object, the consistency for
-        *      remote object will be processed in LFSCK phase III. */
+       /* Remote directory will be scanned by the LFSCK instance
+        * on the MDT where the remote object really resides on. */
        if (!dt_object_exists(obj) || dt_object_remote(obj) ||
            unlikely(!S_ISDIR(lfsck_object_type(obj))))
                GOTO(out, rc = 0);
@@ -524,9 +555,6 @@ static int lfsck_double_scan(const struct lu_env *env,
        int                     rc1 = 0;
 
        list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
-               if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
-                       com->lc_journal = 0;
-
                rc = com->lc_ops->lfsck_double_scan(env, com);
                if (rc != 0)
                        rc1 = rc;
@@ -603,6 +631,15 @@ static int lfsck_master_dir_engine(const struct lu_env *env,
                        l_wait_event(thread->t_ctl_waitq,
                                     !thread_is_running(thread),
                                     &lwi);
+
+                       if (unlikely(!thread_is_running(thread))) {
+                               CDEBUG(D_LFSCK, "%s: scan dir exit for engine "
+                                      "stop, parent "DFID", cookie "LPX64"\n",
+                                      lfsck_lfsck2name(lfsck),
+                                      PFID(lfsck_dto2fid(dir)),
+                                      lfsck->li_cookie_dir);
+                               RETURN(0);
+                       }
                }
 
                lfsck->li_new_scanned++;
@@ -666,21 +703,47 @@ checkpoint:
        RETURN(rc);
 }
 
+/**
+ * Object-table based iteration engine.
+ *
+ * Object-table based iteration is the basic linear engine to scan all the
+ * objects on current device in turn. For each object, it calls all the
+ * registered LFSCK component(s)' API to perform related consistency
+ * verification.
+ *
+ * It flushes related LFSCK tracing files to disk via making checkpoint
+ * periodically. Then if the server crashed or the LFSCK is paused, the
+ * LFSCK can resume from the latest checkpoint.
+ *
+ * It also controls the whole LFSCK speed via lfsck_control_speed() to
+ * avoid the server to become overload.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] lfsck    pointer to the lfsck instance
+ *
+ * \retval             positive number if all objects have been scanned
+ * \retval             0 if the iteration is stopped or paused
+ * \retval             negative error number on failure
+ */
 static int lfsck_master_oit_engine(const struct lu_env *env,
                                   struct lfsck_instance *lfsck)
 {
-       struct lfsck_thread_info        *info   = lfsck_env_info(env);
-       const struct dt_it_ops          *iops   =
+       struct lfsck_thread_info *info  = lfsck_env_info(env);
+       const struct dt_it_ops   *iops  =
                                &lfsck->li_obj_oit->do_index_ops->dio_it;
-       struct dt_it                    *di     = lfsck->li_di_oit;
-       struct lu_fid                   *fid    = &info->lti_fid;
-       struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
-       struct ptlrpc_thread            *thread = &lfsck->li_thread;
-       __u32                            idx    =
-                               lfsck_dev_idx(lfsck->li_bottom);
-       int                              rc;
+       struct dt_it             *di    = lfsck->li_di_oit;
+       struct lu_fid            *fid   = &info->lti_fid;
+       struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
+       struct ptlrpc_thread     *thread = &lfsck->li_thread;
+       struct dt_device         *dev   = lfsck->li_bottom;
+       struct seq_server_site   *ss    = lu_site2seq(dev->dd_lu_dev.ld_site);
+       __u32                    idx    = lfsck_dev_idx(dev);
+       int                      rc;
        ENTRY;
 
+       if (unlikely(ss == NULL))
+               RETURN(-EIO);
+
        do {
                struct dt_object *target;
                bool              update_lma = false;
@@ -703,6 +766,14 @@ static int lfsck_master_oit_engine(const struct lu_env *env,
                        l_wait_event(thread->t_ctl_waitq,
                                     !thread_is_running(thread),
                                     &lwi);
+
+                       if (unlikely(!thread_is_running(thread))) {
+                               CDEBUG(D_LFSCK, "%s: OIT scan exit for engine "
+                                      "stop, cookie "LPU64"\n",
+                                      lfsck_lfsck2name(lfsck),
+                                      iops->store(env, di));
+                               RETURN(0);
+                       }
                }
 
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
@@ -736,11 +807,30 @@ static int lfsck_master_oit_engine(const struct lu_env *env,
                                update_lma = true;
                        }
                } else if (!fid_is_norm(fid) && !fid_is_igif(fid) &&
-                          !fid_is_last_id(fid) && !fid_is_root(fid) &&
-                          !fid_seq_is_dot(fid_seq(fid))) {
+                          !fid_is_last_id(fid) &&
+                          !lu_fid_eq(fid, &lfsck->li_global_root_fid)) {
+
                        /* If the FID/object is only used locally and invisible
-                        * to external nodes, then LFSCK will not handle it. */
+                        * to external nodes, then LFSCK will not handle it.
+                        *
+                        * dot_lustre sequence has been handled specially. */
                        goto checkpoint;
+               } else {
+                       struct lu_seq_range *range = &info->lti_range;
+
+                       if (lfsck->li_master)
+                               fld_range_set_mdt(range);
+                       else
+                               fld_range_set_ost(range);
+                       rc = fld_local_lookup(env, ss->ss_server_fld,
+                                             fid_seq(fid), range);
+                       if (rc != 0 || range->lsr_index != idx) {
+                               /* Remote object will be handled by the LFSCK
+                                * instance on the MDT where the remote object
+                                * really resides on. */
+                               rc = 0;
+                               goto checkpoint;
+                       }
                }
 
                target = lfsck_object_find(env, lfsck, fid);
@@ -756,9 +846,7 @@ static int lfsck_master_oit_engine(const struct lu_env *env,
                                goto checkpoint;
                }
 
-               /* XXX: Currently, skip remote object, the consistency for
-                *      remote object will be processed in LFSCK phase III. */
-               if (dt_object_exists(target) && !dt_object_remote(target)) {
+               if (dt_object_exists(target)) {
                        if (update_lma) {
                                rc = lfsck_update_lma(env, lfsck, target);
                                if (rc != 0)
index 4bba08a..ec4e0eb 100644 (file)
@@ -108,6 +108,12 @@ struct lfsck_bookmark {
        __u64   lb_reserved[2];
 };
 
+enum lfsck_namespace_trace_flags {
+       LNTF_CHECK_LINKEA       = 0x01,
+       LNTF_CHECK_PARENT       = 0x02,
+       LNTF_ALL                = 0xff
+};
+
 struct lfsck_namespace {
        /* Magic number to detect that this struct contains valid data. */
        __u32   ln_magic;
@@ -157,9 +163,6 @@ struct lfsck_namespace {
        /* How many directories have been traversed. */
        __u64   ln_dirs_checked;
 
-       /* How many multiple-linked objects have been checked. */
-       __u64   ln_mlinked_checked;
-
        /* How many objects have been double scanned. */
        __u64   ln_objs_checked_phase2;
 
@@ -183,6 +186,15 @@ struct lfsck_namespace {
 
        /* How many linkEA entries have been repaired. */
        __u64   ln_linkea_repaired;
+
+       /* How many multiple-linked objects have been checked. */
+       __u64   ln_mul_linked_checked;
+
+       /* How many multiple-linked objects have been repaired. */
+       __u64   ln_mul_linked_repaired;
+
+       /* For further using. 256-bytes aligned now. */
+       __u64   ln_reserved[31];
 };
 
 enum lfsck_layout_inconsistency_type {
@@ -411,7 +423,6 @@ struct lfsck_component {
        /* How many objects have been scanned since last sleep. */
        __u32                    lc_new_scanned;
 
-       unsigned int             lc_journal:1;
        __u16                    lc_type;
 };
 
@@ -512,14 +523,6 @@ struct lfsck_instance {
                                  li_start_unplug:1;
 };
 
-enum lfsck_linkea_flags {
-       /* The linkea entries does not match the object nlinks. */
-       LLF_UNMATCH_NLINKS      = 0x01,
-
-       /* Fail to repair the multiple-linked objects during the double scan. */
-       LLF_REPAIR_FAILED       = 0x02,
-};
-
 struct lfsck_async_interpret_args {
        struct lfsck_component          *laia_com;
        struct lfsck_tgt_descs          *laia_ltds;
@@ -608,9 +611,11 @@ struct lfsck_assistant_data {
 #define LFSCK_TMPBUF_LEN       64
 
 struct lfsck_thread_info {
+       struct lu_name          lti_name_const;
        struct lu_name          lti_name;
        struct lu_buf           lti_buf;
        struct lu_buf           lti_linkea_buf;
+       struct lu_buf           lti_linkea_buf2;
        struct lu_buf           lti_big_buf;
        struct lu_fid           lti_fid;
        struct lu_fid           lti_fid2;
@@ -644,6 +649,7 @@ struct lfsck_thread_info {
        struct lov_user_md      lti_lum;
        struct dt_insert_rec    lti_dt_rec;
        struct lu_object_conf   lti_conf;
+       struct lu_seq_range     lti_range;
 };
 
 /* lfsck_lib.c */
@@ -653,6 +659,9 @@ int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
                     struct dt_object *obj, struct lustre_handle *lh,
                     __u64 bits, ldlm_mode_t mode);
 void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode);
+int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
+                             struct lfsck_instance *lfsck,
+                             const struct lu_fid *fid);
 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck);
 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck);
 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
@@ -712,6 +721,12 @@ int lfsck_set_param(const struct lu_env *env, struct lfsck_instance *lfsck,
                    struct lfsck_start *start, bool reset);
 
 /* lfsck_namespace.c */
+int lfsck_namespace_trace_update(const struct lu_env *env,
+                                struct lfsck_component *com,
+                                const struct lu_fid *fid,
+                                const __u8 flags, bool add);
+int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
+                      struct linkea_data *ldata);
 int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
                        struct dt_object *obj, const struct lu_name *cname,
                        const struct lu_fid *pfid);
@@ -755,7 +770,7 @@ lfsck_name_get_const(const struct lu_env *env, const void *area, ssize_t len)
 {
        struct lu_name *lname;
 
-       lname = &lfsck_env_info(env)->lti_name;
+       lname = &lfsck_env_info(env)->lti_name_const;
        lname->ln_name = area;
        lname->ln_namelen = len;
        return lname;
@@ -1006,4 +1021,26 @@ static inline void lfsck_lad_set_bitmap(const struct lu_env *env,
        lad->lad_incomplete = 1;
 }
 
+static inline int lfsck_links_read(const struct lu_env *env,
+                                  struct dt_object *obj,
+                                  struct linkea_data *ldata)
+{
+       ldata->ld_buf =
+               lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf,
+                                      PAGE_CACHE_SIZE);
+
+       return __lfsck_links_read(env, obj, ldata);
+}
+
+static inline int lfsck_links_read2(const struct lu_env *env,
+                                   struct dt_object *obj,
+                                   struct linkea_data *ldata)
+{
+       ldata->ld_buf =
+               lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf2,
+                                      PAGE_CACHE_SIZE);
+
+       return __lfsck_links_read(env, obj, ldata);
+}
+
 #endif /* _LFSCK_INTERNAL_H */
index 884d0e1..bbb2ce9 100644 (file)
@@ -291,9 +291,9 @@ lfsck_layout_assistant_sync_failures_interpret(const struct lu_env *env,
  * fixing for the fake orphan.
  *
  * To avoid above trouble, when layout LFSCK finishes the first-stage scanning,
- * it will scan the bitmap for the ever failed OTs, and notify them that it has
- * ever missed some OST-object verification and should skip orphan handling for
- * all MDTs that are in layout LFSCK.
+ * it will scan the bitmap for the ever failed OSTs, and notify them that they
+ * have ever missed some OST-object verification and should skip the handling
+ * for orphan OST-objects on all MDTs that are in the layout LFSCK.
  *
  * \param[in] env      pointer to the thread context
  * \param[in] com      pointer to the lfsck component
@@ -821,7 +821,6 @@ static int lfsck_layout_load_bitmap(const struct lu_env *env,
        struct dt_object                *obj    = com->lc_obj;
        struct lfsck_assistant_data     *lad    = com->lc_data;
        struct lfsck_layout             *lo     = com->lc_file_ram;
-       const struct dt_body_operations *dbo    = obj->do_body_ops;
        cfs_bitmap_t                    *bitmap = lad->lad_bitmap;
        loff_t                           pos    = com->lc_file_size;
        ssize_t                          size;
@@ -862,9 +861,7 @@ static int lfsck_layout_load_bitmap(const struct lu_env *env,
        }
 
        size = (lo->ll_bitmap_size + 7) >> 3;
-       rc = dbo->dbo_read(env, obj,
-                          lfsck_buf_get(env, bitmap->data, size), &pos,
-                          BYPASS_CAPA);
+       rc = dt_read(env, obj, lfsck_buf_get(env, bitmap->data, size), &pos);
        if (rc == 0) {
                RETURN(-ENOENT);
        } else if (rc != size) {
@@ -904,14 +901,12 @@ static int lfsck_layout_load(const struct lu_env *env,
                             struct lfsck_component *com)
 {
        struct lfsck_layout             *lo     = com->lc_file_ram;
-       const struct dt_body_operations *dbo    = com->lc_obj->do_body_ops;
        ssize_t                          size   = com->lc_file_size;
        loff_t                           pos    = 0;
        int                              rc;
 
-       rc = dbo->dbo_read(env, com->lc_obj,
-                          lfsck_buf_get(env, com->lc_file_disk, size), &pos,
-                          BYPASS_CAPA);
+       rc = dt_read(env, com->lc_obj,
+                    lfsck_buf_get(env, com->lc_file_disk, size), &pos);
        if (rc == 0) {
                return -ENOENT;
        } else if (rc < 0) {
@@ -1044,14 +1039,14 @@ static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
                             struct dt_object *obj, const struct lu_fid *fid)
 {
        struct seq_server_site  *ss     = lu_site2seq(dt->dd_lu_dev.ld_site);
-       struct lu_seq_range      range  = { 0 };
+       struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
        struct lustre_mdt_attrs *lma;
        int                      rc;
 
-       fld_range_set_any(&range);
-       rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), &range);
+       fld_range_set_any(range);
+       rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
        if (rc == 0) {
-               if (fld_range_is_ost(&range))
+               if (fld_range_is_ost(range))
                        return 1;
 
                return 0;
@@ -1403,7 +1398,6 @@ static int lfsck_layout_double_scan_result(const struct lu_env *env,
        lo->ll_objs_checked_phase2 += com->lc_new_checked;
 
        if (rc > 0) {
-               com->lc_journal = 0;
                if (lo->ll_flags & LF_INCOMPLETE) {
                        lo->ll_status = LS_PARTIAL;
                } else {
@@ -2133,7 +2127,6 @@ static int lfsck_layout_conflict_create(const struct lu_env *env,
        struct lfsck_thread_info *info          = lfsck_env_info(env);
        struct lu_fid            *cfid2         = &info->lti_fid2;
        struct ost_id            *oi            = &info->lti_oi;
-       char                     *infix         = info->lti_tmpbuf;
        struct lov_mds_md_v1     *lmm           = ea_buf->lb_buf;
        struct dt_device         *dev           = com->lc_lfsck->li_bottom;
        struct thandle           *th            = NULL;
@@ -2165,10 +2158,11 @@ static int lfsck_layout_conflict_create(const struct lu_env *env,
                lfsck_ibits_unlock(&lh, LCK_EX);
 
                fid_zero(&rec->lor_fid);
-               snprintf(infix, LFSCK_TMPBUF_LEN, "-"DFID"-%x",
-                        PFID(lu_object_fid(&parent->do_lu)), ea_off);
+               snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
+                        "-"DFID"-%x", PFID(lu_object_fid(&parent->do_lu)),
+                        ea_off);
                rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
-                                                 infix, "C", ea_off);
+                                               info->lti_tmpbuf, "C", ea_off);
 
                RETURN(rc);
        }
@@ -2582,8 +2576,7 @@ static int lfsck_layout_scan_orphan(const struct lu_env *env,
               "scanning for OST%04x\n",
               lfsck_lfsck2name(lfsck), ltd->ltd_index);
 
-       if (lad->lad_incomplete &&
-           cfs_bitmap_check(lad->lad_bitmap, ltd->ltd_index)) {
+       if (cfs_bitmap_check(lad->lad_bitmap, ltd->ltd_index)) {
                CDEBUG(D_LFSCK, "%s: layout LFSCK assistant skip the orphan "
                       "scanning for OST%04x\n",
                       lfsck_lfsck2name(lfsck), ltd->ltd_index);
@@ -3352,6 +3345,9 @@ static int lfsck_layout_assistant_handler_p2(const struct lu_env *env,
        int                              rc     = 0;
        ENTRY;
 
+       CDEBUG(D_LFSCK, "%s: layout LFSCK phase2 scan start\n",
+              lfsck_lfsck2name(lfsck));
+
        spin_lock(&ltds->ltd_lock);
        while (!list_empty(&lad->lad_ost_phase2_list)) {
                ltd = list_entry(lad->lad_ost_phase2_list.next,
@@ -3377,6 +3373,9 @@ static int lfsck_layout_assistant_handler_p2(const struct lu_env *env,
                rc = 0;
        spin_unlock(&ltds->ltd_lock);
 
+       CDEBUG(D_LFSCK, "%s: layout LFSCK phase2 scan stop: rc = %d\n",
+              lfsck_lfsck2name(lfsck), rc);
+
        RETURN(rc);
 }
 
@@ -3733,22 +3732,22 @@ static int lfsck_layout_slave_check_pairs(const struct lu_env *env,
        struct obd_export        *exp    = NULL;
        struct ptlrpc_request    *req    = NULL;
        struct lfsck_request     *lr;
-       struct lu_seq_range       range  = { 0 };
+       struct lu_seq_range      *range  = &lfsck_env_info(env)->lti_range;
        int                       rc     = 0;
        ENTRY;
 
        if (unlikely(fid_is_idif(pfid)))
                RETURN(1);
 
-       fld_range_set_any(&range);
-       rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(pfid), &range);
+       fld_range_set_any(range);
+       rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(pfid), range);
        if (rc != 0)
                RETURN(rc == -ENOENT ? 1 : rc);
 
-       if (unlikely(!fld_range_is_mdt(&range)))
+       if (unlikely(!fld_range_is_mdt(range)))
                RETURN(1);
 
-       exp = lustre_find_lwp_by_index(obd->obd_name, range.lsr_index);
+       exp = lustre_find_lwp_by_index(obd->obd_name, range->lsr_index);
        if (unlikely(exp == NULL))
                RETURN(1);
 
@@ -3979,7 +3978,7 @@ static int lfsck_layout_prep(const struct lu_env *env,
        if (lo->ll_status == LS_COMPLETED ||
            lo->ll_status == LS_PARTIAL ||
            /* To handle orphan, must scan from the beginning. */
-           (start != NULL && start->ls_flags & LPF_ORPHAN)) {
+           (start != NULL && start->ls_flags & LPF_OST_ORPHAN)) {
                int rc;
 
                rc = lfsck_layout_reset(env, com, false);
@@ -4060,7 +4059,7 @@ static int lfsck_layout_slave_prep(const struct lu_env *env,
                return 0;
 
        rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
-       if (rc == 0 && start != NULL && start->ls_flags & LPF_ORPHAN) {
+       if (rc == 0 && start != NULL && start->ls_flags & LPF_OST_ORPHAN) {
                LASSERT(!llsd->llsd_rbtree_valid);
 
                write_lock(&llsd->llsd_rb_lock);
@@ -5275,7 +5274,7 @@ static int lfsck_layout_slave_join(const struct lu_env *env,
        int                               rc    = 0;
        ENTRY;
 
-       if (start == NULL || !(start->ls_flags & LPF_ORPHAN))
+       if (start == NULL || !(start->ls_flags & LPF_OST_ORPHAN))
                RETURN(0);
 
        if (!lsp->lsp_index_valid)
@@ -5507,7 +5506,7 @@ static int lfsck_fid_match_idx(const struct lu_env *env,
 {
        struct seq_server_site  *ss;
        struct lu_server_fld    *sf;
-       struct lu_seq_range      range  = { 0 };
+       struct lu_seq_range     *range = &lfsck_env_info(env)->lti_range;
        int                      rc;
 
        /* All abnormal cases will be returned to MDT0. */
@@ -5525,15 +5524,15 @@ static int lfsck_fid_match_idx(const struct lu_env *env,
        sf = ss->ss_server_fld;
        LASSERT(sf != NULL);
 
-       fld_range_set_any(&range);
-       rc = fld_server_lookup(env, sf, fid_seq(fid), &range);
+       fld_range_set_any(range);
+       rc = fld_server_lookup(env, sf, fid_seq(fid), range);
        if (rc != 0)
                return rc;
 
-       if (!fld_range_is_mdt(&range))
+       if (!fld_range_is_mdt(range))
                return -EINVAL;
 
-       if (range.lsr_index == idx)
+       if (range->lsr_index == idx)
                return 1;
 
        return 0;
index e864119..cf6ce46 100644 (file)
@@ -53,6 +53,7 @@ static void lfsck_key_fini(const struct lu_context *ctx,
        struct lfsck_thread_info *info = data;
 
        lu_buf_free(&info->lti_linkea_buf);
+       lu_buf_free(&info->lti_linkea_buf2);
        lu_buf_free(&info->lti_big_buf);
        OBD_FREE_PTR(info);
 }
@@ -421,6 +422,23 @@ void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
        }
 }
 
+int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
+                             struct lfsck_instance *lfsck,
+                             const struct lu_fid *fid)
+{
+       struct seq_server_site  *ss     =
+                       lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
+       struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
+       int                      rc;
+
+       fld_range_set_mdt(range);
+       rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
+       if (rc == 0)
+               rc = range->lsr_index;
+
+       return rc;
+}
+
 static const char dot[] = ".";
 static const char dotdot[] = "..";
 static const char dotlustre[] = ".lustre";
@@ -449,7 +467,7 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
        ENTRY;
 
        rc = linkea_data_new(&ldata,
-                            &lfsck_env_info(env)->lti_linkea_buf);
+                            &lfsck_env_info(env)->lti_linkea_buf2);
        if (rc != 0)
                RETURN(rc);
 
@@ -593,7 +611,7 @@ static int lfsck_create_lpf_remote(const struct lu_env *env,
        ENTRY;
 
        rc = linkea_data_new(&ldata,
-                            &lfsck_env_info(env)->lti_linkea_buf);
+                            &lfsck_env_info(env)->lti_linkea_buf2);
        if (rc != 0)
                RETURN(rc);
 
index 83e9223..625a26c 100644 (file)
@@ -124,7 +124,6 @@ static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
        dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
        dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
        dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
-       dst->ln_mlinked_checked = le64_to_cpu(src->ln_mlinked_checked);
        dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
        dst->ln_objs_repaired_phase2 =
                                le64_to_cpu(src->ln_objs_repaired_phase2);
@@ -135,6 +134,8 @@ static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
                      &src->ln_fid_latest_scanned_phase2);
        dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
        dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
+       dst->ln_mul_linked_checked = le64_to_cpu(src->ln_mul_linked_checked);
+       dst->ln_mul_linked_repaired = le64_to_cpu(src->ln_mul_linked_repaired);
 }
 
 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
@@ -160,7 +161,6 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
        dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
        dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
        dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
-       dst->ln_mlinked_checked = cpu_to_le64(src->ln_mlinked_checked);
        dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
        dst->ln_objs_repaired_phase2 =
                                cpu_to_le64(src->ln_objs_repaired_phase2);
@@ -171,6 +171,8 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
                      &src->ln_fid_latest_scanned_phase2);
        dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
        dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
+       dst->ln_mul_linked_checked = cpu_to_le64(src->ln_mul_linked_checked);
+       dst->ln_mul_linked_repaired = cpu_to_le64(src->ln_mul_linked_repaired);
 }
 
 static void lfsck_namespace_record_failure(const struct lu_env *env,
@@ -290,114 +292,112 @@ static int lfsck_namespace_init(const struct lu_env *env,
        return rc;
 }
 
-static int lfsck_namespace_lookup(const struct lu_env *env,
-                                 struct lfsck_component *com,
-                                 const struct lu_fid *fid, __u8 *flags)
-{
-       struct lu_fid *key = &lfsck_env_info(env)->lti_fid;
-       int            rc;
-
-       fid_cpu_to_be(key, fid);
-       rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)flags,
-                      (const struct dt_key *)key, BYPASS_CAPA);
-       return rc;
-}
-
-static int lfsck_namespace_delete(const struct lu_env *env,
-                                 struct lfsck_component *com,
-                                 const struct lu_fid *fid)
+/**
+ * Update the namespace LFSCK tracing file for the given @fid
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] fid      the fid which flags to be updated in the lfsck
+ *                     tracing file
+ * \param[in] add      true if add new flags, otherwise remove flags
+ *
+ * \retval             0 for succeed or nothing to be done
+ * \retval             negative error number on failure
+ */
+int lfsck_namespace_trace_update(const struct lu_env *env,
+                                struct lfsck_component *com,
+                                const struct lu_fid *fid,
+                                const __u8 flags, bool add)
 {
        struct lfsck_instance   *lfsck  = com->lc_lfsck;
-       struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
-       struct thandle          *handle;
        struct dt_object        *obj    = com->lc_obj;
-       int                      rc;
+       struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid3;
+       struct dt_device        *dev    = lfsck->li_bottom;
+       struct thandle          *th     = NULL;
+       int                      rc     = 0;
+       __u8                     old    = 0;
+       __u8                     new    = 0;
        ENTRY;
 
-       handle = dt_trans_create(env, lfsck->li_bottom);
-       if (IS_ERR(handle))
-               RETURN(PTR_ERR(handle));
-
-       rc = dt_declare_delete(env, obj, (const struct dt_key *)fid, handle);
-       if (rc != 0)
-               GOTO(out, rc);
-
-       rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
-       if (rc != 0)
-               GOTO(out, rc);
+       LASSERT(flags != 0);
 
+       down_write(&com->lc_sem);
        fid_cpu_to_be(key, fid);
-       rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
-                      BYPASS_CAPA);
-
-       GOTO(out, rc);
+       rc = dt_lookup(env, obj, (struct dt_rec *)&old,
+                      (const struct dt_key *)key, BYPASS_CAPA);
+       if (rc == -ENOENT) {
+               if (!add)
+                       GOTO(unlock, rc = 0);
 
-out:
-       dt_trans_stop(env, lfsck->li_bottom, handle);
-       return rc;
-}
+               old = 0;
+               new = flags;
+       } else if (rc == 0) {
+               if (add) {
+                       if ((old & flags) == flags)
+                               GOTO(unlock, rc = 0);
 
-static int lfsck_namespace_update(const struct lu_env *env,
-                                 struct lfsck_component *com,
-                                 const struct lu_fid *fid,
-                                 __u8 flags, bool force)
-{
-       struct lfsck_instance   *lfsck  = com->lc_lfsck;
-       struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
-       struct thandle          *handle;
-       struct dt_object        *obj    = com->lc_obj;
-       int                      rc;
-       bool                     exist  = false;
-       __u8                     tf;
-       ENTRY;
+                       new = old | flags;
+               } else {
+                       if ((old & flags) == 0)
+                               GOTO(unlock, rc = 0);
 
-       rc = lfsck_namespace_lookup(env, com, fid, &tf);
-       if (rc != 0 && rc != -ENOENT)
-               RETURN(rc);
+                       new = old & ~flags;
+               }
+       } else {
+               GOTO(log, rc);
+       }
 
-       if (rc == 0) {
-               if (!force || flags == tf)
-                       RETURN(0);
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(log, rc = PTR_ERR(th));
 
-               exist = true;
-               handle = dt_trans_create(env, lfsck->li_bottom);
-               if (IS_ERR(handle))
-                       RETURN(PTR_ERR(handle));
+       if (old != 0) {
+               rc = dt_declare_delete(env, obj,
+                                      (const struct dt_key *)key, th);
+               if (rc != 0)
+                       GOTO(log, rc);
+       }
 
-               rc = dt_declare_delete(env, obj, (const struct dt_key *)fid,
-                                      handle);
+       if (new != 0) {
+               rc = dt_declare_insert(env, obj,
+                                      (const struct dt_rec *)&new,
+                                      (const struct dt_key *)key, th);
                if (rc != 0)
-                       GOTO(out, rc);
-       } else {
-               handle = dt_trans_create(env, lfsck->li_bottom);
-               if (IS_ERR(handle))
-                       RETURN(PTR_ERR(handle));
+                       GOTO(log, rc);
        }
 
-       rc = dt_declare_insert(env, obj, (const struct dt_rec *)&flags,
-                              (const struct dt_key *)fid, handle);
+       rc = dt_trans_start_local(env, dev, th);
        if (rc != 0)
-               GOTO(out, rc);
+               GOTO(log, rc);
 
-       rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
-       if (rc != 0)
-               GOTO(out, rc);
+       if (old != 0) {
+               rc = dt_delete(env, obj, (const struct dt_key *)key,
+                              th, BYPASS_CAPA);
+               if (rc != 0)
+                       GOTO(log, rc);
+       }
 
-       fid_cpu_to_be(key, fid);
-       if (exist) {
-               rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
-                              BYPASS_CAPA);
+       if (new != 0) {
+               rc = dt_insert(env, obj, (const struct dt_rec *)&new,
+                              (const struct dt_key *)key, th, BYPASS_CAPA, 1);
                if (rc != 0)
-                       GOTO(out, rc);
+                       GOTO(log, rc);
        }
 
-       rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
-                      (const struct dt_key *)key, handle, BYPASS_CAPA, 1);
+       GOTO(log, rc);
 
-       GOTO(out, rc);
+log:
+       if (th != NULL && !IS_ERR(th))
+               dt_trans_stop(env, dev, th);
+
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK %s flags for "DFID" in the "
+              "tracing file, flags %x, old %x, new %x: rc = %d\n",
+              lfsck_lfsck2name(lfsck), add ? "add" : "del", PFID(fid),
+              (__u32)flags, (__u32)old, (__u32)new, rc);
+
+unlock:
+       up_write(&com->lc_sem);
 
-out:
-       dt_trans_stop(env, lfsck->li_bottom, handle);
        return rc;
 }
 
@@ -444,42 +444,104 @@ static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
        return rc;
 }
 
-static int lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
-                           struct linkea_data *ldata)
+int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
+                      struct linkea_data *ldata)
 {
        int rc;
 
-       ldata->ld_buf =
-               lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf,
-                                      PAGE_CACHE_SIZE);
        if (ldata->ld_buf->lb_buf == NULL)
                return -ENOMEM;
 
        if (!dt_object_exists(obj))
-               return -ENODATA;
+               return -ENOENT;
 
        rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
        if (rc == -ERANGE) {
                /* Buf was too small, figure out what we need. */
-               lu_buf_free(ldata->ld_buf);
-               rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
+               rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LINK,
                                  BYPASS_CAPA);
-               if (rc < 0)
+               if (rc <= 0)
                        return rc;
 
-               ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
+               lu_buf_realloc(ldata->ld_buf, rc);
                if (ldata->ld_buf->lb_buf == NULL)
                        return -ENOMEM;
 
                rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
                                  BYPASS_CAPA);
        }
-       if (rc < 0)
-               return rc;
 
-       linkea_init(ldata);
+       if (rc > 0)
+               rc = linkea_init(ldata);
 
-       return 0;
+       return rc;
+}
+
+/**
+ * Remove linkEA for the given object.
+ *
+ * The caller should take the ldlm lock before the calling.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] obj      pointer to the dt_object to be handled
+ *
+ * \retval             0 for repaired cases
+ * \retval             negative error number on failure
+ */
+static int lfsck_namespace_links_remove(const struct lu_env *env,
+                                       struct lfsck_component *com,
+                                       struct dt_object *obj)
+{
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct dt_device                *dev    = lfsck->li_bottom;
+       struct thandle                  *th     = NULL;
+       int                              rc     = 0;
+       ENTRY;
+
+       LASSERT(dt_object_remote(obj) == 0);
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(log, rc = PTR_ERR(th));
+
+       rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, obj, 0);
+       if (unlikely(lfsck_is_dead_obj(obj)))
+               GOTO(unlock, rc = -ENOENT);
+
+       if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+               GOTO(unlock, rc = 0);
+
+       rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th, BYPASS_CAPA);
+
+       GOTO(unlock, rc);
+
+unlock:
+       dt_write_unlock(env, obj);
+
+stop:
+       dt_trans_stop(env, dev, th);
+
+log:
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
+              "for the object "DFID": rc = %d\n",
+              lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
+
+       if (rc == 0) {
+               struct lfsck_namespace *ns = com->lc_file_ram;
+
+               ns->ln_flags |= LF_INCONSISTENT;
+       }
+
+       return rc;
 }
 
 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
@@ -493,19 +555,27 @@ static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
                            BYPASS_CAPA);
 }
 
-/**
- * \retval ve: removed entries
- */
-static int lfsck_linkea_entry_unpack(struct lfsck_instance *lfsck,
-                                    struct linkea_data *ldata,
-                                    struct lu_name *cname,
-                                    struct lu_fid *pfid)
+static void lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata,
+                                               struct lu_name *cname,
+                                               struct lu_fid *pfid,
+                                               char *buf)
+{
+       linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
+       /* To guarantee the 'name' is terminated with '0'. */
+       memcpy(buf, cname->ln_name, cname->ln_namelen);
+       buf[cname->ln_namelen] = 0;
+       cname->ln_name = buf;
+}
+
+static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata,
+                                              struct lu_name *cname,
+                                              struct lu_fid *pfid,
+                                              bool remove)
 {
        struct link_ea_entry    *oldlee;
        int                      oldlen;
-       int                      removed = 0;
+       int                      repeated = 0;
 
-       linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
        oldlee = ldata->ld_lee;
        oldlen = ldata->ld_reclen;
        linkea_next_entry(ldata);
@@ -514,203 +584,554 @@ static int lfsck_linkea_entry_unpack(struct lfsck_instance *lfsck,
                                   ldata->ld_lee->lee_reclen[1];
                if (unlikely(ldata->ld_reclen == oldlen &&
                             memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
+                       repeated++;
+                       if (!remove)
+                               break;
+
                        linkea_del_buf(ldata, cname);
-                       removed++;
                } else {
                        linkea_next_entry(ldata);
                }
        }
        ldata->ld_lee = oldlee;
        ldata->ld_reclen = oldlen;
-       return removed;
+
+       return repeated;
+}
+
+static int lfsck_namespace_insert_orphan(const struct lu_env *env,
+                                        struct lfsck_component *com,
+                                        struct dt_object *orphan,
+                                        const char *infix, const char *type,
+                                        int *count)
+{
+       /* XXX: TBD */
+       return 0;
+}
+
+static int lfsck_namespace_insert_normal(const struct lu_env *env,
+                                        struct lfsck_component *com,
+                                        struct dt_object *parent,
+                                        struct dt_object *child,
+                                        const char *name)
+{
+       /* XXX: TBD */
+       return 0;
+}
+
+static int lfsck_namespace_create_orphan(const struct lu_env *env,
+                                        struct lfsck_component *com,
+                                        struct dt_object *orphan)
+{
+       /* XXX: TBD */
+       return 0;
 }
 
 /**
- * \retval +ve repaired
- * \retval 0   no need to repair
- * \retval -ve error cases
+ * Remove the specified entry from the linkEA.
+ *
+ * Locate the linkEA entry with the given @cname and @pfid, then
+ * remove this entry or the other entries those are repeated with
+ * this entry.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] obj      pointer to the dt_object to be handled
+ * \param[in,out]ldata  pointer to the buffer that holds the linkEA
+ * \param[in] cname    the name for the child in the parent directory
+ * \param[in] pfid     the parent directory's FID for the linkEA
+ * \param[in] next     if true, then remove the first found linkEA
+ *                     entry, and move the ldata->ld_lee to next entry
+ *
+ * \retval             positive number for repaired cases
+ * \retval             0 if nothing to be repaired
+ * \retval             negative error number on failure
  */
-static int lfsck_namespace_double_scan_one(const struct lu_env *env,
-                                          struct lfsck_component *com,
-                                          struct dt_object *child, __u8 flags)
+static int lfsck_namespace_shrink_linkea(const struct lu_env *env,
+                                        struct lfsck_component *com,
+                                        struct dt_object *obj,
+                                        struct linkea_data *ldata,
+                                        struct lu_name *cname,
+                                        struct lu_fid *pfid,
+                                        bool next)
 {
-       struct lfsck_thread_info *info    = lfsck_env_info(env);
-       struct lu_attr           *la      = &info->lti_la;
-       struct lu_name           *cname   = &info->lti_name;
-       struct lu_fid            *pfid    = &info->lti_fid;
-       struct lu_fid            *cfid    = &info->lti_fid2;
-       struct lfsck_instance   *lfsck    = com->lc_lfsck;
-       struct lfsck_bookmark   *bk       = &lfsck->li_bookmark_ram;
-       struct lfsck_namespace  *ns       = com->lc_file_ram;
-       struct linkea_data       ldata    = { 0 };
-       struct thandle          *handle   = NULL;
-       bool                     locked   = false;
-       bool                     update   = false;
-       int                      rc;
+       struct lfsck_instance           *lfsck     = com->lc_lfsck;
+       struct dt_device                *dev       = lfsck->li_bottom;
+       struct lfsck_bookmark           *bk        = &lfsck->li_bookmark_ram;
+       struct thandle                  *th        = NULL;
+       struct lustre_handle             lh        = { 0 };
+       struct linkea_data               ldata_new = { 0 };
+       struct lu_buf                    linkea_buf;
+       int                              rc        = 0;
        ENTRY;
 
-       if (com->lc_journal) {
+       rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
+                             MDS_INODELOCK_UPDATE |
+                             MDS_INODELOCK_XATTR, LCK_EX);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       if (next)
+               linkea_del_buf(ldata, cname);
+       else
+               lfsck_namespace_filter_linkea_entry(ldata, cname, pfid,
+                                                   true);
+       lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
+                      ldata->ld_leh->leh_len);
 
 again:
-               LASSERT(!locked);
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(unlock1, rc = PTR_ERR(th));
 
-               update = false;
-               com->lc_journal = 1;
-               handle = dt_trans_create(env, lfsck->li_next);
-               if (IS_ERR(handle))
-                       RETURN(rc = PTR_ERR(handle));
+       rc = dt_declare_xattr_set(env, obj, &linkea_buf,
+                                 XATTR_NAME_LINK, 0, th);
+       if (rc != 0)
+               GOTO(stop, rc);
 
-               rc = dt_declare_xattr_set(env, child,
-                       lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
-                       XATTR_NAME_LINK, 0, handle);
-               if (rc != 0)
-                       GOTO(stop, rc);
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc != 0)
+               GOTO(stop, rc);
 
-               rc = dt_trans_start(env, lfsck->li_next, handle);
-               if (rc != 0)
-                       GOTO(stop, rc);
+       dt_write_lock(env, obj, 0);
+       if (unlikely(lfsck_is_dead_obj(obj)))
+               GOTO(unlock2, rc = -ENOENT);
+
+       rc = lfsck_links_read2(env, obj, &ldata_new);
+       if (rc != 0)
+               GOTO(unlock2, rc);
+
+       /* The specified linkEA entry has been removed by race. */
+       rc = linkea_links_find(&ldata_new, cname, pfid);
+       if (rc != 0)
+               GOTO(unlock2, rc = 0);
+
+       if (bk->lb_param & LPF_DRYRUN)
+               GOTO(unlock2, rc = 1);
+
+       if (next)
+               linkea_del_buf(&ldata_new, cname);
+       else
+               lfsck_namespace_filter_linkea_entry(&ldata_new, cname, pfid,
+                                                   true);
 
-               dt_write_lock(env, child, MOR_TGT_CHILD);
-               locked = true;
+       if (linkea_buf.lb_len < ldata_new.ld_leh->leh_len) {
+               dt_write_unlock(env, obj);
+               dt_trans_stop(env, dev, th);
+               lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
+                              ldata_new.ld_leh->leh_len);
+               goto again;
        }
 
-       if (unlikely(lfsck_is_dead_obj(child)))
-               GOTO(stop, rc = 0);
+       lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
+                      ldata_new.ld_leh->leh_len);
+       rc = dt_xattr_set(env, obj, &linkea_buf,
+                         XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
+
+       GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
+
+unlock2:
+       dt_write_unlock(env, obj);
+
+stop:
+       dt_trans_stop(env, dev, th);
+
+unlock1:
+       lfsck_ibits_unlock(&lh, LCK_EX);
+
+log:
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK remove %s linkEA entry "
+              "for the object: "DFID", parent "DFID", name %.*s\n",
+              lfsck_lfsck2name(lfsck), next ? "invalid" : "redundant",
+              PFID(lfsck_dto2fid(obj)), PFID(pfid), cname->ln_namelen,
+              cname->ln_name);
 
-       rc = dt_attr_get(env, child, la, BYPASS_CAPA);
-       if (rc == 0)
-               rc = lfsck_links_read(env, child, &ldata);
        if (rc != 0) {
-               if ((bk->lb_param & LPF_DRYRUN) &&
-                   (rc == -EINVAL || rc == -ENODATA))
-                       rc = 1;
+               struct lfsck_namespace *ns = com->lc_file_ram;
 
-               GOTO(stop, rc);
+               ns->ln_flags |= LF_INCONSISTENT;
+       }
+
+       return rc;
+}
+
+/**
+ * Conditionally remove the specified entry from the linkEA.
+ *
+ * Take the parent lock firstly, then check whether the specified
+ * name entry exists or not: if yes, do nothing; otherwise, call
+ * lfsck_namespace_shrink_linkea() to remove the linkea entry.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] parent   pointer to the parent directory
+ * \param[in] child    pointer to the child object that holds the linkEA
+ * \param[in,out]ldata  pointer to the buffer that holds the linkEA
+ * \param[in] cname    the name for the child in the parent directory
+ * \param[in] pfid     the parent directory's FID for the linkEA
+ *
+ * \retval             positive number for repaired cases
+ * \retval             0 if nothing to be repaired
+ * \retval             negative error number on failure
+ */
+static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env,
+                                             struct lfsck_component *com,
+                                             struct dt_object *parent,
+                                             struct dt_object *child,
+                                             struct linkea_data *ldata,
+                                             struct lu_name *cname,
+                                             struct lu_fid *pfid)
+{
+       struct lu_fid           *cfid   = &lfsck_env_info(env)->lti_fid3;
+       struct lustre_handle     lh     = { 0 };
+       int                      rc;
+       ENTRY;
+
+       rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
+                             MDS_INODELOCK_UPDATE, LCK_EX);
+       if (rc != 0)
+               RETURN(rc);
+
+       dt_read_lock(env, parent, 0);
+       if (unlikely(lfsck_is_dead_obj(parent))) {
+               dt_read_unlock(env, parent);
+               lfsck_ibits_unlock(&lh, LCK_EX);
+               rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
+                                                  cname, pfid, true);
+
+               RETURN(rc);
+       }
+
+       rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
+                      (const struct dt_key *)cname->ln_name,
+                      BYPASS_CAPA);
+       dt_read_unlock(env, parent);
+
+       /* It is safe to release the ldlm lock, because when the logic come
+        * here, we have got all the needed information above whether the
+        * linkEA entry is valid or not. It is not important that others
+        * may add new linkEA entry after the ldlm lock released. If other
+        * has removed the specified linkEA entry by race, then it is OK,
+        * because the subsequent lfsck_namespace_shrink_linkea() can handle
+        * such case. */
+       lfsck_ibits_unlock(&lh, LCK_EX);
+       if (rc == -ENOENT) {
+               rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
+                                                  cname, pfid, true);
+
+               RETURN(rc);
+       }
+
+       if (rc != 0)
+               RETURN(rc);
+
+       /* The LFSCK just found some internal status of cross-MDTs
+        * create operation. That is normal. */
+       if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
+               linkea_next_entry(ldata);
+
+               RETURN(0);
        }
 
+       rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname,
+                                          pfid, true);
+
+       RETURN(rc);
+}
+
+/**
+ * Double scan the MDT-object for namespace LFSCK.
+ *
+ * If the MDT-object contains invalid or repeated linkEA entries, then drop
+ * those entries from the linkEA; if the linkEA becomes empty or the object
+ * has no linkEA, then it is an orphan and will be added into the directory
+ * .lustre/lost+found/MDTxxxx/; if the remote parent is lost, then recreate
+ * the remote parent; if the name entry corresponding to some linkEA entry
+ * is lost, then add the name entry back to the namespace.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] child    pointer to the dt_object to be handled
+ * \param[in] flags    some hints to indicate how the @child should be handled
+ *
+ * \retval             positive number for repaired cases
+ * \retval             0 if nothing to be repaired
+ * \retval             negative error number on failure
+ */
+static int lfsck_namespace_double_scan_one(const struct lu_env *env,
+                                          struct lfsck_component *com,
+                                          struct dt_object *child, __u8 flags)
+{
+       struct lfsck_thread_info *info     = lfsck_env_info(env);
+       struct lu_attr           *la       = &info->lti_la;
+       struct lu_name           *cname    = &info->lti_name;
+       struct lu_fid            *pfid     = &info->lti_fid;
+       struct lu_fid            *cfid     = &info->lti_fid2;
+       struct lfsck_instance    *lfsck    = com->lc_lfsck;
+       struct lfsck_namespace   *ns       = com->lc_file_ram;
+       struct dt_object         *parent   = NULL;
+       struct linkea_data        ldata    = { 0 };
+       bool                      repaired = false;
+       int                       count    = 0;
+       int                       rc;
+       ENTRY;
+
+       dt_read_lock(env, child, 0);
+       if (unlikely(lfsck_is_dead_obj(child))) {
+               dt_read_unlock(env, child);
+
+               RETURN(0);
+       }
+
+       rc = lfsck_links_read(env, child, &ldata);
+       dt_read_unlock(env, child);
+       if (rc != 0)
+               GOTO(out, rc);
+
        linkea_first_entry(&ldata);
        while (ldata.ld_lee != NULL) {
-               struct dt_object *parent = NULL;
+               lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid,
+                                                   info->lti_key);
+               rc = lfsck_namespace_filter_linkea_entry(&ldata, cname, pfid,
+                                                        false);
+               /* Found repeated linkEA entries */
+               if (rc > 0) {
+                       rc = lfsck_namespace_shrink_linkea(env, com, child,
+                                               &ldata, cname, pfid, false);
+                       if (rc < 0)
+                               GOTO(out, rc);
 
-               rc = lfsck_linkea_entry_unpack(lfsck, &ldata, cname, pfid);
-               if (rc > 0)
-                       update = true;
+                       if (rc == 0)
+                               continue;
+
+                       repaired = true;
+
+                       /* fall through */
+               }
+
+               /* Invalid PFID in the linkEA entry. */
+               if (!fid_is_sane(pfid)) {
+                       rc = lfsck_namespace_shrink_linkea(env, com, child,
+                                               &ldata, cname, pfid, true);
+                       if (rc < 0)
+                               GOTO(out, rc);
 
-               if (!fid_is_sane(pfid))
-                       goto shrink;
+                       if (rc > 0)
+                               repaired = true;
+
+                       continue;
+               }
 
                parent = lfsck_object_find(env, lfsck, pfid);
                if (IS_ERR(parent))
-                       GOTO(stop, rc = PTR_ERR(parent));
+                       GOTO(out, rc = PTR_ERR(parent));
+
+               if (!dt_object_exists(parent)) {
+                       if (ldata.ld_leh->leh_reccount > 1) {
+                               /* If it is NOT the last linkEA entry, then
+                                * there is still other chance to make the
+                                * child to be visible via other parent, then
+                                * remove this linkEA entry. */
+                               rc = lfsck_namespace_shrink_linkea(env, com,
+                                       child, &ldata, cname, pfid, true);
+                       } else {
+                               /* Create the lost parent as an orphan. */
+                               rc = lfsck_namespace_create_orphan(env, com,
+                                                                  parent);
+                               if (rc < 0) {
+                                       lfsck_object_put(env, parent);
+
+                                       GOTO(out, rc);
+                               }
 
-               if (!dt_object_exists(parent))
-                       goto shrink;
+                               if (rc > 0)
+                                       repaired = true;
+
+                               /* Add the missed name entry to the parent. */
+                               rc = lfsck_namespace_insert_normal(env, com,
+                                               parent, child, cname->ln_name);
+                               linkea_next_entry(&ldata);
+                       }
 
-               /* XXX: Currently, skip remote object, the consistency for
-                *      remote object will be processed in LFSCK phase III. */
-               if (dt_object_remote(parent)) {
                        lfsck_object_put(env, parent);
-                       linkea_next_entry(&ldata);
+                       if (rc < 0)
+                               GOTO(out, rc);
+
+                       if (rc > 0)
+                               repaired = true;
+
                        continue;
                }
 
-               if (unlikely(!dt_try_as_dir(env, parent)))
-                       goto shrink;
+               /* The linkEA entry with bad parent will be removed. */
+               if (unlikely(!dt_try_as_dir(env, parent))) {
+                       lfsck_object_put(env, parent);
+                       rc = lfsck_namespace_shrink_linkea(env, com, child,
+                                               &ldata, cname, pfid, true);
+                       if (rc < 0)
+                               GOTO(out, rc);
+
+                       if (rc > 0)
+                               repaired = true;
+
+                       continue;
+               }
 
-               /* To guarantee the 'name' is terminated with '0'. */
-               memcpy(info->lti_key, cname->ln_name, cname->ln_namelen);
-               info->lti_key[cname->ln_namelen] = 0;
-               cname->ln_name = info->lti_key;
                rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
                               (const struct dt_key *)cname->ln_name,
                               BYPASS_CAPA);
                if (rc != 0 && rc != -ENOENT) {
                        lfsck_object_put(env, parent);
-                       GOTO(stop, rc);
+
+                       GOTO(out, rc);
                }
 
                if (rc == 0) {
+                       lfsck_object_put(env, parent);
                        if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
-                               lfsck_object_put(env, parent);
+                               /* It is the most common case that we
+                                * find the name entry corresponding
+                                * to the linkEA entry. */
                                linkea_next_entry(&ldata);
-                               continue;
+                       } else {
+                               /* XXX: The name entry references another
+                                *      MDT-object that may be created by
+                                *      the LFSCK for repairing dangling
+                                *      name entry. There will be another
+                                *      patch for further processing. */
+                               rc = lfsck_namespace_shrink_linkea(env, com,
+                                       child, &ldata, cname, pfid, true);
+                               if (rc < 0)
+                                       GOTO(out, rc);
+
+                               if (rc > 0)
+                                       repaired = true;
                        }
 
-                       goto shrink;
+                       continue;
                }
 
+               rc = dt_attr_get(env, child, la, BYPASS_CAPA);
+               if (rc != 0)
+                       GOTO(out, rc);
+
                /* If there is no name entry in the parent dir and the object
                 * link count is less than the linkea entries count, then the
                 * linkea entry should be removed. */
-               if (ldata.ld_leh->leh_reccount > la->la_nlink)
-                       goto shrink;
-
-               /* XXX: For the case of there is a linkea entry, but without
-                *      name entry pointing to the object and its hard links
-                *      count is not less than the object name entries count,
-                *      then seems we should add the 'missed' name entry back
-                *      to namespace, but before LFSCK phase III finished, we
-                *      do not know whether the object has some inconsistency
-                *      on other MDTs. So now, do NOT add the name entry back
-                *      to the namespace, but keep the linkEA entry. LU-2914 */
+               if (ldata.ld_leh->leh_reccount > la->la_nlink) {
+                       rc = lfsck_namespace_shrink_linkea_cond(env, com,
+                                       parent, child, &ldata, cname, pfid);
+                       lfsck_object_put(env, parent);
+                       if (rc < 0)
+                               GOTO(out, rc);
+
+                       if (rc > 0)
+                               repaired = true;
+
+                       continue;
+               }
+
+               /* Add the missed name entry back to the namespace. */
+               rc = lfsck_namespace_insert_normal(env, com, parent, child,
+                                                  cname->ln_name);
                lfsck_object_put(env, parent);
+               if (rc < 0)
+                       GOTO(out, rc);
+
+               if (rc > 0)
+                       repaired = true;
+
                linkea_next_entry(&ldata);
-               continue;
+       }
 
-shrink:
-               if (parent != NULL)
-                       lfsck_object_put(env, parent);
-               if (bk->lb_param & LPF_DRYRUN)
-                       RETURN(1);
+       GOTO(out, rc = 0);
+
+out:
+       if (rc < 0 && rc != -ENODATA)
+               return rc;
 
-               CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
-                     "for the object: "DFID", parent "DFID", name %.*s\n",
-                     lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)),
-                     PFID(pfid), cname->ln_namelen, cname->ln_name);
+       if (rc == 0) {
+               LASSERT(ldata.ld_leh != NULL);
 
-               linkea_del_buf(&ldata, cname);
-               update = true;
+               count = ldata.ld_leh->leh_reccount;
        }
 
-       if (update) {
-               if (!com->lc_journal) {
-                       com->lc_journal = 1;
-                       goto again;
-               }
+       if (count == 0) {
+               /* If the child becomes orphan, then insert it into
+                * the global .lustre/lost+found/MDTxxxx directory. */
+               rc = lfsck_namespace_insert_orphan(env, com, child, "", "O",
+                                                  &count);
+               if (rc < 0)
+                       return rc;
 
-               rc = lfsck_links_write(env, child, &ldata, handle);
+               if (rc > 0)
+                       repaired = true;
        }
 
-       GOTO(stop, rc);
+       rc = dt_attr_get(env, child, la, BYPASS_CAPA);
+       if (rc != 0)
+               return rc;
 
-stop:
-       if (locked) {
-       /* XXX: For the case linkea entries count does not match the object hard
-        *      links count, we cannot update the later one simply. Before LFSCK
-        *      phase III finished, we cannot know whether there are some remote
-        *      name entries to be repaired or not. LU-2914 */
-               if (rc == 0 && !lfsck_is_dead_obj(child) &&
-                   ldata.ld_leh != NULL &&
-                   ldata.ld_leh->leh_reccount != la->la_nlink)
-                       CDEBUG(D_LFSCK, "%s: the object "DFID" linkEA entry "
-                              "count %u may not match its hardlink count %u\n",
-                              lfsck_lfsck2name(lfsck), PFID(cfid),
-                              ldata.ld_leh->leh_reccount, la->la_nlink);
-
-               dt_write_unlock(env, child);
+       if (la->la_nlink != count) {
+               /* XXX: there will be other patch(es) for MDT-object
+                *      hard links verification. */
        }
 
-       if (handle != NULL)
-               dt_trans_stop(env, lfsck->li_next, handle);
+       if (repaired) {
+               if (la->la_nlink > 1) {
+                       down_write(&com->lc_sem);
+                       ns->ln_mul_linked_repaired++;
+                       up_write(&com->lc_sem);
+               }
 
-       if (rc == 0 && update) {
-               ns->ln_objs_nlink_repaired++;
-               rc = 1;
+               if (rc == 0)
+                       rc = 1;
        }
 
        return rc;
 }
 
+static void lfsck_namespace_dump_statistics(struct seq_file *m,
+                                           struct lfsck_namespace *ns,
+                                           __u64 checked_phase1,
+                                           __u64 checked_phase2,
+                                           __u32 time_phase1,
+                                           __u32 time_phase2)
+{
+       seq_printf(m, "checked_phase1: "LPU64"\n"
+                     "checked_phase2: "LPU64"\n"
+                     "updated_phase1: "LPU64"\n"
+                     "updated_phase2: "LPU64"\n"
+                     "failed_phase1: "LPU64"\n"
+                     "failed_phase2: "LPU64"\n"
+                     "directories: "LPU64"\n"
+                     "dirent_repaired: "LPU64"\n"
+                     "linkea_repaired: "LPU64"\n"
+                     "nlinks_repaired: "LPU64"\n"
+                     "lost_found: "LPU64"\n"
+                     "multiple_linked_checked: "LPU64"\n"
+                     "multiple_linked_repaired: "LPU64"\n"
+                     "success_count: %u\n"
+                     "run_time_phase1: %u seconds\n"
+                     "run_time_phase2: %u seconds\n",
+                     checked_phase1,
+                     checked_phase2,
+                     ns->ln_items_repaired,
+                     ns->ln_objs_repaired_phase2,
+                     ns->ln_items_failed,
+                     ns->ln_objs_failed_phase2,
+                     ns->ln_dirs_checked,
+                     ns->ln_dirent_repaired,
+                     ns->ln_linkea_repaired,
+                     ns->ln_objs_nlink_repaired,
+                     ns->ln_objs_lost_found,
+                     ns->ln_mul_linked_checked,
+                     ns->ln_mul_linked_repaired,
+                     ns->ln_success_count,
+                     time_phase1,
+                     time_phase2);
+}
+
 /* namespace APIs */
 
 static int lfsck_namespace_reset(const struct lu_env *env,
@@ -872,12 +1293,15 @@ static int lfsck_namespace_prep(const struct lu_env *env,
                        ns->ln_items_repaired = 0;
                        ns->ln_items_failed = 0;
                        ns->ln_dirs_checked = 0;
-                       ns->ln_mlinked_checked = 0;
                        ns->ln_objs_checked_phase2 = 0;
                        ns->ln_objs_repaired_phase2 = 0;
                        ns->ln_objs_failed_phase2 = 0;
                        ns->ln_objs_nlink_repaired = 0;
                        ns->ln_objs_lost_found = 0;
+                       ns->ln_dirent_repaired = 0;
+                       ns->ln_linkea_repaired = 0;
+                       ns->ln_mul_linked_checked = 0;
+                       ns->ln_mul_linked_repaired = 0;
                        fid_zero(&ns->ln_fid_latest_scanned_phase2);
                        if (list_empty(&com->lc_link_dir))
                                list_add_tail(&com->lc_link_dir,
@@ -915,12 +1339,114 @@ static int lfsck_namespace_exec_oit(const struct lu_env *env,
                                    struct lfsck_component *com,
                                    struct dt_object *obj)
 {
+       struct lfsck_thread_info *info  = lfsck_env_info(env);
+       struct lfsck_namespace   *ns    = com->lc_file_ram;
+       struct lfsck_instance    *lfsck = com->lc_lfsck;
+       const struct lu_fid      *fid   = lfsck_dto2fid(obj);
+       struct lu_attr           *la    = &info->lti_la;
+       struct lu_fid            *pfid  = &info->lti_fid2;
+       struct lu_name           *cname = &info->lti_name;
+       struct lu_seq_range      *range = &info->lti_range;
+       struct dt_device         *dev   = lfsck->li_bottom;
+       struct seq_server_site   *ss    =
+                               lu_site2seq(dev->dd_lu_dev.ld_site);
+       struct linkea_data        ldata = { 0 };
+       __u32                     idx   = lfsck_dev_idx(dev);
+       int                       rc;
+       ENTRY;
+
+       rc = lfsck_links_read(env, obj, &ldata);
+       if (rc == -ENOENT)
+               GOTO(out, rc = 0);
+
+       /* -EINVAL means crashed linkEA, should be verified. */
+       if (rc == -EINVAL) {
+               rc = lfsck_namespace_trace_update(env, com, fid,
+                                                 LNTF_CHECK_LINKEA, true);
+               if (rc == 0) {
+                       struct lustre_handle lh = { 0 };
+
+                       rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
+                                             MDS_INODELOCK_UPDATE |
+                                             MDS_INODELOCK_XATTR, LCK_EX);
+                       if (rc == 0) {
+                               rc = lfsck_namespace_links_remove(env, com,
+                                                                 obj);
+                               lfsck_ibits_unlock(&lh, LCK_EX);
+                       }
+               }
+
+               GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
+       }
+
+       /* zero-linkEA object may be orphan, but it also maybe because
+        * of upgrading. Currently, we cannot record it for double scan.
+        * Because it may cause the LFSCK tracing file to be too large. */
+       if (rc == -ENODATA) {
+               if (S_ISDIR(lfsck_object_type(obj)))
+                       GOTO(out, rc = 0);
+
+               rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               if (la->la_nlink > 1)
+                       rc = lfsck_namespace_trace_update(env, com, fid,
+                                               LNTF_CHECK_LINKEA, true);
+
+               GOTO(out, rc);
+       }
+
+       if (rc != 0)
+               GOTO(out, rc);
+
+       /* Record multiple-linked object. */
+       if (ldata.ld_leh->leh_reccount > 1) {
+               rc = lfsck_namespace_trace_update(env, com, fid,
+                                                 LNTF_CHECK_LINKEA, true);
+
+               GOTO(out, rc);
+       }
+
+       linkea_first_entry(&ldata);
+       linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
+       if (!fid_is_sane(pfid)) {
+               rc = lfsck_namespace_trace_update(env, com, fid,
+                                                 LNTF_CHECK_PARENT, true);
+       } else {
+               fld_range_set_mdt(range);
+               rc = fld_local_lookup(env, ss->ss_server_fld,
+                                     fid_seq(pfid), range);
+               if ((rc == -ENOENT) ||
+                   (rc == 0 && range->lsr_index != idx)) {
+                       rc = lfsck_namespace_trace_update(env, com, fid,
+                                               LNTF_CHECK_LINKEA, true);
+               } else {
+                       if (S_ISDIR(lfsck_object_type(obj)))
+                               GOTO(out, rc = 0);
+
+                       rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       if (la->la_nlink > 1)
+                               rc = lfsck_namespace_trace_update(env, com,
+                                               fid, LNTF_CHECK_LINKEA, true);
+               }
+       }
+
+       GOTO(out, rc);
+
+out:
        down_write(&com->lc_sem);
        com->lc_new_checked++;
        if (S_ISDIR(lfsck_object_type(obj)))
-               ((struct lfsck_namespace *)com->lc_file_ram)->ln_dirs_checked++;
+               ns->ln_dirs_checked++;
+       if (rc != 0)
+               lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
        up_write(&com->lc_sem);
-       return 0;
+
+       return rc;
 }
 
 static int lfsck_namespace_exec_dir(const struct lu_env *env,
@@ -1086,40 +1612,14 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
                        do_div(new_checked, duration);
                if (rtime != 0)
                        do_div(speed, rtime);
-               seq_printf(m, "checked_phase1: "LPU64"\n"
-                             "checked_phase2: "LPU64"\n"
-                             "updated_phase1: "LPU64"\n"
-                             "updated_phase2: "LPU64"\n"
-                             "failed_phase1: "LPU64"\n"
-                             "failed_phase2: "LPU64"\n"
-                             "directories: "LPU64"\n"
-                             "multi_linked_files: "LPU64"\n"
-                             "dirent_repaired: "LPU64"\n"
-                             "linkea_repaired: "LPU64"\n"
-                             "nlinks_repaired: "LPU64"\n"
-                             "lost_found: "LPU64"\n"
-                             "success_count: %u\n"
-                             "run_time_phase1: %u seconds\n"
-                             "run_time_phase2: %u seconds\n"
-                             "average_speed_phase1: "LPU64" items/sec\n"
+               lfsck_namespace_dump_statistics(m, ns, checked,
+                                               ns->ln_objs_checked_phase2,
+                                               rtime, ns->ln_run_time_phase2);
+
+               seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
                              "average_speed_phase2: N/A\n"
                              "real_time_speed_phase1: "LPU64" items/sec\n"
                              "real_time_speed_phase2: N/A\n",
-                             checked,
-                             ns->ln_objs_checked_phase2,
-                             ns->ln_items_repaired,
-                             ns->ln_objs_repaired_phase2,
-                             ns->ln_items_failed,
-                             ns->ln_objs_failed_phase2,
-                             ns->ln_dirs_checked,
-                             ns->ln_mlinked_checked,
-                             ns->ln_dirent_repaired,
-                             ns->ln_linkea_repaired,
-                             ns->ln_objs_nlink_repaired,
-                             ns->ln_objs_lost_found,
-                             ns->ln_success_count,
-                             rtime,
-                             ns->ln_run_time_phase2,
                              speed,
                              new_checked);
 
@@ -1168,41 +1668,15 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
                        do_div(speed1, ns->ln_run_time_phase1);
                if (rtime != 0)
                        do_div(speed2, rtime);
-               seq_printf(m, "checked_phase1: "LPU64"\n"
-                             "checked_phase2: "LPU64"\n"
-                             "updated_phase1: "LPU64"\n"
-                             "updated_phase2: "LPU64"\n"
-                             "failed_phase1: "LPU64"\n"
-                             "failed_phase2: "LPU64"\n"
-                             "directories: "LPU64"\n"
-                             "multi_linked_files: "LPU64"\n"
-                             "dirent_repaired: "LPU64"\n"
-                             "linkea_repaired: "LPU64"\n"
-                             "nlinks_repaired: "LPU64"\n"
-                             "lost_found: "LPU64"\n"
-                             "success_count: %u\n"
-                             "run_time_phase1: %u seconds\n"
-                             "run_time_phase2: %u seconds\n"
-                             "average_speed_phase1: "LPU64" items/sec\n"
+               lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
+                                               checked,
+                                               ns->ln_run_time_phase1, rtime);
+
+               seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
                              "average_speed_phase2: "LPU64" objs/sec\n"
                              "real_time_speed_phase1: N/A\n"
                              "real_time_speed_phase2: "LPU64" objs/sec\n"
                              "current_position: "DFID"\n",
-                             ns->ln_items_checked,
-                             checked,
-                             ns->ln_items_repaired,
-                             ns->ln_objs_repaired_phase2,
-                             ns->ln_items_failed,
-                             ns->ln_objs_failed_phase2,
-                             ns->ln_dirs_checked,
-                             ns->ln_mlinked_checked,
-                             ns->ln_dirent_repaired,
-                             ns->ln_linkea_repaired,
-                             ns->ln_objs_nlink_repaired,
-                             ns->ln_objs_lost_found,
-                             ns->ln_success_count,
-                             ns->ln_run_time_phase1,
-                             rtime,
                              speed1,
                              speed2,
                              new_checked,
@@ -1215,41 +1689,16 @@ lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
                        do_div(speed1, ns->ln_run_time_phase1);
                if (ns->ln_run_time_phase2 != 0)
                        do_div(speed2, ns->ln_run_time_phase2);
-               seq_printf(m, "checked_phase1: "LPU64"\n"
-                             "checked_phase2: "LPU64"\n"
-                             "updated_phase1: "LPU64"\n"
-                             "updated_phase2: "LPU64"\n"
-                             "failed_phase1: "LPU64"\n"
-                             "failed_phase2: "LPU64"\n"
-                             "directories: "LPU64"\n"
-                             "multi_linked_files: "LPU64"\n"
-                             "dirent_repaired: "LPU64"\n"
-                             "linkea_repaired: "LPU64"\n"
-                             "nlinks_repaired: "LPU64"\n"
-                             "lost_found: "LPU64"\n"
-                             "success_count: %u\n"
-                             "run_time_phase1: %u seconds\n"
-                             "run_time_phase2: %u seconds\n"
-                             "average_speed_phase1: "LPU64" items/sec\n"
+               lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
+                                               ns->ln_objs_checked_phase2,
+                                               ns->ln_run_time_phase1,
+                                               ns->ln_run_time_phase2);
+
+               seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
                              "average_speed_phase2: "LPU64" objs/sec\n"
                              "real_time_speed_phase1: N/A\n"
                              "real_time_speed_phase2: N/A\n"
                              "current_position: N/A\n",
-                             ns->ln_items_checked,
-                             ns->ln_objs_checked_phase2,
-                             ns->ln_items_repaired,
-                             ns->ln_objs_repaired_phase2,
-                             ns->ln_items_failed,
-                             ns->ln_objs_failed_phase2,
-                             ns->ln_dirs_checked,
-                             ns->ln_mlinked_checked,
-                             ns->ln_dirent_repaired,
-                             ns->ln_linkea_repaired,
-                             ns->ln_objs_nlink_repaired,
-                             ns->ln_objs_lost_found,
-                             ns->ln_success_count,
-                             ns->ln_run_time_phase1,
-                             ns->ln_run_time_phase2,
                              speed1,
                              speed2);
        }
@@ -1426,11 +1875,14 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
        struct dt_object           *dir      = lnr->lnr_obj;
        struct dt_object           *obj      = NULL;
        const struct lu_fid        *pfid     = lfsck_dto2fid(dir);
+       struct dt_device           *dev;
+       struct lustre_handle        lh       = { 0 };
        bool                        repaired = false;
-       bool                        locked   = false;
+       bool                        dtlocked = false;
        bool                        remove;
        bool                        newdata;
        bool                        log      = false;
+       int                         idx;
        int                         count    = 0;
        int                         rc;
        ENTRY;
@@ -1451,7 +1903,29 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
             fid_seq_is_dot(fid_seq(&lnr->lnr_fid))))
                GOTO(out, rc = 0);
 
-       obj = lfsck_object_find(env, lfsck, &lnr->lnr_fid);
+       idx = lfsck_find_mdt_idx_by_fid(env, lfsck, &lnr->lnr_fid);
+       if (idx < 0)
+               GOTO(out, rc = idx);
+
+       if (idx == lfsck_dev_idx(lfsck->li_bottom)) {
+               dev = lfsck->li_next;
+       } else {
+               struct lfsck_tgt_desc *ltd;
+
+               ltd = LTD_TGT(&lfsck->li_mdt_descs, idx);
+               if (unlikely(ltd == NULL)) {
+                       CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which "
+                              "did not join the namespace LFSCK\n",
+                              lfsck_lfsck2name(lfsck), idx);
+                       ns->ln_flags |= LF_INCOMPLETE;
+
+                       GOTO(out, rc = -ENODEV);
+               }
+
+               dev = ltd->ltd_tgt;
+       }
+
+       obj = lfsck_object_find_by_dev(env, dev, &lnr->lnr_fid);
        if (IS_ERR(obj))
                GOTO(out, rc = PTR_ERR(obj));
 
@@ -1465,14 +1939,16 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
        }
 
        cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
-       if (!(bk->lb_param & LPF_DRYRUN) &&
-           (com->lc_journal || repaired)) {
+       if (!(bk->lb_param & LPF_DRYRUN) && repaired) {
 
 again:
-               LASSERT(!locked);
+               rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
+                                     MDS_INODELOCK_UPDATE |
+                                     MDS_INODELOCK_XATTR, LCK_EX);
+               if (rc != 0)
+                       GOTO(out, rc);
 
-               com->lc_journal = 1;
-               handle = dt_trans_create(env, lfsck->li_next);
+               handle = dt_trans_create(env, dev);
                if (IS_ERR(handle))
                        GOTO(out, rc = PTR_ERR(handle));
 
@@ -1480,12 +1956,12 @@ again:
                if (rc != 0)
                        GOTO(stop, rc);
 
-               rc = dt_trans_start(env, lfsck->li_next, handle);
+               rc = dt_trans_start(env, dev, handle);
                if (rc != 0)
                        GOTO(stop, rc);
 
-               dt_write_lock(env, obj, MOR_TGT_CHILD);
-               locked = true;
+               dt_write_lock(env, obj, 0);
+               dtlocked = true;
        }
 
        rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
@@ -1528,13 +2004,15 @@ again:
 
 nodata:
                if (bk->lb_param & LPF_DRYRUN) {
+                       down_write(&com->lc_sem);
                        ns->ln_linkea_repaired++;
+                       up_write(&com->lc_sem);
                        repaired = true;
                        log = true;
                        goto record;
                }
 
-               if (!com->lc_journal)
+               if (!lustre_handle_is_used(&lh))
                        goto again;
 
                if (remove) {
@@ -1562,9 +2040,16 @@ nodata:
                        GOTO(stop, rc);
 
                count = ldata.ld_leh->leh_reccount;
+               down_write(&com->lc_sem);
                ns->ln_linkea_repaired++;
+               up_write(&com->lc_sem);
                repaired = true;
                log = true;
+       } else if (rc == -ENOENT) {
+               log = false;
+               repaired = false;
+
+               GOTO(stop, rc = 0);
        } else {
                GOTO(stop, rc);
        }
@@ -1586,26 +2071,31 @@ record:
                LASSERT(dt_write_locked(env, obj));
 
                dt_write_unlock(env, obj);
-               locked = false;
+               dtlocked = false;
 
-               dt_trans_stop(env, lfsck->li_next, handle);
+               dt_trans_stop(env, dev, handle);
                handle = NULL;
+
+               lfsck_ibits_unlock(&lh, LCK_EX);
        }
 
-       ns->ln_mlinked_checked++;
-       rc = lfsck_namespace_update(env, com, &lnr->lnr_fid,
-                       count != la->la_nlink ? LLF_UNMATCH_NLINKS : 0, false);
+       down_write(&com->lc_sem);
+       ns->ln_mul_linked_checked++;
+       up_write(&com->lc_sem);
+       rc = lfsck_namespace_trace_update(env, com, &lnr->lnr_fid,
+                                         LNTF_CHECK_LINKEA, true);
 
        GOTO(out, rc);
 
 stop:
-       if (locked)
+       if (dtlocked)
                dt_write_unlock(env, obj);
 
-       if (handle != NULL)
-               dt_trans_stop(env, lfsck->li_next, handle);
+       if (handle != NULL && !IS_ERR(handle))
+               dt_trans_stop(env, dev, handle);
 
 out:
+       lfsck_ibits_unlock(&lh, LCK_EX);
        down_write(&com->lc_sem);
        if (rc < 0) {
                CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
@@ -1633,8 +2123,6 @@ out:
                                lfsck_pos_fill(env, lfsck,
                                               &ns->ln_pos_first_inconsistent,
                                               false);
-               } else {
-                       com->lc_journal = 0;
                }
                rc = 0;
        }
@@ -1659,7 +2147,7 @@ static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
        struct dt_key           *key;
        struct lu_fid            fid;
        int                      rc;
-       __u8                     flags = 0;
+       __u8                     flags  = 0;
        ENTRY;
 
        CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
@@ -1695,29 +2183,38 @@ static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
                        l_wait_event(thread->t_ctl_waitq,
                                     !thread_is_running(thread),
                                     &lwi);
+
+                       if (unlikely(!thread_is_running(thread)))
+                               GOTO(put, rc = 0);
                }
 
                key = iops->key(env, di);
                fid_be_to_cpu(&fid, (const struct lu_fid *)key);
+               if (!fid_is_sane(&fid)) {
+                       rc = 0;
+                       goto checkpoint;
+               }
+
                target = lfsck_object_find(env, lfsck, &fid);
-               down_write(&com->lc_sem);
                if (IS_ERR(target)) {
                        rc = PTR_ERR(target);
                        goto checkpoint;
                }
 
-               /* XXX: Currently, skip remote object, the consistency for
-                *      remote object will be processed in LFSCK phase III. */
-               if (dt_object_exists(target) && !dt_object_remote(target)) {
+               if (dt_object_exists(target)) {
                        rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
-                       if (rc == 0)
+                       if (rc == 0) {
                                rc = lfsck_namespace_double_scan_one(env, com,
                                                                target, flags);
+                               if (rc == -ENOENT)
+                                       rc = 0;
+                       }
                }
 
                lfsck_object_put(env, target);
 
 checkpoint:
+               down_write(&com->lc_sem);
                com->lc_new_checked++;
                com->lc_new_scanned++;
                ns->ln_fid_latest_scanned_phase2 = fid;
@@ -1727,13 +2224,6 @@ checkpoint:
                        ns->ln_objs_failed_phase2++;
                up_write(&com->lc_sem);
 
-               if ((rc == 0) || ((rc > 0) && !(bk->lb_param & LPF_DRYRUN))) {
-                       lfsck_namespace_delete(env, com, &fid);
-               } else if (rc < 0) {
-                       flags |= LLF_REPAIR_FAILED;
-                       lfsck_namespace_update(env, com, &fid, flags, true);
-               }
-
                if (rc < 0 && bk->lb_param & LPF_FAILOUT)
                        GOTO(put, rc);
 
@@ -1772,6 +2262,10 @@ put:
 
 fini:
        iops->fini(env, di);
+
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan stop: rc = %d\n",
+              lfsck_lfsck2name(lfsck), rc);
+
        return rc;
 }
 
@@ -1808,7 +2302,6 @@ static int lfsck_namespace_double_scan_result(const struct lu_env *env,
        com->lc_new_checked = 0;
 
        if (rc > 0) {
-               com->lc_journal = 0;
                if (ns->ln_flags & LF_INCOMPLETE)
                        ns->ln_status = LS_PARTIAL;
                else
@@ -1909,7 +2402,7 @@ int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
        rc = dt_declare_xattr_set(env, obj, &linkea_buf,
                                  XATTR_NAME_LINK, fl, th);
        if (rc != 0)
-               GOTO(stop, rc = PTR_ERR(th));
+               GOTO(stop, rc);
 
        rc = dt_trans_start_local(env, dev, th);
        if (rc != 0)
index 8d9b52c..1577c54 100644 (file)
@@ -1185,6 +1185,10 @@ int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
 {
        const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
                                                     ldata->ld_leh->leh_len);
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
+               return 0;
+
        return mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle,
                             mdd_object_capa(env, mdd_obj));
 }
index cdf317a..bb1fd78 100644 (file)
@@ -4713,8 +4713,6 @@ void lustre_assert_wire_constants(void)
                (unsigned)LFSCK_TYPE_SCRUB);
        LASSERTF(LFSCK_TYPE_LAYOUT == 0x00000001UL, "found 0x%.8xUL\n",
                (unsigned)LFSCK_TYPE_LAYOUT);
-       LASSERTF(LFSCK_TYPE_DNE == 0x00000002UL, "found 0x%.8xUL\n",
-               (unsigned)LFSCK_TYPE_DNE);
        LASSERTF(LFSCK_TYPE_NAMESPACE == 0x00000004UL, "found 0x%.8xUL\n",
                (unsigned)LFSCK_TYPE_NAMESPACE);
        LASSERTF(LE_LASTID_REBUILDING == 1, "found %lld\n",
index 3ab412f..4bb9e36 100644 (file)
@@ -43,6 +43,9 @@ setupall
 [[ $(lustre_version_code ost1) -lt $(version_code 2.5.55) ]] &&
        ALWAYS_EXCEPT="$ALWAYS_EXCEPT 11 12 13 14 15 16 17 18 19 20 21"
 
+[[ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.6.50) ]] &&
+       ALWAYS_EXCEPT="$ALWAYS_EXCEPT 2d 3"
+
 build_test_filter
 
 $LCTL set_param debug=+lfsck > /dev/null || true
@@ -261,7 +264,7 @@ test_2a() {
        # for interop with old server
        [ -z "$repaired" ] &&
                repaired=$($SHOW_NAMESPACE |
-                        awk '/^updated_phase1/ { print $2 }')
+                        awk '/^updated_phase2/ { print $2 }')
 
        [ $repaired -eq 1 ] ||
                error "(5) Fail to repair crashed linkEA: $repaired"
@@ -348,6 +351,83 @@ test_2c()
 }
 run_test 2c "LFSCK can find out and remove repeated linkEA entry"
 
+test_2d()
+{
+       lfsck_prep 1 1
+
+       #define OBD_FAIL_LFSCK_NO_LINKEA        0x161d
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0x161d
+       touch $DIR/$tdir/dummy
+
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0
+       umount_client $MOUNT
+       $START_NAMESPACE -r || error "(3) Fail to start LFSCK for namespace!"
+       wait_update_facet $SINGLEMDS "$LCTL get_param -n \
+               mdd.${MDT_DEV}.lfsck_namespace |
+               awk '/^status/ { print \\\$2 }'" "completed" 32 || {
+               $SHOW_NAMESPACE
+               error "(4) unexpected status"
+       }
+
+       local repaired=$($SHOW_NAMESPACE |
+                        awk '/^linkea_repaired/ { print $2 }')
+       [ $repaired -eq 1 ] ||
+               error "(5) Fail to repair crashed linkEA: $repaired"
+
+       mount_client $MOUNT || error "(6) Fail to start client!"
+
+       stat $DIR/$tdir/dummy | grep "Links: 1" > /dev/null ||
+               error "(7) Fail to stat $DIR/$tdir/dummy"
+
+       local dummyfid=$($LFS path2fid $DIR/$tdir/dummy)
+       local dummyname=$($LFS fid2path $DIR $dummyfid)
+       [ "$dummyname" == "$DIR/$tdir/dummy" ] ||
+               error "(8) Fail to repair linkEA: $dummyfid $dummyname"
+}
+run_test 2d "LFSCK can recover the missed linkEA entry"
+
+test_3()
+{
+       lfsck_prep 4 4
+
+       mkdir $DIR/$tdir/dummy || error "(1) Fail to mkdir"
+       ln $DIR/$tdir/d0/f0 $DIR/$tdir/dummy/f0 || error "(2) Fail to hardlink"
+       ln $DIR/$tdir/d0/f1 $DIR/$tdir/dummy/f1 || error "(3) Fail to hardlink"
+
+       $LFS mkdir -i 0 $DIR/$tdir/edir || error "(4) Fail to mkdir"
+       touch $DIR/$tdir/edir/f0 || error "(5) Fail to touch"
+       touch $DIR/$tdir/edir/f1 || error "(6) Fail to touch"
+
+       #define OBD_FAIL_LFSCK_LINKEA_CRASH     0x1603
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1603
+       ln $DIR/$tdir/edir/f0 $DIR/$tdir/edir/w0 || error "(7) Fail to hardlink"
+
+       #define OBD_FAIL_LFSCK_LINKEA_MORE      0x1604
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1604
+       ln $DIR/$tdir/edir/f1 $DIR/$tdir/edir/w1 || error "(8) Fail to hardlink"
+
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0
+
+       $START_NAMESPACE -r || error "(9) Fail to start LFSCK for namespace!"
+       wait_update_facet $SINGLEMDS "$LCTL get_param -n \
+               mdd.${MDT_DEV}.lfsck_namespace |
+               awk '/^status/ { print \\\$2 }'" "completed" 32 || {
+               $SHOW_NAMESPACE
+               error "(10) unexpected status"
+       }
+
+       local checked=$($SHOW_NAMESPACE |
+                       awk '/^checked_phase2/ { print $2 }')
+       [ $checked -ge 4 ] ||
+               error "(11) Fail to check multiple-linked object: $checked"
+
+       local repaired=$($SHOW_NAMESPACE |
+                        awk '/^multiple_linked_repaired/ { print $2 }')
+       [ $repaired -ge 2 ] ||
+               error "(12) Fail to repair multiple-linked object: $repaired"
+}
+run_test 3 "LFSCK can verify multiple-linked objects"
+
 test_4()
 {
        [ $(facet_fstype $SINGLEMDS) != ldiskfs ] &&
@@ -1842,13 +1922,22 @@ test_18c() {
                        error "(5) Expect 0 fixed on mds2, but got: $repaired"
        fi
 
+       ls -ail $MOUNT/.lustre/lost+found/
+
        echo "There should NOT be some stub under .lustre/lost+found/MDT0001/"
-       ls -ail $MOUNT/.lustre/lost+found/MDT0001/*-N-0 &&
-               error "(6) .lustre/lost+found/MDT0001/ should be empty"
+       if [ -d $MOUNT/.lustre/lost+found/MDT0001 ]; then
+               cname=$(find $MOUNT/.lustre/lost+found/MDT0001/ -name *-N-*)
+               [ -z "$cname" ] ||
+                       error "(6) .lustre/lost+found/MDT0001/ should be empty"
+       fi
 
        echo "There should be some stub under .lustre/lost+found/MDT0000/"
-       ls -ail $MOUNT/.lustre/lost+found/MDT0000/*-N-0 ||
-               error "(7) .lustre/lost+found/MDT0000/ should not be empty"
+       [ -d $MOUNT/.lustre/lost+found/MDT0000 ] ||
+               error "(7) $MOUNT/.lustre/lost+found/MDT0000/ should be there"
+
+       cname=$(find $MOUNT/.lustre/lost+found/MDT0000/ -name *-N-*)
+       [ ! -z "$cname" ] ||
+               error "(8) .lustre/lost+found/MDT0000/ should not be empty"
 }
 run_test 18c "Find out orphan OST-object and repair it (3)"
 
@@ -2036,14 +2125,17 @@ test_18e() {
                error "(6) Expect 1 orphan has been fixed, but got: $repaired"
 
        echo "There should be stub file under .lustre/lost+found/MDT0000/"
-       local cname=$(ls $MOUNT/.lustre/lost+found/MDT0000/*-C-0)
-       [ ! -z $name ] ||
-               error "(7) .lustre/lost+found/MDT0000/ should not be empty"
+       [ -d $MOUNT/.lustre/lost+found/MDT0000 ] ||
+               error "(7) $MOUNT/.lustre/lost+found/MDT0000/ should be there"
+
+       cname=$(find $MOUNT/.lustre/lost+found/MDT0000/ -name *-C-*)
+       [ ! -z "$cname" ] ||
+               error "(8) .lustre/lost+found/MDT0000/ should not be empty"
 
        echo "The stub file should keep the original f2 data"
        cur_size=$(ls -il $cname | awk '{ print $6 }')
        [ "$cur_size" == "$saved_size" ] ||
-               error "(8) Expect file2 size $saved_size, but got $cur_size"
+               error "(9) Expect file2 size $saved_size, but got $cur_size"
 
        cat $cname
        $LFS path2fid $cname
index bbaed41..f339379 100644 (file)
@@ -75,7 +75,6 @@ struct lfsck_type_name {
 static struct lfsck_type_name lfsck_types_names[] = {
        { "scrub",      LFSCK_TYPE_SCRUB },
        { "layout",     LFSCK_TYPE_LAYOUT },
-/*     { "dne",        LFSCK_TYPE_DNE }, */
        { "namespace",  LFSCK_TYPE_NAMESPACE },
        { "default",    LFSCK_TYPES_DEF },
        { "all",        LFSCK_TYPES_SUPPORTED },
@@ -112,7 +111,7 @@ static void usage_start(void)
                "-e: error handle mode (default 'continue', or 'abort')\n"
                "-h: this help message\n"
                "-n: check with no modification (default 'off', or 'on')\n"
-               "-o: repair orphan objects\n"
+               "-o: repair orphan OST-objects\n"
                "-r: reset scanning to the start of the device\n"
                "-s: maximum items to be scanned per second "
                    "(default '%d' = no limit)\n"
@@ -221,7 +220,7 @@ int jt_lfsck_start(int argc, char **argv)
                        break;
                case 'o':
                        start.ls_flags |= LPF_ALL_TGT | LPF_BROADCAST |
-                                         LPF_ORPHAN;
+                                         LPF_OST_ORPHAN;
                        break;
                case 'r':
                        start.ls_flags |= LPF_RESET;
index c2e11a4..5bea5f4 100644 (file)
@@ -2135,7 +2135,6 @@ static void check_lfsck_request(void)
 
        CHECK_VALUE_X(LFSCK_TYPE_SCRUB);
        CHECK_VALUE_X(LFSCK_TYPE_LAYOUT);
-       CHECK_VALUE_X(LFSCK_TYPE_DNE);
        CHECK_VALUE_X(LFSCK_TYPE_NAMESPACE);
 
        CHECK_VALUE(LE_LASTID_REBUILDING);
index aeb88ae..006bdf6 100644 (file)
@@ -4725,8 +4725,6 @@ void lustre_assert_wire_constants(void)
                (unsigned)LFSCK_TYPE_SCRUB);
        LASSERTF(LFSCK_TYPE_LAYOUT == 0x00000001UL, "found 0x%.8xUL\n",
                (unsigned)LFSCK_TYPE_LAYOUT);
-       LASSERTF(LFSCK_TYPE_DNE == 0x00000002UL, "found 0x%.8xUL\n",
-               (unsigned)LFSCK_TYPE_DNE);
        LASSERTF(LFSCK_TYPE_NAMESPACE == 0x00000004UL, "found 0x%.8xUL\n",
                (unsigned)LFSCK_TYPE_NAMESPACE);
        LASSERTF(LE_LASTID_REBUILDING == 1, "found %lld\n",