Whamcloud - gitweb
LU-5506 lfsck: skip orphan MDT-object handling for failed MDTs 44/11444/23
authorFan Yong <fan.yong@intel.com>
Fri, 8 Aug 2014 19:16:14 +0000 (03:16 +0800)
committerAndreas Dilger <andreas.dilger@intel.com>
Wed, 22 Oct 2014 05:05:26 +0000 (05:05 +0000)
The namespace LFSCK will record the failed MDTs in the LFSCK tracing
file (lfsck_namespace) during the first-stage scanning, then when it
moves to the second-stage scanning, it can know which (the failed)
MDT(s) contain the MDT-objects that may have un-verified name entries
on other MDTs. Then the LFSCK will skip the orphan MDT-objects
handling on those ever failed MDTs. But other MDTs can be handled as
normal case without affected by the failed MDTs.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: Ic53d2fa24f81fb918d130dbc27694d3b68dc19d6
Reviewed-on: http://review.whamcloud.com/11444
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
lustre/include/lustre/lustre_idl.h
lustre/lfsck/lfsck_engine.c
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_namespace.c
lustre/osp/osp_object.c
lustre/tests/sanity-lfsck.sh

index a51610d..d3dc0b0 100644 (file)
@@ -1711,6 +1711,7 @@ static inline void lmm_oi_cpu_to_le(struct ost_id *dst_oi,
 #define XATTR_NAME_SOM         "trusted.som"
 #define XATTR_NAME_HSM         "trusted.hsm"
 #define XATTR_NAME_LFSCK_NAMESPACE "trusted.lfsck_namespace"
 #define XATTR_NAME_SOM         "trusted.som"
 #define XATTR_NAME_HSM         "trusted.hsm"
 #define XATTR_NAME_LFSCK_NAMESPACE "trusted.lfsck_namespace"
+#define XATTR_NAME_LFSCK_BITMAP "trusted.lfsck_bitmap"
 #define XATTR_NAME_MAX_LEN     32 /* increase this, if there is longer name. */
 
 struct lov_mds_md_v3 {            /* LOV EA mds/wire data (little-endian) */
 #define XATTR_NAME_MAX_LEN     32 /* increase this, if there is longer name. */
 
 struct lov_mds_md_v3 {            /* LOV EA mds/wire data (little-endian) */
index 34277e2..4d232d9 100644 (file)
@@ -1374,11 +1374,14 @@ again:
                                list = &ltd->ltd_layout_list;
                                gen = &ltd->ltd_layout_gen;
                        } else {
                                list = &ltd->ltd_layout_list;
                                gen = &ltd->ltd_layout_gen;
                        } else {
+                               struct lfsck_namespace *ns = com->lc_file_ram;
+
                                ltd = list_entry(lad->lad_mdt_list.next,
                                                 struct lfsck_tgt_desc,
                                                 ltd_namespace_list);
                                list = &ltd->ltd_namespace_list;
                                gen = &ltd->ltd_namespace_gen;
                                ltd = list_entry(lad->lad_mdt_list.next,
                                                 struct lfsck_tgt_desc,
                                                 ltd_namespace_list);
                                list = &ltd->ltd_namespace_list;
                                gen = &ltd->ltd_namespace_gen;
+                               lr->lr_flags2 = ns->ln_flags & ~LF_INCOMPLETE;
                        }
 
                        if (*gen == lad->lad_touch_gen)
                        }
 
                        if (*gen == lad->lad_touch_gen)
@@ -1386,6 +1389,9 @@ again:
 
                        *gen = lad->lad_touch_gen;
                        list_move_tail(list, &lad->lad_mdt_list);
 
                        *gen = lad->lad_touch_gen;
                        list_move_tail(list, &lad->lad_mdt_list);
+                       if (ltd->ltd_namespace_failed)
+                               continue;
+
                        atomic_inc(&ltd->ltd_ref);
                        laia->laia_ltd = ltd;
                        spin_unlock(&ltds->ltd_lock);
                        atomic_inc(&ltd->ltd_ref);
                        laia->laia_ltd = ltd;
                        spin_unlock(&ltds->ltd_lock);
index a00f451..b32b147 100644 (file)
@@ -219,8 +219,13 @@ struct lfsck_namespace {
        /* How many lost name entries have been re-inserted. */
        __u64   ln_lost_dirent_repaired;
 
        /* How many lost name entries have been re-inserted. */
        __u64   ln_lost_dirent_repaired;
 
+       /* The size of MDT targets bitmap with nbits. Such bitmap records
+        * the MDTs that contain non-verified MDT-objects. */
+       __u32   ln_bitmap_size;
+
+       __u32   ln_reserved_1;
        /* For further using. 256-bytes aligned now. */
        /* For further using. 256-bytes aligned now. */
-       __u64   ln_reserved[26];
+       __u64   ln_reserved[25];
 };
 
 enum lfsck_layout_inconsistency_type {
 };
 
 enum lfsck_layout_inconsistency_type {
@@ -382,7 +387,8 @@ struct lfsck_tgt_desc {
        __u32              ltd_namespace_gen;
        unsigned int       ltd_dead:1,
                           ltd_layout_done:1,
        __u32              ltd_namespace_gen;
        unsigned int       ltd_dead:1,
                           ltd_layout_done:1,
-                          ltd_namespace_done:1;
+                          ltd_namespace_done:1,
+                          ltd_namespace_failed:1;
 };
 
 struct lfsck_tgt_desc_idx {
 };
 
 struct lfsck_tgt_desc_idx {
index b5e5d80..f91bdcd 100644 (file)
@@ -131,6 +131,7 @@ static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
        dst->ln_bad_type_repaired = le64_to_cpu(src->ln_bad_type_repaired);
        dst->ln_lost_dirent_repaired =
                                le64_to_cpu(src->ln_lost_dirent_repaired);
        dst->ln_bad_type_repaired = le64_to_cpu(src->ln_bad_type_repaired);
        dst->ln_lost_dirent_repaired =
                                le64_to_cpu(src->ln_lost_dirent_repaired);
+       dst->ln_bitmap_size = le32_to_cpu(src->ln_bitmap_size);
 }
 
 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
 }
 
 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
@@ -176,6 +177,7 @@ static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
        dst->ln_bad_type_repaired = cpu_to_le64(src->ln_bad_type_repaired);
        dst->ln_lost_dirent_repaired =
                                cpu_to_le64(src->ln_lost_dirent_repaired);
        dst->ln_bad_type_repaired = cpu_to_le64(src->ln_bad_type_repaired);
        dst->ln_lost_dirent_repaired =
                                cpu_to_le64(src->ln_lost_dirent_repaired);
+       dst->ln_bitmap_size = cpu_to_le32(src->ln_bitmap_size);
 }
 
 static void lfsck_namespace_record_failure(const struct lu_env *env,
 }
 
 static void lfsck_namespace_record_failure(const struct lu_env *env,
@@ -200,6 +202,81 @@ static void lfsck_namespace_record_failure(const struct lu_env *env,
 }
 
 /**
 }
 
 /**
+ * Load the MDT bitmap from the lfsck_namespace tracing file.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ *
+ * \retval             positive number for data corruption
+ * \retval             0 for success
+ * \retval             negative error number on failure
+ */
+static int lfsck_namespace_load_bitmap(const struct lu_env *env,
+                                      struct lfsck_component *com)
+{
+       struct dt_object                *obj    = com->lc_obj;
+       struct lfsck_assistant_data     *lad    = com->lc_data;
+       struct lfsck_namespace          *ns     = com->lc_file_ram;
+       cfs_bitmap_t                    *bitmap = lad->lad_bitmap;
+       ssize_t                          size;
+       __u32                            nbits;
+       int                              rc;
+       ENTRY;
+
+       if (com->lc_lfsck->li_mdt_descs.ltd_tgts_bitmap->size >
+           ns->ln_bitmap_size)
+               nbits = com->lc_lfsck->li_mdt_descs.ltd_tgts_bitmap->size;
+       else
+               nbits = ns->ln_bitmap_size;
+
+       if (unlikely(nbits < BITS_PER_LONG))
+               nbits = BITS_PER_LONG;
+
+       if (nbits > bitmap->size) {
+               __u32 new_bits = bitmap->size;
+               cfs_bitmap_t *new_bitmap;
+
+               while (new_bits < nbits)
+                       new_bits <<= 1;
+
+               new_bitmap = CFS_ALLOCATE_BITMAP(new_bits);
+               if (new_bitmap == NULL)
+                       RETURN(-ENOMEM);
+
+               lad->lad_bitmap = new_bitmap;
+               CFS_FREE_BITMAP(bitmap);
+               bitmap = new_bitmap;
+       }
+
+       if (ns->ln_bitmap_size == 0) {
+               lad->lad_incomplete = 0;
+               CFS_RESET_BITMAP(bitmap);
+
+               RETURN(0);
+       }
+
+       size = (ns->ln_bitmap_size + 7) >> 3;
+       rc = dt_xattr_get(env, obj,
+                         lfsck_buf_get(env, bitmap->data, size),
+                         XATTR_NAME_LFSCK_BITMAP, BYPASS_CAPA);
+       if (rc == -ERANGE || rc == -ENODATA || rc == 0)
+               RETURN(1);
+
+       if (rc < 0)
+               RETURN(rc);
+
+       if (rc != size)
+               RETURN(rc);
+
+       if (cfs_bitmap_check_empty(bitmap))
+               lad->lad_incomplete = 0;
+       else
+               lad->lad_incomplete = 1;
+
+       RETURN(0);
+}
+
+/**
  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
  * \retval 0: succeed.
  * \retval -ve: failed cases.
  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
  * \retval 0: succeed.
  * \retval -ve: failed cases.
@@ -237,17 +314,30 @@ static int lfsck_namespace_load(const struct lu_env *env,
 }
 
 static int lfsck_namespace_store(const struct lu_env *env,
 }
 
 static int lfsck_namespace_store(const struct lu_env *env,
-                                struct lfsck_component *com, bool init)
+                                struct lfsck_component *com)
 {
 {
-       struct dt_object        *obj    = com->lc_obj;
-       struct lfsck_instance   *lfsck  = com->lc_lfsck;
-       struct thandle          *handle;
-       int                      len    = com->lc_file_size;
-       int                      rc;
+       struct dt_object                *obj    = com->lc_obj;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct lfsck_namespace          *ns     = com->lc_file_ram;
+       struct lfsck_assistant_data     *lad    = com->lc_data;
+       cfs_bitmap_t                    *bitmap = NULL;
+       struct thandle                  *handle;
+       __u32                            nbits  = 0;
+       int                              len    = com->lc_file_size;
+       int                              rc;
        ENTRY;
 
        ENTRY;
 
+       if (lad != NULL) {
+               bitmap = lad->lad_bitmap;
+               nbits = bitmap->size;
+
+               LASSERT(nbits > 0);
+               LASSERTF((nbits & 7) == 0, "Invalid nbits %u\n", nbits);
+       }
+
+       ns->ln_bitmap_size = nbits;
        lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
        lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
-                                 (struct lfsck_namespace *)com->lc_file_ram);
+                                 ns);
        handle = dt_trans_create(env, lfsck->li_bottom);
        if (IS_ERR(handle))
                GOTO(log, rc = PTR_ERR(handle));
        handle = dt_trans_create(env, lfsck->li_bottom);
        if (IS_ERR(handle))
                GOTO(log, rc = PTR_ERR(handle));
@@ -258,15 +348,26 @@ static int lfsck_namespace_store(const struct lu_env *env,
        if (rc != 0)
                GOTO(out, rc);
 
        if (rc != 0)
                GOTO(out, rc);
 
+       if (bitmap != NULL) {
+               rc = dt_declare_xattr_set(env, obj,
+                               lfsck_buf_get(env, bitmap->data, nbits >> 3),
+                               XATTR_NAME_LFSCK_BITMAP, 0, handle);
+               if (rc != 0)
+                       GOTO(out, rc);
+       }
+
        rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
        if (rc != 0)
                GOTO(out, rc);
 
        rc = dt_xattr_set(env, obj,
                          lfsck_buf_get(env, com->lc_file_disk, len),
        rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
        if (rc != 0)
                GOTO(out, rc);
 
        rc = dt_xattr_set(env, obj,
                          lfsck_buf_get(env, com->lc_file_disk, len),
-                         XATTR_NAME_LFSCK_NAMESPACE,
-                         init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
-                         handle, BYPASS_CAPA);
+                         XATTR_NAME_LFSCK_NAMESPACE, 0, handle, BYPASS_CAPA);
+       if (rc == 0 && bitmap != NULL)
+               rc = dt_xattr_set(env, obj,
+                                 lfsck_buf_get(env, bitmap->data, nbits >> 3),
+                                 XATTR_NAME_LFSCK_BITMAP, 0, handle,
+                                 BYPASS_CAPA);
 
        GOTO(out, rc);
 
 
        GOTO(out, rc);
 
@@ -290,7 +391,7 @@ static int lfsck_namespace_init(const struct lu_env *env,
        ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
        ns->ln_status = LS_INIT;
        down_write(&com->lc_sem);
        ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
        ns->ln_status = LS_INIT;
        down_write(&com->lc_sem);
-       rc = lfsck_namespace_store(env, com, true);
+       rc = lfsck_namespace_store(env, com);
        up_write(&com->lc_sem);
        return rc;
 }
        up_write(&com->lc_sem);
        return rc;
 }
@@ -2068,6 +2169,7 @@ lfsck_namespace_dsd_orphan(const struct lu_env *env,
                           enum lfsck_namespace_inconsistency_type *type)
 {
        struct lfsck_thread_info *info = lfsck_env_info(env);
                           enum lfsck_namespace_inconsistency_type *type)
 {
        struct lfsck_thread_info *info = lfsck_env_info(env);
+       struct lfsck_namespace   *ns   = com->lc_file_ram;
        int                       rc;
        ENTRY;
 
        int                       rc;
        ENTRY;
 
@@ -2078,6 +2180,17 @@ lfsck_namespace_dsd_orphan(const struct lu_env *env,
                RETURN(rc);
 
        *type = LNIT_MUL_REF;
                RETURN(rc);
 
        *type = LNIT_MUL_REF;
+
+       /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT has
+        * ever tried to verify some remote MDT-object that resides on this
+        * MDT, but this MDT failed to respond such request. So means there
+        * may be some remote name entry on other MDT that references this
+        * object with another name, so we cannot know whether this linkEA
+        * is valid or not. So keep it there and maybe resolved when next
+        * LFSCK run. */
+       if (ns->ln_flags & LF_INCOMPLETE)
+               RETURN(0);
+
        /* The unique linkEA is invalid, even if the ".." name entry may be
         * valid, we still cannot know via which name entry this directory
         * will be referenced. Then handle it as pure orphan. */
        /* The unique linkEA is invalid, even if the ".." name entry may be
         * valid, we still cannot know via which name entry this directory
         * will be referenced. Then handle it as pure orphan. */
@@ -2126,6 +2239,7 @@ lfsck_namespace_dsd_single(const struct lu_env *env,
        struct lu_name           *cname         = &info->lti_name;
        const struct lu_fid      *cfid          = lfsck_dto2fid(child);
        struct lu_fid            *tfid          = &info->lti_fid3;
        struct lu_name           *cname         = &info->lti_name;
        const struct lu_fid      *cfid          = lfsck_dto2fid(child);
        struct lu_fid            *tfid          = &info->lti_fid3;
+       struct lfsck_namespace   *ns            = com->lc_file_ram;
        struct lfsck_instance    *lfsck         = com->lc_lfsck;
        struct dt_object         *parent        = NULL;
        int                       rc            = 0;
        struct lfsck_instance    *lfsck         = com->lc_lfsck;
        struct dt_object         *parent        = NULL;
        int                       rc            = 0;
@@ -2153,6 +2267,16 @@ lfsck_namespace_dsd_single(const struct lu_env *env,
         * name entry the child will be referenced, since all known entries
         * have been verified during the first-stage scanning. */
        if (!dt_object_exists(parent)) {
         * name entry the child will be referenced, since all known entries
         * have been verified during the first-stage scanning. */
        if (!dt_object_exists(parent)) {
+               /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT
+                * has ever tried to verify some remote MDT-object that resides
+                * on this MDT, but this MDT failed to respond such request. So
+                * means there may be some remote name entry on other MDT that
+                * references this object with another name, so we cannot know
+                * whether this linkEA is valid or not. So keep it there and
+                * maybe resolved when next LFSCK run. */
+               if (ns->ln_flags & LF_INCOMPLETE)
+                       GOTO(out, rc = 0);
+
                if (!lustre_handle_is_used(lh) && retry != NULL) {
                        *retry = true;
 
                if (!lustre_handle_is_used(lh) && retry != NULL) {
                        *retry = true;
 
@@ -2208,6 +2332,16 @@ lost_parent:
        rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
                       (const struct dt_key *)cname->ln_name, BYPASS_CAPA);
        if (rc == -ENOENT) {
        rc = dt_lookup(env, parent, (struct dt_rec *)tfid,
                       (const struct dt_key *)cname->ln_name, BYPASS_CAPA);
        if (rc == -ENOENT) {
+               /* If the LFSCK is marked as LF_INCOMPLETE, then means some MDT
+                * has ever tried to verify some remote MDT-object that resides
+                * on this MDT, but this MDT failed to respond such request. So
+                * means there may be some remote name entry on other MDT that
+                * references this object with another name, so we cannot know
+                * whether this linkEA is valid or not. So keep it there and
+                * maybe resolved when next LFSCK run. */
+               if (ns->ln_flags & LF_INCOMPLETE)
+                       GOTO(out, rc = 0);
+
                if (!lustre_handle_is_used(lh) && retry != NULL) {
                        *retry = true;
 
                if (!lustre_handle_is_used(lh) && retry != NULL) {
                        *retry = true;
 
@@ -2596,7 +2730,8 @@ static int lfsck_namespace_double_scan_dir(const struct lu_env *env,
 
        LASSERT(!dt_object_remote(child));
 
 
        LASSERT(!dt_object_remote(child));
 
-       if (!(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) {
+       if (flags & (LNTF_CHECK_LINKEA | LNTF_CHECK_PARENT) &&
+           !(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) {
                CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the"
                       "the namespace LFSCK, then the LFSCK cannot guarantee"
                       "all the name entries have been verified in first-stage"
                CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the"
                       "the namespace LFSCK, then the LFSCK cannot guarantee"
                       "all the name entries have been verified in first-stage"
@@ -2681,6 +2816,20 @@ lock:
                         *    directory, then handle it as orphan. */
                        lfsck_ibits_unlock(&lh, LCK_EX);
                        type = LNIT_MUL_REF;
                         *    directory, then handle it as orphan. */
                        lfsck_ibits_unlock(&lh, LCK_EX);
                        type = LNIT_MUL_REF;
+
+                       /* If the LFSCK is marked as LF_INCOMPLETE,
+                        * then means some MDT has ever tried to
+                        * verify some remote MDT-object that resides
+                        * on this MDT, but this MDT failed to respond
+                        * such request. So means there may be some
+                        * remote name entry on other MDT that
+                        * references this object with another name,
+                        * so we cannot know whether this linkEA is
+                        * valid or not. So keep it there and maybe
+                        * resolved when next LFSCK run. */
+                       if (ns->ln_flags & LF_INCOMPLETE)
+                               GOTO(out, rc = 0);
+
                        snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
                                 "-"DFID, PFID(pfid));
                        rc = lfsck_namespace_insert_orphan(env, com, child,
                        snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
                                 "-"DFID, PFID(pfid));
                        rc = lfsck_namespace_insert_orphan(env, com, child,
@@ -2859,6 +3008,22 @@ lost_parent:
                                rc = lfsck_namespace_shrink_linkea(env, com,
                                        child, &ldata, cname, pfid, true);
                        } else {
                                rc = lfsck_namespace_shrink_linkea(env, com,
                                        child, &ldata, cname, pfid, true);
                        } else {
+                               /* If the LFSCK is marked as LF_INCOMPLETE,
+                                * then means some MDT has ever tried to
+                                * verify some remote MDT-object that resides
+                                * on this MDT, but this MDT failed to respond
+                                * such request. So means there may be some
+                                * remote name entry on other MDT that
+                                * references this object with another name,
+                                * so we cannot know whether this linkEA is
+                                * valid or not. So keep it there and maybe
+                                * resolved when next LFSCK run. */
+                               if (ns->ln_flags & LF_INCOMPLETE) {
+                                       lfsck_object_put(env, parent);
+
+                                       GOTO(out, rc = 0);
+                               }
+
                                /* Create the lost parent as an orphan. */
                                rc = lfsck_namespace_create_orphan(env, com,
                                                                   parent);
                                /* Create the lost parent as an orphan. */
                                rc = lfsck_namespace_create_orphan(env, com,
                                                                   parent);
@@ -2979,6 +3144,19 @@ lost_parent:
                        continue;
                }
 
                        continue;
                }
 
+               /* If the LFSCK is marked as LF_INCOMPLETE, then means some
+                * MDT has ever tried to verify some remote MDT-object that
+                * resides on this MDT, but this MDT failed to respond such
+                * request. So means there may be some remote name entry on
+                * other MDT that references this object with another name,
+                * so we cannot know whether this linkEA is valid or not.
+                * So keep it there and maybe resolved when next LFSCK run. */
+               if (ns->ln_flags & LF_INCOMPLETE) {
+                       lfsck_object_put(env, parent);
+
+                       GOTO(out, rc = 0);
+               }
+
                /* Add the missing name entry back to the namespace. */
                rc = lfsck_namespace_insert_normal(env, com, parent, child,
                                                   cname->ln_name);
                /* Add the missing name entry back to the namespace. */
                rc = lfsck_namespace_insert_normal(env, com, parent, child,
                                                   cname->ln_name);
@@ -3023,7 +3201,14 @@ out:
                count = ldata.ld_leh->leh_reccount;
        }
 
                count = ldata.ld_leh->leh_reccount;
        }
 
-       if (count == 0) {
+       /* If the LFSCK is marked as LF_INCOMPLETE, then means some
+        * MDT has ever tried to verify some remote MDT-object that
+        * resides on this MDT, but this MDT failed to respond such
+        * request. So means there may be some remote name entry on
+        * other MDT that references this object with another name,
+        * so we cannot know whether this linkEA is valid or not.
+        * So keep it there and maybe resolved when next LFSCK run. */
+       if (count == 0 && !(ns->ln_flags & LF_INCOMPLETE)) {
                /* If the child becomes orphan, then insert it into
                 * the global .lustre/lost+found/MDTxxxx directory. */
                rc = lfsck_namespace_insert_orphan(env, com, child, "", "O",
                /* If the child becomes orphan, then insert it into
                 * the global .lustre/lost+found/MDTxxxx directory. */
                rc = lfsck_namespace_insert_orphan(env, com, child, "", "O",
@@ -3113,11 +3298,12 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m,
 static int lfsck_namespace_reset(const struct lu_env *env,
                                 struct lfsck_component *com, bool init)
 {
 static int lfsck_namespace_reset(const struct lu_env *env,
                                 struct lfsck_component *com, bool init)
 {
-       struct lfsck_instance   *lfsck = com->lc_lfsck;
-       struct lfsck_namespace  *ns    = com->lc_file_ram;
-       struct dt_object        *root;
-       struct dt_object        *dto;
-       int                      rc;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct lfsck_namespace          *ns     = com->lc_file_ram;
+       struct lfsck_assistant_data     *lad    = com->lc_data;
+       struct dt_object                *root;
+       struct dt_object                *dto;
+       int                              rc;
        ENTRY;
 
        root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
        ENTRY;
 
        root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
@@ -3160,7 +3346,10 @@ static int lfsck_namespace_reset(const struct lu_env *env,
        if (rc != 0)
                GOTO(out, rc);
 
        if (rc != 0)
                GOTO(out, rc);
 
-       rc = lfsck_namespace_store(env, com, true);
+       lad->lad_incomplete = 0;
+       CFS_RESET_BITMAP(lad->lad_bitmap);
+
+       rc = lfsck_namespace_store(env, com);
 
        GOTO(out, rc);
 
 
        GOTO(out, rc);
 
@@ -3213,7 +3402,7 @@ static int lfsck_namespace_checkpoint(const struct lu_env *env,
                com->lc_new_checked = 0;
        }
 
                com->lc_new_checked = 0;
        }
 
-       rc = lfsck_namespace_store(env, com, false);
+       rc = lfsck_namespace_store(env, com);
        up_write(&com->lc_sem);
 
 log:
        up_write(&com->lc_sem);
 
 log:
@@ -3235,17 +3424,18 @@ static int lfsck_namespace_prep(const struct lu_env *env,
        struct lfsck_position   *pos    = &com->lc_pos_start;
        int                      rc;
 
        struct lfsck_position   *pos    = &com->lc_pos_start;
        int                      rc;
 
-       if (ns->ln_status == LS_COMPLETED) {
+       rc = lfsck_namespace_load_bitmap(env, com);
+       if (rc > 0 || (rc == 0 && ns->ln_status == LS_COMPLETED)) {
                rc = lfsck_namespace_reset(env, com, false);
                if (rc == 0)
                        rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
                rc = lfsck_namespace_reset(env, com, false);
                if (rc == 0)
                        rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
+       }
 
 
-               if (rc != 0) {
-                       CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
-                              "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
+       if (rc != 0) {
+               CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: rc = %d\n",
+                      lfsck_lfsck2name(lfsck), rc);
 
 
-                       return rc;
-               }
+               return rc;
        }
 
        down_write(&com->lc_sem);
        }
 
        down_write(&com->lc_sem);
@@ -3513,7 +3703,7 @@ static int lfsck_namespace_post(const struct lu_env *env,
                com->lc_new_checked = 0;
        }
 
                com->lc_new_checked = 0;
        }
 
-       rc = lfsck_namespace_store(env, com, false);
+       rc = lfsck_namespace_store(env, com);
        up_write(&com->lc_sem);
 
        CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
        up_write(&com->lc_sem);
 
        CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
@@ -3785,8 +3975,8 @@ out_create:
        }
 
        CDEBUG(D_LFSCK, "%s: namespace LFSCK handles notify %u from MDT %x, "
        }
 
        CDEBUG(D_LFSCK, "%s: namespace LFSCK handles notify %u from MDT %x, "
-              "status %d\n", lfsck_lfsck2name(lfsck), lr->lr_event,
-              lr->lr_index, lr->lr_status);
+              "status %d, flags %x\n", lfsck_lfsck2name(lfsck), lr->lr_event,
+              lr->lr_index, lr->lr_status, lr->lr_flags2);
 
        spin_lock(&ltds->ltd_lock);
        ltd = LTD_TGT(ltds, lr->lr_index);
 
        spin_lock(&ltds->ltd_lock);
        ltd = LTD_TGT(ltds, lr->lr_index);
@@ -3811,6 +4001,9 @@ out_create:
                        break;
                }
 
                        break;
                }
 
+               if (lr->lr_flags2 & LF_INCOMPLETE)
+                       ns->ln_flags |= LF_INCOMPLETE;
+
                if (list_empty(&ltd->ltd_namespace_list))
                        list_add_tail(&ltd->ltd_namespace_list,
                                      &lad->lad_mdt_list);
                if (list_empty(&ltd->ltd_namespace_list))
                        list_add_tail(&ltd->ltd_namespace_list,
                                      &lad->lad_mdt_list);
@@ -4095,14 +4288,14 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
        struct dt_object           *dir      = lnr->lnr_obj;
        struct dt_object           *obj      = NULL;
        const struct lu_fid        *pfid     = lfsck_dto2fid(dir);
        struct dt_object           *dir      = lnr->lnr_obj;
        struct dt_object           *obj      = NULL;
        const struct lu_fid        *pfid     = lfsck_dto2fid(dir);
-       struct dt_device           *dev;
+       struct dt_device           *dev      = NULL;
        struct lustre_handle        lh       = { 0 };
        bool                        repaired = false;
        bool                        dtlocked = false;
        bool                        remove;
        bool                        newdata;
        bool                        log      = false;
        struct lustre_handle        lh       = { 0 };
        bool                        repaired = false;
        bool                        dtlocked = false;
        bool                        remove;
        bool                        newdata;
        bool                        log      = false;
-       int                         idx;
+       int                         idx      = 0;
        int                         count    = 0;
        int                         rc;
        enum lfsck_namespace_inconsistency_type type = LNIT_NONE;
        int                         count    = 0;
        int                         rc;
        enum lfsck_namespace_inconsistency_type type = LNIT_NONE;
@@ -4159,7 +4352,7 @@ static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
                        CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which "
                               "did not join the namespace LFSCK\n",
                               lfsck_lfsck2name(lfsck), idx);
                        CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which "
                               "did not join the namespace LFSCK\n",
                               lfsck_lfsck2name(lfsck), idx);
-                       ns->ln_flags |= LF_INCOMPLETE;
+                       lfsck_lad_set_bitmap(env, com, idx);
 
                        GOTO(out, rc = -ENODEV);
                }
 
                        GOTO(out, rc = -ENODEV);
                }
@@ -4410,6 +4603,12 @@ out:
                       lnr->lnr_namelen, lnr->lnr_name, rc);
 
                lfsck_namespace_record_failure(env, lfsck, ns);
                       lnr->lnr_namelen, lnr->lnr_name, rc);
 
                lfsck_namespace_record_failure(env, lfsck, ns);
+               if ((rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -EREMCHG ||
+                    rc == -ETIMEDOUT || rc == -EHOSTDOWN ||
+                    rc == -EHOSTUNREACH || rc == -EINPROGRESS) &&
+                   dev != NULL && dev != lfsck->li_next)
+                       lfsck_lad_set_bitmap(env, com, idx);
+
                if (!(bk->lb_param & LPF_FAILOUT))
                        rc = 0;
        } else {
                if (!(bk->lb_param & LPF_FAILOUT))
                        rc = 0;
        } else {
@@ -4451,6 +4650,7 @@ out:
 
        if (obj != NULL && !IS_ERR(obj))
                lfsck_object_put(env, obj);
 
        if (obj != NULL && !IS_ERR(obj))
                lfsck_object_put(env, obj);
+
        return rc;
 }
 
        return rc;
 }
 
@@ -4558,7 +4758,7 @@ checkpoint:
                        ns->ln_time_last_checkpoint = cfs_time_current_sec();
                        ns->ln_objs_checked_phase2 += com->lc_new_checked;
                        com->lc_new_checked = 0;
                        ns->ln_time_last_checkpoint = cfs_time_current_sec();
                        ns->ln_objs_checked_phase2 += com->lc_new_checked;
                        com->lc_new_checked = 0;
-                       rc = lfsck_namespace_store(env, com, false);
+                       rc = lfsck_namespace_store(env, com);
                        up_write(&com->lc_sem);
                        if (rc != 0)
                                GOTO(put, rc);
                        up_write(&com->lc_sem);
                        if (rc != 0)
                                GOTO(put, rc);
@@ -4639,17 +4839,108 @@ static int lfsck_namespace_double_scan_result(const struct lu_env *env,
                ns->ln_status = LS_FAILED;
        }
 
                ns->ln_status = LS_FAILED;
        }
 
-       rc = lfsck_namespace_store(env, com, false);
+       rc = lfsck_namespace_store(env, com);
        up_write(&com->lc_sem);
 
        return rc;
 }
 
        up_write(&com->lc_sem);
 
        return rc;
 }
 
+static int
+lfsck_namespace_assistant_sync_failures_interpret(const struct lu_env *env,
+                                                 struct ptlrpc_request *req,
+                                                 void *args, int rc)
+{
+       return 0;
+}
+
+/**
+ * Notify remote LFSCK instances about former failures.
+ *
+ * The local LFSCK instance has recorded which MDTs have ever failed to respond
+ * some LFSCK verification requests (maybe because of network issues or the MDT
+ * itself trouble). During the respond gap the MDT may missed some name entries
+ * verification, then the MDT cannot know whether related MDT-objects have been
+ * referenced by related name entries or not, then in the second-stage scanning,
+ * these MDT-objects will be regarded as orphan, if the MDT-object contains bad
+ * linkEA for back reference, then it will misguide the LFSCK to generate wrong
+ * name entry for repairing the orphan.
+ *
+ * To avoid above trouble, when layout LFSCK finishes the first-stage scanning,
+ * it will scan the bitmap for the ever failed MDTs, and notify them that they
+ * have ever missed some name entries verification and should skip the handling
+ * for orphan MDT-objects.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] lr       pointer to the lfsck request
+ */
 static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env,
                                                    struct lfsck_component *com,
                                                    struct lfsck_request *lr)
 {
 static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env,
                                                    struct lfsck_component *com,
                                                    struct lfsck_request *lr)
 {
-       /* XXX: TBD */
+       struct lfsck_async_interpret_args *laia  =
+                               &lfsck_env_info(env)->lti_laia2;
+       struct lfsck_assistant_data       *lad   = com->lc_data;
+       struct lfsck_namespace            *ns    = com->lc_file_ram;
+       struct lfsck_instance             *lfsck = com->lc_lfsck;
+       struct lfsck_tgt_descs            *ltds  = &lfsck->li_mdt_descs;
+       struct lfsck_tgt_desc             *ltd;
+       struct ptlrpc_request_set         *set;
+       int                                rc    = 0;
+       ENTRY;
+
+       set = ptlrpc_prep_set();
+       if (set == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       lr->lr_flags2 = ns->ln_flags | LF_INCOMPLETE;
+       memset(laia, 0, sizeof(*laia));
+       lad->lad_touch_gen++;
+
+       spin_lock(&ltds->ltd_lock);
+       while (!list_empty(&lad->lad_mdt_list)) {
+               ltd = list_entry(lad->lad_mdt_list.next,
+                                struct lfsck_tgt_desc,
+                                ltd_namespace_list);
+               if (ltd->ltd_namespace_gen == lad->lad_touch_gen)
+                       break;
+
+               ltd->ltd_namespace_gen = lad->lad_touch_gen;
+               list_move_tail(&ltd->ltd_namespace_list,
+                              &lad->lad_mdt_list);
+               if (!lad->lad_incomplete ||
+                   !cfs_bitmap_check(lad->lad_bitmap, ltd->ltd_index)) {
+                       ltd->ltd_namespace_failed = 0;
+                       continue;
+               }
+
+               ltd->ltd_namespace_failed = 1;
+               spin_unlock(&ltds->ltd_lock);
+               rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
+                       lfsck_namespace_assistant_sync_failures_interpret,
+                       laia, LFSCK_NOTIFY);
+               if (rc != 0)
+                       CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail "
+                              "to sync failure with MDT %x: rc = %d\n",
+                              lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
+
+               spin_lock(&ltds->ltd_lock);
+       }
+       spin_unlock(&ltds->ltd_lock);
+
+       rc = ptlrpc_set_wait(set);
+       ptlrpc_set_destroy(set);
+
+       GOTO(out, rc);
+
+out:
+       if (rc != 0)
+               CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail "
+                      "to sync failure with MDTs, and related MDTs "
+                      "may handle orphan un-properly: rc = %d\n",
+                      lfsck_lfsck2name(lfsck), rc);
+
+       EXIT;
 }
 
 struct lfsck_assistant_operations lfsck_namespace_assistant_ops = {
 }
 
 struct lfsck_assistant_operations lfsck_namespace_assistant_ops = {
index 4a76f10..2835bbb 100644 (file)
@@ -959,9 +959,15 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
        LASSERT(name != NULL);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NETWORK) &&
        LASSERT(name != NULL);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NETWORK) &&
-           osp->opd_index == cfs_fail_val &&
-           osp_dev2node(osp) == cfs_fail_val)
-               RETURN(-ENOTCONN);
+           osp->opd_index == cfs_fail_val) {
+               if (is_ost_obj(&dt->do_lu)) {
+                       if (osp_dev2node(osp) == cfs_fail_val)
+                               RETURN(-ENOTCONN);
+               } else {
+                       if (strcmp(name, XATTR_NAME_LINK) == 0)
+                               RETURN(-ENOTCONN);
+               }
+       }
 
        if (unlikely(obj->opo_non_exist))
                RETURN(-ENOENT);
 
        if (unlikely(obj->opo_non_exist))
                RETURN(-ENOENT);
index 602b13c..c2bb00d 100644 (file)
@@ -46,7 +46,7 @@ setupall
        ALWAYS_EXCEPT="$ALWAYS_EXCEPT 11 12 13 14 15 16 17 18 19 20 21"
 
 [[ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.6.50) ]] &&
        ALWAYS_EXCEPT="$ALWAYS_EXCEPT 11 12 13 14 15 16 17 18 19 20 21"
 
 [[ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.6.50) ]] &&
-       ALWAYS_EXCEPT="$ALWAYS_EXCEPT 2d 2e 3 22 23 24 25 26 27"
+       ALWAYS_EXCEPT="$ALWAYS_EXCEPT 2d 2e 3 22 23 24 25 26 27 28"
 
 build_test_filter
 
 
 build_test_filter
 
@@ -3365,6 +3365,109 @@ test_27b() {
 }
 run_test 27b "LFSCK can recreate the lost remote parent directory as orphan"
 
 }
 run_test 27b "LFSCK can recreate the lost remote parent directory as orphan"
 
+test_28() {
+       [ $MDSCOUNT -lt 2 ] &&
+               skip "The test needs at least 2 MDTs" && return
+
+       echo "#####"
+       echo "The target name entry is lost. The LFSCK should insert the"
+       echo "orphan MDT-object under .lustre/lost+found/MDTxxxx. But if"
+       echo "the MDT (on which the orphan MDT-object resides) has ever"
+       echo "failed to respond some name entry verification durin the"
+       echo "first stage-scanning, then the LFSCK should skip to handle"
+       echo "orphan MDT-object on this MDT. But other MDTs should not"
+       echo "be affected."
+       echo "#####"
+
+       check_mount_and_prep
+       $LFS mkdir -i 0 $DIR/$tdir/d1
+       $LFS mkdir -i 1 $DIR/$tdir/d1/a1
+       $LFS mkdir -i 1 $DIR/$tdir/d1/a2
+
+       $LFS mkdir -i 1 $DIR/$tdir/d2
+       $LFS mkdir -i 0 $DIR/$tdir/d2/a1
+       $LFS mkdir -i 0 $DIR/$tdir/d2/a2
+
+       echo "Inject failure stub on MDT0 to simulate the case that"
+       echo "d1/a1's name entry will be removed, but the d1/a1's object"
+       echo "and its linkEA are kept in the system. And the case that"
+       echo "d2/a2's name entry will be removed, but the d2/a2's object"
+       echo "and its linkEA are kept in the system."
+
+       #define OBD_FAIL_LFSCK_NO_NAMEENTRY     0x1624
+       do_facet mds1 $LCTL set_param fail_loc=0x1624
+       do_facet mds2 $LCTL set_param fail_loc=0x1624
+       rmdir $DIR/$tdir/d1/a1 || error "(1) Fail to rmdir $DIR/$tdir/d1/a1"
+       rmdir $DIR/$tdir/d2/a2 || error "(2) Fail to rmdir $DIR/$tdir/d2/a2"
+       do_facet mds1 $LCTL set_param fail_loc=0
+       do_facet mds2 $LCTL set_param fail_loc=0
+
+       cancel_lru_locks mdc
+       cancel_lru_locks osc
+
+       echo "Inject failure, to simulate the MDT0 fail to handle"
+       echo "MDT1 LFSCK request during the first-stage scanning."
+       #define OBD_FAIL_LFSCK_BAD_NETWORK      0x161c
+       do_facet mds2 $LCTL set_param fail_loc=0x161c fail_val=0
+
+       echo "Trigger namespace LFSCK on all devices to find out orphan object"
+       $START_NAMESPACE -r -A ||
+               error "(3) Fail to start LFSCK for namespace"
+
+       wait_update_facet mds1 "$LCTL get_param -n \
+               mdd.$(facet_svc mds1).lfsck_namespace |
+               awk '/^status/ { print \\\$2 }'" "partial" 32 || {
+               error "(4) mds1 is not the expected 'partial'"
+       }
+
+       wait_update_facet mds2 "$LCTL get_param -n \
+               mdd.$(facet_svc mds2).lfsck_namespace |
+               awk '/^status/ { print \\\$2 }'" "completed" 32 || {
+               error "(5) mds2 is not the expected 'completed'"
+       }
+
+       do_facet mds2 $LCTL set_param fail_loc=0 fail_val=0
+
+       local repaired=$(do_facet mds1 $LCTL get_param -n \
+                        mdd.$(facet_svc mds1).lfsck_namespace |
+                        awk '/^lost_dirent_repaired/ { print $2 }')
+       [ $repaired -eq 0 ] ||
+               error "(6) Expect 0 fixed on mds1, but got: $repaired"
+
+       repaired=$(do_facet mds2 $LCTL get_param -n \
+                  mdd.$(facet_svc mds2).lfsck_namespace |
+                  awk '/^lost_dirent_repaired/ { print $2 }')
+       [ $repaired -eq 1 ] ||
+               error "(7) Expect 1 fixed on mds2, but got: $repaired"
+
+       echo "Trigger namespace LFSCK on all devices again to cleanup"
+       $START_NAMESPACE -r -A ||
+               error "(8) Fail to start LFSCK for namespace"
+
+       for k in $(seq $MDSCOUNT); do
+               # The LFSCK status query internal is 30 seconds. For the case
+               # of some LFSCK_NOTIFY RPCs failure/lost, we will wait enough
+               # time to guarantee the status sync up.
+               wait_update_facet mds${k} "$LCTL get_param -n \
+                       mdd.$(facet_svc mds${k}).lfsck_namespace |
+                       awk '/^status/ { print \\\$2 }'" "completed" 32 ||
+                       error "(9) MDS${k} is not the expected 'completed'"
+       done
+
+       local repaired=$(do_facet mds1 $LCTL get_param -n \
+                        mdd.$(facet_svc mds1).lfsck_namespace |
+                        awk '/^lost_dirent_repaired/ { print $2 }')
+       [ $repaired -eq 1 ] ||
+               error "(10) Expect 1 fixed on mds1, but got: $repaired"
+
+       repaired=$(do_facet mds2 $LCTL get_param -n \
+                  mdd.$(facet_svc mds2).lfsck_namespace |
+                  awk '/^lost_dirent_repaired/ { print $2 }')
+       [ $repaired -eq 0 ] ||
+               error "(11) Expect 0 fixed on mds2, but got: $repaired"
+}
+run_test 28 "Skip the failed MDT(s) when handle orphan MDT-objects"
+
 $LCTL set_param debug=-lfsck > /dev/null || true
 
 # restore MDS/OST size
 $LCTL set_param debug=-lfsck > /dev/null || true
 
 # restore MDS/OST size