From: Mikhail Pershin Date: Tue, 16 Jun 2015 21:15:01 +0000 (+0300) Subject: LU-6714 llog: fix wrong offset in llog_process_thread() X-Git-Tag: 2.7.57~51 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=df45994ddcf5e6fdc379b3e1d43f1d26ba321a0e LU-6714 llog: fix wrong offset in llog_process_thread() - llh_cat_idx may become bigger than llog bitmap size in llog_cat_set_first_idx() function - it is wrong to use previous cur_offset as new buffer offset, new offset should be calculated from value returned by llog_next_block(). - optimize llog_skip_over() to find llog entry offset by index for llog with fixed-size records. Signed-off-by: Mikhail Pershin Change-Id: I0207ddd7e09161e25a2f5d56d69db4c993e650c3 Reviewed-on: http://review.whamcloud.com/15316 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: John L. Hammond Reviewed-by: James Simmons Reviewed-by: Oleg Drokin --- diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index df98cc9..56413ef 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -283,9 +283,8 @@ static int llog_process_thread(void *arg) struct llog_log_hdr *llh = loghandle->lgh_hdr; struct llog_process_cat_data *cd = lpi->lpi_catdata; char *buf; - int chunk_size; + size_t chunk_size; __u64 cur_offset; - __u64 last_offset; int rc = 0, index = 1, last_index; int saved_index = 0; int last_called_index = 0; @@ -296,6 +295,8 @@ static int llog_process_thread(void *arg) RETURN(-EINVAL); cur_offset = chunk_size = llh->llh_hdr.lrh_len; + /* expect chunk_size to be power of two */ + LASSERT(is_power_of_2(chunk_size)); OBD_ALLOC_LARGE(buf, chunk_size); if (buf == NULL) { @@ -312,75 +313,108 @@ static int llog_process_thread(void *arg) else last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1; - if (index > last_index) /* Record is not in this buffer. */ - GOTO(out, rc); - while (rc == 0) { struct llog_rec_hdr *rec; + off_t chunk_offset; + unsigned int buf_offset = 0; + bool partial_chunk; /* skip records not set in bitmap */ while (index <= last_index && !ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) ++index; - LASSERT(index <= last_index + 1); - if (index == last_index + 1) - break; -repeat: - CDEBUG(D_OTHER, "index: %d last_index %d\n", - index, last_index); + /* There are no indices prior the last_index */ + if (index > last_index) + break; + + CDEBUG(D_OTHER, "index: %d last_index %d\n", index, + last_index); +repeat: /* get the buf with our target record; avoid old garbage */ memset(buf, 0, chunk_size); - last_offset = cur_offset; rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index, index, &cur_offset, buf, chunk_size); if (rc != 0) GOTO(out, rc); + /* NB: after llog_next_block() call the cur_offset is the + * offset of the next block after read one. + * The absolute offset of the current chunk is calculated + * from cur_offset value and stored in chunk_offset variable. + */ + if (cur_offset % chunk_size != 0) { + partial_chunk = true; + chunk_offset = cur_offset & ~(chunk_size - 1); + } else { + partial_chunk = false; + chunk_offset = cur_offset - chunk_size; + } + /* NB: when rec->lrh_len is accessed it is already swabbed * since it is used at the "end" of the loop and the rec * swabbing is done at the beginning of the loop. */ - for (rec = (struct llog_rec_hdr *)buf; + for (rec = (struct llog_rec_hdr *)(buf + buf_offset); (char *)rec < buf + chunk_size; rec = llog_rec_hdr_next(rec)) { CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n", rec, rec->lrh_type); - if (LLOG_REC_HDR_NEEDS_SWABBING(rec)) + if (LLOG_REC_HDR_NEEDS_SWABBING(rec)) lustre_swab_llog_rec(rec); - CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n", - rec->lrh_type, rec->lrh_index); - - if (rec->lrh_index == 0) { - /* probably another rec just got added? */ - if (index <= loghandle->lgh_last_idx) - GOTO(repeat, rc = 0); - GOTO(out, rc = 0); /* no more records */ + CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n", + rec->lrh_type, rec->lrh_index); + + /* for partial chunk the end of it is zeroed, check + * for index 0 to distinguish it. */ + if (partial_chunk && rec->lrh_index == 0) { + /* concurrent llog_add() might add new records + * while llog_processing, check this is not + * the case and re-read the current chunk + * otherwise. */ + if (index > loghandle->lgh_last_idx) + GOTO(out, rc = 0); + CDEBUG(D_OTHER, "Re-read last llog buffer for " + "new records, index %u, last %u\n", + index, loghandle->lgh_last_idx); + /* save offset inside buffer for the re-read */ + buf_offset = (char *)rec - (char *)buf; + cur_offset = chunk_offset; + goto repeat; } + if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) { - CWARN("invalid length %d in llog record for " - "index %d/%d\n", rec->lrh_len, - rec->lrh_index, index); - GOTO(out, rc = -EINVAL); - } + CWARN("invalid length %d in llog record for " + "index %d/%d\n", rec->lrh_len, + rec->lrh_index, index); + GOTO(out, rc = -EINVAL); + } - if (rec->lrh_index < index) { - CDEBUG(D_OTHER, "skipping lrh_index %d\n", - rec->lrh_index); - continue; - } + if (rec->lrh_index < index) { + CDEBUG(D_OTHER, "skipping lrh_index %d\n", + rec->lrh_index); + continue; + } + + if (rec->lrh_index != index) { + CERROR("%s: Invalid record: index %u but " + "expected %u\n", + loghandle->lgh_ctxt->loc_obd->obd_name, + rec->lrh_index, index); + GOTO(out, rc = -ERANGE); + } CDEBUG(D_OTHER, "lrh_index: %d lrh_len: %d (%d remains)\n", rec->lrh_index, rec->lrh_len, (int)(buf + chunk_size - (char *)rec)); - loghandle->lgh_cur_idx = rec->lrh_index; - loghandle->lgh_cur_offset = (char *)rec - (char *)buf + - last_offset; + loghandle->lgh_cur_idx = rec->lrh_index; + loghandle->lgh_cur_offset = (char *)rec - (char *)buf + + chunk_offset; /* if set, process the callback on this record */ if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) { @@ -393,23 +427,20 @@ repeat: rc = llog_cancel_rec(lpi->lpi_env, loghandle, rec->lrh_index); - } - if (rc) - GOTO(out, rc); - } else { - CDEBUG(D_OTHER, "Skipped index %d\n", index); - } - - /* next record, still in buffer? */ - ++index; - if (index > last_index) - GOTO(out, rc = 0); - } - } + } + if (rc) + GOTO(out, rc); + } + /* exit if the last index is reached */ + if (index >= last_index) + GOTO(out, rc = 0); + ++index; + } + } out: - if (cd != NULL) - cd->lpcd_last_idx = last_called_index; + if (cd != NULL) + cd->lpcd_last_idx = last_called_index; if (unlikely(rc == -EIO && loghandle->lgh_obj != NULL)) { /* something bad happened to the processing of a local diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c index 9af09a7..e1ad2cd 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/obdclass/llog_cat.c @@ -748,34 +748,40 @@ int llog_cat_reverse_process(const struct lu_env *env, } EXPORT_SYMBOL(llog_cat_reverse_process); -static int llog_cat_set_first_idx(struct llog_handle *cathandle, int index) +static int llog_cat_set_first_idx(struct llog_handle *cathandle, int idx) { struct llog_log_hdr *llh = cathandle->lgh_hdr; - int i, bitmap_size, idx; + int bitmap_size; + ENTRY; bitmap_size = LLOG_HDR_BITMAP_SIZE(llh); - if (llh->llh_cat_idx == (index - 1)) { - idx = llh->llh_cat_idx + 1; + /* + * The llh_cat_idx equals to the first used index minus 1 + * so if we canceled the first index then llh_cat_idx + * must be renewed. + */ + if (llh->llh_cat_idx == (idx - 1)) { llh->llh_cat_idx = idx; - if (idx == cathandle->lgh_last_idx) - goto out; - - for (i = (index + 1) % bitmap_size; - i != cathandle->lgh_last_idx; - i = (i + 1) % bitmap_size) { - if (!ext2_test_bit(i, LLOG_HDR_BITMAP(llh))) { - idx = llh->llh_cat_idx + 1; + + do { + idx = (idx + 1) % bitmap_size; + if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) { + /* update llh_cat_idx for each unset bit, + * expecting the*/ llh->llh_cat_idx = idx; - } else if (i == 0) { - llh->llh_cat_idx = 0; + } else if (idx == 0) { + /* skip header bit */ + continue; } else { + /* the first index is found */ break; } - } -out: - CDEBUG(D_RPCTRACE, "set catlog "DOSTID" first idx %u\n", - POSTID(&cathandle->lgh_id.lgl_oi), llh->llh_cat_idx); + } while (idx != cathandle->lgh_last_idx); + + CDEBUG(D_RPCTRACE, "Set catlog "DOSTID" first idx %u," + " (last_idx %u)\n", POSTID(&cathandle->lgh_id.lgl_oi), + llh->llh_cat_idx, cathandle->lgh_last_idx); } RETURN(0); diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 0a09fb5..a79b1ec 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -641,13 +641,19 @@ out: * actual records are larger than minimum size) we just skip * some more records. */ -static inline void llog_skip_over(__u64 *off, int curr, int goal, - __u32 chunk_size) +static inline void llog_skip_over(struct llog_log_hdr *llh, __u64 *off, + int curr, int goal, __u32 chunk_size) { - if (goal <= curr) - return; - *off = (*off + (goal - curr - 1) * LLOG_MIN_REC_SIZE) & - ~(chunk_size - 1); + if (goal > curr) { + if (llh->llh_size == 0) { + /* variable size records */ + *off = (*off + (goal - curr - 1) * LLOG_MIN_REC_SIZE); + } else { + *off = chunk_size + (goal - 1) * llh->llh_size; + } + } + /* always align with lower chunk boundary*/ + *off &= ~(chunk_size - 1); } /** @@ -734,7 +740,8 @@ static int llog_osd_next_block(const struct lu_env *env, struct llog_rec_hdr *rec, *last_rec; struct llog_rec_tail *tail; - llog_skip_over(cur_offset, *cur_idx, next_idx, chunk_size); + llog_skip_over(loghandle->lgh_hdr, cur_offset, *cur_idx, + next_idx, chunk_size); /* read up to next llog chunk_size block */ lgi->lgi_buf.lb_len = chunk_size - @@ -864,7 +871,8 @@ static int llog_osd_prev_block(const struct lu_env *env, LASSERT(dt); cur_offset = chunk_size; - llog_skip_over(&cur_offset, 0, prev_idx, chunk_size); + llog_skip_over(loghandle->lgh_hdr, &cur_offset, 0, prev_idx, + chunk_size); rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc)