Whamcloud - gitweb
LU-8288 lfsck: handle dangling LOV EA reference 62/21562/17
authorFan Yong <fan.yong@intel.com>
Sat, 10 Sep 2016 11:30:42 +0000 (19:30 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 18 Jan 2017 18:59:12 +0000 (18:59 +0000)
Originally, the layout LFSCK logic of handling dangling LOV EA
reference is as following:

During the first phase scanning, if the layout LFSCK find that
some LOV EA entry references an OST-object that does not exist,
then it will repair the inconsistency based on the LFSCK start
parameter "-c" option. If "-c" option is specified, the layout
LFSCK will think the OST-object lost, then it will create the
lost OST-object with the FID that is stored in the LOV EA slot.
But such repairing may be incorrect. Because the LOV EA may be
corrupted as to the LOV EA is invalid. Means the OST-object is
still on the OST. When moves to the second stage scaning, the
layout LFSCK will find the orphan OST-object that claims to be
as one of the MDT-object's stripe. And if someone has already
modified the new created OST-object before finding the orphan,
then the layout LFSCK cannot recover the original data back.

To avoid above trouble, the patch introduces new start option:
"--delay-create-ostobj" or short described as "-d". It allows
the layout LFSCK to postpone creating the "lost" OST-object
until all the orphan OST-objects handled. It will record the
dangling references in some new introduced layout LFSCK trace
files on disk during the layout LFSCK first stage scanning,
then travel those traces file after all orphan OST-objects
handled in the second-stage scanning. The side-effect of such
option is that as long as one OST does not join the layout
LFSCK or fail to complete the scanning, then reparing dangling
LOV EA will be skipped. For a large system with a lot of OSTs,
such condition may be a bit strict. The default value is 'off'.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: Ic222e1ad20c8011aa5f41cc43171d017ab5c464f
Reviewed-on: https://review.whamcloud.com/21562
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
12 files changed:
lustre/doc/lctl-lfsck-start.8
lustre/include/dt_object.h
lustre/include/lustre/lustre_lfsck_user.h
lustre/lfsck/lfsck_bookmark.c
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c
lustre/lfsck/lfsck_namespace.c
lustre/obdclass/dt_object.c
lustre/tests/sanity-lfsck.sh
lustre/utils/lctl.c
lustre/utils/lustre_lfsck.c

index b2aa2d7..3ca3ce4 100644 (file)
@@ -3,7 +3,8 @@
 .br
 .B lctl lfsck_start \fR[-M | --device [MDT,OST]_device]
      \fR[-A | --all] [-c | --create-ostobj [on | off]]
-     \fR[-C | --create-mttobj [on | off]]
+     \fR[-C | --create-mdtobj [on | off]]
+     \fR[-d | --delay-create-ostobj [on | off]]
      \fR[-e | --error <continue | abort>] [-h | --help]
      \fR[-n | --dryrun [on | off]] [-o | --orphan]
      \fR[-r | --reset] [-s | --speed speed_limit]
@@ -38,6 +39,16 @@ Under default mode, when the LFSCK find dangling name entry, it will report
 the inconsistency but will not repair it.  If 'on' is given, then LFSCK will
 re-create the missed MDT-object.
 .TP
+.B  -d, --delay-create-ostobj [on | off]
+Delay to create the lost OST-object for dangling LOV EA until orphan OST-objects
+handled: 'off' (default) or 'on'. If both "--create-ostobj" and the delay option
+are 'on', then the LFSCK will NOT create the OST-object to repair dangling LOV
+EA unless all the OST-objects have been handled. It can avoid reparing dangling
+LOV EA incorrectly because of LOV EA corruption. The side-effect is that as long
+as one OST does not join the layout LFSCK or fail to complete the scanning, then
+reparing dangling LOV EA will be skipped. For a large system with a lot of OSTs,
+such condition may be a bit strict. The default value is 'off'.
+.TP
 .B  -e, --error <error_handle>
 With error_handle as 'abort' then if the repair of a file is not possible, then
 LFSCK will save the current position stop with an error.  Otherwise the default
index 1a4fdec..cbd3b92 100644 (file)
@@ -342,8 +342,9 @@ enum dt_index_flags {
  */
 extern const struct dt_index_features dt_directory_features;
 extern const struct dt_index_features dt_otable_features;
-extern const struct dt_index_features dt_lfsck_orphan_features;
-extern const struct dt_index_features dt_lfsck_features;
+extern const struct dt_index_features dt_lfsck_layout_orphan_features;
+extern const struct dt_index_features dt_lfsck_layout_dangling_features;
+extern const struct dt_index_features dt_lfsck_namespace_features;
 
 /* index features supported by the accounting objects */
 extern const struct dt_index_features dt_acct_features;
index 477beab..a02f65f 100644 (file)
@@ -158,6 +158,9 @@ enum lfsck_param_flags {
 
        /* Do not return until the LFSCK not running. */
        LPF_WAIT                = 0x0100,
+
+       /* Delay to create OST-object for dangling LOV EA. */
+       LPF_DELAY_CREATE_OSTOBJ = 0x0200,
 };
 
 enum lfsck_type {
@@ -191,6 +194,7 @@ enum lfsck_start_valid {
        LSV_ASYNC_WINDOWS       = 0x00000008,
        LSV_CREATE_OSTOBJ       = 0x00000010,
        LSV_CREATE_MDTOBJ       = 0x00000020,
+       LSV_DELAY_CREATE_OSTOBJ = 0x00000040,
 };
 
 /* Arguments for starting lfsck. */
index 79ab91c..4663328 100644 (file)
@@ -225,6 +225,11 @@ int lfsck_set_param(const struct lu_env *env, struct lfsck_instance *lfsck,
                        dirty = true;
                }
 
+               if (bk->lb_param & LPF_DELAY_CREATE_OSTOBJ) {
+                       bk->lb_param &= ~LPF_DELAY_CREATE_OSTOBJ;
+                       dirty = true;
+               }
+
                if (bk->lb_param & LPF_FAILOUT) {
                        bk->lb_param &= ~LPF_FAILOUT;
                        dirty = true;
@@ -282,6 +287,18 @@ int lfsck_set_param(const struct lu_env *env, struct lfsck_instance *lfsck,
                        }
                }
 
+               if ((start->ls_valid & LSV_DELAY_CREATE_OSTOBJ) || reset) {
+                       if ((bk->lb_param & LPF_DELAY_CREATE_OSTOBJ) &&
+                           !(start->ls_valid & LSV_DELAY_CREATE_OSTOBJ)) {
+                               bk->lb_param &= ~LPF_DELAY_CREATE_OSTOBJ;
+                               dirty = true;
+                       } else if (!(bk->lb_param & LPF_DELAY_CREATE_OSTOBJ) &&
+                                  start->ls_flags & LPF_DELAY_CREATE_OSTOBJ) {
+                               bk->lb_param |= LPF_DELAY_CREATE_OSTOBJ;
+                               dirty = true;
+                       }
+               }
+
                if ((start->ls_valid & LSV_ERROR_HANDLE) || reset) {
                        if ((bk->lb_param & LPF_FAILOUT) &&
                            !(start->ls_valid & LSV_ERROR_HANDLE)) {
index dd89553..42dfea6 100644 (file)
@@ -357,7 +357,11 @@ struct lfsck_layout {
 
        /* For further using. 256-bytes aligned now. */
        __u32   ll_reserved_1;
-       __u64   ll_reserved_2[11];
+
+       /* The latest object has been processed (failed) during double scan. */
+       struct lu_fid   ll_fid_latest_scanned_phase2;
+
+       __u64   ll_reserved_2[9];
 
        /* The OST targets bitmap to record the OSTs that contain
         * non-verified OST-objects. */
@@ -866,6 +870,7 @@ struct lfsck_thread_info {
        struct lu_fid           lti_fid;
        struct lu_fid           lti_fid2;
        struct lu_fid           lti_fid3;
+       struct lu_fid           lti_fid4;
        struct lu_attr          lti_la;
        struct lu_attr          lti_la2;
        struct ost_id           lti_oi;
@@ -968,6 +973,16 @@ int lfsck_double_scan_generic(const struct lu_env *env,
                              struct lfsck_component *com, int status);
 void lfsck_quit_generic(const struct lu_env *env,
                        struct lfsck_component *com);
+int lfsck_load_one_trace_file(const struct lu_env *env,
+                             struct lfsck_component *com,
+                             struct dt_object *parent,
+                             struct dt_object **child,
+                             const struct dt_index_features *ft,
+                             const char *name, bool reset);
+int lfsck_load_sub_trace_files(const struct lu_env *env,
+                              struct lfsck_component *com,
+                              const struct dt_index_features *ft,
+                              const char *prefix, bool reset);
 
 /* lfsck_engine.c */
 int lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie, __u16 *type);
index 05ee675..1df9822 100644 (file)
@@ -716,6 +716,8 @@ static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
                                le64_to_cpu(src->ll_objs_repaired[i]);
        des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
        des->ll_bitmap_size = le32_to_cpu(src->ll_bitmap_size);
+       fid_le_to_cpu(&des->ll_fid_latest_scanned_phase2,
+                     &src->ll_fid_latest_scanned_phase2);
 }
 
 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
@@ -746,6 +748,8 @@ static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
                                cpu_to_le64(src->ll_objs_repaired[i]);
        des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
        des->ll_bitmap_size = cpu_to_le32(src->ll_bitmap_size);
+       fid_cpu_to_le(&des->ll_fid_latest_scanned_phase2,
+                     &src->ll_fid_latest_scanned_phase2);
 }
 
 /**
@@ -965,6 +969,9 @@ static int lfsck_layout_init(const struct lu_env *env,
        lo->ll_status = LS_INIT;
        down_write(&com->lc_sem);
        rc = lfsck_layout_store(env, com);
+       if (rc == 0 && com->lc_lfsck->li_master)
+               rc = lfsck_load_sub_trace_files(env, com,
+                       &dt_lfsck_layout_dangling_features, LFSCK_LAYOUT, true);
        up_write(&com->lc_sem);
 
        return rc;
@@ -1394,6 +1401,114 @@ static int lfsck_layout_trans_stop(const struct lu_env *env,
        return rc == 0 ? 1 : rc;
 }
 
+static int lfsck_layout_ins_dangling_rec(const struct lu_env *env,
+                                        struct lfsck_component *com,
+                                        const struct lu_fid *pfid,
+                                        const struct lu_fid *cfid,
+                                        __u32 ea_off, __u32 ost_idx)
+{
+       struct lu_fid *key = &lfsck_env_info(env)->lti_fid3;
+       struct lu_fid *rec = &lfsck_env_info(env)->lti_fid4;
+       struct dt_device *dev;
+       struct dt_object *obj;
+       struct thandle *th = NULL;
+       int idx;
+       int rc = 0;
+       ENTRY;
+
+       idx = lfsck_sub_trace_file_fid2idx(pfid);
+       obj = com->lc_sub_trace_objs[idx].lsto_obj;
+       dev = lfsck_obj2dev(obj);
+       fid_cpu_to_be(key, pfid);
+       key->f_ver = cpu_to_be32(ea_off);
+       fid_cpu_to_be(rec, cfid);
+       rec->f_ver = cpu_to_be32(ost_idx);
+
+       mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(unlock, rc = PTR_ERR(th));
+
+       rc = dt_declare_insert(env, obj,
+                              (const struct dt_rec *)rec,
+                              (const struct dt_key *)key, th);
+       if (rc)
+               GOTO(unlock, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc)
+               GOTO(unlock, rc);
+
+       rc = dt_insert(env, obj, (const struct dt_rec *)rec,
+                      (const struct dt_key *)key, th, 1);
+
+       GOTO(unlock, rc);
+
+unlock:
+       if (th != NULL && !IS_ERR(th))
+               dt_trans_stop(env, dev, th);
+
+       mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
+
+       CDEBUG(D_LFSCK, "%s: insert the paris "DFID" => "DFID", ea_off = %u, "
+              "ost_idx = %u, into the trace file for further dangling check: "
+              "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
+              PFID(pfid), PFID(cfid), ea_off, ost_idx, rc);
+
+       return rc;
+}
+
+static int lfsck_layout_del_dangling_rec(const struct lu_env *env,
+                                        struct lfsck_component *com,
+                                        const struct lu_fid *fid,
+                                        __u32 ea_off)
+{
+       struct lu_fid *key = &lfsck_env_info(env)->lti_fid3;
+       struct dt_device *dev;
+       struct dt_object *obj;
+       struct thandle *th = NULL;
+       int idx;
+       int rc = 0;
+       ENTRY;
+
+       idx = lfsck_sub_trace_file_fid2idx(fid);
+       obj = com->lc_sub_trace_objs[idx].lsto_obj;
+       dev = lfsck_obj2dev(obj);
+       fid_cpu_to_be(key, fid);
+       key->f_ver = cpu_to_be32(ea_off);
+
+       mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(unlock, rc = PTR_ERR(th));
+
+       rc = dt_declare_delete(env, obj, (const struct dt_key *)key, th);
+       if (rc)
+               GOTO(unlock, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc)
+               GOTO(unlock, rc);
+
+       rc = dt_delete(env, obj, (const struct dt_key *)key, th);
+
+       GOTO(unlock, rc);
+
+unlock:
+       if (th != NULL && !IS_ERR(th))
+               dt_trans_stop(env, dev, th);
+
+       mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
+
+       CDEBUG(D_LFSCK, "%s: delete the dangling record for "DFID
+              ", ea_off = %u from the trace file: rc = %d\n",
+              lfsck_lfsck2name(com->lc_lfsck), PFID(fid), ea_off, rc);
+
+       return rc;
+}
+
 /**
  * Get the system default stripe size.
  *
@@ -2482,6 +2597,12 @@ static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
        if (!S_ISREG(lu_object_attr(&parent->do_lu)))
                GOTO(put, rc = -EISDIR);
 
+       /* The orphan OST-object claims to be the parent's stripe, then
+        * related dangling record in the trace file is meaningless. */
+       rc = lfsck_layout_del_dangling_rec(env, com, pfid, ea_off);
+       if (rc != 0 && rc != -ENOENT)
+               GOTO(put, rc);
+
        rc = lfsck_layout_recreate_lovea(env, com, ltd, rec, parent, cfid,
                                         ltd->ltd_index, ea_off);
 
@@ -2547,7 +2668,8 @@ static int lfsck_layout_scan_orphan(const struct lu_env *env,
        if (unlikely(IS_ERR(obj)))
                GOTO(log, rc = PTR_ERR(obj));
 
-       rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_orphan_features);
+       rc = obj->do_ops->do_index_try(env, obj,
+                                      &dt_lfsck_layout_orphan_features);
        if (rc != 0)
                GOTO(put, rc);
 
@@ -2614,43 +2736,53 @@ log:
        return rc > 0 ? 0 : rc;
 }
 
-/* For the MDT-object with dangling reference, we need to repare the
- * inconsistency according to the LFSCK sponsor's requirement:
+/**
+ * Repair the MDT-object with dangling LOV EA reference.
+ *
+ * we need to repair the inconsistency according to the users' requirement:
  *
  * 1) Keep the inconsistency there and report the inconsistency case,
  *    then give the chance to the application to find related issues,
  *    and the users can make the decision about how to handle it with
  *    more human knownledge. (by default)
  *
- * 2) Re-create the missing OST-object with the FID/owner information. */
-static int lfsck_layout_repair_dangling(const struct lu_env *env,
-                                       struct lfsck_component *com,
-                                       struct dt_object *parent,
-                                       struct lfsck_layout_req *llr,
-                                       struct lu_attr *la)
+ * 2) Re-create the missing OST-object with the FID/owner information.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      the layout LFSCK component
+ * \param[in] parent   the MDT-object with dangling LOV EA reference
+ * \param[in] child    the OST-object to be created
+ * \param[in] ea_off   the offset of the OST-object in the LOV EA
+ * \param[in] ost_idx  the index of OST on which the OST-object resides
+ *
+ * \retval             +1 for repair successfully
+ * \retval             0 for did nothing
+ * \retval             negative error number on failure
+ */
+static int __lfsck_layout_repair_dangling(const struct lu_env *env,
+                                         struct lfsck_component *com,
+                                         struct dt_object *parent,
+                                         struct dt_object *child,
+                                         __u32 ea_off, __u32 ost_idx, bool log)
 {
-       struct lfsck_thread_info        *info   = lfsck_env_info(env);
-       struct filter_fid               *pfid   = &info->lti_new_pfid;
-       struct dt_object_format         *dof    = &info->lti_dof;
-       struct dt_object                *child  = llr->llr_child;
-       struct dt_device                *dev    = lfsck_obj2dev(child);
-       const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
-       struct thandle                  *handle;
-       struct lu_buf                   *buf;
-       struct lustre_handle             lh     = { 0 };
-       int                              rc;
-       bool                             create;
+       struct lfsck_thread_info *info = lfsck_env_info(env);
+       struct filter_fid *ff = &info->lti_new_pfid;
+       struct dt_object_format *dof = &info->lti_dof;
+       struct lu_attr *la = &info->lti_la;
+       struct lfsck_instance *lfsck = com->lc_lfsck;
+       struct dt_device *dev = lfsck_obj2dev(child);
+       const struct lu_fid *pfid = lfsck_dto2fid(parent);
+       const struct lu_fid *cfid = lfsck_dto2fid(child);
+       struct thandle *handle;
+       struct lu_buf *buf;
+       struct lustre_handle lh = { 0 };
+       int rc;
        ENTRY;
 
-       if (com->lc_lfsck->li_bookmark_ram.lb_param & LPF_CREATE_OSTOBJ)
-               create = true;
-       else
-               create = false;
-
-       if (!create)
+       if (!(lfsck->li_bookmark_ram.lb_param & LPF_CREATE_OSTOBJ))
                GOTO(log, rc = 1);
 
-       rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
+       rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
                              MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
                              LCK_EX);
        if (rc != 0)
@@ -2665,13 +2797,13 @@ static int lfsck_layout_repair_dangling(const struct lu_env *env,
        la->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
                       LA_ATIME | LA_MTIME | LA_CTIME;
        memset(dof, 0, sizeof(*dof));
-       pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
-       pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
+       ff->ff_parent.f_seq = cpu_to_le64(pfid->f_seq);
+       ff->ff_parent.f_oid = cpu_to_le32(pfid->f_oid);
        /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
         * MDT-object's FID::f_ver, instead it is the OST-object index in its
         * parent MDT-object's layout EA. */
-       pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
-       buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
+       ff->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
+       buf = lfsck_buf_get(env, ff, sizeof(struct filter_fid));
 
        handle = dt_trans_create(env, dev);
        if (IS_ERR(handle))
@@ -2692,7 +2824,53 @@ static int lfsck_layout_repair_dangling(const struct lu_env *env,
 
        dt_read_lock(env, parent, 0);
        if (unlikely(lfsck_is_dead_obj(parent)))
-               GOTO(unlock2, rc = 1);
+               GOTO(unlock2, rc = 0);
+
+       if (lfsck->li_bookmark_ram.lb_param & LPF_DELAY_CREATE_OSTOBJ) {
+               struct ost_id *oi = &info->lti_oi;
+               struct lu_fid *tfid = &info->lti_fid2;
+               struct lu_buf *lovea = &info->lti_big_buf;
+               struct lov_mds_md_v1 *lmm;
+               struct lov_ost_data_v1 *objs;
+               __u32 magic;
+               int count;
+               int idx2;
+
+               rc = lfsck_layout_get_lovea(env, parent, lovea);
+               if (rc <= 0)
+                       GOTO(unlock2, rc);
+
+               lmm = lovea->lb_buf;
+               rc = lfsck_layout_verify_header(lmm);
+               if (unlikely(rc != 0))
+                       GOTO(unlock2, rc);
+
+               count = le16_to_cpu(lmm->lmm_stripe_count);
+               /* Someone changed the LOV EA, do nothing. */
+               if (count <= ea_off)
+                       GOTO(unlock2, rc = 0);
+
+               /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which
+                * has been verified in lfsck_layout_verify_header() already.
+                * If some new magic introduced in the future, then the layout
+                * LFSCK needs to be updated also. */
+               magic = le32_to_cpu(lmm->lmm_magic);
+               if (magic == LOV_MAGIC_V1) {
+                       objs = &lmm->lmm_objects[ea_off];
+               } else {
+                       LASSERT(magic == LOV_MAGIC_V3);
+
+                       objs = &((struct lov_mds_md_v3 *)lmm)->\
+                                                       lmm_objects[ea_off];
+               }
+
+               ostid_le_to_cpu(&objs->l_ost_oi, oi);
+               idx2 = le32_to_cpu(objs->l_ost_idx);
+               rc = ostid_to_fid(tfid, oi, idx2);
+               /* Someone changed the LOV EA, do nothing. */
+               if (rc != 0 || !lu_fid_eq(tfid, cfid))
+                       GOTO(unlock2, rc);
+       }
 
        rc = dt_create(env, child, la, NULL, dof, handle);
        if (rc != 0)
@@ -2713,15 +2891,89 @@ unlock1:
        lfsck_ibits_unlock(&lh, LCK_EX);
 
 log:
+       if (rc != 0 && log)
+               CDEBUG(D_LFSCK, "%s: layout LFSCK assistant found "
+                      "dangling reference for: parent "DFID", child "
+                      DFID", ea_off %u, ost_idx %u, %s: rc = %d\n",
+                      lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid),
+                      ea_off, ost_idx,
+                      (lfsck->li_bookmark_ram.lb_param & LPF_CREATE_OSTOBJ) ?
+                               "Create the lost OST-object as required" :
+                               "Keep the MDT-object there by default", rc);
+
+       return rc;
+}
+
+/**
+ * Repair the MDT-object with dangling LOV EA reference.
+ *
+ * Prepare parameters and call __lfsck_layout_repair_dangling()
+ * to repair the dangling LOV EA reference.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      the layout LFSCK component
+ * \param[in] pfid     the MDT-object's FID
+ * \param[in] cfid     the FID for the OST-object to be created
+ * \param[in] ea_off   the offset of the OST-object in the LOV EA
+ * \param[in] ost_idx  the index of OST on which the OST-object resides
+ *
+ * \retval             +1 for repair successfully
+ * \retval             0 for did nothing
+ * \retval             negative error number on failure
+ */
+static int lfsck_layout_repair_dangling(const struct lu_env *env,
+                                       struct lfsck_component *com,
+                                       const struct lu_fid *pfid,
+                                       const struct lu_fid *cfid,
+                                       __u32 ea_off, __u32 ost_idx)
+{
+       struct lfsck_instance *lfsck = com->lc_lfsck;
+       struct dt_object *parent = NULL;
+       struct dt_object *child = NULL;
+       struct lfsck_tgt_desc *ltd;
+       int rc;
+       ENTRY;
+
+       parent = lfsck_object_find_bottom(env, lfsck, pfid);
+       if (IS_ERR(parent))
+               GOTO(log, rc = PTR_ERR(parent));
+
+       /* The MDT-object has been removed. */
+       if (dt_object_exists(parent) == 0)
+               GOTO(log, rc = 0);
+
+       ltd = lfsck_ltd2tgt(&lfsck->li_ost_descs, ost_idx);
+       if (unlikely(ltd == NULL))
+               GOTO(log, rc = -ENODEV);
+
+       child = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid);
+       if (IS_ERR(child))
+               GOTO(log, rc = PTR_ERR(child));
+
+       /* The OST-object has been created. */
+       if (unlikely(dt_object_exists(child) != 0))
+               GOTO(log, rc = 0);
+
+       rc = __lfsck_layout_repair_dangling(env, com, parent, child,
+                                           ea_off, ost_idx, false);
+
+       GOTO(log, rc);
+
+log:
+       if (child != NULL && !IS_ERR(child))
+               lfsck_object_put(env, child);
+
+       if (parent != NULL && !IS_ERR(parent))
+               lfsck_object_put(env, parent);
+
        if (rc != 0)
                CDEBUG(D_LFSCK, "%s: layout LFSCK assistant found "
-                      "dangling reference for: parent "DFID", child "DFID
-                      ", OST-index %u, stripe-index %u, owner %u/%u. %s: "
-                      "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
-                      PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
-                      llr->llr_ost_idx, llr->llr_lov_idx,
-                      la->la_uid, la->la_gid,
-                      create ? "Create the lost OST-object as required" :
+                      "dangling reference for: parent "DFID", child "
+                      DFID", ea_off %u, ost_idx %u, %s: rc = %d\n",
+                      lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid),
+                      ea_off, ost_idx,
+                      (lfsck->li_bookmark_ram.lb_param & LPF_CREATE_OSTOBJ) ?
+                               "Create the lost OST-object as required" :
                                "Keep the MDT-object there by default", rc);
 
        return rc;
@@ -3328,7 +3580,14 @@ repair:
 
        switch (type) {
        case LLIT_DANGLING:
-               rc = lfsck_layout_repair_dangling(env, com, parent, llr, pla);
+               if (bk->lb_param & LPF_DELAY_CREATE_OSTOBJ)
+                       rc = lfsck_layout_ins_dangling_rec(env, com,
+                               lfsck_dto2fid(parent), lfsck_dto2fid(child),
+                               llr->llr_lov_idx, llr->llr_ost_idx);
+               else
+                       rc = __lfsck_layout_repair_dangling(env, com, parent,
+                                       llr->llr_child, llr->llr_lov_idx,
+                                       llr->llr_ost_idx, true);
                break;
        case LLIT_UNMATCHED_PAIR:
                rc = lfsck_layout_repair_unmatched_pair(env, com, parent,
@@ -3369,7 +3628,8 @@ out:
                } else {
                        lfsck_layout_record_failure(env, lfsck, lo);
                }
-       } else if (rc > 0) {
+       } else if (rc > 0 && (type != LLIT_DANGLING ||
+                             !(bk->lb_param & LPF_DELAY_CREATE_OSTOBJ))) {
                LASSERTF(type > LLIT_NONE && type <= LLIT_MAX,
                         "unknown type = %d\n", type);
 
@@ -3388,6 +3648,134 @@ out:
        return rc;
 }
 
+static int
+lfsck_layout_double_scan_one_trace_file(const struct lu_env *env,
+                                       struct lfsck_component *com,
+                                       struct dt_object *obj, bool first)
+{
+       struct lfsck_instance *lfsck = com->lc_lfsck;
+       struct ptlrpc_thread *thread = &lfsck->li_thread;
+       struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
+       struct lfsck_layout *lo = com->lc_file_ram;
+       const struct dt_it_ops *iops = &obj->do_index_ops->dio_it;
+       struct dt_it *di;
+       struct dt_key *key;
+       struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid3;
+       struct lu_fid *cfid = &lfsck_env_info(env)->lti_fid4;
+       __u32 ea_off;
+       __u32 ost_idx;
+       int rc;
+       ENTRY;
+
+       di = iops->init(env, obj, 0);
+       if (IS_ERR(di))
+               RETURN(PTR_ERR(di));
+
+       if (first)
+               fid_cpu_to_be(pfid, &lo->ll_fid_latest_scanned_phase2);
+       else
+               fid_zero(pfid);
+       rc = iops->get(env, di, (const struct dt_key *)pfid);
+       if (rc < 0)
+               GOTO(fini, rc);
+
+       if (first) {
+               /* The start one either has been processed or does not exist,
+                * skip it. */
+               rc = iops->next(env, di);
+               if (rc != 0)
+                       GOTO(put, rc);
+       }
+
+       do {
+               if (CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_DELAY3, cfs_fail_val) &&
+                   unlikely(!thread_is_running(thread)))
+                       GOTO(put, rc = 0);
+
+               key = iops->key(env, di);
+               if (IS_ERR(key)) {
+                       rc = PTR_ERR(key);
+                       if (rc == -ENOENT)
+                               GOTO(put, rc = 1);
+
+                       goto checkpoint;
+               }
+
+               fid_be_to_cpu(pfid, (const struct lu_fid *)key);
+               ea_off = pfid->f_ver;
+               pfid->f_ver = 0;
+               if (!fid_is_sane(pfid)) {
+                       rc = 0;
+                       goto checkpoint;
+               }
+
+               rc = iops->rec(env, di, (struct dt_rec *)cfid, 0);
+               if (rc == 0) {
+                       fid_be_to_cpu(cfid, cfid);
+                       ost_idx = cfid->f_ver;
+                       cfid->f_ver = 0;
+                       if (!fid_is_sane(cfid)) {
+                               rc = 0;
+                               goto checkpoint;
+                       }
+
+                       rc = lfsck_layout_repair_dangling(env, com, pfid, cfid,
+                                                         ea_off, ost_idx);
+               }
+
+checkpoint:
+               down_write(&com->lc_sem);
+               com->lc_new_checked++;
+               com->lc_new_scanned++;
+               if (rc >= 0)
+                       lo->ll_fid_latest_scanned_phase2 = *pfid;
+
+               if (rc > 0)
+                       lo->ll_objs_repaired[LLIT_DANGLING - 1]++;
+               else if (rc < 0)
+                       lo->ll_objs_failed_phase2++;
+               up_write(&com->lc_sem);
+
+               if (rc < 0 && bk->lb_param & LPF_FAILOUT)
+                       GOTO(put, rc);
+
+               if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
+                                             cfs_time_current())) &&
+                   com->lc_new_checked != 0) {
+                       down_write(&com->lc_sem);
+                       lo->ll_run_time_phase2 +=
+                               cfs_duration_sec(cfs_time_current() +
+                               HALF_SEC - com->lc_time_last_checkpoint);
+                       lo->ll_time_last_checkpoint = cfs_time_current_sec();
+                       lo->ll_objs_checked_phase2 += com->lc_new_checked;
+                       com->lc_new_checked = 0;
+                       lfsck_layout_store(env, com);
+                       up_write(&com->lc_sem);
+
+                       com->lc_time_last_checkpoint = cfs_time_current();
+                       com->lc_time_next_checkpoint =
+                               com->lc_time_last_checkpoint +
+                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
+               }
+
+               lfsck_control_speed_by_self(com);
+               if (unlikely(!thread_is_running(thread)))
+                       GOTO(put, rc = 0);
+
+               rc = iops->next(env, di);
+       } while (rc == 0);
+
+       GOTO(put, rc);
+
+put:
+       iops->put(env, di);
+
+fini:
+       iops->fini(env, di);
+
+       return rc;
+}
+
 static int lfsck_layout_assistant_handler_p2(const struct lu_env *env,
                                             struct lfsck_component *com)
 {
@@ -3408,7 +3796,7 @@ static int lfsck_layout_assistant_handler_p2(const struct lu_env *env,
                                 struct lfsck_tgt_desc,
                                 ltd_layout_phase_list);
                list_del_init(&ltd->ltd_layout_phase_list);
-               if (bk->lb_param & LPF_ALL_TGT) {
+               if (bk->lb_param & LPF_OST_ORPHAN) {
                        spin_unlock(&ltds->ltd_lock);
                        rc = lfsck_layout_scan_orphan(env, com, ltd);
                        if (rc != 0 && bk->lb_param & LPF_FAILOUT)
@@ -3427,6 +3815,29 @@ static int lfsck_layout_assistant_handler_p2(const struct lu_env *env,
                rc = 0;
        spin_unlock(&ltds->ltd_lock);
 
+       if (rc == 1 && bk->lb_param & LPF_OST_ORPHAN) {
+               struct lfsck_layout *lo = com->lc_file_ram;
+               int i;
+
+               com->lc_new_checked = 0;
+               com->lc_new_scanned = 0;
+               com->lc_time_last_checkpoint = cfs_time_current();
+               com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
+                               cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
+
+               i = lfsck_sub_trace_file_fid2idx(
+                               &lo->ll_fid_latest_scanned_phase2);
+               rc = lfsck_layout_double_scan_one_trace_file(env, com,
+                               com->lc_sub_trace_objs[i].lsto_obj, true);
+               while (rc > 0 && ++i < LFSCK_STF_COUNT)
+                       rc = lfsck_layout_double_scan_one_trace_file(env, com,
+                               com->lc_sub_trace_objs[i].lsto_obj, false);
+
+               CDEBUG(D_LFSCK, "%s: layout LFSCK phase2 scan dangling stop "
+                      "at the No. %d trace file: rc = %d\n",
+                      lfsck_lfsck2name(lfsck), i, rc);
+       }
+
        CDEBUG(D_LFSCK, "%s: layout LFSCK phase2 scan stop: rc = %d\n",
               lfsck_lfsck2name(lfsck), rc);
 
@@ -3922,6 +4333,9 @@ static int lfsck_layout_reset(const struct lu_env *env,
        }
 
        rc = lfsck_layout_store(env, com);
+       if (rc == 0 && com->lc_lfsck->li_master)
+               rc = lfsck_load_sub_trace_files(env, com,
+                       &dt_lfsck_layout_dangling_features, LFSCK_LAYOUT, true);
        up_write(&com->lc_sem);
 
        CDEBUG(D_LFSCK, "%s: layout LFSCK reset: rc = %d\n",
@@ -5564,6 +5978,7 @@ int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
        struct lfsck_layout     *lo;
        struct dt_object        *root = NULL;
        struct dt_object        *obj;
+       int                      i;
        int                      rc;
        ENTRY;
 
@@ -5584,6 +5999,9 @@ int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
                                LFSCK_LAYOUT);
                if (com->lc_data == NULL)
                        GOTO(out, rc = -ENOMEM);
+
+               for (i = 0; i < LFSCK_STF_COUNT; i++)
+                       mutex_init(&com->lc_sub_trace_objs[i].lsto_mutex);
        } else {
                struct lfsck_layout_slave_data *llsd;
 
@@ -5627,6 +6045,10 @@ int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
                rc = lfsck_layout_reset(env, com, true);
        else if (rc == -ENOENT)
                rc = lfsck_layout_init(env, com);
+       else if (lfsck->li_master)
+               rc = lfsck_load_sub_trace_files(env, com,
+                               &dt_lfsck_layout_dangling_features,
+                               LFSCK_LAYOUT, false);
 
        if (rc != 0)
                GOTO(out, rc);
index 17c2dd5..98f7e24 100644 (file)
@@ -86,6 +86,8 @@ const char *lfsck_param_names[] = {
        "orphan",
        "create_ostobj",
        "create_mdtobj",
+       NULL,
+       "delay_create_ostobj",
        NULL
 };
 
@@ -2626,6 +2628,96 @@ void lfsck_quit_generic(const struct lu_env *env,
                     &lwi);
 }
 
+int lfsck_load_one_trace_file(const struct lu_env *env,
+                             struct lfsck_component *com,
+                             struct dt_object *parent,
+                             struct dt_object **child,
+                             const struct dt_index_features *ft,
+                             const char *name, bool reset)
+{
+       struct lfsck_instance *lfsck = com->lc_lfsck;
+       struct dt_object *obj;
+       int rc;
+       ENTRY;
+
+       if (*child != NULL) {
+               struct dt_it *it;
+               const struct dt_it_ops *iops;
+               struct lu_fid *fid = &lfsck_env_info(env)->lti_fid3;
+
+               if (!reset)
+                       RETURN(0);
+
+               obj = *child;
+               rc = obj->do_ops->do_index_try(env, obj, ft);
+               if (rc)
+                       /* unlink by force */
+                       goto unlink;
+
+               iops = &obj->do_index_ops->dio_it;
+               it = iops->init(env, obj, 0);
+               if (IS_ERR(it))
+                       /* unlink by force */
+                       goto unlink;
+
+               fid_zero(fid);
+               rc = iops->get(env, it, (const struct dt_key *)fid);
+               if (rc >= 0) {
+                       rc = iops->next(env, it);
+                       iops->put(env, it);
+               }
+               iops->fini(env, it);
+               if (rc > 0)
+                       /* "rc > 0" means the index file is empty. */
+                       RETURN(0);
+
+unlink:
+               /* The old index is not empty, remove it firstly. */
+               rc = local_object_unlink(env, lfsck->li_bottom, parent, name);
+
+               CDEBUG(D_LFSCK, "%s: unlink lfsck sub trace file %s: rc = %d\n",
+                      lfsck_lfsck2name(com->lc_lfsck), name, rc);
+
+               if (rc)
+                       RETURN(rc);
+
+               lfsck_object_put(env, *child);
+               *child = NULL;
+       }
+
+       obj = local_index_find_or_create(env, lfsck->li_los, parent, name,
+                                        S_IFREG | S_IRUGO | S_IWUSR, ft);
+       if (IS_ERR(obj))
+               RETURN(PTR_ERR(obj));
+
+       rc = obj->do_ops->do_index_try(env, obj, ft);
+       if (rc == 0)
+               *child = obj;
+
+       RETURN(rc);
+}
+
+int lfsck_load_sub_trace_files(const struct lu_env *env,
+                              struct lfsck_component *com,
+                              const struct dt_index_features *ft,
+                              const char *prefix, bool reset)
+{
+       char *name = lfsck_env_info(env)->lti_key;
+       struct lfsck_sub_trace_obj *lsto;
+       int rc;
+       int i;
+
+       for (i = 0, rc = 0, lsto = &com->lc_sub_trace_objs[0];
+            i < LFSCK_STF_COUNT && rc == 0; i++, lsto++) {
+               snprintf(name, NAME_MAX, "%s_%02d", prefix, i);
+               rc = lfsck_load_one_trace_file(env, com,
+                               com->lc_lfsck->li_lfsck_dir,
+                               &lsto->lsto_obj, ft, name, reset);
+       }
+
+       return rc;
+}
+
 /* external interfaces */
 
 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
index c43c64d..4384bb8 100644 (file)
@@ -440,71 +440,6 @@ log:
        return rc;
 }
 
-static struct dt_object *
-lfsck_namespace_load_one_trace_file(const struct lu_env *env,
-                                   struct lfsck_component *com,
-                                   struct dt_object *parent,
-                                   const char *name, bool reset)
-{
-       struct lfsck_instance   *lfsck = com->lc_lfsck;
-       struct dt_object        *obj;
-       int                      rc;
-
-       if (reset) {
-               rc = local_object_unlink(env, lfsck->li_bottom, parent, name);
-               if (rc != 0 && rc != -ENOENT)
-                       return ERR_PTR(rc);
-       }
-
-       obj = local_index_find_or_create(env, lfsck->li_los, parent, name,
-                                        S_IFREG | S_IRUGO | S_IWUSR,
-                                        &dt_lfsck_features);
-
-       return obj;
-}
-
-static int lfsck_namespace_load_sub_trace_files(const struct lu_env *env,
-                                               struct lfsck_component *com,
-                                               bool reset)
-{
-       char                            *name = lfsck_env_info(env)->lti_key;
-       struct lfsck_sub_trace_obj      *lsto;
-       struct dt_object                *obj;
-       int                              rc;
-       int                              i;
-
-       for (i = 0, lsto = &com->lc_sub_trace_objs[0];
-            i < LFSCK_STF_COUNT; i++, lsto++) {
-               snprintf(name, NAME_MAX, "%s_%02d", LFSCK_NAMESPACE, i);
-               mutex_lock(&lsto->lsto_mutex);
-               if (lsto->lsto_obj != NULL) {
-                       if (!reset) {
-                               mutex_unlock(&lsto->lsto_mutex);
-                               continue;
-                       }
-
-                       lfsck_object_put(env, lsto->lsto_obj);
-                       lsto->lsto_obj = NULL;
-               }
-
-               obj = lfsck_namespace_load_one_trace_file(env, com,
-                               com->lc_lfsck->li_lfsck_dir, name, reset);
-               LASSERT(obj != NULL);
-               if (IS_ERR(obj)) {
-                       rc = PTR_ERR(obj);
-               } else {
-                       lsto->lsto_obj = obj;
-                       rc = obj->do_ops->do_index_try(env, obj,
-                                                      &dt_lfsck_features);
-               }
-               mutex_unlock(&lsto->lsto_mutex);
-               if (rc != 0)
-                       return rc;
-       }
-
-       return 0;
-}
-
 static int lfsck_namespace_init(const struct lu_env *env,
                                struct lfsck_component *com)
 {
@@ -517,9 +452,10 @@ static int lfsck_namespace_init(const struct lu_env *env,
        ns->ln_time_latest_reset = cfs_time_current_sec();
        down_write(&com->lc_sem);
        rc = lfsck_namespace_store(env, com);
-       up_write(&com->lc_sem);
        if (rc == 0)
-               rc = lfsck_namespace_load_sub_trace_files(env, com, true);
+               rc = lfsck_load_sub_trace_files(env, com,
+                       &dt_lfsck_namespace_features, LFSCK_NAMESPACE, true);
+       up_write(&com->lc_sem);
 
        return rc;
 }
@@ -3946,7 +3882,6 @@ static int lfsck_namespace_reset(const struct lu_env *env,
        struct lfsck_namespace          *ns     = com->lc_file_ram;
        struct lfsck_assistant_data     *lad    = com->lc_data;
        struct dt_object                *root;
-       struct dt_object                *dto;
        int                              rc;
        ENTRY;
 
@@ -3972,15 +3907,14 @@ static int lfsck_namespace_reset(const struct lu_env *env,
        ns->ln_status = LS_INIT;
        ns->ln_time_latest_reset = cfs_time_current_sec();
 
-       lfsck_object_put(env, com->lc_obj);
-       com->lc_obj = NULL;
-       dto = lfsck_namespace_load_one_trace_file(env, com, root,
-                                                 LFSCK_NAMESPACE, true);
-       if (IS_ERR(dto))
-               GOTO(out, rc = PTR_ERR(dto));
+       rc = lfsck_load_one_trace_file(env, com, root, &com->lc_obj,
+                                      &dt_lfsck_namespace_features,
+                                      LFSCK_NAMESPACE, true);
+       if (rc)
+               GOTO(out, rc);
 
-       com->lc_obj = dto;
-       rc = lfsck_namespace_load_sub_trace_files(env, com, true);
+       rc = lfsck_load_sub_trace_files(env, com, &dt_lfsck_namespace_features,
+                                       LFSCK_NAMESPACE, true);
        if (rc != 0)
                GOTO(out, rc);
 
@@ -6198,8 +6132,9 @@ checkpoint:
                down_write(&com->lc_sem);
                com->lc_new_checked++;
                com->lc_new_scanned++;
-               if (rc >= 0 && fid_is_sane(&fid))
+               if (rc >= 0)
                        ns->ln_fid_latest_scanned_phase2 = fid;
+
                if (rc > 0)
                        ns->ln_objs_repaired_phase2++;
                else if (rc < 0)
@@ -6219,10 +6154,8 @@ checkpoint:
                        ns->ln_time_last_checkpoint = cfs_time_current_sec();
                        ns->ln_objs_checked_phase2 += com->lc_new_checked;
                        com->lc_new_checked = 0;
-                       rc = lfsck_namespace_store(env, com);
+                       lfsck_namespace_store(env, com);
                        up_write(&com->lc_sem);
-                       if (rc != 0)
-                               GOTO(put, rc);
 
                        com->lc_time_last_checkpoint = cfs_time_current();
                        com->lc_time_next_checkpoint =
@@ -6706,7 +6639,7 @@ int lfsck_namespace_setup(const struct lu_env *env,
        obj = local_index_find_or_create(env, lfsck->li_los, root,
                                         LFSCK_NAMESPACE,
                                         S_IFREG | S_IRUGO | S_IWUSR,
-                                        &dt_lfsck_features);
+                                        &dt_lfsck_namespace_features);
        if (IS_ERR(obj))
                GOTO(out, rc = PTR_ERR(obj));
 
@@ -6717,7 +6650,8 @@ int lfsck_namespace_setup(const struct lu_env *env,
        else if (rc < 0)
                rc = lfsck_namespace_reset(env, com, true);
        else
-               rc = lfsck_namespace_load_sub_trace_files(env, com, false);
+               rc = lfsck_load_sub_trace_files(env, com,
+                       &dt_lfsck_namespace_features, LFSCK_NAMESPACE, false);
        if (rc != 0)
                GOTO(out, rc);
 
index afbfefe..f5f338b 100644 (file)
@@ -578,8 +578,8 @@ EXPORT_SYMBOL(dt_directory_features);
 const struct dt_index_features dt_otable_features;
 EXPORT_SYMBOL(dt_otable_features);
 
-/* lfsck orphan */
-const struct dt_index_features dt_lfsck_orphan_features = {
+/* lfsck layout orphan */
+const struct dt_index_features dt_lfsck_layout_orphan_features = {
        .dif_flags              = 0,
        .dif_keysize_min        = sizeof(struct lu_fid),
        .dif_keysize_max        = sizeof(struct lu_fid),
@@ -587,10 +587,21 @@ const struct dt_index_features dt_lfsck_orphan_features = {
        .dif_recsize_max        = sizeof(struct lu_orphan_rec),
        .dif_ptrsize            = 4
 };
-EXPORT_SYMBOL(dt_lfsck_orphan_features);
+EXPORT_SYMBOL(dt_lfsck_layout_orphan_features);
 
-/* lfsck */
-const struct dt_index_features dt_lfsck_features = {
+/* lfsck layout dangling */
+const struct dt_index_features dt_lfsck_layout_dangling_features = {
+       .dif_flags              = DT_IND_UPDATE,
+       .dif_keysize_min        = sizeof(struct lu_fid),
+       .dif_keysize_max        = sizeof(struct lu_fid),
+       .dif_recsize_min        = sizeof(struct lu_fid),
+       .dif_recsize_max        = sizeof(struct lu_fid),
+       .dif_ptrsize            = 4
+};
+EXPORT_SYMBOL(dt_lfsck_layout_dangling_features);
+
+/* lfsck namespace */
+const struct dt_index_features dt_lfsck_namespace_features = {
        .dif_flags              = DT_IND_UPDATE,
        .dif_keysize_min        = sizeof(struct lu_fid),
        .dif_keysize_max        = sizeof(struct lu_fid),
@@ -598,7 +609,7 @@ const struct dt_index_features dt_lfsck_features = {
        .dif_recsize_max        = sizeof(__u8),
        .dif_ptrsize            = 4
 };
-EXPORT_SYMBOL(dt_lfsck_features);
+EXPORT_SYMBOL(dt_lfsck_namespace_features);
 
 /* accounting indexes */
 const struct dt_index_features dt_acct_features = {
@@ -664,7 +675,7 @@ static inline const struct dt_index_features *dt_index_feat_select(__u64 seq,
                        return ERR_PTR(-ENOENT);
                return &dt_quota_slv_features;
        } else if (seq == FID_SEQ_LAYOUT_RBTREE){
-               return &dt_lfsck_orphan_features;
+               return &dt_lfsck_layout_orphan_features;
        } else if (seq >= FID_SEQ_NORMAL) {
                /* object is part of the namespace, verify that it is a
                 * directory */
index 1796dfb..24d8e4d 100644 (file)
@@ -1512,10 +1512,11 @@ test_13() {
 }
 run_test 13 "LFSCK can repair crashed lmm_oi"
 
-test_14() {
+test_14a() {
        echo "#####"
        echo "The OST-object referenced by the MDT-object should be there;"
        echo "otherwise, the LFSCK should re-create the missing OST-object."
+       echo "without '--delay-create-ostobj' option."
        echo "#####"
 
        check_mount_and_prep
@@ -1586,7 +1587,74 @@ test_14() {
 
        stop_full_debug_logging
 }
-run_test 14 "LFSCK can repair MDT-object with dangling reference"
+run_test 14a "LFSCK can repair MDT-object with dangling LOV EA reference (1)"
+
+test_14b() {
+       echo "#####"
+       echo "The OST-object referenced by the MDT-object should be there;"
+       echo "otherwise, the LFSCK should re-create the missing OST-object."
+       echo "with '--delay-create-ostobj' option."
+       echo "#####"
+
+       check_mount_and_prep
+       $LFS setstripe -c 1 -i 0 $DIR/$tdir
+
+       echo "Inject failure stub to simulate dangling referenced MDT-object"
+       #define OBD_FAIL_LFSCK_DANGLING 0x1610
+       do_facet ost1 $LCTL set_param fail_loc=0x1610
+       local count=$(precreated_ost_obj_count 0 0)
+
+       createmany -o $DIR/$tdir/f $((count + 31))
+       touch $DIR/$tdir/guard
+       do_facet ost1 $LCTL set_param fail_loc=0
+
+       start_full_debug_logging
+
+       # exhaust other pre-created dangling cases
+       count=$(precreated_ost_obj_count 0 0)
+       createmany -o $DIR/$tdir/a $count ||
+               error "(0) Fail to create $count files."
+
+       echo "'ls' should fail because of dangling referenced MDT-object"
+       ls -ail $DIR/$tdir > /dev/null 2>&1 && error "(1) ls should fail."
+
+       echo "Trigger layout LFSCK to find out dangling reference"
+       $START_LAYOUT -r -o -d || error "(2) Fail to start LFSCK for layout!"
+
+       wait_all_targets_blocked layout completed 3
+
+       local repaired=$($SHOW_LAYOUT |
+                        awk '/^repaired_dangling/ { print $2 }')
+       [ $repaired -ge 32 ] ||
+               error "(4) Fail to repair dangling reference: $repaired"
+
+       echo "'stat' should fail because of not repair dangling by default"
+       stat $DIR/$tdir/guard > /dev/null 2>&1 && error "(5) stat should fail"
+
+       echo "Trigger layout LFSCK to repair dangling reference"
+       $START_LAYOUT -r -o -c -d || error "(6) Fail to start LFSCK for layout!"
+
+       wait_all_targets_blocked layout completed 7
+
+       # There may be some async LFSCK updates in processing, wait for
+       # a while until the target reparation has been done. LU-4970.
+
+       echo "'stat' should success after layout LFSCK repairing"
+       wait_update_facet client "stat $DIR/$tdir/guard |
+               awk '/Size/ { print \\\$2 }'" "0" 32 || {
+               stat $DIR/$tdir/guard
+               $SHOW_LAYOUT
+               error "(8) unexpected size"
+       }
+
+       repaired=$($SHOW_LAYOUT |
+                        awk '/^repaired_dangling/ { print $2 }')
+       [ $repaired -ge 32 ] ||
+               error "(9) Fail to repair dangling reference: $repaired"
+
+       stop_full_debug_logging
+}
+run_test 14b "LFSCK can repair MDT-object with dangling LOV EA reference (2)"
 
 test_15a() {
        echo "#####"
@@ -2148,11 +2216,9 @@ run_test 18c "Find out orphan OST-object and repair it (3)"
 
 test_18d() {
        echo "#####"
-       echo "The target MDT-object layout EA slot is occpuied by some new"
-       echo "created OST-object when repair dangling reference case. Such"
-       echo "conflict OST-object has never been modified. Then when found"
-       echo "the orphan OST-object, LFSCK will replace it with the orphan"
-       echo "OST-object."
+       echo "The target MDT-object layout EA is corrupted, but the right"
+       echo "OST-object is still alive as orphan. The layout LFSCK will"
+       echo "not create new OST-object to occupy such slot."
        echo "#####"
 
        check_mount_and_prep
@@ -2192,7 +2258,7 @@ test_18d() {
                error "(1) Expect incorrect file2 size"
 
        echo "Trigger layout LFSCK on all devices to find out orphan OST-object"
-       $START_LAYOUT -r -o -c || error "(2) Fail to start LFSCK for layout!"
+       $START_LAYOUT -r -o -c -d || error "(2) Fail to start LFSCK for layout!"
 
        for k in $(seq $MDSCOUNT); do
                # The LFSCK status query internal is 30 seconds. For the case
@@ -2218,10 +2284,16 @@ test_18d() {
        [ $repaired -eq 1 ] ||
                error "(5) Expect 1 orphan has been fixed, but got: $repaired"
 
+       repaired=$(do_facet $SINGLEMDS $LCTL get_param -n \
+                  mdd.$(facet_svc $SINGLEMDS).lfsck_layout |
+                  awk '/^repaired_dangling/ { print $2 }')
+       [ $repaired -eq 0 ] ||
+               error "(6) Expect 0 dangling has been fixed, but got: $repaired"
+
        echo "The file size should be correct after layout LFSCK scanning"
        cur_size=$(ls -il $DIR/$tdir/a1/f2 | awk '{ print $6 }')
        [ "$cur_size" == "$saved_size" ] ||
-               error "(6) Expect file2 size $saved_size, but got $cur_size"
+               error "(7) Expect file2 size $saved_size, but got $cur_size"
 
        echo "The LFSCK should find back the original data."
        cat $DIR/$tdir/a1/f2
index f81ad85..409638f 100644 (file)
@@ -399,6 +399,7 @@ command_t cmdlist[] = {
         "usage: lfsck_start [-M | --device [MDT,OST]_device]\n"
         "                   [-A | --all] [-c | --create-ostobj [on | off]]\n"
         "                   [-C | --create-mdtobj [on | off]]\n"
+        "                   [-d | --delay-create-ostobj [on | off]]\n"
         "                   [-e | --error {continue | abort}] [-h | --help]\n"
         "                   [-n | --dryrun [on | off]] [-o | --orphan]\n"
         "                   [-r | --reset] [-s | --speed speed_limit]\n"
index 3795a17..2c0f369 100644 (file)
@@ -55,6 +55,8 @@ static struct option long_opt_start[] = {
        {"create-ostobj",       optional_argument, 0, 'c'},
        {"create_mdtobj",       optional_argument, 0, 'C'},
        {"create-mdtobj",       optional_argument, 0, 'C'},
+       {"delay_create_ostobj", optional_argument, 0, 'd'},
+       {"delay-create-ostobj", optional_argument, 0, 'd'},
        {"error",               required_argument, 0, 'e'},
        {"help",                no_argument,       0, 'h'},
        {"dryrun",              optional_argument, 0, 'n'},
@@ -126,6 +128,7 @@ static void usage_start(void)
                "lfsck_start [-M | --device {MDT,OST}_device]\n"
                "            [-A | --all] [-c | --create_ostobj [on | off]]\n"
                "            [-C | --create_mdtobj [on | off]]\n"
+               "            [-d | --delay_create_ostobj [on | off]]\n"
                "            [-e | --error {continue | abort}] [-h | --help]\n"
                "            [-n | --dryrun [on | off]] [-o | --orphan]\n"
                "            [-r | --reset] [-s | --speed ops_per_sec_limit]\n"
@@ -139,6 +142,8 @@ static void usage_start(void)
                    "(default 'off', or 'on')\n"
                "-C: create the lost MDT-object for dangling name entry "
                    "(default 'off', or 'on')\n"
+               "-d: delay create the lost OST-object for dangling LOV EA "
+                   "until orphan OST-objects handled (default 'off', or 'on')\n"
                "-e: error handle mode (default 'continue', or 'abort')\n"
                "-h: this help message\n"
                "-n: check with no modification (default 'off', or 'on')\n"
@@ -269,7 +274,7 @@ int jt_lfsck_start(int argc, char **argv)
        char rawbuf[MAX_IOC_BUFLEN], *buf = rawbuf;
        char device[MAX_OBD_NAME];
        struct lfsck_start start;
-       char *optstring = "Ac::C::e:hM:n::ors:t:w:";
+       char *optstring = "Ac::C::d::e:hM:n::ors:t:w:";
        int opt, index, rc, val, i;
 
        memset(&data, 0, sizeof(data));
@@ -313,6 +318,19 @@ int jt_lfsck_start(int argc, char **argv)
                        }
                        start.ls_valid |= LSV_CREATE_MDTOBJ;
                        break;
+               case 'd':
+                       if (optarg == NULL || strcmp(optarg, "on") == 0) {
+                               start.ls_flags |= LPF_DELAY_CREATE_OSTOBJ;
+                       } else if (strcmp(optarg, "off") != 0) {
+                               fprintf(stderr, "invalid switch: -c '%s'. "
+                                       "valid switches are:\n"
+                                       "empty ('on'), or 'off' without space. "
+                                       "For example:\n"
+                                       "'-c', '-con', '-coff'\n", optarg);
+                               return -EINVAL;
+                       }
+                       start.ls_valid |= LSV_DELAY_CREATE_OSTOBJ;
+                       break;
                case 'e':
                        if (strcmp(optarg, "abort") == 0) {
                                start.ls_flags |= LPF_FAILOUT;