Whamcloud - gitweb
LU-5511 lfsck: repair unmatched parent-child pairs
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
index 625a26c..9b759b6 100644 (file)
@@ -136,6 +136,10 @@ static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
        dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
        dst->ln_mul_linked_checked = le64_to_cpu(src->ln_mul_linked_checked);
        dst->ln_mul_linked_repaired = le64_to_cpu(src->ln_mul_linked_repaired);
+       dst->ln_unknown_inconsistency =
+                               le64_to_cpu(src->ln_unknown_inconsistency);
+       dst->ln_unmatched_pairs_repaired =
+                               le64_to_cpu(src->ln_unmatched_pairs_repaired);
 }
 
 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
@@ -173,6 +177,10 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
        dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
        dst->ln_mul_linked_checked = cpu_to_le64(src->ln_mul_linked_checked);
        dst->ln_mul_linked_repaired = cpu_to_le64(src->ln_mul_linked_repaired);
+       dst->ln_unknown_inconsistency =
+                               cpu_to_le64(src->ln_unknown_inconsistency);
+       dst->ln_unmatched_pairs_repaired =
+                               cpu_to_le64(src->ln_unmatched_pairs_repaired);
 }
 
 static void lfsck_namespace_record_failure(const struct lu_env *env,
@@ -840,6 +848,781 @@ static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env,
 }
 
 /**
+ * Overwrite the linkEA for the object with the given ldata.
+ *
+ * The caller should take the ldlm lock before the calling.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] obj      pointer to the dt_object to be handled
+ * \param[in] ldata    pointer to the new linkEA data
+ *
+ * \retval             positive number for repaired cases
+ * \retval             0 if nothing to be repaired
+ * \retval             negative error number on failure
+ */
+int lfsck_namespace_rebuild_linkea(const struct lu_env *env,
+                                  struct lfsck_component *com,
+                                  struct dt_object *obj,
+                                  struct linkea_data *ldata)
+{
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct dt_device                *dev    = lfsck->li_bottom;
+       struct thandle                  *th     = NULL;
+       struct lu_buf                    linkea_buf;
+       int                              rc     = 0;
+       ENTRY;
+
+       LASSERT(!dt_object_remote(obj));
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(log, rc = PTR_ERR(th));
+
+       lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
+                      ldata->ld_leh->leh_len);
+       rc = dt_declare_xattr_set(env, obj, &linkea_buf,
+                                 XATTR_NAME_LINK, 0, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, obj, 0);
+       if (unlikely(lfsck_is_dead_obj(obj)))
+               GOTO(unlock, rc = 0);
+
+       if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+               GOTO(unlock, rc = 1);
+
+       rc = dt_xattr_set(env, obj, &linkea_buf,
+                         XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
+
+       GOTO(unlock, rc = (rc == 0 ? 1 : rc));
+
+unlock:
+       dt_write_unlock(env, obj);
+
+stop:
+       dt_trans_stop(env, dev, th);
+
+log:
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild linkEA for the "
+              "object "DFID": rc = %d\n",
+              lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
+
+       if (rc != 0) {
+               struct lfsck_namespace *ns = com->lc_file_ram;
+
+               ns->ln_flags |= LF_INCONSISTENT;
+       }
+
+       return rc;
+}
+
+/**
+ * Update the ".." name entry for the given object.
+ *
+ * The object's ".." is corrupted, this function will update the ".." name
+ * entry with the given pfid, and the linkEA with the given ldata.
+ *
+ * The caller should take the ldlm lock before the calling.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] obj      pointer to the dt_object to be handled
+ * \param[in] pfid     the new fid for the object's ".." name entry
+ * \param[in] cname    the name for the @obj in the parent directory
+ *
+ * \retval             positive number for repaired cases
+ * \retval             0 if nothing to be repaired
+ * \retval             negative error number on failure
+ */
+static int lfsck_namespace_repair_unmatched_pairs(const struct lu_env *env,
+                                                 struct lfsck_component *com,
+                                                 struct dt_object *obj,
+                                                 const struct lu_fid *pfid,
+                                                 struct lu_name *cname)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct dt_insert_rec            *rec    = &info->lti_dt_rec;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct dt_device                *dev    = lfsck->li_bottom;
+       struct thandle                  *th     = NULL;
+       struct linkea_data               ldata  = { 0 };
+       struct lu_buf                    linkea_buf;
+       int                              rc     = 0;
+       ENTRY;
+
+       LASSERT(!dt_object_remote(obj));
+       LASSERT(S_ISDIR(lfsck_object_type(obj)));
+
+       rc = linkea_data_new(&ldata, &info->lti_big_buf);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       rc = linkea_add_buf(&ldata, cname, pfid);
+       if (rc != 0)
+               GOTO(log, rc);
+
+       lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
+                      ldata.ld_leh->leh_len);
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(log, rc = PTR_ERR(th));
+
+       rc = dt_declare_delete(env, obj, (const struct dt_key *)dotdot, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rec->rec_type = S_IFDIR;
+       rec->rec_fid = pfid;
+       rc = dt_declare_insert(env, obj, (const struct dt_rec *)rec,
+                              (const struct dt_key *)dotdot, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_declare_xattr_set(env, obj, &linkea_buf,
+                                 XATTR_NAME_LINK, 0, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, obj, 0);
+       if (unlikely(lfsck_is_dead_obj(obj)))
+               GOTO(unlock, rc = 0);
+
+       if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+               GOTO(unlock, rc = 1);
+
+       /* The old ".." name entry maybe not exist. */
+       dt_delete(env, obj, (const struct dt_key *)dotdot, th,
+                 BYPASS_CAPA);
+
+       rc = dt_insert(env, obj, (const struct dt_rec *)rec,
+                      (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
+       if (rc != 0)
+               GOTO(unlock, rc);
+
+       rc = dt_xattr_set(env, obj, &linkea_buf,
+                         XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
+
+       GOTO(unlock, rc = (rc == 0 ? 1 : rc));
+
+unlock:
+       dt_write_unlock(env, obj);
+
+stop:
+       dt_trans_stop(env, dev, th);
+
+log:
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK rebuild dotdot name entry for "
+              "the object "DFID", new parent "DFID": rc = %d\n",
+              lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)),
+              PFID(pfid), rc);
+
+       if (rc != 0) {
+               struct lfsck_namespace *ns = com->lc_file_ram;
+
+               ns->ln_flags |= LF_INCONSISTENT;
+       }
+
+       return rc;
+}
+
+/**
+ * Handle orphan @obj during Double Scan Directory.
+ *
+ * Remove the @obj's current (invalid) linkEA entries, and insert
+ * it in the directory .lustre/lost+found/MDTxxxx/ with the name:
+ * ${FID}-${PFID}-D-${conflict_version}
+ *
+ * The caller should take the ldlm lock before the calling.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] obj      pointer to the orphan object to be handled
+ * \param[in] pfid     the new fid for the object's ".." name entry
+ * \param[in,out] lh   ldlm lock handler for the given @obj
+ *
+ * \retval             positive number for repaired cases
+ * \retval             0 if nothing to be repaired
+ * \retval             negative error number on failure
+ */
+static int lfsck_namespace_dsd_orphan(const struct lu_env *env,
+                                     struct lfsck_component *com,
+                                     struct dt_object *obj,
+                                     const struct lu_fid *pfid,
+                                     struct lustre_handle *lh)
+{
+       struct lfsck_thread_info *info = lfsck_env_info(env);
+       int                       rc;
+       ENTRY;
+
+       /* Remove the unrecognized linkEA. */
+       rc = lfsck_namespace_links_remove(env, com, obj);
+       lfsck_ibits_unlock(lh, LCK_EX);
+       if (rc < 0 && rc != -ENODATA)
+               RETURN(rc);
+
+       /* The unique linkEA is invalid, even if the ".." name entry may be
+        * valid, we still cannot know via which name entry this directory
+        * will be referenced. Then handle it as pure orphan. */
+       snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
+                "-"DFID, PFID(pfid));
+       rc = lfsck_namespace_insert_orphan(env, com, obj,
+                                          info->lti_tmpbuf, "D", NULL);
+
+       RETURN(rc);
+}
+
+/**
+ * Double Scan Directory object for single linkEA entry case.
+ *
+ * The given @child has unique linkEA entry. If the linkEA entry is valid,
+ * then check whether the name is in the namespace or not, if not, add the
+ * missing name entry back to namespace. If the linkEA entry is invalid,
+ * then remove it and insert the @child in the .lustre/lost+found/MDTxxxx/
+ * as an orphan.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] child    pointer to the directory to be double scanned
+ * \param[in] pfid     the FID corresponding to the ".." entry
+ * \param[in] ldata    pointer to the linkEA data for the given @child
+ * \param[in,out] lh   ldlm lock handler for the given @child
+ * \param[out] type    to tell the caller what the inconsistency is
+ * \param[in] retry    if found inconsistency, but the caller does not hold
+ *                     ldlm lock on the @child, then set @retry as true
+ *
+ * \retval             positive number for repaired cases
+ * \retval             0 if nothing to be repaired
+ * \retval             negative error number on failure
+ */
+static int
+lfsck_namespace_dsd_single(const struct lu_env *env,
+                          struct lfsck_component *com,
+                          struct dt_object *child,
+                          const struct lu_fid *pfid,
+                          struct linkea_data *ldata,
+                          struct lustre_handle *lh,
+                          enum lfsck_namespace_inconsistency_type *type,
+                          bool *retry)
+{
+       struct lfsck_thread_info *info          = lfsck_env_info(env);
+       struct lu_name           *cname         = &info->lti_name;
+       const struct lu_fid      *cfid          = lfsck_dto2fid(child);
+       struct lu_fid            *tfid          = &info->lti_fid3;
+       struct lfsck_instance    *lfsck         = com->lc_lfsck;
+       struct dt_object         *parent        = NULL;
+       int                       rc            = 0;
+       ENTRY;
+
+       lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid, info->lti_key);
+       /* The unique linkEA entry with bad parent will be handled as orphan. */
+       if (!fid_is_sane(tfid)) {
+               if (!lustre_handle_is_used(lh) && retry != NULL)
+                       *retry = true;
+               else
+                       rc = lfsck_namespace_dsd_orphan(env, com, child,
+                                                       pfid, lh);
+
+               GOTO(out, rc);
+       }
+
+       parent = lfsck_object_find_bottom(env, lfsck, tfid);
+       if (IS_ERR(parent))
+               GOTO(out, rc = PTR_ERR(parent));
+
+       /* We trust the unique linkEA entry in spite of whether it matches the
+        * ".." name entry or not. Because even if the linkEA entry is wrong
+        * and the ".." name entry is right, we still cannot know via which
+        * name entry the child will be referenced, since all known entries
+        * have been verified during the first-stage scanning. */
+       if (!dt_object_exists(parent)) {
+               if (!lustre_handle_is_used(lh) && retry != NULL) {
+                       *retry = true;
+
+                       GOTO(out, rc = 0);
+               }
+
+               lfsck_ibits_unlock(lh, LCK_EX);
+               /* Create the lost parent as an orphan. */
+               rc = lfsck_namespace_create_orphan(env, com, parent);
+               if (rc >= 0)
+                       /* Add the missing name entry to the parent. */
+                       rc = lfsck_namespace_insert_normal(env, com, parent,
+                                                       child, cname->ln_name);
+
+               GOTO(out, rc);
+       }
+
+       /* The unique linkEA entry with bad parent will be handled as orphan. */
+       if (unlikely(!dt_try_as_dir(env, parent))) {
+               if (!lustre_handle_is_used(lh) && retry != NULL)
+                       *retry = true;
+               else
+                       rc = lfsck_namespace_dsd_orphan(env, com, child,
+                                                       pfid, lh);
+
+               GOTO(out, rc);
+       }
+
+       rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
+                      (const struct dt_key *)cname->ln_name, BYPASS_CAPA);
+       if (rc == -ENOENT) {
+               if (!lustre_handle_is_used(lh) && retry != NULL) {
+                       *retry = true;
+
+                       GOTO(out, rc = 0);
+               }
+
+               lfsck_ibits_unlock(lh, LCK_EX);
+               /* Add the missing name entry back to the namespace. */
+               rc = lfsck_namespace_insert_normal(env, com, parent, child,
+                                                  cname->ln_name);
+
+               GOTO(out, rc);
+       }
+
+       if (rc != 0)
+               GOTO(out, rc);
+
+       /* XXX: The name entry references another MDT-object that may be
+        *      created by the LFSCK for repairing dangling name entry.
+        *      There will be another patch for further processing. */
+       if (!lu_fid_eq(tfid, cfid)) {
+               if (!lustre_handle_is_used(lh) && retry != NULL)
+                       *retry = true;
+               else
+                       rc = lfsck_namespace_dsd_orphan(env, com, child,
+                                                       pfid, lh);
+
+               GOTO(out, rc);
+       }
+
+       /* The ".." name entry is wrong, update it. */
+       if (!lu_fid_eq(pfid, lfsck_dto2fid(parent))) {
+               if (!lustre_handle_is_used(lh) && retry != NULL) {
+                       *retry = true;
+
+                       GOTO(out, rc = 0);
+               }
+
+               *type = LNIT_UNMATCHED_PAIRS;
+               rc = lfsck_namespace_repair_unmatched_pairs(env, com, child,
+                                               lfsck_dto2fid(parent), cname);
+       }
+
+       GOTO(out, rc);
+
+out:
+       if (parent != NULL && !IS_ERR(parent))
+               lfsck_object_put(env, parent);
+
+       return rc;
+}
+
+/**
+ * Double Scan Directory object for single linkEA entry case.
+ *
+ * The given @child has multiple linkEA entries. There is at most one linkEA
+ * entry will be valid, all the others will be removed. Firstly, the function
+ * will try to find out the linkEA entry for which the name entry exists under
+ * the given parent (@pfid). If there is no linkEA entry that matches the given
+ * ".." name entry, then tries to find out the first linkEA entry that both the
+ * parent and the name entry exist to rebuild a new ".." name entry.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] child    pointer to the directory to be double scanned
+ * \param[in] pfid     the FID corresponding to the ".." entry
+ * \param[in] ldata    pointer to the linkEA data for the given @child
+ * \param[in,out] lh   ldlm lock handler for the given @child
+ * \param[out] type    to tell the caller what the inconsistency is
+ * \param[in] lpf      true if the ".." entry is under lost+found/MDTxxxx/
+ *
+ * \retval             positive number for repaired cases
+ * \retval             0 if nothing to be repaired
+ * \retval             negative error number on failure
+ */
+static int
+lfsck_namespace_dsd_multiple(const struct lu_env *env,
+                            struct lfsck_component *com,
+                            struct dt_object *child,
+                            const struct lu_fid *pfid,
+                            struct linkea_data *ldata,
+                            struct lustre_handle *lh,
+                            enum lfsck_namespace_inconsistency_type *type,
+                            bool lpf)
+{
+       struct lfsck_thread_info *info          = lfsck_env_info(env);
+       struct lu_name           *cname         = &info->lti_name;
+       const struct lu_fid      *cfid          = lfsck_dto2fid(child);
+       struct lu_fid            *tfid          = &info->lti_fid3;
+       struct lu_fid            *pfid2         = &info->lti_fid4;
+       struct lfsck_instance    *lfsck         = com->lc_lfsck;
+       struct dt_object         *parent        = NULL;
+       struct linkea_data        ldata_new     = { 0 };
+       int                       rc            = 0;
+       bool                      once          = true;
+       ENTRY;
+
+again:
+       while (ldata->ld_lee != NULL) {
+               lfsck_namespace_unpack_linkea_entry(ldata, cname, tfid,
+                                                   info->lti_key);
+               /* Drop repeated linkEA entries. */
+               lfsck_namespace_filter_linkea_entry(ldata, cname, tfid, true);
+               /* Drop invalid linkEA entry. */
+               if (!fid_is_sane(tfid)) {
+                       linkea_del_buf(ldata, cname);
+                       continue;
+               }
+
+               /* If current dotdot is the .lustre/lost+found/MDTxxxx/,
+                * then it is possible that: the directry object has ever
+                * been lost, but its name entry was there. In the former
+                * LFSCK run, during the first-stage scanning, the LFSCK
+                * found the dangling name entry, but it did not recreate
+                * the lost object, and when moved to the second-stage
+                * scanning, some children objects of the lost directory
+                * object were found, then the LFSCK recreated such lost
+                * directory object as an orphan.
+                *
+                * When the LFSCK runs again, if the dangling name is still
+                * there, the LFSCK should move the orphan directory object
+                * back to the normal namespace. */
+               if (!lpf && !lu_fid_eq(pfid, tfid) && once) {
+                       linkea_next_entry(ldata);
+                       continue;
+               }
+
+               parent = lfsck_object_find_bottom(env, lfsck, tfid);
+               if (IS_ERR(parent))
+                       RETURN(PTR_ERR(parent));
+
+               if (!dt_object_exists(parent)) {
+                       lfsck_object_put(env, parent);
+                       if (ldata->ld_leh->leh_reccount > 1) {
+                               /* If it is NOT the last linkEA entry, then
+                                * there is still other chance to make the
+                                * child to be visible via other parent, then
+                                * remove this linkEA entry. */
+                               linkea_del_buf(ldata, cname);
+                               continue;
+                       }
+
+                       break;
+               }
+
+               /* The linkEA entry with bad parent will be removed. */
+               if (unlikely(!dt_try_as_dir(env, parent))) {
+                       lfsck_object_put(env, parent);
+                       linkea_del_buf(ldata, cname);
+                       continue;
+               }
+
+               rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
+                              (const struct dt_key *)cname->ln_name,
+                              BYPASS_CAPA);
+               *pfid2 = *lfsck_dto2fid(parent);
+               lfsck_object_put(env, parent);
+               if (rc == -ENOENT) {
+                       linkea_next_entry(ldata);
+                       continue;
+               }
+
+               if (rc != 0)
+                       RETURN(rc);
+
+               if (lu_fid_eq(tfid, cfid)) {
+                       if (!lu_fid_eq(pfid, pfid2)) {
+                               *type = LNIT_UNMATCHED_PAIRS;
+                               rc = lfsck_namespace_repair_unmatched_pairs(env,
+                                               com, child, pfid2, cname);
+
+                               RETURN(rc);
+                       }
+
+                       /* It is the most common case that we find the
+                        * name entry corresponding to the linkEA entry
+                        * that matches the ".." name entry. */
+                       rc = linkea_data_new(&ldata_new, &info->lti_big_buf);
+                       if (rc != 0)
+                               RETURN(rc);
+
+                       rc = linkea_add_buf(&ldata_new, cname, pfid2);
+                       if (rc != 0)
+                               RETURN(rc);
+
+                       rc = lfsck_namespace_rebuild_linkea(env, com, child,
+                                                           &ldata_new);
+
+                       /* XXX: there will be other patch. */
+
+                       RETURN(rc);
+               }
+
+               /* XXX: The name entry references another MDT-object that
+                *      may be created by the LFSCK for repairing dangling
+                *      name entry. There will be another patch for further
+                *      processing. */
+               linkea_del_buf(ldata, cname);
+       }
+
+       if (ldata->ld_leh->leh_reccount == 1) {
+               rc = lfsck_namespace_dsd_single(env, com, child, pfid, ldata,
+                                               lh, type, NULL);
+
+               RETURN(rc);
+       }
+
+       /* All linkEA entries are invalid and removed, then handle the @child
+        * as an orphan.*/
+       if (ldata->ld_leh->leh_reccount == 0) {
+               rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, lh);
+
+               RETURN(rc);
+       }
+
+       linkea_first_entry(ldata);
+       /* If the dangling name entry for the orphan directory object has
+        * been remvoed, then just check whether the directory object is
+        * still under the .lustre/lost+found/MDTxxxx/ or not. */
+       if (lpf) {
+               lpf = false;
+               goto again;
+       }
+
+       /* There is no linkEA entry that matches the ".." name entry. Find
+        * the first linkEA entry that both parent and name entry exist to
+        * rebuild a new ".." name entry. */
+       if (once) {
+               once = false;
+               goto again;
+       }
+
+       RETURN(rc);
+}
+
+/**
+ * Double scan the directory object for namespace LFSCK.
+ *
+ * This function will verify the <parent, child> pairs in the namespace tree:
+ * the parent references the child via some name entry that should be in the
+ * child's linkEA entry, the child should back references the parent via its
+ * ".." name entry.
+ *
+ * The LFSCK will scan every linkEA entry in turn until find out the first
+ * matched pairs. If found, then all other linkEA entries will be dropped.
+ * If all the linkEA entries cannot match the ".." name entry, then there
+ * are serveral possible cases:
+ *
+ * 1) If there is only one linkEA entry, then trust it as long as the PFID
+ *    in the linkEA entry is valid.
+ *
+ * 2) If there are multiple linkEA entries, then try to find the linkEA
+ *    that matches the ".." name entry. If found, then all other entries
+ *    are invalid; otherwise, it is quite possible that the ".." name entry
+ *    is corrupted. Under such case, the LFSCK will rebuild the ".." name
+ *    entry according to the first valid linkEA entry (both the parent and
+ *    the name entry should exist).
+ *
+ * 3) If the directory object has no (valid) linkEA entry, then the
+ *    directory object will be handled as pure orphan and inserted
+ *    in the .lustre/lost+found/MDTxxxx/ with the name:
+ *    ${self_FID}-${PFID}-D-${conflict_version}
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] child    pointer to the directory object to be handled
+ * \param[in] flags    to indicate the specical checking on the @child
+ *
+ * \retval             positive number for repaired cases
+ * \retval             0 if nothing to be repaired
+ * \retval             negative error number on failure
+ */
+static int lfsck_namespace_double_scan_dir(const struct lu_env *env,
+                                          struct lfsck_component *com,
+                                          struct dt_object *child, __u8 flags)
+{
+       struct lfsck_thread_info *info          = lfsck_env_info(env);
+       const struct lu_fid      *cfid          = lfsck_dto2fid(child);
+       struct lu_fid            *pfid          = &info->lti_fid2;
+       struct lfsck_namespace   *ns            = com->lc_file_ram;
+       struct lfsck_instance    *lfsck         = com->lc_lfsck;
+       struct lustre_handle      lh            = { 0 };
+       struct linkea_data        ldata         = { 0 };
+       bool                      unknown       = false;
+       bool                      lpf           = false;
+       bool                      retry         = false;
+       enum lfsck_namespace_inconsistency_type type = LNIT_BAD_LINKEA;
+       int                       rc            = 0;
+       ENTRY;
+
+       LASSERT(!dt_object_remote(child));
+
+       if (!(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) {
+               CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the"
+                      "the namespace LFSCK, then the LFSCK cannot guarantee"
+                      "all the name entries have been verified in first-stage"
+                      "scanning. So have to skip orphan related handling for"
+                      "the directory object "DFID" with remote name entry\n",
+                      lfsck_lfsck2name(lfsck), PFID(cfid));
+
+               RETURN(0);
+       }
+
+       if (unlikely(!dt_try_as_dir(env, child)))
+               GOTO(out, rc = -ENOTDIR);
+
+       /* We only take ldlm lock on the @child when required. When the
+        * logic comes here for the first time, it is always false. */
+       if (0) {
+
+lock:
+               rc = lfsck_ibits_lock(env, lfsck, child, &lh,
+                                     MDS_INODELOCK_UPDATE |
+                                     MDS_INODELOCK_XATTR, LCK_EX);
+               if (rc != 0)
+                       GOTO(out, rc);
+       }
+
+       dt_read_lock(env, child, 0);
+       if (unlikely(lfsck_is_dead_obj(child))) {
+               dt_read_unlock(env, child);
+
+               GOTO(out, rc = 0);
+       }
+
+       rc = dt_lookup(env, child, (struct dt_rec *)pfid,
+                      (const struct dt_key *)dotdot, BYPASS_CAPA);
+       if (rc != 0) {
+               if (rc != -ENOENT && rc != -ENODATA && rc != -EINVAL) {
+                       dt_read_unlock(env, child);
+
+                       GOTO(out, rc);
+               }
+
+               if (!lustre_handle_is_used(&lh)) {
+                       dt_read_unlock(env, child);
+                       goto lock;
+               }
+
+               fid_zero(pfid);
+       } else if (lfsck->li_lpf_obj != NULL &&
+                  lu_fid_eq(pfid, lfsck_dto2fid(lfsck->li_lpf_obj))) {
+               lpf = true;
+       }
+
+       rc = lfsck_links_read(env, child, &ldata);
+       dt_read_unlock(env, child);
+       if (rc != 0) {
+               if (rc != -ENODATA && rc != -EINVAL)
+                       GOTO(out, rc);
+
+               if (!lustre_handle_is_used(&lh))
+                       goto lock;
+
+               if (rc == -EINVAL && !fid_is_zero(pfid)) {
+                       /* Remove the corrupted linkEA. */
+                       rc = lfsck_namespace_links_remove(env, com, child);
+                       if (rc == 0)
+                               /* Here, because of the crashed linkEA, we
+                                * cannot know whether there is some parent
+                                * that references the child directory via
+                                * some name entry or not. So keep it there,
+                                * when the LFSCK run next time, if there is
+                                * some parent that references this object,
+                                * then the LFSCK can rebuild the linkEA;
+                                * otherwise, this object will be handled
+                                * as orphan as above. */
+                               unknown = true;
+               } else {
+                       /* 1. If we have neither ".." nor linkEA,
+                        *    then it is an orphan.
+                        *
+                        * 2. If we only have the ".." name entry,
+                        *    but no parent references this child
+                        *    directory, then handle it as orphan. */
+                       lfsck_ibits_unlock(&lh, LCK_EX);
+                       snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
+                                "-"DFID, PFID(pfid));
+                       rc = lfsck_namespace_insert_orphan(env, com, child,
+                                               info->lti_tmpbuf, "D", NULL);
+               }
+
+               GOTO(out, rc);
+       }
+
+       linkea_first_entry(&ldata);
+       /* This is the most common case: the object has unique linkEA entry. */
+       if (ldata.ld_leh->leh_reccount == 1) {
+               rc = lfsck_namespace_dsd_single(env, com, child, pfid, &ldata,
+                                               &lh, &type, &retry);
+               if (retry) {
+                       LASSERT(!lustre_handle_is_used(&lh));
+
+                       retry = false;
+                       goto lock;
+               }
+
+               GOTO(out, rc);
+       }
+
+       if (!lustre_handle_is_used(&lh))
+               goto lock;
+
+       if (unlikely(ldata.ld_leh->leh_reccount == 0)) {
+               rc = lfsck_namespace_dsd_orphan(env, com, child, pfid, &lh);
+
+               GOTO(out, rc);
+       }
+
+       /* When we come here, the cases usually like that:
+        * 1) The directory object has a corrupted linkEA entry. During the
+        *    first-stage scanning, the LFSCK cannot know such corruption,
+        *    then it appends the right linkEA entry according to the found
+        *    name entry after the bad one.
+        *
+        * 2) The directory object has a right linkEA entry. During the
+        *    first-stage scanning, the LFSCK finds some bad name entry,
+        *    but the LFSCK cannot aware that at that time, then it adds
+        *    the bad linkEA entry for further processing. */
+       rc = lfsck_namespace_dsd_multiple(env, com, child, pfid, &ldata,
+                                         &lh, &type, lpf);
+
+       GOTO(out, rc);
+
+out:
+       lfsck_ibits_unlock(&lh, LCK_EX);
+       if (rc > 0) {
+               switch (type) {
+               case LNIT_BAD_LINKEA:
+                       ns->ln_linkea_repaired++;
+                       break;
+               case LNIT_UNMATCHED_PAIRS:
+                       ns->ln_unmatched_pairs_repaired++;
+                       break;
+               default:
+                       break;
+               }
+       }
+
+       if (unknown)
+               ns->ln_unknown_inconsistency++;
+
+       return rc;
+}
+
+/**
  * Double scan the MDT-object for namespace LFSCK.
  *
  * If the MDT-object contains invalid or repeated linkEA entries, then drop
@@ -883,6 +1666,13 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env,
                RETURN(0);
        }
 
+       if (S_ISDIR(lfsck_object_type(child))) {
+               dt_read_unlock(env, child);
+               rc = lfsck_namespace_double_scan_dir(env, com, child, flags);
+
+               RETURN(rc);
+       }
+
        rc = lfsck_links_read(env, child, &ldata);
        dt_read_unlock(env, child);
        if (rc != 0)
@@ -922,7 +1712,7 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env,
                        continue;
                }
 
-               parent = lfsck_object_find(env, lfsck, pfid);
+               parent = lfsck_object_find_bottom(env, lfsck, pfid);
                if (IS_ERR(parent))
                        GOTO(out, rc = PTR_ERR(parent));
 
@@ -947,7 +1737,7 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env,
                                if (rc > 0)
                                        repaired = true;
 
-                               /* Add the missed name entry to the parent. */
+                               /* Add the missing name entry to the parent. */
                                rc = lfsck_namespace_insert_normal(env, com,
                                                parent, child, cname->ln_name);
                                linkea_next_entry(&ldata);
@@ -1031,7 +1821,7 @@ static int lfsck_namespace_double_scan_one(const struct lu_env *env,
                        continue;
                }
 
-               /* Add the missed name entry back to the namespace. */
+               /* Add the missing name entry back to the namespace. */
                rc = lfsck_namespace_insert_normal(env, com, parent, child,
                                                   cname->ln_name);
                lfsck_object_put(env, parent);
@@ -1078,11 +1868,8 @@ out:
        }
 
        if (repaired) {
-               if (la->la_nlink > 1) {
-                       down_write(&com->lc_sem);
+               if (la->la_nlink > 1)
                        ns->ln_mul_linked_repaired++;
-                       up_write(&com->lc_sem);
-               }
 
                if (rc == 0)
                        rc = 1;
@@ -1111,6 +1898,8 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m,
                      "lost_found: "LPU64"\n"
                      "multiple_linked_checked: "LPU64"\n"
                      "multiple_linked_repaired: "LPU64"\n"
+                     "unknown_inconsistency: "LPU64"\n"
+                     "unmatched_pairs_repaired: "LPU64"\n"
                      "success_count: %u\n"
                      "run_time_phase1: %u seconds\n"
                      "run_time_phase2: %u seconds\n",
@@ -1127,6 +1916,8 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m,
                      ns->ln_objs_lost_found,
                      ns->ln_mul_linked_checked,
                      ns->ln_mul_linked_repaired,
+                     ns->ln_unknown_inconsistency,
+                     ns->ln_unmatched_pairs_repaired,
                      ns->ln_success_count,
                      time_phase1,
                      time_phase2);
@@ -1302,6 +2093,8 @@ static int lfsck_namespace_prep(const struct lu_env *env,
                        ns->ln_linkea_repaired = 0;
                        ns->ln_mul_linked_checked = 0;
                        ns->ln_mul_linked_repaired = 0;
+                       ns->ln_unknown_inconsistency = 0;
+                       ns->ln_unmatched_pairs_repaired = 0;
                        fid_zero(&ns->ln_fid_latest_scanned_phase2);
                        if (list_empty(&com->lc_link_dir))
                                list_add_tail(&com->lc_link_dir,
@@ -1897,10 +2690,18 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
                repaired = true;
        }
 
+       if (unlikely(fid_is_zero(&lnr->lnr_fid))) {
+               if (strcmp(lnr->lnr_name, dotdot) != 0)
+                       LBUG();
+               else
+                       rc = lfsck_namespace_trace_update(env, com, pfid,
+                                               LNTF_CHECK_PARENT, true);
+
+               GOTO(out, rc);
+       }
+
        if (lnr->lnr_name[0] == '.' &&
-           (lnr->lnr_namelen == 1 ||
-            (lnr->lnr_namelen == 2 && lnr->lnr_name[1] == '.') ||
-            fid_seq_is_dot(fid_seq(&lnr->lnr_fid))))
+           (lnr->lnr_namelen == 1 || fid_seq_is_dot(fid_seq(&lnr->lnr_fid))))
                GOTO(out, rc = 0);
 
        idx = lfsck_find_mdt_idx_by_fid(env, lfsck, &lnr->lnr_fid);
@@ -1908,10 +2709,23 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
                GOTO(out, rc = idx);
 
        if (idx == lfsck_dev_idx(lfsck->li_bottom)) {
+               if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0))
+                       GOTO(out, rc = 0);
+
                dev = lfsck->li_next;
        } else {
                struct lfsck_tgt_desc *ltd;
 
+               /* Usually, some local filesystem consistency verification
+                * tools can guarantee the local namespace tree consistenct.
+                * So the LFSCK will only verify the remote directory. */
+               if (unlikely(strcmp(lnr->lnr_name, dotdot) == 0)) {
+                       rc = lfsck_namespace_trace_update(env, com, pfid,
+                                               LNTF_CHECK_PARENT, true);
+
+                       GOTO(out, rc);
+               }
+
                ltd = LTD_TGT(&lfsck->li_mdt_descs, idx);
                if (unlikely(ltd == NULL)) {
                        CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which "
@@ -1977,16 +2791,17 @@ again:
                        goto record;
 
                ns->ln_flags |= LF_INCONSISTENT;
-               /* For dir, if there are more than one linkea entries, or the
-                * linkea entry does not match the name entry, then remove all
-                * and add the correct one. */
-               if (S_ISDIR(lfsck_object_type(obj))) {
-                       remove = true;
-                       newdata = true;
-               } else {
-                       remove = false;
-                       newdata = false;
-               }
+               /* For sub-dir object, we cannot make sure whether the sub-dir
+                * back references the parent via ".." name entry correctly or
+                * not in the LFSCK first-stage scanning. It may be that the
+                * (remote) sub-dir ".." name entry has no parent FID after
+                * file-level backup/restore and its linkEA may be wrong.
+                * So under such case, we should replace the linkEA according
+                * to current name entry. But this needs to be done during the
+                * LFSCK second-stage scanning. The LFSCK will record the name
+                * entry for further possible using. */
+               remove = false;
+               newdata = false;
                goto nodata;
        } else if (unlikely(rc == -EINVAL)) {
                count = 1;
@@ -2004,9 +2819,7 @@ again:
 
 nodata:
                if (bk->lb_param & LPF_DRYRUN) {
-                       down_write(&com->lc_sem);
                        ns->ln_linkea_repaired++;
-                       up_write(&com->lc_sem);
                        repaired = true;
                        log = true;
                        goto record;
@@ -2040,11 +2853,12 @@ nodata:
                        GOTO(stop, rc);
 
                count = ldata.ld_leh->leh_reccount;
-               down_write(&com->lc_sem);
-               ns->ln_linkea_repaired++;
-               up_write(&com->lc_sem);
-               repaired = true;
-               log = true;
+               if (!S_ISDIR(lfsck_object_type(obj)) ||
+                   !dt_object_remote(obj)) {
+                       ns->ln_linkea_repaired++;
+                       repaired = true;
+                       log = true;
+               }
        } else if (rc == -ENOENT) {
                log = false;
                repaired = false;
@@ -2061,8 +2875,8 @@ record:
        if (rc != 0)
                GOTO(stop, rc);
 
-       if ((count == 1) &&
-           (la->la_nlink == 1 || S_ISDIR(lfsck_object_type(obj))))
+       if ((count == 1 && la->la_nlink == 1) ||
+           S_ISDIR(lfsck_object_type(obj)))
                /* Usually, it is for single linked object or dir, do nothing.*/
                GOTO(stop, rc);
 
@@ -2079,9 +2893,7 @@ record:
                lfsck_ibits_unlock(&lh, LCK_EX);
        }
 
-       down_write(&com->lc_sem);
        ns->ln_mul_linked_checked++;
-       up_write(&com->lc_sem);
        rc = lfsck_namespace_trace_update(env, com, &lnr->lnr_fid,
                                          LNTF_CHECK_LINKEA, true);