Whamcloud - gitweb
LU-5395 lfsck: misc patch to prevent lfsck hung 04/11304/12
authorFan Yong <fan.yong@intel.com>
Sun, 13 Jul 2014 12:08:23 +0000 (20:08 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 5 Sep 2014 00:23:44 +0000 (00:23 +0000)
1) When the LFSCK rebuilt the crashed LAST_ID files, it will
   notify the MDS to sync lastid information via disconnecting
   the connection. OFD should hold the export reference before
   disconnecting to allow to send RPC reply message.

2) When the layout LFSCK scans on the OST, it needs to handle
   the IDIF objects specially (use fid_idif_id() to get the
   OST object ID) to avoid to regard the LAST_ID file as
   corrupted by wrong.

3) The LFSCK should check the ostid_to_fid() return value for
   corrupted OSTID and/or index.

4) If the LAST_ID file is not crashed, then do not update the
   LAST_ID file.

5) NOT change the lu_buf::lb_len once the lu_buf::lb_buf is
   allocated to prevent accessing released or non-allocated
   RAM space by wrong.

6) Other small fixes and code cleanup.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I84726ddcf0b8fa6b334163fb13d9bae273033d20
Reviewed-on: http://review.whamcloud.com/11304
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre/lustre_idl.h
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c
lustre/lfsck/lfsck_namespace.c
lustre/ofd/ofd_dev.c

index 3507a37..016e665 100644 (file)
@@ -778,9 +778,9 @@ static inline int ostid_to_fid(struct lu_fid *fid, const struct ost_id *ostid,
                 * been in production for years.  This can handle create rates
                 * of 1M objects/s/OST for 9 years, or combinations thereof. */
                if (oid >= IDIF_MAX_OID) {
-                        CERROR("bad MDT0 id, "DOSTID" ost_idx:%u\n",
-                               POSTID(ostid), ost_idx);
-                        return -EBADF;
+                       CERROR("bad MDT0 id(1), "DOSTID" ost_idx:%u\n",
+                              POSTID(ostid), ost_idx);
+                       return -EBADF;
                }
                fid->f_seq = fid_idif_seq(oid, ost_idx);
                /* truncate to 32 bits by assignment */
@@ -794,7 +794,7 @@ static inline int ostid_to_fid(struct lu_fid *fid, const struct ost_id *ostid,
                 * OST objects into the FID namespace.  In both cases, we just
                 * pass the FID through, no conversion needed. */
                if (ostid->oi_fid.f_ver != 0) {
-                       CERROR("bad MDT0 id, "DOSTID" ost_idx:%u\n",
+                       CERROR("bad MDT0 id(2), "DOSTID" ost_idx:%u\n",
                                POSTID(ostid), ost_idx);
                        return -EBADF;
                }
@@ -929,7 +929,7 @@ static inline int lu_fid_cmp(const struct lu_fid *f0,
 static inline void ostid_cpu_to_le(const struct ost_id *src_oi,
                                   struct ost_id *dst_oi)
 {
-       if (fid_seq_is_mdt0(ostid_seq(src_oi))) {
+       if (fid_seq_is_mdt0(src_oi->oi.oi_seq)) {
                dst_oi->oi.oi_id = cpu_to_le64(src_oi->oi.oi_id);
                dst_oi->oi.oi_seq = cpu_to_le64(src_oi->oi.oi_seq);
        } else {
@@ -940,7 +940,7 @@ static inline void ostid_cpu_to_le(const struct ost_id *src_oi,
 static inline void ostid_le_to_cpu(const struct ost_id *src_oi,
                                   struct ost_id *dst_oi)
 {
-       if (fid_seq_is_mdt0(ostid_seq(src_oi))) {
+       if (fid_seq_is_mdt0(src_oi->oi.oi_seq)) {
                dst_oi->oi.oi_id = le64_to_cpu(src_oi->oi.oi_id);
                dst_oi->oi.oi_seq = le64_to_cpu(src_oi->oi.oi_seq);
        } else {
index b79941c..a4f3bf8 100644 (file)
@@ -663,6 +663,13 @@ lfsck_name_get_const(const struct lu_env *env, const void *area, ssize_t len)
        return lname;
 }
 
+static inline void
+lfsck_buf_init(struct lu_buf *buf, void *area, ssize_t len)
+{
+       buf->lb_buf = area;
+       buf->lb_len = len;
+}
+
 static inline struct lu_buf *
 lfsck_buf_get(const struct lu_env *env, void *area, ssize_t len)
 {
index 6222b59..10101ab 100644 (file)
@@ -302,8 +302,7 @@ static inline bool lfsck_layout_req_empty(struct lfsck_layout_master_data *llmd)
 }
 
 static int lfsck_layout_get_lovea(const struct lu_env *env,
-                                 struct dt_object *obj,
-                                 struct lu_buf *buf, ssize_t *buflen)
+                                 struct dt_object *obj, struct lu_buf *buf)
 {
        int rc;
 
@@ -316,9 +315,6 @@ again:
                        return rc;
 
                lu_buf_realloc(buf, rc);
-               if (buflen != NULL)
-                       *buflen = buf->lb_len;
-
                if (buf->lb_buf == NULL)
                        return -ENOMEM;
 
@@ -333,9 +329,6 @@ again:
 
        if (unlikely(buf->lb_buf == NULL)) {
                lu_buf_alloc(buf, rc);
-               if (buflen != NULL)
-                       *buflen = buf->lb_len;
-
                if (buf->lb_buf == NULL)
                        return -ENOMEM;
 
@@ -358,7 +351,7 @@ static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
                struct ost_id   oi;
                int             rc;
 
-               lmm_oi_cpu_to_le(&oi, &lmm->lmm_oi);
+               lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
                if ((magic & LOV_MAGIC_MASK) == LOV_MAGIC_MAGIC)
                        rc = -EOPNOTSUPP;
                else
@@ -376,7 +369,7 @@ static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
        if (lov_pattern(pattern) != LOV_PATTERN_RAID0) {
                struct ost_id oi;
 
-               lmm_oi_cpu_to_le(&oi, &lmm->lmm_oi);
+               lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
                CDEBUG(D_LFSCK, "Unsupported LOV EA pattern %u on "DOSTID"\n",
                       pattern, POSTID(&oi));
 
@@ -939,7 +932,7 @@ lfsck_layout_lastid_create(const struct lu_env *env,
                GOTO(stop, rc);
 
        dt_write_lock(env, obj, 0);
-       if (likely(!dt_object_exists(obj))) {
+       if (likely(dt_object_exists(obj) == 0)) {
                rc = dt_create(env, obj, la, NULL, dof, th);
                if (rc == 0)
                        rc = dt_record_write(env, obj,
@@ -990,6 +983,12 @@ lfsck_layout_lastid_reload(const struct lu_env *env,
                        lfsck->li_out_notify(env, lfsck->li_out_notify_data,
                                             LE_LASTID_REBUILDING);
                        lo->ll_flags |= LF_CRASHED_LASTID;
+
+                       CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
+                              "LAST_ID file (1) for the sequence "LPX64
+                              ", old value "LPU64", known value "LPU64"\n",
+                              lfsck_lfsck2name(lfsck), lls->lls_seq,
+                              lastid, lls->lls_lastid);
                }
        } else if (lastid >= lls->lls_lastid) {
                lls->lls_lastid = lastid;
@@ -1016,27 +1015,8 @@ lfsck_layout_lastid_store(const struct lu_env *env,
        list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
                loff_t pos = 0;
 
-               /* XXX: Add the code back if we really found related
-                *      inconsistent cases in the future. */
-#if 0
-               if (!lls->lls_dirty) {
-                       /* In OFD, before the pre-creation, the LAST_ID
-                        * file will be updated firstly, which may hide
-                        * some potential crashed cases. For example:
-                        *
-                        * The old obj1's ID is higher than old LAST_ID
-                        * but lower than the new LAST_ID, but the LFSCK
-                        * have not touch the obj1 until the OFD updated
-                        * the LAST_ID. So the LFSCK does not regard it
-                        * as crashed case. But when OFD does not create
-                        * successfully, it will set the LAST_ID as the
-                        * real created objects' ID, then LFSCK needs to
-                        * found related inconsistency. */
-                       rc = lfsck_layout_lastid_reload(env, com, lls);
-                       if (likely(!lls->lls_dirty))
-                               continue;
-               }
-#endif
+               if (!lls->lls_dirty)
+                       continue;
 
                CDEBUG(D_LFSCK, "%s: layout LFSCK will sync the LAST_ID for "
                       "<seq> "LPX64" as <oid> "LPU64"\n",
@@ -1110,7 +1090,7 @@ lfsck_layout_lastid_load(const struct lu_env *env,
                RETURN(PTR_ERR(obj));
 
        /* LAST_ID crashed, to be rebuilt */
-       if (!dt_object_exists(obj)) {
+       if (dt_object_exists(obj) == 0) {
                if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
                        LASSERT(lfsck->li_out_notify != NULL);
 
@@ -1118,6 +1098,10 @@ lfsck_layout_lastid_load(const struct lu_env *env,
                                             LE_LASTID_REBUILDING);
                        lo->ll_flags |= LF_CRASHED_LASTID;
 
+                       CDEBUG(D_LFSCK, "%s: layout LFSCK cannot find the "
+                              "LAST_ID file for sequence "LPX64"\n",
+                              lfsck_lfsck2name(lfsck), lls->lls_seq);
+
                        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
                            cfs_fail_val > 0) {
                                struct l_wait_info lwi = LWI_TIMEOUT(
@@ -1148,6 +1132,11 @@ lfsck_layout_lastid_load(const struct lu_env *env,
                        lfsck->li_out_notify(env, lfsck->li_out_notify_data,
                                             LE_LASTID_REBUILDING);
                        lo->ll_flags |= LF_CRASHED_LASTID;
+
+                       CDEBUG(D_LFSCK, "%s: layout LFSCK finds invalid "
+                              "LAST_ID file for the sequence "LPX64
+                              ": rc = %d\n",
+                              lfsck_lfsck2name(lfsck), lls->lls_seq, rc);
                }
 
                lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
@@ -1745,7 +1734,13 @@ static int lfsck_layout_refill_lovea(const struct lu_env *env,
 {
        struct ost_id           *oi     = &lfsck_env_info(env)->lti_oi;
        struct lov_mds_md_v1    *lmm    = buf->lb_buf;
+       struct lu_buf            ea_buf;
        int                      rc;
+       __u32                    magic;
+       __u16                    count;
+
+       magic = le32_to_cpu(lmm->lmm_magic);
+       count = le16_to_cpu(lmm->lmm_stripe_count);
 
        fid_to_ostid(cfid, oi);
        ostid_cpu_to_le(oi, &slot->l_ost_oi);
@@ -1755,10 +1750,8 @@ static int lfsck_layout_refill_lovea(const struct lu_env *env,
        if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE) {
                struct lov_ost_data_v1 *objs;
                int                     i;
-               __u16                   count;
 
-               count = le16_to_cpu(lmm->lmm_stripe_count);
-               if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
+               if (magic == LOV_MAGIC_V1)
                        objs = &lmm->lmm_objects[0];
                else
                        objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
@@ -1773,7 +1766,8 @@ static int lfsck_layout_refill_lovea(const struct lu_env *env,
                        lmm->lmm_pattern &= ~cpu_to_le32(LOV_PATTERN_F_HOLE);
        }
 
-       rc = dt_xattr_set(env, parent, buf, XATTR_NAME_LOV, fl, handle,
+       lfsck_buf_init(&ea_buf, lmm, lov_mds_md_size(count, magic));
+       rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV, fl, handle,
                          BYPASS_CAPA);
        if (rc == 0)
                rc = 1;
@@ -1805,7 +1799,7 @@ static int lfsck_layout_extend_lovea(const struct lu_env *env,
                __u32 pattern = LOV_PATTERN_RAID0;
 
                count = ea_off + 1;
-               LASSERT(buf->lb_len == lov_mds_md_size(count, LOV_MAGIC_V1));
+               LASSERT(buf->lb_len >= lov_mds_md_size(count, LOV_MAGIC_V1));
 
                if (ea_off != 0 || reset) {
                        pattern |= LOV_PATTERN_F_HOLE;
@@ -1838,7 +1832,7 @@ static int lfsck_layout_extend_lovea(const struct lu_env *env,
                gap = ea_off - count;
                if (gap >= 0)
                        count = ea_off + 1;
-               LASSERT(buf->lb_len == lov_mds_md_size(count, magic));
+               LASSERT(buf->lb_len >= lov_mds_md_size(count, magic));
 
                if (gap > 0) {
                        memset(objs, 0, gap * sizeof(*objs));
@@ -1988,13 +1982,14 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
        struct dt_object                *pobj   = NULL;
        struct dt_object                *cobj   = NULL;
        struct thandle                  *th     = NULL;
-       struct lu_buf                   *pbuf   = NULL;
+       struct lu_buf                    pbuf   = { 0 };
        struct lu_buf                   *ea_buf = &info->lti_big_buf;
+       struct lu_buf                    lov_buf;
        struct lustre_handle             lh     = { 0 };
        struct linkea_data               ldata  = { 0 };
        struct lu_buf                    linkea_buf;
        const struct lu_name            *pname;
-       int                              buflen = ea_buf->lb_len;
+       int                              size   = 0;
        int                              idx    = 0;
        int                              rc     = 0;
        ENTRY;
@@ -2019,7 +2014,7 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
                 * real parent MDT-object's FID::f_ver, instead it is the
                 * OST-object index in its parent MDT-object's layout EA. */
                ff->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
-               pbuf = lfsck_buf_get(env, ff, sizeof(struct filter_fid));
+               lfsck_buf_init(&pbuf, ff, sizeof(struct filter_fid));
                cobj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid);
                if (IS_ERR(cobj))
                        GOTO(log, rc = PTR_ERR(cobj));
@@ -2060,14 +2055,11 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
        memset(dof, 0, sizeof(*dof));
        dof->dof_type = dt_mode_to_dft(S_IFREG);
 
-       rc = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
-       if (buflen < rc) {
-               lu_buf_realloc(ea_buf, rc);
-               buflen = ea_buf->lb_len;
+       size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
+       if (ea_buf->lb_len < size) {
+               lu_buf_realloc(ea_buf, size);
                if (ea_buf->lb_buf == NULL)
                        GOTO(put, rc = -ENOMEM);
-       } else {
-               ea_buf->lb_len = rc;
        }
 
        /* Hold update lock on the .lustre/lost+found/MDTxxxx/.
@@ -2090,7 +2082,8 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
         * If other subsequent modifications failed, then next LFSCK scanning
         * will process the OST-object as orphan again with known parent FID. */
        if (cobj != NULL) {
-               rc = dt_declare_xattr_set(env, cobj, pbuf, XATTR_NAME_FID, 0, th);
+               rc = dt_declare_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID,
+                                         0, th);
                if (rc != 0)
                        GOTO(stop, rc);
        }
@@ -2101,7 +2094,8 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
                GOTO(stop, rc);
 
        /* 3a. Add layout EA for the MDT-object. */
-       rc = dt_declare_xattr_set(env, pobj, ea_buf, XATTR_NAME_LOV,
+       lfsck_buf_init(&lov_buf, ea_buf->lb_buf, size);
+       rc = dt_declare_xattr_set(env, pobj, &lov_buf, XATTR_NAME_LOV,
                                  LU_XATTR_CREATE, th);
        if (rc != 0)
                GOTO(stop, rc);
@@ -2116,8 +2110,8 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
                GOTO(stop, rc);
 
        /* 5a. insert linkEA for parent. */
-       linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
-       linkea_buf.lb_len = ldata.ld_leh->leh_len;
+       lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
+                      ldata.ld_leh->leh_len);
        rc = dt_declare_xattr_set(env, pobj, &linkea_buf,
                                  XATTR_NAME_LINK, 0, th);
        if (rc != 0)
@@ -2129,7 +2123,7 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
 
        /* 1b. Update OST-object's parent information remotely. */
        if (cobj != NULL) {
-               rc = dt_xattr_set(env, cobj, pbuf, XATTR_NAME_FID, 0, th,
+               rc = dt_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID, 0, th,
                                  BYPASS_CAPA);
                if (rc != 0)
                        GOTO(stop, rc);
@@ -2141,7 +2135,7 @@ static int lfsck_layout_recreate_parent(const struct lu_env *env,
        if (rc == 0)
                /* 3b. Add layout EA for the MDT-object. */
                rc = lfsck_layout_extend_lovea(env, lfsck, th, pobj, cfid,
-                                              ea_buf, LU_XATTR_CREATE,
+                                              &lov_buf, LU_XATTR_CREATE,
                                               ltd->ltd_index, ea_off, false);
        dt_write_unlock(env, pobj);
        if (rc < 0)
@@ -2170,7 +2164,6 @@ put:
                lu_object_put(env, &cobj->do_lu);
        if (pobj != NULL && !IS_ERR(pobj))
                lu_object_put(env, &pobj->do_lu);
-       ea_buf->lb_len = buflen;
 
 log:
        if (rc < 0)
@@ -2261,7 +2254,8 @@ static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
                RETURN(PTR_ERR(obj));
 
        dt_read_lock(env, obj, 0);
-       if (dt_object_exists(obj) == 0) {
+       if (dt_object_exists(obj) == 0 ||
+           lfsck_is_dead_obj(obj)) {
                dt_read_unlock(env, obj);
 
                GOTO(put, rc = -ENOENT);
@@ -2362,7 +2356,7 @@ static int lfsck_layout_conflict_create(const struct lu_env *env,
                                        struct lu_fid *cfid,
                                        struct lu_buf *ea_buf,
                                        struct lov_ost_data_v1 *slot,
-                                       __u32 ea_off, __u32 ori_len)
+                                       __u32 ea_off)
 {
        struct lfsck_thread_info *info          = lfsck_env_info(env);
        struct lu_fid            *cfid2         = &info->lti_fid2;
@@ -2377,7 +2371,9 @@ static int lfsck_layout_conflict_create(const struct lu_env *env,
        ENTRY;
 
        ostid_le_to_cpu(&slot->l_ost_oi, oi);
-       ostid_to_fid(cfid2, oi, ost_idx2);
+       rc = ostid_to_fid(cfid2, oi, ost_idx2);
+       if (rc != 0)
+               GOTO(out, rc);
 
        /* Hold layout lock on the parent to prevent others to access. */
        rc = lfsck_layout_lock(env, com, parent, &lh,
@@ -2394,7 +2390,6 @@ static int lfsck_layout_conflict_create(const struct lu_env *env,
        if (rc == -ETXTBSY) {
                /* No need the layout lock on the original parent. */
                lfsck_layout_unlock(&lh);
-               ea_buf->lb_len = ori_len;
 
                fid_zero(&rec->lor_fid);
                snprintf(infix, LFSCK_TMPBUF_LEN, "-"DFID"-%x",
@@ -2436,8 +2431,6 @@ unlock:
        lfsck_layout_unlock(&lh);
 
 out:
-       ea_buf->lb_len = ori_len;
-
        CDEBUG(D_LFSCK, "%s: layout LFSCK assistant replaced the conflict "
               "OST-object "DFID" on the OST %x with the orphan "DFID" on "
               "the OST %x: parent "DFID", stripe-index %u: rc = %d\n",
@@ -2469,7 +2462,6 @@ static int lfsck_layout_recreate_lovea(const struct lu_env *env,
        struct dt_device         *dt            = lfsck->li_bottom;
        struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
        struct thandle            *handle       = NULL;
-       size_t                    buflen        = buf->lb_len;
        size_t                    lovea_size;
        struct lov_mds_md_v1     *lmm;
        struct lov_ost_data_v1   *objs;
@@ -2512,7 +2504,6 @@ again:
        lovea_size = rc;
        if (buf->lb_len < lovea_size) {
                lu_buf_realloc(buf, lovea_size);
-               buflen = buf->lb_len;
                if (buf->lb_buf == NULL)
                        GOTO(unlock_layout, rc = -ENOMEM);
        }
@@ -2563,7 +2554,6 @@ again:
 
                LASSERT(buf->lb_len >= lovea_size);
 
-               buf->lb_len = lovea_size;
                rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
                                               buf, fl, ost_idx, ea_off, false);
 
@@ -2580,8 +2570,6 @@ again:
 
                LASSERT(buf->lb_len >= lovea_size);
 
-               buf->lb_len = lovea_size;
-               memset(lmm, 0, buf->lb_len);
                rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
                                               buf, fl, ost_idx, ea_off, true);
 
@@ -2620,7 +2608,7 @@ again:
                        rc = lovea_size;
                        goto again;
                }
-               buf->lb_len = lovea_size;
+
                rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
                                               buf, fl, ost_idx, ea_off, false);
 
@@ -2629,7 +2617,6 @@ again:
 
        LASSERTF(rc > 0, "invalid rc = %d\n", rc);
 
-       buf->lb_len = lovea_size;
        for (i = 0; i < count; i++, objs++) {
                /* The MDT-object was created via lfsck_layout_recover_create()
                 * by others before, and we fill the dummy layout EA. */
@@ -2656,7 +2643,17 @@ again:
                }
 
                ostid_le_to_cpu(&objs->l_ost_oi, oi);
-               ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
+               rc = ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
+               if (rc != 0) {
+                       CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
+                              "invalid layout EA at the slot %d, index %u\n",
+                              lfsck_lfsck2name(lfsck),
+                              PFID(lfsck_dto2fid(parent)), i,
+                              le32_to_cpu(objs->l_ost_idx));
+
+                       GOTO(unlock_parent, rc);
+               }
+
                /* It should be rare case, the slot is there, but the LFSCK
                 * does not handle it during the first-phase cycle scanning. */
                if (unlikely(lu_fid_eq(fid, cfid))) {
@@ -2673,7 +2670,6 @@ again:
                                if (handle != NULL)
                                        dt_trans_stop(env, dt, handle);
                                lfsck_layout_unlock(&lh);
-                               buf->lb_len = buflen;
                                rc = lfsck_layout_update_pfid(env, com, parent,
                                                        cfid, ltd->ltd_tgt, i);
 
@@ -2704,7 +2700,7 @@ again:
        else
                objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off];
        rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid,
-                                         buf, objs, ea_off, buflen);
+                                         buf, objs, ea_off);
 
        RETURN(rc);
 
@@ -2718,7 +2714,6 @@ stop:
 
 unlock_layout:
        lfsck_layout_unlock(&lh);
-       buf->lb_len = buflen;
 
        return rc;
 }
@@ -2815,7 +2810,10 @@ static int lfsck_layout_scan_orphan(const struct lu_env *env,
 
        ostid_set_seq(oi, FID_SEQ_IDIF);
        ostid_set_id(oi, 0);
-       ostid_to_fid(fid, oi, ltd->ltd_index);
+       rc = ostid_to_fid(fid, oi, ltd->ltd_index);
+       if (rc != 0)
+               GOTO(log, rc);
+
        obj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, fid);
        if (unlikely(IS_ERR(obj)))
                GOTO(log, rc = PTR_ERR(obj));
@@ -3121,6 +3119,7 @@ static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
        struct lov_mds_md_v1            *lmm;
        struct lov_ost_data_v1          *objs;
        struct lustre_handle             lh     = { 0 };
+       struct lu_buf                    ea_buf;
        __u32                            magic;
        int                              rc;
        ENTRY;
@@ -3195,7 +3194,10 @@ static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
        ostid_cpu_to_le(oi, &objs[llr->llr_lov_idx].l_ost_oi);
        objs[llr->llr_lov_idx].l_ost_gen = cpu_to_le32(0);
        objs[llr->llr_lov_idx].l_ost_idx = cpu_to_le32(llr->llr_ost_idx);
-       rc = dt_xattr_set(env, parent, buf, XATTR_NAME_LOV,
+       lfsck_buf_init(&ea_buf, lmm,
+                      lov_mds_md_size(le16_to_cpu(lmm->lmm_stripe_count),
+                                      magic));
+       rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV,
                          LU_XATTR_REPLACE, handle, BYPASS_CAPA);
 
        GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
@@ -3342,15 +3344,20 @@ static int lfsck_layout_check_parent(const struct lu_env *env,
        if (IS_ERR(tobj))
                RETURN(PTR_ERR(tobj));
 
-       if (!dt_object_exists(tobj))
+       dt_read_lock(env, tobj, 0);
+       if (dt_object_exists(tobj) == 0 ||
+           lfsck_is_dead_obj(tobj))
+               GOTO(out, rc = LLIT_UNMATCHED_PAIR);
+
+       if (!S_ISREG(lfsck_object_type(tobj)))
                GOTO(out, rc = LLIT_UNMATCHED_PAIR);
 
        /* Load the tobj's layout EA, in spite of it is a local MDT-object or
         * remote one on another MDT. Then check whether the given OST-object
         * is in such layout. If yes, it is multiple referenced, otherwise it
         * is unmatched referenced case. */
-       rc = lfsck_layout_get_lovea(env, tobj, buf, NULL);
-       if (rc == 0)
+       rc = lfsck_layout_get_lovea(env, tobj, buf);
+       if (rc == 0 || rc == -ENOENT)
                GOTO(out, rc = LLIT_UNMATCHED_PAIR);
 
        if (rc < 0)
@@ -3369,12 +3376,23 @@ static int lfsck_layout_check_parent(const struct lu_env *env,
        for (i = 0; i < count; i++, objs++) {
                struct lu_fid           *tfid   = &info->lti_fid2;
                struct ost_id           *oi     = &info->lti_oi;
+               __u32                    idx2;
 
                if (lovea_slot_is_dummy(objs))
                        continue;
 
                ostid_le_to_cpu(&objs->l_ost_oi, oi);
-               ostid_to_fid(tfid, oi, le32_to_cpu(objs->l_ost_idx));
+               idx2 = le32_to_cpu(objs->l_ost_idx);
+               rc = ostid_to_fid(tfid, oi, idx2);
+               if (rc != 0) {
+                       CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
+                              "invalid layout EA at the slot %d, index %u\n",
+                              lfsck_lfsck2name(com->lc_lfsck),
+                              PFID(pfid), i, idx2);
+
+                       GOTO(out, rc = LLIT_UNMATCHED_PAIR);
+               }
+
                if (lu_fid_eq(cfid, tfid)) {
                        *lov_ea = *buf;
 
@@ -3385,6 +3403,7 @@ static int lfsck_layout_check_parent(const struct lu_env *env,
        GOTO(out, rc = LLIT_UNMATCHED_PAIR);
 
 out:
+       dt_read_unlock(env, tobj);
        lfsck_object_put(env, tobj);
 
        return rc;
@@ -3398,7 +3417,7 @@ static int lfsck_layout_assistant_handle_one(const struct lu_env *env,
        struct lfsck_thread_info             *info   = lfsck_env_info(env);
        struct filter_fid_old                *pea    = &info->lti_old_pfid;
        struct lu_fid                        *pfid   = &info->lti_fid;
-       struct lu_buf                        *buf    = NULL;
+       struct lu_buf                         buf    = { 0 };
        struct dt_object                     *parent = llr->llr_parent->llo_obj;
        struct dt_object                     *child  = llr->llr_child;
        struct lu_attr                       *pla    = &info->lti_la;
@@ -3430,8 +3449,8 @@ static int lfsck_layout_assistant_handle_one(const struct lu_env *env,
        if (rc != 0)
                GOTO(out, rc);
 
-       buf = lfsck_buf_get(env, pea, sizeof(struct filter_fid_old));
-       rc= dt_xattr_get(env, child, buf, XATTR_NAME_FID, BYPASS_CAPA);
+       lfsck_buf_init(&buf, pea, sizeof(struct filter_fid_old));
+       rc = dt_xattr_get(env, child, &buf, XATTR_NAME_FID, BYPASS_CAPA);
        if (unlikely(rc >= 0 && rc != sizeof(struct filter_fid_old) &&
                     rc != sizeof(struct filter_fid))) {
                type = LLIT_UNMATCHED_PAIR;
@@ -3454,7 +3473,7 @@ static int lfsck_layout_assistant_handle_one(const struct lu_env *env,
 
        rc = lfsck_layout_check_parent(env, com, parent, pfid,
                                       lu_object_fid(&child->do_lu),
-                                      pla, cla, llr, buf, idx);
+                                      pla, cla, llr, &buf, idx);
        if (rc > 0) {
                type = rc;
                goto repair;
@@ -3486,7 +3505,7 @@ repair:
                break;
        case LLIT_MULTIPLE_REFERENCED:
                rc = lfsck_layout_repair_multiple_references(env, com, llr,
-                                                            pla, buf);
+                                                            pla, &buf);
                break;
        case LLIT_INCONSISTENT_OWNER:
                rc = lfsck_layout_repair_owner(env, com, llr, pla);
@@ -4111,10 +4130,14 @@ static int lfsck_layout_master_check_pairs(const struct lu_env *env,
                RETURN(PTR_ERR(obj));
 
        dt_read_lock(env, obj, 0);
-       if (unlikely(!dt_object_exists(obj)))
+       if (unlikely(dt_object_exists(obj) == 0 ||
+                    lfsck_is_dead_obj(obj)))
                GOTO(unlock, rc = -ENOENT);
 
-       rc = lfsck_layout_get_lovea(env, obj, buf, NULL);
+       if (!S_ISREG(lfsck_object_type(obj)))
+               GOTO(unlock, rc = -ENODATA);
+
+       rc = lfsck_layout_get_lovea(env, obj, buf);
        if (rc < 0)
                GOTO(unlock, rc);
 
@@ -4256,7 +4279,8 @@ static int lfsck_layout_slave_repair_pfid(const struct lu_env *env,
        fid_cpu_to_le(&ff->ff_parent, &lr->lr_fid2);
        buf = lfsck_buf_get(env, ff, sizeof(*ff));
        dt_write_lock(env, obj, 0);
-       if (unlikely(!dt_object_exists(obj)))
+       if (unlikely(dt_object_exists(obj) == 0 ||
+                    lfsck_is_dead_obj(obj)))
                GOTO(unlock, rc = 0);
 
        th = dt_trans_create(env, dev);
@@ -4594,7 +4618,7 @@ static int lfsck_layout_scan_stripes(const struct lu_env *env,
        struct ptlrpc_thread            *mthread = &lfsck->li_thread;
        struct ptlrpc_thread            *athread = &llmd->llmd_thread;
                struct l_wait_info       lwi     = { 0 };
-       struct lu_buf                   *buf;
+       struct lu_buf                    buf;
        int                              rc      = 0;
        int                              i;
        __u32                            magic;
@@ -4602,8 +4626,8 @@ static int lfsck_layout_scan_stripes(const struct lu_env *env,
        __u16                            gen;
        ENTRY;
 
-       buf = lfsck_buf_get(env, &info->lti_old_pfid,
-                           sizeof(struct filter_fid_old));
+       lfsck_buf_init(&buf, &info->lti_old_pfid,
+                      sizeof(struct filter_fid_old));
        count = le16_to_cpu(lmm->lmm_stripe_count);
        gen = le16_to_cpu(lmm->lmm_layout_gen);
        /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
@@ -4624,8 +4648,7 @@ static int lfsck_layout_scan_stripes(const struct lu_env *env,
                struct lfsck_layout_req *llr;
                struct lfsck_tgt_desc   *tgt    = NULL;
                struct dt_object        *cobj   = NULL;
-               __u32                    index  =
-                                       le32_to_cpu(objs->l_ost_idx);
+               __u32                    index;
                bool                     wakeup = false;
 
                if (unlikely(lovea_slot_is_dummy(objs)))
@@ -4643,7 +4666,15 @@ static int lfsck_layout_scan_stripes(const struct lu_env *env,
                        GOTO(out, rc = 0);
 
                ostid_le_to_cpu(&objs->l_ost_oi, oi);
-               ostid_to_fid(fid, oi, index);
+               index = le32_to_cpu(objs->l_ost_idx);
+               rc = ostid_to_fid(fid, oi, index);
+               if (rc != 0) {
+                       CDEBUG(D_LFSCK, "%s: get invalid layout EA for "DFID
+                              ": "DOSTID", idx:%u\n", lfsck_lfsck2name(lfsck),
+                              PFID(lfsck_dto2fid(parent)), POSTID(oi), index);
+                       goto next;
+               }
+
                tgt = lfsck_tgt_get(ltds, index);
                if (unlikely(tgt == NULL)) {
                        CDEBUG(D_LFSCK, "%s: cannot talk with OST %x which "
@@ -4663,7 +4694,7 @@ static int lfsck_layout_scan_stripes(const struct lu_env *env,
                if (rc != 0)
                        goto next;
 
-               rc = dt_declare_xattr_get(env, cobj, buf, XATTR_NAME_FID,
+               rc = dt_declare_xattr_get(env, cobj, &buf, XATTR_NAME_FID,
                                          BYPASS_CAPA);
                if (rc != 0)
                        goto next;
@@ -4749,8 +4780,9 @@ static int lfsck_layout_master_exec_oit(const struct lu_env *env,
        struct lov_mds_md_v1            *lmm    = NULL;
        struct dt_device                *dev    = lfsck->li_bottom;
        struct lustre_handle             lh     = { 0 };
-       ssize_t                          buflen = buf->lb_len;
+       struct lu_buf                    ea_buf = { 0 };
        int                              rc     = 0;
+       int                              size   = 0;
        bool                             locked = false;
        bool                             stripe = false;
        bool                             bad_oi = false;
@@ -4768,11 +4800,15 @@ static int lfsck_layout_master_exec_oit(const struct lu_env *env,
        locked = true;
 
 again:
-       rc = lfsck_layout_get_lovea(env, obj, buf, &buflen);
+       if (dt_object_exists(obj) == 0 ||
+           lfsck_is_dead_obj(obj))
+               GOTO(out, rc = 0);
+
+       rc = lfsck_layout_get_lovea(env, obj, buf);
        if (rc <= 0)
                GOTO(out, rc);
 
-       buf->lb_len = rc;
+       size = rc;
        lmm = buf->lb_buf;
        rc = lfsck_layout_verify_header(lmm);
        /* If the LOV EA crashed, then it is possible to be rebuilt later
@@ -4785,6 +4821,7 @@ again:
 
        /* Inconsistent lmm_oi, should be repaired. */
        bad_oi = true;
+       lmm->lmm_oi = *oi;
 
        if (bk->lb_param & LPF_DRYRUN) {
                down_write(&com->lc_sem);
@@ -4797,7 +4834,6 @@ again:
        if (!lustre_handle_is_used(&lh)) {
                dt_read_unlock(env, obj);
                locked = false;
-               buf->lb_len = buflen;
                rc = lfsck_layout_lock(env, com, obj, &lh,
                                       MDS_INODELOCK_LAYOUT |
                                       MDS_INODELOCK_XATTR);
@@ -4808,7 +4844,8 @@ again:
                if (IS_ERR(handle))
                        GOTO(out, rc = PTR_ERR(handle));
 
-               rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
+               lfsck_buf_init(&ea_buf, lmm, size);
+               rc = dt_declare_xattr_set(env, obj, &ea_buf, XATTR_NAME_LOV,
                                          LU_XATTR_REPLACE, handle);
                if (rc != 0)
                        GOTO(out, rc);
@@ -4823,8 +4860,7 @@ again:
                goto again;
        }
 
-       lmm->lmm_oi = *oi;
-       rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LOV,
+       rc = dt_xattr_set(env, obj, &ea_buf, XATTR_NAME_LOV,
                          LU_XATTR_REPLACE, handle, BYPASS_CAPA);
        if (rc != 0)
                GOTO(out, rc);
@@ -4863,7 +4899,6 @@ out:
                        lfsck_layout_record_failure(env, lfsck, lo);
                up_write(&com->lc_sem);
        }
-       buf->lb_len = buflen;
 
        return rc;
 }
@@ -4931,7 +4966,11 @@ static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
        if (unlikely(fid_is_last_id(fid)))
                GOTO(unlock, rc = 0);
 
-       oid = fid_oid(fid);
+       if (fid_is_idif(fid))
+               oid = fid_idif_id(fid_seq(fid), fid_oid(fid), fid_ver(fid));
+       else
+               oid = fid_oid(fid);
+
        if (oid > lls->lls_lastid_known)
                lls->lls_lastid_known = oid;
 
@@ -4939,12 +4978,17 @@ static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
                if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
                        /* OFD may create new objects during LFSCK scanning. */
                        rc = lfsck_layout_lastid_reload(env, com, lls);
-                       if (unlikely(rc != 0))
+                       if (unlikely(rc != 0)) {
                                CDEBUG(D_LFSCK, "%s: layout LFSCK failed to "
                                      "reload LAST_ID for "LPX64": rc = %d\n",
                                      lfsck_lfsck2name(com->lc_lfsck),
                                      lls->lls_seq, rc);
-                       if (oid <= lls->lls_lastid)
+
+                               GOTO(unlock, rc);
+                       }
+
+                       if (oid <= lls->lls_lastid ||
+                           lo->ll_flags & LF_CRASHED_LASTID)
                                GOTO(unlock, rc = 0);
 
                        LASSERT(lfsck->li_out_notify != NULL);
@@ -4952,6 +4996,12 @@ static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
                        lfsck->li_out_notify(env, lfsck->li_out_notify_data,
                                             LE_LASTID_REBUILDING);
                        lo->ll_flags |= LF_CRASHED_LASTID;
+
+                       CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
+                              "LAST_ID file (2) for the sequence "LPX64
+                              ", old value "LPU64", known value "LPU64"\n",
+                              lfsck_lfsck2name(lfsck), lls->lls_seq,
+                              lls->lls_lastid, oid);
                }
 
                lls->lls_lastid = oid;
@@ -5072,6 +5122,10 @@ static int lfsck_layout_slave_post(const struct lu_env *env,
                if (lo->ll_flags & LF_CRASHED_LASTID) {
                        done = true;
                        lo->ll_flags &= ~LF_CRASHED_LASTID;
+
+                       CDEBUG(D_LFSCK, "%s: layout LFSCK has rebuilt "
+                              "crashed LAST_ID files successfully\n",
+                              lfsck_lfsck2name(lfsck));
                }
                lo->ll_flags &= ~LF_UPGRADE;
                list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
@@ -6308,7 +6362,8 @@ again1:
        }
 
        dt_read_lock(env, obj, 0);
-       if (!dt_object_exists(obj)) {
+       if (dt_object_exists(obj) == 0 ||
+           lfsck_is_dead_obj(obj)) {
                dt_read_unlock(env, obj);
                lfsck_object_put(env, obj);
                pos++;
index 9f4047d..145a84c 100644 (file)
@@ -402,8 +402,8 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
                GOTO(stop, rc);
 
        /* 3a. insert linkEA for child */
-       linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
-       linkea_buf.lb_len = ldata.ld_leh->leh_len;
+       lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
+                      ldata.ld_leh->leh_len);
        rc = dt_declare_xattr_set(env, child, &linkea_buf,
                                  XATTR_NAME_LINK, 0, th);
        if (rc != 0)
@@ -569,8 +569,8 @@ static int lfsck_create_lpf_remote(const struct lu_env *env,
                GOTO(stop, rc);
 
        /* 3a. insert linkEA for child */
-       linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
-       linkea_buf.lb_len = ldata.ld_leh->leh_len;
+       lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
+                      ldata.ld_leh->leh_len);
        rc = dt_declare_xattr_set(env, child, &linkea_buf,
                                  XATTR_NAME_LINK, 0, th);
        if (rc != 0)
index da21ef3..92ae407 100644 (file)
@@ -1650,8 +1650,8 @@ int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
        if (rc != 0)
                RETURN(rc);
 
-       linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
-       linkea_buf.lb_len = ldata.ld_leh->leh_len;
+       lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
+                      ldata.ld_leh->leh_len);
        th = dt_trans_create(env, dev);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
index b854cc7..9511275 100644 (file)
@@ -426,6 +426,8 @@ static int ofd_lfsck_out_notify(const struct lu_env *env, void *data,
                ofd->ofd_lastid_rebuilding = 0;
                ofd->ofd_lastid_gen++;
                up_write(&ofd->ofd_lastid_rwsem);
+               CWARN("%s: Rebuilt crashed LAST_ID files successfully.\n",
+                     obd->obd_name);
                break;
        }
        default:
@@ -1331,7 +1333,8 @@ static int ofd_create_hdl(struct tgt_session_info *tsi)
        } else {
                if (unlikely(exp->exp_filter_data.fed_lastid_gen !=
                             ofd->ofd_lastid_gen)) {
-                       ofd_obd_disconnect(exp);
+                       /* Keep the export ref so we can send the reply. */
+                       ofd_obd_disconnect(class_export_get(exp));
                        GOTO(out_nolock, rc = -ENOTCONN);
                }