X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdclass%2Fllog.c;h=56413eff997656c61503567f26fa357c4efaace9;hp=df98cc950c2d4cc04b72d21f878c95f2d39a82ec;hb=df45994ddcf5e6fdc379b3e1d43f1d26ba321a0e;hpb=33754bef1a26b40687f2bb8677fb3aadcc2d5c23 diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index df98cc9..56413ef 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -283,9 +283,8 @@ static int llog_process_thread(void *arg) struct llog_log_hdr *llh = loghandle->lgh_hdr; struct llog_process_cat_data *cd = lpi->lpi_catdata; char *buf; - int chunk_size; + size_t chunk_size; __u64 cur_offset; - __u64 last_offset; int rc = 0, index = 1, last_index; int saved_index = 0; int last_called_index = 0; @@ -296,6 +295,8 @@ static int llog_process_thread(void *arg) RETURN(-EINVAL); cur_offset = chunk_size = llh->llh_hdr.lrh_len; + /* expect chunk_size to be power of two */ + LASSERT(is_power_of_2(chunk_size)); OBD_ALLOC_LARGE(buf, chunk_size); if (buf == NULL) { @@ -312,75 +313,108 @@ static int llog_process_thread(void *arg) else last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1; - if (index > last_index) /* Record is not in this buffer. */ - GOTO(out, rc); - while (rc == 0) { struct llog_rec_hdr *rec; + off_t chunk_offset; + unsigned int buf_offset = 0; + bool partial_chunk; /* skip records not set in bitmap */ while (index <= last_index && !ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) ++index; - LASSERT(index <= last_index + 1); - if (index == last_index + 1) - break; -repeat: - CDEBUG(D_OTHER, "index: %d last_index %d\n", - index, last_index); + /* There are no indices prior the last_index */ + if (index > last_index) + break; + + CDEBUG(D_OTHER, "index: %d last_index %d\n", index, + last_index); +repeat: /* get the buf with our target record; avoid old garbage */ memset(buf, 0, chunk_size); - last_offset = cur_offset; rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index, index, &cur_offset, buf, chunk_size); if (rc != 0) GOTO(out, rc); + /* NB: after llog_next_block() call the cur_offset is the + * offset of the next block after read one. + * The absolute offset of the current chunk is calculated + * from cur_offset value and stored in chunk_offset variable. + */ + if (cur_offset % chunk_size != 0) { + partial_chunk = true; + chunk_offset = cur_offset & ~(chunk_size - 1); + } else { + partial_chunk = false; + chunk_offset = cur_offset - chunk_size; + } + /* NB: when rec->lrh_len is accessed it is already swabbed * since it is used at the "end" of the loop and the rec * swabbing is done at the beginning of the loop. */ - for (rec = (struct llog_rec_hdr *)buf; + for (rec = (struct llog_rec_hdr *)(buf + buf_offset); (char *)rec < buf + chunk_size; rec = llog_rec_hdr_next(rec)) { CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n", rec, rec->lrh_type); - if (LLOG_REC_HDR_NEEDS_SWABBING(rec)) + if (LLOG_REC_HDR_NEEDS_SWABBING(rec)) lustre_swab_llog_rec(rec); - CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n", - rec->lrh_type, rec->lrh_index); - - if (rec->lrh_index == 0) { - /* probably another rec just got added? */ - if (index <= loghandle->lgh_last_idx) - GOTO(repeat, rc = 0); - GOTO(out, rc = 0); /* no more records */ + CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n", + rec->lrh_type, rec->lrh_index); + + /* for partial chunk the end of it is zeroed, check + * for index 0 to distinguish it. */ + if (partial_chunk && rec->lrh_index == 0) { + /* concurrent llog_add() might add new records + * while llog_processing, check this is not + * the case and re-read the current chunk + * otherwise. */ + if (index > loghandle->lgh_last_idx) + GOTO(out, rc = 0); + CDEBUG(D_OTHER, "Re-read last llog buffer for " + "new records, index %u, last %u\n", + index, loghandle->lgh_last_idx); + /* save offset inside buffer for the re-read */ + buf_offset = (char *)rec - (char *)buf; + cur_offset = chunk_offset; + goto repeat; } + if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) { - CWARN("invalid length %d in llog record for " - "index %d/%d\n", rec->lrh_len, - rec->lrh_index, index); - GOTO(out, rc = -EINVAL); - } + CWARN("invalid length %d in llog record for " + "index %d/%d\n", rec->lrh_len, + rec->lrh_index, index); + GOTO(out, rc = -EINVAL); + } - if (rec->lrh_index < index) { - CDEBUG(D_OTHER, "skipping lrh_index %d\n", - rec->lrh_index); - continue; - } + if (rec->lrh_index < index) { + CDEBUG(D_OTHER, "skipping lrh_index %d\n", + rec->lrh_index); + continue; + } + + if (rec->lrh_index != index) { + CERROR("%s: Invalid record: index %u but " + "expected %u\n", + loghandle->lgh_ctxt->loc_obd->obd_name, + rec->lrh_index, index); + GOTO(out, rc = -ERANGE); + } CDEBUG(D_OTHER, "lrh_index: %d lrh_len: %d (%d remains)\n", rec->lrh_index, rec->lrh_len, (int)(buf + chunk_size - (char *)rec)); - loghandle->lgh_cur_idx = rec->lrh_index; - loghandle->lgh_cur_offset = (char *)rec - (char *)buf + - last_offset; + loghandle->lgh_cur_idx = rec->lrh_index; + loghandle->lgh_cur_offset = (char *)rec - (char *)buf + + chunk_offset; /* if set, process the callback on this record */ if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) { @@ -393,23 +427,20 @@ repeat: rc = llog_cancel_rec(lpi->lpi_env, loghandle, rec->lrh_index); - } - if (rc) - GOTO(out, rc); - } else { - CDEBUG(D_OTHER, "Skipped index %d\n", index); - } - - /* next record, still in buffer? */ - ++index; - if (index > last_index) - GOTO(out, rc = 0); - } - } + } + if (rc) + GOTO(out, rc); + } + /* exit if the last index is reached */ + if (index >= last_index) + GOTO(out, rc = 0); + ++index; + } + } out: - if (cd != NULL) - cd->lpcd_last_idx = last_called_index; + if (cd != NULL) + cd->lpcd_last_idx = last_called_index; if (unlikely(rc == -EIO && loghandle->lgh_obj != NULL)) { /* something bad happened to the processing of a local