Whamcloud - gitweb
LU-3336 lfsck: regenerate lost layout EA 10/7810/21
authorFan Yong <fan.yong@intel.com>
Wed, 12 Feb 2014 19:32:41 +0000 (03:32 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 27 Feb 2014 15:27:57 +0000 (15:27 +0000)
To find out orphan OST-objects, the LFSCK on OST side maintains
two bitmaps in RAM for the OST-object accessed during the LFSCK.
After the first cycle system scanning, the LFSCK got the bitmap
for the known OST-objects, and got another bitmap for which OST
objects have been referenced by MDT-objects. Then the LFSCK can
know which OST-objects are not referenced by any MDT-object via
comparing the two bitmaps.

If the MDT-object exists, then check the layout EA. If related
layout EA is empty, then fill the layout EA slot with the given
OST-object stripe information. If the given OST-object stripe
index exceeds current layout EA size, then extend the layout EA
and fill the gap slot(s) as empty, which can be filled by others.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: Ie542289b397576be457b04792e845324d6926836
Reviewed-on: http://review.whamcloud.com/7810
Tested-by: Jenkins
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/obd_support.h
lustre/lfsck/lfsck_layout.c
lustre/lod/lod_object.c
lustre/osp/osp_object.c
lustre/tests/sanity-lfsck.sh

index 4d192ca..6f4cbc5 100644 (file)
@@ -506,6 +506,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LFSCK_UNMATCHED_PAIR2 0x1612
 #define OBD_FAIL_LFSCK_BAD_OWNER       0x1613
 #define OBD_FAIL_LFSCK_MULTIPLE_REF    0x1614
+#define OBD_FAIL_LFSCK_LOST_STRIPE     0x1615
 
 #define OBD_FAIL_LFSCK_NOTIFY_NET      0x16f0
 #define OBD_FAIL_LFSCK_QUERY_NET       0x16f1
index f77ee87..2fe033e 100644 (file)
@@ -652,6 +652,15 @@ out:
        }
 }
 
+static inline bool is_dummy_lov_ost_data(struct lov_ost_data_v1 *obj)
+{
+       if (fid_is_zero(&obj->l_ost_oi.oi_fid) &&
+           obj->l_ost_gen == 0 && obj->l_ost_idx == 0)
+               return true;
+
+       return false;
+}
+
 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
                                   const struct lfsck_layout *src)
 {
@@ -1655,17 +1664,510 @@ static int lfsck_layout_trans_stop(const struct lu_env *env,
        return rc;
 }
 
+/**
+ * \retval      +1: repaired
+ * \retval       0: did nothing
+ * \retval     -ve: on error
+ */
+static int lfsck_layout_refill_lovea(const struct lu_env *env,
+                                    struct thandle *handle,
+                                    struct dt_object *parent,
+                                    struct lu_fid *cfid,
+                                    struct lu_buf *buf,
+                                    struct lov_ost_data_v1 *slot,
+                                    int fl, __u32 ost_idx)
+{
+       struct ost_id   *oi     = &lfsck_env_info(env)->lti_oi;
+       int              rc;
+
+       fid_to_ostid(cfid, oi);
+       ostid_cpu_to_le(oi, &slot->l_ost_oi);
+       slot->l_ost_gen = cpu_to_le32(0);
+       slot->l_ost_idx = cpu_to_le32(ost_idx);
+       rc = dt_xattr_set(env, parent, buf, XATTR_NAME_LOV, fl, handle,
+                         BYPASS_CAPA);
+       if (rc == 0)
+               rc = 1;
+
+       return rc;
+}
+
+/**
+ * \retval      +1: repaired
+ * \retval       0: did nothing
+ * \retval     -ve: on error
+ */
+static int lfsck_layout_extend_lovea(const struct lu_env *env,
+                                    struct thandle *handle,
+                                    struct dt_object *parent,
+                                    struct lu_fid *cfid,
+                                    struct lu_buf *buf, int fl,
+                                    __u32 ost_idx, __u32 ea_off)
+{
+       struct lov_mds_md_v1    *lmm    = buf->lb_buf;
+       struct lov_ost_data_v1  *objs;
+       int                      rc;
+       ENTRY;
+
+       if (fl == LU_XATTR_CREATE) {
+               LASSERT(buf->lb_len == lov_mds_md_size(ea_off + 1,
+                                                      LOV_MAGIC_V1));
+
+               memset(lmm, 0, buf->lb_len);
+               lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
+               /* XXX: currently, we only support LOV_PATTERN_RAID0. */
+               lmm->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
+               fid_to_lmm_oi(lfsck_dto2fid(parent), &lmm->lmm_oi);
+               lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
+               /* XXX: We cannot know the stripe size,
+                *      then use the default value (1 MB). */
+               lmm->lmm_stripe_size = cpu_to_le32(1024 * 1024);
+               lmm->lmm_layout_gen = cpu_to_le16(0);
+               objs = &(lmm->lmm_objects[ea_off]);
+       } else {
+               __u16   count = le16_to_cpu(lmm->lmm_stripe_count);
+               int     gap   = ea_off - count;
+               __u32   magic = le32_to_cpu(lmm->lmm_magic);
+
+               /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3
+                * which has been verified in lfsck_layout_verify_header()
+                * already. If some new magic introduced in the future,
+                * then layout LFSCK needs to be updated also. */
+               if (magic == LOV_MAGIC_V1) {
+                       objs = &(lmm->lmm_objects[count]);
+               } else {
+                       LASSERT(magic == LOV_MAGIC_V3);
+                       objs = &((struct lov_mds_md_v3 *)lmm)->
+                                                       lmm_objects[count];
+               }
+
+               if (gap > 0)
+                       memset(objs, 0, gap * sizeof(*objs));
+               lmm->lmm_layout_gen =
+                           cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
+               objs += gap;
+
+               LASSERT(buf->lb_len == lov_mds_md_size(ea_off + 1, magic));
+       }
+
+       lmm->lmm_stripe_count = cpu_to_le16(ea_off + 1);
+       rc = lfsck_layout_refill_lovea(env, handle, parent, cfid, buf, objs,
+                                      fl, ost_idx);
+
+       RETURN(rc);
+}
+
+/**
+ * \retval      +1: repaired
+ * \retval       0: did nothing
+ * \retval     -ve: on error
+ */
+static int lfsck_layout_update_pfid(const struct lu_env *env,
+                                   struct lfsck_component *com,
+                                   struct dt_object *parent,
+                                   struct lu_fid *cfid,
+                                   struct dt_device *cdev, __u32 ea_off)
+{
+       struct filter_fid       *pfid   = &lfsck_env_info(env)->lti_new_pfid;
+       struct dt_object        *child;
+       struct thandle          *handle;
+       const struct lu_fid     *tfid   = lu_object_fid(&parent->do_lu);
+       struct lu_buf           *buf;
+       int                      rc     = 0;
+       ENTRY;
+
+       child = lfsck_object_find_by_dev(env, cdev, cfid);
+       if (IS_ERR(child))
+               RETURN(PTR_ERR(child));
+
+       handle = dt_trans_create(env, cdev);
+       if (IS_ERR(handle))
+               GOTO(out, rc = PTR_ERR(handle));
+
+       pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
+       pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
+       /* In fact, the ff_parent::f_ver is not the real parent FID::f_ver,
+        * instead, it is the OST-object index in its parent MDT-object
+        * layout EA. */
+       pfid->ff_parent.f_ver = cpu_to_le32(ea_off);
+       buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
+
+       rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start(env, cdev, handle);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
+                         BYPASS_CAPA);
+
+       GOTO(stop, rc = (rc == 0 ? 1 : rc));
+
+stop:
+       dt_trans_stop(env, cdev, handle);
+
+out:
+       lu_object_put(env, &child->do_lu);
+
+       return rc;
+}
+
+/**
+ * \retval      +1: repaired
+ * \retval       0: did nothing
+ * \retval     -ve: on error
+ */
+static int lfsck_layout_recreate_parent(const struct lu_env *env,
+                                       struct lfsck_component *com,
+                                       struct lfsck_tgt_desc *ltd,
+                                       struct lu_orphan_rec *rec,
+                                       struct lu_fid *cfid,
+                                       const char *prefix,
+                                       const char *postfix,
+                                       __u32 ea_off)
+{
+       /* XXX: To be extended in other patch. */
+       return 0;
+}
+
+/**
+ * \retval      +1: repaired
+ * \retval       0: did nothing
+ * \retval     -ve: on error
+ */
+static int lfsck_layout_conflict_create(const struct lu_env *env,
+                                       struct lfsck_component *com,
+                                       struct lfsck_tgt_desc *ltd,
+                                       struct lu_orphan_rec *rec,
+                                       struct dt_object *parent,
+                                       struct lu_fid *cfid,
+                                       struct lu_buf *ea_buf,
+                                       struct lov_ost_data_v1 *slot,
+                                       __u32 ea_off, __u32 ori_len)
+{
+       /* XXX: To be extended in other patch. */
+       return 0;
+}
+
+/**
+ * \retval      +1: repaired
+ * \retval       0: did nothing
+ * \retval     -ve: on error
+ */
+static int lfsck_layout_recreate_lovea(const struct lu_env *env,
+                                      struct lfsck_component *com,
+                                      struct lfsck_tgt_desc *ltd,
+                                      struct lu_orphan_rec *rec,
+                                      struct dt_object *parent,
+                                      struct lu_fid *cfid,
+                                      __u32 ost_idx, __u32 ea_off)
+{
+       struct lfsck_thread_info *info          = lfsck_env_info(env);
+       struct lu_buf            *buf           = &info->lti_big_buf;
+       struct lu_fid            *fid           = &info->lti_fid2;
+       struct ost_id            *oi            = &info->lti_oi;
+       struct lfsck_instance    *lfsck         = com->lc_lfsck;
+       struct dt_device         *dt            = lfsck->li_bottom;
+       struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
+       struct thandle            *handle       = NULL;
+       size_t                    buflen        = buf->lb_len;
+       struct lov_mds_md_v1     *lmm;
+       struct lov_ost_data_v1   *objs;
+       struct lustre_handle      lh            = { 0 };
+       __u32                     magic;
+       int                       fl            = 0;
+       int                       rc;
+       int                       rc1;
+       int                       i;
+       __u16                     count;
+       ENTRY;
+
+       CDEBUG(D_LFSCK, "Re-create the crashed layout EA: parent "
+              DFID", child "DFID", OST-index %u, stripe-index %u\n",
+              PFID(lfsck_dto2fid(parent)), PFID(cfid), ost_idx, ea_off);
+
+       rc = lfsck_layout_lock(env, com, parent, &lh,
+                              MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
+       if (rc != 0)
+               RETURN(rc);
+
+again:
+       if (!(bk->lb_param & LPF_DRYRUN)) {
+               handle = dt_trans_create(env, dt);
+               if (IS_ERR(handle))
+                       GOTO(unlock_layout, rc = PTR_ERR(handle));
+
+               rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
+                                         fl, handle);
+               if (rc != 0)
+                       GOTO(stop, rc);
+
+               rc = dt_trans_start_local(env, dt, handle);
+               if (rc != 0)
+                       GOTO(stop, rc);
+       }
+
+       dt_write_lock(env, parent, 0);
+       rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
+       if (rc == -ERANGE) {
+               rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV,
+                                 BYPASS_CAPA);
+               LASSERT(rc != 0);
+
+               dt_write_unlock(env, parent);
+               if (handle != NULL) {
+                       dt_trans_stop(env, dt, handle);
+                       handle = NULL;
+               }
+
+               if (rc < 0)
+                       GOTO(unlock_layout, rc);
+
+               lu_buf_realloc(buf, rc);
+               buflen = buf->lb_len;
+               if (buf->lb_buf == NULL)
+                       GOTO(unlock_layout, rc = -ENOMEM);
+
+               fl = LU_XATTR_REPLACE;
+               goto again;
+       } else if (rc == -ENODATA || rc == 0) {
+               fl = LU_XATTR_CREATE;
+       } else if (rc < 0) {
+               GOTO(unlock_parent, rc);
+       } else if (unlikely(buf->lb_len == 0)) {
+               dt_write_unlock(env, parent);
+               if (handle != NULL) {
+                       dt_trans_stop(env, dt, handle);
+                       handle = NULL;
+               }
+
+               lu_buf_alloc(buf, rc);
+               buflen = buf->lb_len;
+               if (buf->lb_buf == NULL)
+                       GOTO(unlock_layout, rc = -ENOMEM);
+
+               fl = LU_XATTR_REPLACE;
+               goto again;
+       } else {
+               fl = LU_XATTR_REPLACE;
+       }
+
+       if (fl == LU_XATTR_CREATE) {
+               if (bk->lb_param & LPF_DRYRUN)
+                       GOTO(unlock_parent, rc = 1);
+
+               rc = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
+               /* If the declared is not big enough, re-try. */
+               if (buf->lb_len < rc) {
+                       dt_write_unlock(env, parent);
+                       if (handle != NULL) {
+                               dt_trans_stop(env, dt, handle);
+                               handle = NULL;
+                       }
+
+                       lu_buf_realloc(buf, rc);
+                       buflen = buf->lb_len;
+                       if (buf->lb_buf == NULL)
+                               GOTO(unlock_layout, rc = -ENOMEM);
+
+                       goto again;
+               }
+
+               buf->lb_len = rc;
+               rc = lfsck_layout_extend_lovea(env, handle, parent, cfid, buf,
+                                              fl, ost_idx, ea_off);
+
+               GOTO(unlock_parent, rc);
+       }
+
+       lmm = buf->lb_buf;
+       rc1 = lfsck_layout_verify_header(lmm);
+       if (rc1 != 0)
+               GOTO(unlock_parent, rc = rc1);
+
+       /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
+        * been verified in lfsck_layout_verify_header() already. If some
+        * new magic introduced in the future, then layout LFSCK needs to
+        * be updated also. */
+       magic = le32_to_cpu(lmm->lmm_magic);
+       if (magic == LOV_MAGIC_V1) {
+               objs = &(lmm->lmm_objects[0]);
+       } else {
+               LASSERT(magic == LOV_MAGIC_V3);
+               objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
+       }
+
+       count = le16_to_cpu(lmm->lmm_stripe_count);
+       if (count == 0)
+               GOTO(unlock_parent, rc = -EINVAL);
+       LASSERT(count > 0);
+
+       /* Exceed the current end of MDT-object layout EA. Then extend it. */
+       if (count <= ea_off) {
+               if (bk->lb_param & LPF_DRYRUN)
+                       GOTO(unlock_parent, rc = 1);
+
+               rc = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
+               /* If the declared is not big enough, re-try. */
+               if (buf->lb_len < rc) {
+                       dt_write_unlock(env, parent);
+                       if (handle != NULL) {
+                               dt_trans_stop(env, dt, handle);
+                               handle = NULL;
+                       }
+
+                       lu_buf_realloc(buf, rc);
+                       buflen = buf->lb_len;
+                       if (buf->lb_buf == NULL)
+                               GOTO(unlock_layout, rc = -ENOMEM);
+
+                       goto again;
+               }
+
+               buf->lb_len = rc;
+               rc = lfsck_layout_extend_lovea(env, handle, parent, cfid, buf,
+                                              fl, ost_idx, ea_off);
+               GOTO(unlock_parent, rc);
+       }
+
+       LASSERTF(rc > 0, "invalid rc = %d\n", rc);
+
+       buf->lb_len = rc;
+       for (i = 0; i < count; i++, objs++) {
+               /* The MDT-object was created via lfsck_layout_recover_create()
+                * by others before, and we fill the dummy layout EA. */
+               if (is_dummy_lov_ost_data(objs)) {
+                       if (i != ea_off)
+                               continue;
+
+                       if (bk->lb_param & LPF_DRYRUN)
+                               GOTO(unlock_parent, rc = 1);
+
+                       lmm->lmm_layout_gen =
+                           cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
+                       rc = lfsck_layout_refill_lovea(env, handle, parent,
+                                                      cfid, buf, objs, fl,
+                                                      ost_idx);
+                       GOTO(unlock_parent, rc);
+               }
+
+               ostid_le_to_cpu(&objs->l_ost_oi, oi);
+               ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
+               /* It should be rare case, the slot is there, but the LFSCK
+                * does not handle it during the first-phase cycle scanning. */
+               if (unlikely(lu_fid_eq(fid, cfid))) {
+                       if (i == ea_off) {
+                               GOTO(unlock_parent, rc = 0);
+                       } else {
+                               /* Rare case that the OST-object index
+                                * does not match the parent MDT-object
+                                * layout EA. We trust the later one. */
+                               if (bk->lb_param & LPF_DRYRUN)
+                                       GOTO(unlock_parent, rc = 1);
+
+                               dt_write_unlock(env, parent);
+                               if (handle != NULL)
+                                       dt_trans_stop(env, dt, handle);
+                               lfsck_layout_unlock(&lh);
+                               buf->lb_len = buflen;
+                               rc = lfsck_layout_update_pfid(env, com, parent,
+                                                       cfid, ltd->ltd_tgt, i);
+
+                               RETURN(rc);
+                       }
+               }
+       }
+
+       /* The MDT-object exists, but related layout EA slot is occupied
+        * by others. */
+       if (bk->lb_param & LPF_DRYRUN)
+               GOTO(unlock_parent, rc = 1);
+
+       dt_write_unlock(env, parent);
+       if (handle != NULL)
+               dt_trans_stop(env, dt, handle);
+       lfsck_layout_unlock(&lh);
+       if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
+               objs = &(lmm->lmm_objects[ea_off]);
+       else
+               objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off];
+       rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid,
+                                         buf, objs, ea_off, buflen);
+
+       RETURN(rc);
+
+unlock_parent:
+       dt_write_unlock(env, parent);
+
+stop:
+       if (handle != NULL)
+               dt_trans_stop(env, dt, handle);
+
+unlock_layout:
+       lfsck_layout_unlock(&lh);
+       buf->lb_len = buflen;
+
+       return rc;
+}
+
 static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
                                        struct lfsck_component *com,
                                        struct lfsck_tgt_desc *ltd,
                                        struct lu_orphan_rec *rec,
                                        struct lu_fid *cfid)
 {
-       struct lfsck_layout             *lo     = com->lc_file_ram;
-       int                              rc     = 0;
+       struct lfsck_layout     *lo     = com->lc_file_ram;
+       struct lu_fid           *pfid   = &rec->lor_fid;
+       struct dt_object        *parent = NULL;
+       __u32                    ea_off = pfid->f_ver;
+       int                      rc     = 0;
+       ENTRY;
 
-       /* XXX: To be extended in other patch. */
+       if (!fid_is_sane(cfid))
+               GOTO(out, rc = -EINVAL);
 
+       if (fid_is_zero(pfid)) {
+               rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
+                                                 "N-", "", ea_off);
+               GOTO(out, rc);
+       }
+
+       pfid->f_ver = 0;
+       if (!fid_is_sane(pfid))
+               GOTO(out, rc = -EINVAL);
+
+       parent = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
+       if (IS_ERR(parent))
+               GOTO(out, rc = PTR_ERR(parent));
+
+       if (unlikely(dt_object_remote(parent) != 0))
+               GOTO(put, rc = -EXDEV);
+
+       if (dt_object_exists(parent) == 0) {
+               lu_object_put(env, &parent->do_lu);
+               rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
+                                                 "R-", "", ea_off);
+               GOTO(out, rc);
+       }
+
+       if (!S_ISREG(lu_object_attr(&parent->do_lu)))
+               GOTO(put, rc = -EISDIR);
+
+       rc = lfsck_layout_recreate_lovea(env, com, ltd, rec, parent, cfid,
+                                        ltd->ltd_index, ea_off);
+
+       GOTO(put, rc);
+
+put:
+       if (rc <= 0)
+               lu_object_put(env, &parent->do_lu);
+       else
+               /* The layout EA is changed, need to be reloaded next time. */
+               lu_object_put_nocache(env, &parent->do_lu);
+
+out:
        down_write(&com->lc_sem);
        com->lc_new_scanned++;
        com->lc_new_checked++;
@@ -2222,6 +2724,9 @@ static int lfsck_layout_check_parent(const struct lu_env *env,
                struct lu_fid           *tfid   = &info->lti_fid2;
                struct ost_id           *oi     = &info->lti_oi;
 
+               if (is_dummy_lov_ost_data(objs))
+                       continue;
+
                ostid_le_to_cpu(&objs->l_ost_oi, oi);
                ostid_to_fid(tfid, oi, le32_to_cpu(objs->l_ost_idx));
                if (lu_fid_eq(cfid, tfid)) {
@@ -3194,6 +3699,9 @@ static int lfsck_layout_scan_stripes(const struct lu_env *env,
                                        le32_to_cpu(objs->l_ost_idx);
                bool                     wakeup = false;
 
+               if (is_dummy_lov_ost_data(objs))
+                       continue;
+
                l_wait_event(mthread->t_ctl_waitq,
                             bk->lb_async_windows == 0 ||
                             llmd->llmd_prefetched < bk->lb_async_windows ||
@@ -4622,7 +5130,7 @@ static struct dt_it *lfsck_orphan_it_init(const struct lu_env *env,
                GOTO(out, rc = -ENODEV);
 
        if (dev->dd_record_fid_accessed) {
-               /* The first iteratino against the rbtree, scan the whole rbtree
+               /* The first iteration against the rbtree, scan the whole rbtree
                 * to remove the nodes which do NOT need to be handled. */
                write_lock(&llsd->llsd_rb_lock);
                if (dev->dd_record_fid_accessed) {
index 90f2da6..e4583d8 100644 (file)
@@ -359,6 +359,11 @@ static int lod_declare_attr_set(const struct lu_env *env,
                }
        }
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
+           dt_object_exists(next) != 0 &&
+           dt_object_remote(next) == 0)
+               dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
+
        RETURN(rc);
 }
 
@@ -431,6 +436,11 @@ static int lod_attr_set(const struct lu_env *env,
                }
        }
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
+           dt_object_exists(next) != 0 &&
+           dt_object_remote(next) == 0)
+               dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
+
        RETURN(rc);
 }
 
index fae592f..f705813 100644 (file)
@@ -1478,7 +1478,9 @@ static int osp_index_try(const struct lu_env *env,
                         struct dt_object *dt,
                         const struct dt_index_features *feat)
 {
-       if (fid_is_last_id(lu_object_fid(&dt->do_lu))) {
+       const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
+
+       if (fid_is_last_id(fid) && fid_is_idif(fid)) {
                dt->do_index_ops = &osp_orphan_index_ops;
 
                return 0;
index b59bf55..e75e49d 100644 (file)
@@ -43,7 +43,7 @@ check_and_setup_lustre
        ALWAYS_EXCEPT="$ALWAYS_EXCEPT 2c"
 
 [[ $(lustre_version_code ost1) -lt $(version_code 2.5.55) ]] &&
-       ALWAYS_EXCEPT="$ALWAYS_EXCEPT 11 12 13 14 15 16 17"
+       ALWAYS_EXCEPT="$ALWAYS_EXCEPT 11 12 13 14 15 16 17 18"
 
 build_test_filter
 
@@ -1568,6 +1568,113 @@ test_17() {
 }
 run_test 17 "LFSCK can repair multiple references"
 
+test_18a() {
+       [ $MDSCOUNT -lt 2 ] &&
+               skip "We need at least 2 MDSes for test_18a" && exit 0
+
+       [ $OSTCOUNT -lt 2 ] &&
+               skip "We need at least 2 OSTs for test_18a" && exit 0
+
+       echo "#####"
+       echo "The target MDT-object is there, but related stripe information"
+       echo "is lost or partly lost. The LFSCK should regenerate the missed"
+       echo "layout EA entries."
+       echo "#####"
+
+       echo "stopall"
+       stopall > /dev/null
+       echo "formatall"
+       formatall > /dev/null
+       echo "setupall"
+       setupall > /dev/null
+
+       mkdir -p $DIR/$tdir
+       $LFS mkdir -i 0 $DIR/$tdir/a1
+       $LFS mkdir -i 1 $DIR/$tdir/a2
+       $LFS setstripe -c 1 -i 0 -s 1M $DIR/$tdir/a1
+       $LFS setstripe -c 2 -i 1 -s 1M $DIR/$tdir/a2
+       dd if=/dev/zero of=$DIR/$tdir/a1/f1 bs=1M count=2
+       dd if=/dev/zero of=$DIR/$tdir/a2/f2 bs=1M count=2
+
+       local saved_size=$(ls -il $DIR/$tdir/a1/f1 | awk '{ print $6 }')
+
+       $LFS path2fid $DIR/$tdir/a1/f1
+       $LFS getstripe $DIR/$tdir/a1/f1
+       $LFS path2fid $DIR/$tdir/a2/f2
+       $LFS getstripe $DIR/$tdir/a2/f2
+       sync
+       cancel_lru_locks osc
+
+       echo "Inject failure, to make the MDT-object lost its layout EA"
+       #define OBD_FAIL_LFSCK_LOST_STRIPE 0x1615
+       do_facet mds1 $LCTL set_param fail_loc=0x1615
+       chown 1.1 $DIR/$tdir/a1/f1
+       do_facet mds2 $LCTL set_param fail_loc=0x1615
+       chown 1.1 $DIR/$tdir/a2/f2
+       sync
+       sleep 2
+       do_facet mds1 $LCTL set_param fail_loc=0
+       do_facet mds2 $LCTL set_param fail_loc=0
+
+       echo "stopall to cleanup object cache"
+       stopall > /dev/null
+       echo "setupall"
+       setupall > /dev/null
+
+       echo "The file size should be incorrect since layout EA is lost"
+       local cur_size=$(ls -il $DIR/$tdir/a1/f1 | awk '{ print $6 }')
+       [ "$cur_size" != "$saved_size" ] ||
+               error "(1) Expect incorrect file1 size"
+
+       cur_size=$(ls -il $DIR/$tdir/a2/f2 | awk '{ print $6 }')
+       [ "$cur_size" != "$saved_size" ] ||
+               error "(2) Expect incorrect file2 size"
+
+       echo "Trigger layout LFSCK on all devices to find out orphan OST-object"
+       $START_LAYOUT -o || error "(3) Fail to start LFSCK for layout!"
+
+       for k in $(seq $MDSCOUNT); do
+               # The LFSCK status query internal is 30 seconds. For the case
+               # of some LFSCK_NOTIFY RPCs failure/lost, we will wait enough
+               # time to guarantee the status sync up.
+               wait_update_facet mds${k} "$LCTL get_param -n \
+                       mdd.$(facet_svc mds${k}).lfsck_layout |
+                       awk '/^status/ { print \\\$2 }'" "completed" 32 ||
+                       error "(4) MDS${k} is not the expected 'completed'"
+       done
+
+       for k in $(seq $OSTCOUNT); do
+               local cur_status=$(do_facet ost${k} $LCTL get_param -n \
+                               obdfilter.$(facet_svc ost${k}).lfsck_layout |
+                               awk '/^status/ { print $2 }')
+               [ "$cur_status" == "completed" ] ||
+               error "(5) OST${k} Expect 'completed', but got '$cur_status'"
+       done
+
+       for k in 1 2; do
+               local repaired=$(do_facet mds${k} $LCTL get_param -n \
+                                mdd.$(facet_svc mds${k}).lfsck_layout |
+                                awk '/^repaired_orphan/ { print $2 }')
+               [ $repaired -eq ${k} ] ||
+               error "(6) Expect ${k} fixed on mds${k}, but got: $repaired"
+       done
+
+       $LFS path2fid $DIR/$tdir/a1/f1
+       $LFS getstripe $DIR/$tdir/a1/f1
+       $LFS path2fid $DIR/$tdir/a2/f2
+       $LFS getstripe $DIR/$tdir/a2/f2
+
+       echo "The file size should be correct after layout LFSCK scanning"
+       cur_size=$(ls -il $DIR/$tdir/a1/f1 | awk '{ print $6 }')
+       [ "$cur_size" == "$saved_size" ] ||
+               error "(7) Expect file1 size $saved_size, but got $cur_size"
+
+       cur_size=$(ls -il $DIR/$tdir/a2/f2 | awk '{ print $6 }')
+       [ "$cur_size" == "$saved_size" ] ||
+               error "(8) Expect file2 size $saved_size, but got $cur_size"
+}
+run_test 18a "Find out orphan OST-object and repair it (1)"
+
 $LCTL set_param debug=-lfsck > /dev/null || true
 
 # restore MDS/OST size