Whamcloud - gitweb
LU-16203 llog: skip bad records in llog 76/48776/6
authorMikhail Pershin <mpershin@whamcloud.com>
Mon, 3 Oct 2022 15:35:25 +0000 (18:35 +0300)
committerOleg Drokin <green@whamcloud.com>
Tue, 8 Nov 2022 08:50:37 +0000 (08:50 +0000)
This patch is further development of idea to skip bad
(corrupted) llogs data. If llog has fixed-size records
then it is possible to skip one record but not rest of
llog block.

Patch also fixes the skipping to the next chunk:
 - make sure to skip to the next block for partial chunk
   or it causes the same block re-read.
 - handle index == 0 as goal for the llog_next_block() as
   expected exclusion and just return requested block
 - set new index after block was skipped to the first one
   in block
 - don't create fake padding record in llog_osd_next_block()
   as the caller can handle it and would know about
 - restore test_8 functionality to check corruption handling

Fixes: ec4194e4e78c ("LU-11591 llog: add synchronization for the last record")
Signed-off-by: Mikhail Pershin <mpershin@whamcloud.com>
Change-Id: I6f88269e8626269268352f8bfd6d7950de438f3a
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/48776
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/obdclass/llog.c
lustre/obdclass/llog_osd.c
lustre/obdclass/llog_test.c

index dc59322..f1b3424 100644 (file)
@@ -464,27 +464,26 @@ out:
 }
 EXPORT_SYMBOL(llog_init_handle);
 
+#define LLOG_ERROR_REC(lgh, rec, format, a...) \
+       CERROR("%s: "DFID" rec type=%x idx=%u len=%u, " format "\n" , \
+              loghandle2name(lgh), PLOGID(&lgh->lgh_id), (rec)->lrh_type, \
+              (rec)->lrh_index, (rec)->lrh_len, ##a)
+
 int llog_verify_record(const struct llog_handle *llh, struct llog_rec_hdr *rec)
 {
        int chunk_size = llh->lgh_hdr->llh_hdr.lrh_len;
 
-       if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
-               CERROR("%s: record is too large: %d > %d\n",
-                      loghandle2name(llh), rec->lrh_len, chunk_size);
-               return -EINVAL;
-       }
-       if (rec->lrh_index >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr)) {
-               CERROR("%s: index is too high: %d\n",
-                      loghandle2name(llh), rec->lrh_index);
-               return -EINVAL;
-       }
-       if ((rec->lrh_type & LLOG_OP_MASK) != LLOG_OP_MAGIC) {
-               CERROR("%s: magic %x is bad\n",
-                      loghandle2name(llh), rec->lrh_type);
-               return -EINVAL;
-       }
+       if ((rec->lrh_type & LLOG_OP_MASK) != LLOG_OP_MAGIC)
+               LLOG_ERROR_REC(llh, rec, "magic is bad");
+       else if (rec->lrh_len == 0 || rec->lrh_len > chunk_size)
+               LLOG_ERROR_REC(llh, rec, "bad record len, chunk size is %d",
+                              chunk_size);
+       else if (rec->lrh_index >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr))
+               LLOG_ERROR_REC(llh, rec, "index is too high");
+       else
+               return 0;
 
-       return 0;
+       return -EINVAL;
 }
 EXPORT_SYMBOL(llog_verify_record);
 
@@ -499,19 +498,18 @@ static inline bool llog_is_index_skipable(int idx, struct llog_log_hdr *llh,
 
 static int llog_process_thread(void *arg)
 {
-       struct llog_process_info        *lpi = arg;
-       struct llog_handle              *loghandle = lpi->lpi_loghandle;
-       struct llog_log_hdr             *llh = loghandle->lgh_hdr;
-       struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
-       struct llog_thread_info         *lti;
-       char                            *buf;
-       size_t                           chunk_size;
-       __u64                            cur_offset;
-       int                              rc = 0, index = 1, last_index;
-       int                              saved_index = 0;
-       int                              last_called_index = 0;
-       bool                             repeated = false;
-       bool                            refresh_idx = false;
+       struct llog_process_info *lpi = arg;
+       struct llog_handle *loghandle = lpi->lpi_loghandle;
+       struct llog_log_hdr *llh = loghandle->lgh_hdr;
+       struct llog_process_cat_data *cd  = lpi->lpi_catdata;
+       struct llog_thread_info *lti;
+       char *buf;
+       size_t chunk_size;
+       __u64 cur_offset;
+       int rc = 0, index = 1, last_index;
+       int saved_index = 0;
+       int last_called_index = 0;
+       bool repeated = false;
 
        ENTRY;
 
@@ -545,8 +543,8 @@ static int llog_process_thread(void *arg)
                struct llog_rec_hdr *rec;
                off_t chunk_offset = 0;
                unsigned int buf_offset = 0;
-               int     lh_last_idx;
-               int     synced_idx = 0;
+               int lh_last_idx;
+               int synced_idx = 0;
 
                /* skip records not set in bitmap */
                while (index <= last_index &&
@@ -579,6 +577,9 @@ repeat:
                if (repeated && (chunk_offset + buf_offset) == cur_offset &&
                    (rc == -EBADR || rc == -EIO))
                        GOTO(out, rc = 0);
+               /* EOF while trying to skip to the next chunk */
+               if (!index && rc == -EBADR)
+                       GOTO(out, rc = 0);
                if (rc != 0)
                        GOTO(out, rc);
 
@@ -608,6 +609,15 @@ repeat:
                        CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
                               rec->lrh_type, rec->lrh_index);
 
+                       /* start with first rec if block was skipped */
+                       if (!index) {
+                               CDEBUG(D_OTHER,
+                                      "%s: skipping to the index %u\n",
+                                      loghandle2name(loghandle),
+                                      rec->lrh_index);
+                               index = rec->lrh_index;
+                       }
+
                        if (index == (synced_idx + 1) &&
                            synced_idx == LLOG_HDR_TAIL(llh)->lrt_index)
                                GOTO(out, rc = 0);
@@ -635,13 +645,15 @@ repeat:
                         * it turns to
                         * lh_last_idx != LLOG_HDR_TAIL(llh)->lrt_index
                         * This exception is working for catalog only.
+                        * The last check is for the partial chunk boundary,
+                        * if it is reached then try to re-read for possible
+                        * new records once.
                         */
-
                        if ((index == lh_last_idx && synced_idx != index) ||
                            (index == (lh_last_idx + 1) &&
                             lh_last_idx != LLOG_HDR_TAIL(llh)->lrt_index) ||
-                           (rec->lrh_index == 0 && !repeated)) {
-
+                           (((char *)rec - buf >= cur_offset - chunk_offset) &&
+                           !repeated)) {
                                /* save offset inside buffer for the re-read */
                                buf_offset = (char *)rec - (char *)buf;
                                cur_offset = chunk_offset;
@@ -654,26 +666,28 @@ repeat:
                                up_read(&loghandle->lgh_last_sem);
                                CDEBUG(D_OTHER, "synced_idx: %d\n", synced_idx);
                                goto repeat;
-
                        }
-
                        repeated = false;
 
                        rc = llog_verify_record(loghandle, rec);
                        if (rc) {
-                               CERROR("%s: invalid record in llog "DFID
-                                      " record for index %d/%d: rc = %d\n",
-                                      loghandle2name(loghandle),
-                                      PLOGID(&loghandle->lgh_id),
-                                      rec->lrh_index, index, rc);
+                               CDEBUG(D_OTHER, "invalid record at index %d\n",
+                                      index);
                                /*
-                                * the block seem to be corrupted, let's try
-                                * with the next one. reset rc to go to the
-                                * next chunk.
+                                * for fixed-sized llogs we can skip one record
+                                * by using llh_size from llog header.
+                                * Otherwise skip the next llog chunk.
                                 */
-                               refresh_idx = true;
+                               rc = 0;
+                               if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
+                                       rec->lrh_len = llh->llh_size;
+                                       goto next_rec;
+                               }
+                               /* make sure that is always next block */
+                               cur_offset = chunk_offset + chunk_size;
+                               /* no goal to find, just next block to read */
                                index = 0;
-                               GOTO(repeat, rc = 0);
+                               break;
                        }
 
                        if (rec->lrh_index < index) {
@@ -686,10 +700,9 @@ repeat:
                                /* the record itself looks good, but we met a
                                 * gap which can be result of old bugs, just
                                 * keep going */
-                               CERROR("%s: "DFID" index %u, expected %u\n",
-                                      loghandle2name(loghandle),
-                                      PLOGID(&loghandle->lgh_id),
-                                      rec->lrh_index, index);
+                               LLOG_ERROR_REC(loghandle, rec,
+                                              "gap in index, expected %u",
+                                              index);
                                index = rec->lrh_index;
                        }
 
@@ -759,6 +772,7 @@ repeat:
                                    loghandle->lgh_hdr->llh_count == 1)
                                        GOTO(out, rc = 0);
                        }
+next_rec:
                        /* exit if the last index is reached */
                        if (index >= last_index)
                                GOTO(out, rc = 0);
index 4838e2a..92f47a8 100644 (file)
@@ -917,7 +917,7 @@ static int llog_osd_next_block(const struct lu_env *env,
        __u32                   chunk_size;
        int last_idx = *cur_idx;
        __u64 last_offset = *cur_offset;
-       bool force_mini_rec = false;
+       bool force_mini_rec = !next_idx;
 
        ENTRY;
 
@@ -969,9 +969,13 @@ static int llog_osd_next_block(const struct lu_env *env,
 
                rc = dt_read(env, o, &lgi->lgi_buf, cur_offset);
                if (rc < 0) {
-                       if (rc == -EBADR && !force_mini_rec)
-                               goto retry;
-
+                       if (rc == -EBADR) {
+                               /* no goal is valid case */
+                               if (!next_idx)
+                                       GOTO(out, rc);
+                               if (!force_mini_rec)
+                                       goto retry;
+                       }
                        CERROR("%s: can't read llog block from log "DFID
                               " offset %llu: rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
@@ -1008,21 +1012,9 @@ static int llog_osd_next_block(const struct lu_env *env,
                tail = (struct llog_rec_tail *)((char *)buf + rc -
                                                sizeof(struct llog_rec_tail));
 
-               if (llog_verify_record(loghandle, rec)) {
-                       /*
-                        * the block seems corrupted. make a pad record so the
-                        * caller can skip the block and try with the next one
-                        */
-                       rec->lrh_len = rc;
-                       rec->lrh_index = next_idx;
-                       rec->lrh_type = LLOG_PAD_MAGIC;
-
-                       tail = rec_tail(rec);
-                       tail->lrt_len = rc;
-                       tail->lrt_index = next_idx;
-
+               /* caller handles bad records if any */
+               if (llog_verify_record(loghandle, rec))
                        GOTO(out, rc = 0);
-               }
 
                /* get the last record in block */
                last_rec = (struct llog_rec_hdr *)((char *)buf + rc -
index e6d3634..1a7cbde 100644 (file)
@@ -1186,23 +1186,90 @@ static int test_8_cb(const struct lu_env *env, struct llog_handle *llh,
        return 0;
 }
 
+static int llog_zeroes(const struct lu_env *env, struct dt_object *o,
+                     __u64 start, __u64 end)
+{
+       struct lu_attr la;
+       struct thandle *th;
+       struct dt_device *d;
+       char *buf;
+       loff_t pos = start;
+       struct lu_buf lb;
+       int rc;
+
+       ENTRY;
+
+       OBD_ALLOC(buf, end - start);
+       if (!buf)
+               RETURN(-ENOMEM);
+
+       LASSERT(o);
+       d = lu2dt_dev(o->do_lu.lo_dev);
+       LASSERT(d);
+
+       rc = dt_attr_get(env, o, &la);
+       if (rc)
+               GOTO(free, rc);
+
+       CDEBUG(D_OTHER, "llog size %llu\n", la.la_size);
+       rc = sizeof(struct llog_log_hdr) + sizeof(struct llog_mini_rec);
+       if (la.la_size < end) {
+               CERROR("llog size %llu is small for punch at [%llu;%llu]\n",
+                      la.la_size, start, end);
+               GOTO(free, rc = 0);
+       }
+
+       th = dt_trans_create(env, d);
+       if (IS_ERR(th))
+               GOTO(free, rc = PTR_ERR(th));
+
+       lb.lb_buf = buf;
+       lb.lb_len = end - start;
+       rc = dt_declare_write(env, o, &lb, pos, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, d, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_write(env, o, &lb, &pos, th);
+       if (rc)
+               GOTO(stop, rc);
+stop:
+       dt_trans_stop(env, d, th);
+free:
+       OBD_FREE(buf, end - start);
+
+       RETURN(rc);
+}
+
+struct llog_test8_rec {
+       struct llog_rec_hdr ltr_hdr;
+       __u64 padding[29];
+       struct llog_rec_tail ltr_tail;
+} __attribute__((packed));
+
 static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
 {
        struct llog_handle *llh = NULL;
        char name[10];
        int rc, rc2, i;
        int orig_counter;
-       struct llog_mini_rec lmr;
+       struct llog_test8_rec ltr;
        struct llog_ctxt *ctxt;
        struct dt_object *obj = NULL;
+       int plain_pos;
+       int reclen = sizeof(ltr);
 
        ENTRY;
 
        ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
        LASSERT(ctxt);
 
-       lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
-       lmr.lmr_hdr.lrh_type = LLOG_OP_MAGIC;
+       /* simulate generic llog records with 256-bytes size */
+       ltr.ltr_hdr.lrh_len = ltr.ltr_tail.lrt_len = reclen;
+       ltr.ltr_hdr.lrh_type = LLOG_OP_MAGIC;
 
        CWARN("8a: fill the first plain llog\n");
        rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
@@ -1220,24 +1287,28 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
        plain_counter = 0;
        rc = llog_cat_process(env, llh, test_8_cb, "foobar", 0, 0);
        if (rc != 0) {
-               CERROR("5a: process with test_8_cb failed: %d\n", rc);
+               CERROR("8a: process with test_8_cb failed: %d\n", rc);
                GOTO(out, rc);
        }
        orig_counter = plain_counter;
 
-       for (i = 0; i < 100; i++) {
-               rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL);
+       for (i = 0; i < 20; i++) {
+               rc = llog_cat_add(env, llh, &ltr.ltr_hdr, NULL);
                if (rc) {
                        CERROR("5a: add record failed\n");
                        GOTO(out, rc);
                }
        }
+       CWARN("8b: first llog "DFID"\n",
+             PFID(lu_object_fid(&llh->u.chd.chd_current_log->lgh_obj->do_lu)));
 
-       /* grab the current plain llog, we'll corrupt it later */
-       obj = llh->u.chd.chd_current_log->lgh_obj;
-       LASSERT(obj);
-       lu_object_get(&obj->do_lu);
-       CWARN("8a: pin llog "DFID"\n", PFID(lu_object_fid(&obj->do_lu)));
+       /* get llog index in catalog to clear it later */
+       plain_pos = (llh->lgh_last_idx - 1) * sizeof(struct llog_logid_rec);
+       /* destroy plain llog to don't leave it orphaned */
+       list_del_init(&llh->u.chd.chd_current_log->u.phd.phd_entry);
+       llog_destroy(env, llh->u.chd.chd_current_log);
+       llog_close(env, llh->u.chd.chd_current_log);
+       llh->u.chd.chd_current_log = NULL;
 
        rc2 = llog_cat_close(env, llh);
        if (rc2) {
@@ -1261,14 +1332,22 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
        }
 
        for (i = 0; i < 100; i++) {
-               rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL);
+               rc = llog_cat_add(env, llh, &ltr.ltr_hdr, NULL);
                if (rc) {
                        CERROR("8b: add record failed\n");
                        GOTO(out, rc);
                }
        }
-       CWARN("8b: second llog "DFID"\n",
-             PFID(lu_object_fid(&llh->u.chd.chd_current_log->lgh_obj->do_lu)));
+       /* grab the current plain llog, we'll corrupt it later */
+       obj = llh->u.chd.chd_current_log->lgh_obj;
+       LASSERT(obj);
+       lu_object_get(&obj->do_lu);
+       CWARN("8b: pin llog "DFID"\n", PFID(lu_object_fid(&obj->do_lu)));
+
+       /* must lost all 20 records */
+       CWARN("8b: clean first llog record in catalog\n");
+       llog_zeroes(env, llh->lgh_obj, 8192 + plain_pos,
+                   8192 + plain_pos + sizeof(struct llog_logid_rec));
 
        rc2 = llog_cat_close(env, llh);
        if (rc2) {
@@ -1278,10 +1357,12 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
                GOTO(out_put, rc);
        }
 
-       /* Here was 8c: drop two records from the first plain llog
-        * llog_truncate was bad idea cause it creates a wrong state,
-        * lgh_last_idx is wrong and two records belongs to zeroed buffer
-        */
+       /* lost 28 records, from 5 to 32 in block */
+       CWARN("8c: corrupt first chunk in the middle\n");
+       llog_zeroes(env, obj, 8192 + reclen * 4, 8192 + reclen * 10);
+       /* lost whole chunk - 32 records */
+       CWARN("8c: corrupt second chunk at start\n");
+       llog_zeroes(env, obj, 16384, 16384 + reclen);
 
        CWARN("8d: count survived records\n");
        rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
@@ -1303,9 +1384,12 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
                GOTO(out, rc);
        }
 
-       if (orig_counter + 200 != plain_counter) {
+       /* if llog processing skips bad data as expected then 80
+        * records from 120 should be lost
+        */
+       if (orig_counter + 120 - 80 != plain_counter) {
                CERROR("found %d records (expected %d)\n", plain_counter,
-                      orig_counter + 200);
+                      orig_counter + 120 - 80);
                rc = -EIO;
        }
 
@@ -1444,7 +1528,7 @@ static int llog_test_process_thread(void *arg)
                                      NULL, lpi->lpi_cbdata, 1, 0, true);
 
        complete(&lpi->lpi_completion);
-
+       msleep(MSEC_PER_SEC / 2);
        lpi->lpi_rc = rc;
        if (rc)
                CWARN("10h: Error during catalog processing %d\n", rc);
@@ -2193,7 +2277,6 @@ static int llog_test_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
        struct llog_ctxt *ctxt;
        struct dt_object *o;
        struct lu_env env;
-       struct lu_context test_session;
        int rc;
 
        ENTRY;
@@ -2220,13 +2303,6 @@ static int llog_test_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
        if (rc)
                RETURN(rc);
 
-       rc = lu_context_init(&test_session, LCT_SERVER_SESSION);
-       if (rc)
-               GOTO(cleanup_env, rc);
-       test_session.lc_thread = (struct ptlrpc_thread *)current;
-       lu_context_enter(&test_session);
-       env.le_ses = &test_session;
-
        CWARN("Setup llog-test device over %s device\n",
              lustre_cfg_string(lcfg, 1));
 
@@ -2236,7 +2312,7 @@ static int llog_test_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
        rc = llog_setup(&env, tgt, &tgt->obd_olg, LLOG_TEST_ORIG_CTXT, tgt,
                        &llog_osd_ops);
        if (rc)
-               GOTO(cleanup_session, rc);
+               GOTO(cleanup_env, rc);
 
        /* use MGS llog dir for tests */
        ctxt = llog_get_context(tgt, LLOG_CONFIG_ORIG_CTXT);
@@ -2254,9 +2330,7 @@ static int llog_test_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
        rc = llog_run_tests(&env, tgt);
        if (rc)
                llog_test_cleanup(obd);
-cleanup_session:
-       lu_context_exit(&test_session);
-       lu_context_fini(&test_session);
+
 cleanup_env:
        lu_env_fini(&env);
        RETURN(rc);