struct llog_log_hdr *llh = loghandle->lgh_hdr;
struct llog_process_cat_data *cd = lpi->lpi_catdata;
char *buf;
- int chunk_size;
+ size_t chunk_size;
__u64 cur_offset;
- __u64 last_offset;
int rc = 0, index = 1, last_index;
int saved_index = 0;
int last_called_index = 0;
RETURN(-EINVAL);
cur_offset = chunk_size = llh->llh_hdr.lrh_len;
+ /* expect chunk_size to be power of two */
+ LASSERT(is_power_of_2(chunk_size));
OBD_ALLOC_LARGE(buf, chunk_size);
if (buf == NULL) {
else
last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
- if (index > last_index) /* Record is not in this buffer. */
- GOTO(out, rc);
-
while (rc == 0) {
struct llog_rec_hdr *rec;
+ off_t chunk_offset;
+ unsigned int buf_offset = 0;
+ bool partial_chunk;
/* skip records not set in bitmap */
while (index <= last_index &&
!ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
++index;
- LASSERT(index <= last_index + 1);
- if (index == last_index + 1)
- break;
-repeat:
- CDEBUG(D_OTHER, "index: %d last_index %d\n",
- index, last_index);
+ /* There are no indices prior the last_index */
+ if (index > last_index)
+ break;
+
+ CDEBUG(D_OTHER, "index: %d last_index %d\n", index,
+ last_index);
+repeat:
/* get the buf with our target record; avoid old garbage */
memset(buf, 0, chunk_size);
- last_offset = cur_offset;
rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
index, &cur_offset, buf, chunk_size);
if (rc != 0)
GOTO(out, rc);
+ /* NB: after llog_next_block() call the cur_offset is the
+ * offset of the next block after read one.
+ * The absolute offset of the current chunk is calculated
+ * from cur_offset value and stored in chunk_offset variable.
+ */
+ if (cur_offset % chunk_size != 0) {
+ partial_chunk = true;
+ chunk_offset = cur_offset & ~(chunk_size - 1);
+ } else {
+ partial_chunk = false;
+ chunk_offset = cur_offset - chunk_size;
+ }
+
/* NB: when rec->lrh_len is accessed it is already swabbed
* since it is used at the "end" of the loop and the rec
* swabbing is done at the beginning of the loop. */
- for (rec = (struct llog_rec_hdr *)buf;
+ for (rec = (struct llog_rec_hdr *)(buf + buf_offset);
(char *)rec < buf + chunk_size;
rec = llog_rec_hdr_next(rec)) {
CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
rec, rec->lrh_type);
- if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
+ if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
lustre_swab_llog_rec(rec);
- CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
- rec->lrh_type, rec->lrh_index);
-
- if (rec->lrh_index == 0) {
- /* probably another rec just got added? */
- if (index <= loghandle->lgh_last_idx)
- GOTO(repeat, rc = 0);
- GOTO(out, rc = 0); /* no more records */
+ CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
+ rec->lrh_type, rec->lrh_index);
+
+ /* for partial chunk the end of it is zeroed, check
+ * for index 0 to distinguish it. */
+ if (partial_chunk && rec->lrh_index == 0) {
+ /* concurrent llog_add() might add new records
+ * while llog_processing, check this is not
+ * the case and re-read the current chunk
+ * otherwise. */
+ if (index > loghandle->lgh_last_idx)
+ GOTO(out, rc = 0);
+ CDEBUG(D_OTHER, "Re-read last llog buffer for "
+ "new records, index %u, last %u\n",
+ index, loghandle->lgh_last_idx);
+ /* save offset inside buffer for the re-read */
+ buf_offset = (char *)rec - (char *)buf;
+ cur_offset = chunk_offset;
+ goto repeat;
}
+
if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
- CWARN("invalid length %d in llog record for "
- "index %d/%d\n", rec->lrh_len,
- rec->lrh_index, index);
- GOTO(out, rc = -EINVAL);
- }
+ CWARN("invalid length %d in llog record for "
+ "index %d/%d\n", rec->lrh_len,
+ rec->lrh_index, index);
+ GOTO(out, rc = -EINVAL);
+ }
- if (rec->lrh_index < index) {
- CDEBUG(D_OTHER, "skipping lrh_index %d\n",
- rec->lrh_index);
- continue;
- }
+ if (rec->lrh_index < index) {
+ CDEBUG(D_OTHER, "skipping lrh_index %d\n",
+ rec->lrh_index);
+ continue;
+ }
+
+ if (rec->lrh_index != index) {
+ CERROR("%s: Invalid record: index %u but "
+ "expected %u\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name,
+ rec->lrh_index, index);
+ GOTO(out, rc = -ERANGE);
+ }
CDEBUG(D_OTHER,
"lrh_index: %d lrh_len: %d (%d remains)\n",
rec->lrh_index, rec->lrh_len,
(int)(buf + chunk_size - (char *)rec));
- loghandle->lgh_cur_idx = rec->lrh_index;
- loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
- last_offset;
+ loghandle->lgh_cur_idx = rec->lrh_index;
+ loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
+ chunk_offset;
/* if set, process the callback on this record */
if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
rc = llog_cancel_rec(lpi->lpi_env,
loghandle,
rec->lrh_index);
- }
- if (rc)
- GOTO(out, rc);
- } else {
- CDEBUG(D_OTHER, "Skipped index %d\n", index);
- }
-
- /* next record, still in buffer? */
- ++index;
- if (index > last_index)
- GOTO(out, rc = 0);
- }
- }
+ }
+ if (rc)
+ GOTO(out, rc);
+ }
+ /* exit if the last index is reached */
+ if (index >= last_index)
+ GOTO(out, rc = 0);
+ ++index;
+ }
+ }
out:
- if (cd != NULL)
- cd->lpcd_last_idx = last_called_index;
+ if (cd != NULL)
+ cd->lpcd_last_idx = last_called_index;
if (unlikely(rc == -EIO && loghandle->lgh_obj != NULL)) {
/* something bad happened to the processing of a local
}
EXPORT_SYMBOL(llog_cat_reverse_process);
-static int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
+static int llog_cat_set_first_idx(struct llog_handle *cathandle, int idx)
{
struct llog_log_hdr *llh = cathandle->lgh_hdr;
- int i, bitmap_size, idx;
+ int bitmap_size;
+
ENTRY;
bitmap_size = LLOG_HDR_BITMAP_SIZE(llh);
- if (llh->llh_cat_idx == (index - 1)) {
- idx = llh->llh_cat_idx + 1;
+ /*
+ * The llh_cat_idx equals to the first used index minus 1
+ * so if we canceled the first index then llh_cat_idx
+ * must be renewed.
+ */
+ if (llh->llh_cat_idx == (idx - 1)) {
llh->llh_cat_idx = idx;
- if (idx == cathandle->lgh_last_idx)
- goto out;
-
- for (i = (index + 1) % bitmap_size;
- i != cathandle->lgh_last_idx;
- i = (i + 1) % bitmap_size) {
- if (!ext2_test_bit(i, LLOG_HDR_BITMAP(llh))) {
- idx = llh->llh_cat_idx + 1;
+
+ do {
+ idx = (idx + 1) % bitmap_size;
+ if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) {
+ /* update llh_cat_idx for each unset bit,
+ * expecting the*/
llh->llh_cat_idx = idx;
- } else if (i == 0) {
- llh->llh_cat_idx = 0;
+ } else if (idx == 0) {
+ /* skip header bit */
+ continue;
} else {
+ /* the first index is found */
break;
}
- }
-out:
- CDEBUG(D_RPCTRACE, "set catlog "DOSTID" first idx %u\n",
- POSTID(&cathandle->lgh_id.lgl_oi), llh->llh_cat_idx);
+ } while (idx != cathandle->lgh_last_idx);
+
+ CDEBUG(D_RPCTRACE, "Set catlog "DOSTID" first idx %u,"
+ " (last_idx %u)\n", POSTID(&cathandle->lgh_id.lgl_oi),
+ llh->llh_cat_idx, cathandle->lgh_last_idx);
}
RETURN(0);
* actual records are larger than minimum size) we just skip
* some more records.
*/
-static inline void llog_skip_over(__u64 *off, int curr, int goal,
- __u32 chunk_size)
+static inline void llog_skip_over(struct llog_log_hdr *llh, __u64 *off,
+ int curr, int goal, __u32 chunk_size)
{
- if (goal <= curr)
- return;
- *off = (*off + (goal - curr - 1) * LLOG_MIN_REC_SIZE) &
- ~(chunk_size - 1);
+ if (goal > curr) {
+ if (llh->llh_size == 0) {
+ /* variable size records */
+ *off = (*off + (goal - curr - 1) * LLOG_MIN_REC_SIZE);
+ } else {
+ *off = chunk_size + (goal - 1) * llh->llh_size;
+ }
+ }
+ /* always align with lower chunk boundary*/
+ *off &= ~(chunk_size - 1);
}
/**
struct llog_rec_hdr *rec, *last_rec;
struct llog_rec_tail *tail;
- llog_skip_over(cur_offset, *cur_idx, next_idx, chunk_size);
+ llog_skip_over(loghandle->lgh_hdr, cur_offset, *cur_idx,
+ next_idx, chunk_size);
/* read up to next llog chunk_size block */
lgi->lgi_buf.lb_len = chunk_size -
LASSERT(dt);
cur_offset = chunk_size;
- llog_skip_over(&cur_offset, 0, prev_idx, chunk_size);
+ llog_skip_over(loghandle->lgh_hdr, &cur_offset, 0, prev_idx,
+ chunk_size);
rc = dt_attr_get(env, o, &lgi->lgi_attr);
if (rc)