From f61148f758b30de91684c58053e4dad6159f8858 Mon Sep 17 00:00:00 2001 From: wang di Date: Mon, 24 Aug 2015 10:28:03 -0700 Subject: [PATCH] LU-7050 llog: record the minimum record size Remember the minimum record size in llog header, so in llog_skip_over, it can skip the records properly. In current implementation, it will use LLOG_MIN_REC_SIZE, only 24 bytes, which too less for update records(usually more than 1000 bytes), and cause update recovery reading a lot useless update records from other MDTs. The minimum record size will be recorded in llh_size, which is only used by fixed size record llog now, and also add another flag LLOG_F_IS_FIXSIZE to indicate the fix size record llog. Signed-off-by: wang di Change-Id: Ia62684d1fb744e3aca74107f22683b2ee63a2d16 Reviewed-on: http://review.whamcloud.com/16103 Tested-by: Jenkins Reviewed-by: Mike Pershin Reviewed-by: James Simmons Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/include/lustre/lustre_idl.h | 5 ++ lustre/obdclass/llog.c | 1 + lustre/obdclass/llog_osd.c | 95 +++++++++++++++++++++++++++++++------- lustre/obdclass/llog_test.c | 6 ++- 4 files changed, 88 insertions(+), 19 deletions(-) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 589b2c6..fccdc1b 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -3308,7 +3308,12 @@ enum llog_flag { LLOG_F_IS_CAT = 0x2, LLOG_F_IS_PLAIN = 0x4, LLOG_F_EXT_JOBID = 0x8, + LLOG_F_IS_FIXSIZE = 0x10, + /* Note: Flags covered by LLOG_F_EXT_MASK will be inherited from + * catlog to plain log, so do not add LLOG_F_IS_FIXSIZE here, + * because the catlog record is usually fixed size, but its plain + * log record can be variable */ LLOG_F_EXT_MASK = LLOG_F_EXT_JOBID, }; diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index 7bdd90f..6d1e07c 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -379,6 +379,7 @@ int llog_init_handle(const struct lu_env *env, struct llog_handle *handle, LASSERT(list_empty(&handle->u.chd.chd_head)); INIT_LIST_HEAD(&handle->u.chd.chd_head); llh->llh_size = sizeof(struct llog_logid_rec); + llh->llh_flags |= LLOG_F_IS_FIXSIZE; } else if (!(flags & LLOG_F_IS_PLAIN)) { CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n", handle->lgh_ctxt->loc_obd->obd_name, diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index bc9951b..7913294 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -478,8 +478,9 @@ static int llog_osd_write_rec(const struct lu_env *env, "len:%u offset %llu\n", POSTID(&loghandle->lgh_id.lgl_oi), idx, rec->lrh_len, (long long)lgi->lgi_off); - } else if (llh->llh_size > 0) { - if (llh->llh_size != rec->lrh_len) { + } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { + if (llh->llh_size == 0 || + llh->llh_size != rec->lrh_len) { CERROR("%s: wrong record size, llh_size is %u" " but record size is %u\n", o->do_lu.lo_dev->ld_obd->obd_name, @@ -564,6 +565,16 @@ static int llog_osd_write_rec(const struct lu_env *env, } llh->llh_count++; + if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { + LASSERT(llh->llh_size == reclen); + } else { + /* Update the minimum size of the llog record */ + if (llh->llh_size == 0) + llh->llh_size = reclen; + else if (reclen < llh->llh_size) + llh->llh_size = reclen; + } + if (lgi->lgi_attr.la_size == 0) { lgi->lgi_off = 0; lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len; @@ -582,9 +593,9 @@ static int llog_osd_write_rec(const struct lu_env *env, * the RPC (1MB limit), if we write 8K for each operation, which * will cost a lot space, and keep us adding more updates to one * update log.*/ - lgi->lgi_off = offsetof(typeof(*llh), llh_count); - lgi->lgi_buf.lb_len = sizeof(llh->llh_count); - lgi->lgi_buf.lb_buf = &llh->llh_count; + lgi->lgi_off = 0; + lgi->lgi_buf.lb_len = llh->llh_bitmap_offset; + lgi->lgi_buf.lb_buf = &llh->llh_hdr; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc != 0) GOTO(out_unlock, rc); @@ -660,16 +671,38 @@ out: * that we are not far enough along the log (because the * actual records are larger than minimum size) we just skip * some more records. + * + * Note: in llog_process_thread, it will use bitmap offset as + * the index to locate the record, which also includs some pad + * records, whose record size is very small, and it also does not + * consider pad record when recording minimum record size (otherwise + * min_record size might be too small), so in some rare cases, + * it might skip too much record for @goal, see llog_osd_next_block(). + * + * When force_mini_rec is true, it means we have to use LLOG_MIN_REC_SIZE + * as the min record size to skip over, usually because in the previous + * try, it skip too much record, see loog_osd_next(prev)_block(). */ -static inline void llog_skip_over(struct llog_log_hdr *llh, __u64 *off, - int curr, int goal, __u32 chunk_size) +static inline void llog_skip_over(struct llog_handle *lgh, __u64 *off, + int curr, int goal, __u32 chunk_size, + bool force_mini_rec) { + struct llog_log_hdr *llh = lgh->lgh_hdr; + + /* Goal should not bigger than the record count */ + if (goal > lgh->lgh_last_idx) + goal = lgh->lgh_last_idx; + if (goal > curr) { - if (llh->llh_size == 0) { - /* variable size records */ - *off = *off + (goal - curr - 1) * LLOG_MIN_REC_SIZE; - } else { + if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { *off = chunk_size + (goal - 1) * llh->llh_size; + } else { + __u64 min_rec_size = LLOG_MIN_REC_SIZE; + + if (llh->llh_size > 0 && !force_mini_rec) + min_rec_size = llh->llh_size; + + *off = *off + (goal - curr - 1) * min_rec_size; } } /* always align with lower chunk boundary*/ @@ -730,6 +763,9 @@ static int llog_osd_next_block(const struct lu_env *env, struct dt_device *dt; int rc; __u32 chunk_size; + int last_idx = *cur_idx; + __u64 last_offset = *cur_offset; + bool force_mini_rec = false; ENTRY; @@ -760,8 +796,8 @@ static int llog_osd_next_block(const struct lu_env *env, struct llog_rec_hdr *rec, *last_rec; struct llog_rec_tail *tail; - llog_skip_over(loghandle->lgh_hdr, cur_offset, *cur_idx, - next_idx, chunk_size); + llog_skip_over(loghandle, cur_offset, *cur_idx, + next_idx, chunk_size, force_mini_rec); /* read up to next llog chunk_size block */ lgi->lgi_buf.lb_len = chunk_size - @@ -770,6 +806,9 @@ static int llog_osd_next_block(const struct lu_env *env, rc = dt_read(env, o, &lgi->lgi_buf, cur_offset); if (rc < 0) { + if (rc == -EBADR && !force_mini_rec) + goto retry; + CERROR("%s: can't read llog block from log "DFID " offset "LPU64": rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, @@ -784,10 +823,16 @@ static int llog_osd_next_block(const struct lu_env *env, memset(buf + rc, 0, len - rc); } - if (rc == 0) /* end of file, nothing to do */ + if (rc == 0) { /* end of file, nothing to do */ + if (!force_mini_rec) + goto retry; GOTO(out, rc); + } if (rc < sizeof(*tail)) { + if (!force_mini_rec) + goto retry; + CERROR("%s: invalid llog block at log id "DOSTID"/%u " "offset "LPU64"\n", o->do_lu.lo_dev->ld_obd->obd_name, @@ -821,12 +866,18 @@ static int llog_osd_next_block(const struct lu_env *env, loghandle->lgh_id.lgl_ogen, *cur_offset); GOTO(out, rc = -EINVAL); } - if (tail->lrt_index < next_idx) + if (tail->lrt_index < next_idx) { + last_idx = *cur_idx; + last_offset = *cur_offset; continue; + } /* sanity check that the start of the new buffer is no farther * than the record that we wanted. This shouldn't happen. */ if (rec->lrh_index > next_idx) { + if (!force_mini_rec && next_idx > last_idx) + goto retry; + CERROR("%s: missed desired record? %u > %u\n", o->do_lu.lo_dev->ld_obd->obd_name, rec->lrh_index, next_idx); @@ -839,6 +890,14 @@ static int llog_osd_next_block(const struct lu_env *env, CLF_VERSION | CLF_RENAME); GOTO(out, rc = 0); + +retry: + /* Note: because there are some pad records in the + * llog, so llog_skip_over() might skip too much + * records, let's try skip again with minimum record */ + force_mini_rec = true; + *cur_offset = last_offset; + *cur_idx = last_idx; } GOTO(out, rc = -EIO); out: @@ -890,9 +949,11 @@ static int llog_osd_prev_block(const struct lu_env *env, dt = lu2dt_dev(o->do_lu.lo_dev); LASSERT(dt); + /* Let's only use mini record size for previous block read + * for now XXX */ cur_offset = chunk_size; - llog_skip_over(loghandle->lgh_hdr, &cur_offset, 0, prev_idx, - chunk_size); + llog_skip_over(loghandle, &cur_offset, 0, prev_idx, + chunk_size, true); rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) diff --git a/lustre/obdclass/llog_test.c b/lustre/obdclass/llog_test.c index 259c83b..aff9204 100644 --- a/lustre/obdclass/llog_test.c +++ b/lustre/obdclass/llog_test.c @@ -236,7 +236,8 @@ static int test3_check_n_add_cb(const struct lu_env *env, int *last_rec = data; int rc; - if (lgh->lgh_hdr->llh_size > 0) { + if (lgh->lgh_hdr->llh_flags & LLOG_F_IS_FIXSIZE) { + LASSERT(lgh->lgh_hdr->llh_size > 0); if (lgh->lgh_cur_offset != lgh->lgh_hdr->llh_hdr.lrh_len + (start_idx + records - 1) * lgh->lgh_hdr->llh_size) @@ -346,6 +347,7 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd, hdr->lrh_len = sizeof(struct llog_gen_rec); hdr->lrh_type = LLOG_GEN_REC; llh->lgh_hdr->llh_size = sizeof(struct llog_gen_rec); + llh->lgh_hdr->llh_flags |= LLOG_F_IS_FIXSIZE; /* Fill the llog with 64-bytes records, use 1023 records, * so last chunk will be partially full. Don't change this @@ -410,7 +412,7 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd, /* Drop llh_size to 0 to mark llog as variable-size and write * header to make this change permanent. */ - llh->lgh_hdr->llh_size = 0; + llh->lgh_hdr->llh_flags &= ~LLOG_F_IS_FIXSIZE; llog_write(env, llh, &llh->lgh_hdr->llh_hdr, LLOG_HEADER_IDX); hdr->lrh_type = OBD_CFG_REC; -- 1.8.3.1