From cf121b16685fe2a271b1b3c5e97eabcfe01aac8a Mon Sep 17 00:00:00 2001 From: Mikhail Pershin Date: Mon, 3 Oct 2022 18:35:25 +0300 Subject: [PATCH] LU-16203 llog: skip bad records in llog This patch is further development of idea to skip bad (corrupted) llogs data. If llog has fixed-size records then it is possible to skip one record but not rest of llog block. Patch also fixes the skipping to the next chunk: - make sure to skip to the next block for partial chunk or it causes the same block re-read. - handle index == 0 as goal for the llog_next_block() as expected exclusion and just return requested block - set new index after block was skipped to the first one in block - don't create fake padding record in llog_osd_next_block() as the caller can handle it and would know about - restore test_8 functionality to check corruption handling Fixes: ec4194e4e78c ("LU-11591 llog: add synchronization for the last record") Signed-off-by: Mikhail Pershin Change-Id: I6f88269e8626269268352f8bfd6d7950de438f3a Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/48776 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Alex Zhuravlev Reviewed-by: Oleg Drokin --- lustre/obdclass/llog.c | 114 ++++++++++++++++++++---------------- lustre/obdclass/llog_osd.c | 28 ++++----- lustre/obdclass/llog_test.c | 140 +++++++++++++++++++++++++++++++++----------- 3 files changed, 181 insertions(+), 101 deletions(-) diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index dc59322..f1b3424 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -464,27 +464,26 @@ out: } EXPORT_SYMBOL(llog_init_handle); +#define LLOG_ERROR_REC(lgh, rec, format, a...) \ + CERROR("%s: "DFID" rec type=%x idx=%u len=%u, " format "\n" , \ + loghandle2name(lgh), PLOGID(&lgh->lgh_id), (rec)->lrh_type, \ + (rec)->lrh_index, (rec)->lrh_len, ##a) + int llog_verify_record(const struct llog_handle *llh, struct llog_rec_hdr *rec) { int chunk_size = llh->lgh_hdr->llh_hdr.lrh_len; - if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) { - CERROR("%s: record is too large: %d > %d\n", - loghandle2name(llh), rec->lrh_len, chunk_size); - return -EINVAL; - } - if (rec->lrh_index >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr)) { - CERROR("%s: index is too high: %d\n", - loghandle2name(llh), rec->lrh_index); - return -EINVAL; - } - if ((rec->lrh_type & LLOG_OP_MASK) != LLOG_OP_MAGIC) { - CERROR("%s: magic %x is bad\n", - loghandle2name(llh), rec->lrh_type); - return -EINVAL; - } + if ((rec->lrh_type & LLOG_OP_MASK) != LLOG_OP_MAGIC) + LLOG_ERROR_REC(llh, rec, "magic is bad"); + else if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) + LLOG_ERROR_REC(llh, rec, "bad record len, chunk size is %d", + chunk_size); + else if (rec->lrh_index >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr)) + LLOG_ERROR_REC(llh, rec, "index is too high"); + else + return 0; - return 0; + return -EINVAL; } EXPORT_SYMBOL(llog_verify_record); @@ -499,19 +498,18 @@ static inline bool llog_is_index_skipable(int idx, struct llog_log_hdr *llh, static int llog_process_thread(void *arg) { - struct llog_process_info *lpi = arg; - struct llog_handle *loghandle = lpi->lpi_loghandle; - struct llog_log_hdr *llh = loghandle->lgh_hdr; - struct llog_process_cat_data *cd = lpi->lpi_catdata; - struct llog_thread_info *lti; - char *buf; - size_t chunk_size; - __u64 cur_offset; - int rc = 0, index = 1, last_index; - int saved_index = 0; - int last_called_index = 0; - bool repeated = false; - bool refresh_idx = false; + struct llog_process_info *lpi = arg; + struct llog_handle *loghandle = lpi->lpi_loghandle; + struct llog_log_hdr *llh = loghandle->lgh_hdr; + struct llog_process_cat_data *cd = lpi->lpi_catdata; + struct llog_thread_info *lti; + char *buf; + size_t chunk_size; + __u64 cur_offset; + int rc = 0, index = 1, last_index; + int saved_index = 0; + int last_called_index = 0; + bool repeated = false; ENTRY; @@ -545,8 +543,8 @@ static int llog_process_thread(void *arg) struct llog_rec_hdr *rec; off_t chunk_offset = 0; unsigned int buf_offset = 0; - int lh_last_idx; - int synced_idx = 0; + int lh_last_idx; + int synced_idx = 0; /* skip records not set in bitmap */ while (index <= last_index && @@ -579,6 +577,9 @@ repeat: if (repeated && (chunk_offset + buf_offset) == cur_offset && (rc == -EBADR || rc == -EIO)) GOTO(out, rc = 0); + /* EOF while trying to skip to the next chunk */ + if (!index && rc == -EBADR) + GOTO(out, rc = 0); if (rc != 0) GOTO(out, rc); @@ -608,6 +609,15 @@ repeat: CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n", rec->lrh_type, rec->lrh_index); + /* start with first rec if block was skipped */ + if (!index) { + CDEBUG(D_OTHER, + "%s: skipping to the index %u\n", + loghandle2name(loghandle), + rec->lrh_index); + index = rec->lrh_index; + } + if (index == (synced_idx + 1) && synced_idx == LLOG_HDR_TAIL(llh)->lrt_index) GOTO(out, rc = 0); @@ -635,13 +645,15 @@ repeat: * it turns to * lh_last_idx != LLOG_HDR_TAIL(llh)->lrt_index * This exception is working for catalog only. + * The last check is for the partial chunk boundary, + * if it is reached then try to re-read for possible + * new records once. */ - if ((index == lh_last_idx && synced_idx != index) || (index == (lh_last_idx + 1) && lh_last_idx != LLOG_HDR_TAIL(llh)->lrt_index) || - (rec->lrh_index == 0 && !repeated)) { - + (((char *)rec - buf >= cur_offset - chunk_offset) && + !repeated)) { /* save offset inside buffer for the re-read */ buf_offset = (char *)rec - (char *)buf; cur_offset = chunk_offset; @@ -654,26 +666,28 @@ repeat: up_read(&loghandle->lgh_last_sem); CDEBUG(D_OTHER, "synced_idx: %d\n", synced_idx); goto repeat; - } - repeated = false; rc = llog_verify_record(loghandle, rec); if (rc) { - CERROR("%s: invalid record in llog "DFID - " record for index %d/%d: rc = %d\n", - loghandle2name(loghandle), - PLOGID(&loghandle->lgh_id), - rec->lrh_index, index, rc); + CDEBUG(D_OTHER, "invalid record at index %d\n", + index); /* - * the block seem to be corrupted, let's try - * with the next one. reset rc to go to the - * next chunk. + * for fixed-sized llogs we can skip one record + * by using llh_size from llog header. + * Otherwise skip the next llog chunk. */ - refresh_idx = true; + rc = 0; + if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { + rec->lrh_len = llh->llh_size; + goto next_rec; + } + /* make sure that is always next block */ + cur_offset = chunk_offset + chunk_size; + /* no goal to find, just next block to read */ index = 0; - GOTO(repeat, rc = 0); + break; } if (rec->lrh_index < index) { @@ -686,10 +700,9 @@ repeat: /* the record itself looks good, but we met a * gap which can be result of old bugs, just * keep going */ - CERROR("%s: "DFID" index %u, expected %u\n", - loghandle2name(loghandle), - PLOGID(&loghandle->lgh_id), - rec->lrh_index, index); + LLOG_ERROR_REC(loghandle, rec, + "gap in index, expected %u", + index); index = rec->lrh_index; } @@ -759,6 +772,7 @@ repeat: loghandle->lgh_hdr->llh_count == 1) GOTO(out, rc = 0); } +next_rec: /* exit if the last index is reached */ if (index >= last_index) GOTO(out, rc = 0); diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 4838e2a..92f47a8 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -917,7 +917,7 @@ static int llog_osd_next_block(const struct lu_env *env, __u32 chunk_size; int last_idx = *cur_idx; __u64 last_offset = *cur_offset; - bool force_mini_rec = false; + bool force_mini_rec = !next_idx; ENTRY; @@ -969,9 +969,13 @@ static int llog_osd_next_block(const struct lu_env *env, rc = dt_read(env, o, &lgi->lgi_buf, cur_offset); if (rc < 0) { - if (rc == -EBADR && !force_mini_rec) - goto retry; - + if (rc == -EBADR) { + /* no goal is valid case */ + if (!next_idx) + GOTO(out, rc); + if (!force_mini_rec) + goto retry; + } CERROR("%s: can't read llog block from log "DFID " offset %llu: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, @@ -1008,21 +1012,9 @@ static int llog_osd_next_block(const struct lu_env *env, tail = (struct llog_rec_tail *)((char *)buf + rc - sizeof(struct llog_rec_tail)); - if (llog_verify_record(loghandle, rec)) { - /* - * the block seems corrupted. make a pad record so the - * caller can skip the block and try with the next one - */ - rec->lrh_len = rc; - rec->lrh_index = next_idx; - rec->lrh_type = LLOG_PAD_MAGIC; - - tail = rec_tail(rec); - tail->lrt_len = rc; - tail->lrt_index = next_idx; - + /* caller handles bad records if any */ + if (llog_verify_record(loghandle, rec)) GOTO(out, rc = 0); - } /* get the last record in block */ last_rec = (struct llog_rec_hdr *)((char *)buf + rc - diff --git a/lustre/obdclass/llog_test.c b/lustre/obdclass/llog_test.c index e6d3634..1a7cbde 100644 --- a/lustre/obdclass/llog_test.c +++ b/lustre/obdclass/llog_test.c @@ -1186,23 +1186,90 @@ static int test_8_cb(const struct lu_env *env, struct llog_handle *llh, return 0; } +static int llog_zeroes(const struct lu_env *env, struct dt_object *o, + __u64 start, __u64 end) +{ + struct lu_attr la; + struct thandle *th; + struct dt_device *d; + char *buf; + loff_t pos = start; + struct lu_buf lb; + int rc; + + ENTRY; + + OBD_ALLOC(buf, end - start); + if (!buf) + RETURN(-ENOMEM); + + LASSERT(o); + d = lu2dt_dev(o->do_lu.lo_dev); + LASSERT(d); + + rc = dt_attr_get(env, o, &la); + if (rc) + GOTO(free, rc); + + CDEBUG(D_OTHER, "llog size %llu\n", la.la_size); + rc = sizeof(struct llog_log_hdr) + sizeof(struct llog_mini_rec); + if (la.la_size < end) { + CERROR("llog size %llu is small for punch at [%llu;%llu]\n", + la.la_size, start, end); + GOTO(free, rc = 0); + } + + th = dt_trans_create(env, d); + if (IS_ERR(th)) + GOTO(free, rc = PTR_ERR(th)); + + lb.lb_buf = buf; + lb.lb_len = end - start; + rc = dt_declare_write(env, o, &lb, pos, th); + if (rc) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, d, th); + if (rc) + GOTO(stop, rc); + + rc = dt_write(env, o, &lb, &pos, th); + if (rc) + GOTO(stop, rc); +stop: + dt_trans_stop(env, d, th); +free: + OBD_FREE(buf, end - start); + + RETURN(rc); +} + +struct llog_test8_rec { + struct llog_rec_hdr ltr_hdr; + __u64 padding[29]; + struct llog_rec_tail ltr_tail; +} __attribute__((packed)); + static int llog_test_8(const struct lu_env *env, struct obd_device *obd) { struct llog_handle *llh = NULL; char name[10]; int rc, rc2, i; int orig_counter; - struct llog_mini_rec lmr; + struct llog_test8_rec ltr; struct llog_ctxt *ctxt; struct dt_object *obj = NULL; + int plain_pos; + int reclen = sizeof(ltr); ENTRY; ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT); LASSERT(ctxt); - lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE; - lmr.lmr_hdr.lrh_type = LLOG_OP_MAGIC; + /* simulate generic llog records with 256-bytes size */ + ltr.ltr_hdr.lrh_len = ltr.ltr_tail.lrt_len = reclen; + ltr.ltr_hdr.lrh_type = LLOG_OP_MAGIC; CWARN("8a: fill the first plain llog\n"); rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS); @@ -1220,24 +1287,28 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd) plain_counter = 0; rc = llog_cat_process(env, llh, test_8_cb, "foobar", 0, 0); if (rc != 0) { - CERROR("5a: process with test_8_cb failed: %d\n", rc); + CERROR("8a: process with test_8_cb failed: %d\n", rc); GOTO(out, rc); } orig_counter = plain_counter; - for (i = 0; i < 100; i++) { - rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL); + for (i = 0; i < 20; i++) { + rc = llog_cat_add(env, llh, <r.ltr_hdr, NULL); if (rc) { CERROR("5a: add record failed\n"); GOTO(out, rc); } } + CWARN("8b: first llog "DFID"\n", + PFID(lu_object_fid(&llh->u.chd.chd_current_log->lgh_obj->do_lu))); - /* grab the current plain llog, we'll corrupt it later */ - obj = llh->u.chd.chd_current_log->lgh_obj; - LASSERT(obj); - lu_object_get(&obj->do_lu); - CWARN("8a: pin llog "DFID"\n", PFID(lu_object_fid(&obj->do_lu))); + /* get llog index in catalog to clear it later */ + plain_pos = (llh->lgh_last_idx - 1) * sizeof(struct llog_logid_rec); + /* destroy plain llog to don't leave it orphaned */ + list_del_init(&llh->u.chd.chd_current_log->u.phd.phd_entry); + llog_destroy(env, llh->u.chd.chd_current_log); + llog_close(env, llh->u.chd.chd_current_log); + llh->u.chd.chd_current_log = NULL; rc2 = llog_cat_close(env, llh); if (rc2) { @@ -1261,14 +1332,22 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd) } for (i = 0; i < 100; i++) { - rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL); + rc = llog_cat_add(env, llh, <r.ltr_hdr, NULL); if (rc) { CERROR("8b: add record failed\n"); GOTO(out, rc); } } - CWARN("8b: second llog "DFID"\n", - PFID(lu_object_fid(&llh->u.chd.chd_current_log->lgh_obj->do_lu))); + /* grab the current plain llog, we'll corrupt it later */ + obj = llh->u.chd.chd_current_log->lgh_obj; + LASSERT(obj); + lu_object_get(&obj->do_lu); + CWARN("8b: pin llog "DFID"\n", PFID(lu_object_fid(&obj->do_lu))); + + /* must lost all 20 records */ + CWARN("8b: clean first llog record in catalog\n"); + llog_zeroes(env, llh->lgh_obj, 8192 + plain_pos, + 8192 + plain_pos + sizeof(struct llog_logid_rec)); rc2 = llog_cat_close(env, llh); if (rc2) { @@ -1278,10 +1357,12 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd) GOTO(out_put, rc); } - /* Here was 8c: drop two records from the first plain llog - * llog_truncate was bad idea cause it creates a wrong state, - * lgh_last_idx is wrong and two records belongs to zeroed buffer - */ + /* lost 28 records, from 5 to 32 in block */ + CWARN("8c: corrupt first chunk in the middle\n"); + llog_zeroes(env, obj, 8192 + reclen * 4, 8192 + reclen * 10); + /* lost whole chunk - 32 records */ + CWARN("8c: corrupt second chunk at start\n"); + llog_zeroes(env, obj, 16384, 16384 + reclen); CWARN("8d: count survived records\n"); rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS); @@ -1303,9 +1384,12 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd) GOTO(out, rc); } - if (orig_counter + 200 != plain_counter) { + /* if llog processing skips bad data as expected then 80 + * records from 120 should be lost + */ + if (orig_counter + 120 - 80 != plain_counter) { CERROR("found %d records (expected %d)\n", plain_counter, - orig_counter + 200); + orig_counter + 120 - 80); rc = -EIO; } @@ -1444,7 +1528,7 @@ static int llog_test_process_thread(void *arg) NULL, lpi->lpi_cbdata, 1, 0, true); complete(&lpi->lpi_completion); - + msleep(MSEC_PER_SEC / 2); lpi->lpi_rc = rc; if (rc) CWARN("10h: Error during catalog processing %d\n", rc); @@ -2193,7 +2277,6 @@ static int llog_test_setup(struct obd_device *obd, struct lustre_cfg *lcfg) struct llog_ctxt *ctxt; struct dt_object *o; struct lu_env env; - struct lu_context test_session; int rc; ENTRY; @@ -2220,13 +2303,6 @@ static int llog_test_setup(struct obd_device *obd, struct lustre_cfg *lcfg) if (rc) RETURN(rc); - rc = lu_context_init(&test_session, LCT_SERVER_SESSION); - if (rc) - GOTO(cleanup_env, rc); - test_session.lc_thread = (struct ptlrpc_thread *)current; - lu_context_enter(&test_session); - env.le_ses = &test_session; - CWARN("Setup llog-test device over %s device\n", lustre_cfg_string(lcfg, 1)); @@ -2236,7 +2312,7 @@ static int llog_test_setup(struct obd_device *obd, struct lustre_cfg *lcfg) rc = llog_setup(&env, tgt, &tgt->obd_olg, LLOG_TEST_ORIG_CTXT, tgt, &llog_osd_ops); if (rc) - GOTO(cleanup_session, rc); + GOTO(cleanup_env, rc); /* use MGS llog dir for tests */ ctxt = llog_get_context(tgt, LLOG_CONFIG_ORIG_CTXT); @@ -2254,9 +2330,7 @@ static int llog_test_setup(struct obd_device *obd, struct lustre_cfg *lcfg) rc = llog_run_tests(&env, tgt); if (rc) llog_test_cleanup(obd); -cleanup_session: - lu_context_exit(&test_session); - lu_context_fini(&test_session); + cleanup_env: lu_env_fini(&env); RETURN(rc); -- 1.8.3.1