}
EXPORT_SYMBOL(llog_verify_record);
+static inline bool llog_is_index_skipable(int idx, struct llog_log_hdr *llh,
+ struct llog_process_cat_data *cd)
+{
+ if (cd && (cd->lpcd_read_mode & LLOG_READ_MODE_RAW))
+ return false;
+
+ return !test_bit_le(idx, LLOG_HDR_BITMAP(llh));
+}
+
static int llog_process_thread(void *arg)
{
struct llog_process_info *lpi = arg;
last_called_index = cd->lpcd_first_idx;
index = cd->lpcd_first_idx + 1;
}
- if (cd != NULL && cd->lpcd_last_idx)
+ if (cd && cd->lpcd_last_idx)
last_index = cd->lpcd_last_idx;
+ else if (cd && (cd->lpcd_read_mode & LLOG_READ_MODE_RAW))
+ last_index = loghandle->lgh_last_idx;
else
last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
/* skip records not set in bitmap */
while (index <= last_index &&
- !test_bit_le(index, LLOG_HDR_BITMAP(llh)))
+ llog_is_index_skipable(index, llh, cd))
++index;
/* There are no indices prior the last_index */
CDEBUG(D_OTHER, "cur_offset %llu, chunk_offset %llu,"
" buf_offset %u, rc = %d\n", cur_offset,
(__u64)chunk_offset, buf_offset, rc);
+ if (rc == -ESTALE)
+ GOTO(out, rc = 0);
/* we`ve tried to reread the chunk, but there is no
* new records */
if (rc == -EIO && repeated && (chunk_offset + buf_offset) ==
continue;
}
- if (rec->lrh_index != index) {
- /*
- * the last time we couldn't parse the block due
- * to corruption, thus has no idea about the
- * next index, take it from the block, once.
- */
- if (refresh_idx) {
- refresh_idx = false;
- index = rec->lrh_index;
- } else {
- CERROR("%s: "DFID" Invalid record: index"
- " %u but expected %u\n",
- loghandle2name(loghandle),
- PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
- rec->lrh_index, index);
- GOTO(out, rc = -ERANGE);
- }
+ if (rec->lrh_index > index) {
+ /* the record itself looks good, but we met a
+ * gap which can be result of old bugs, just
+ * keep going */
+ CERROR("%s: "DFID" index %u, expected %u\n",
+ loghandle2name(loghandle),
+ PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
+ rec->lrh_index, index);
+ index = rec->lrh_index;
}
CDEBUG(D_OTHER,
loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
chunk_offset;
- /* if set, process the callback on this record */
- if (test_bit_le(index, LLOG_HDR_BITMAP(llh))) {
+ /* if needed, process the callback on this record */
+ if (!llog_is_index_skipable(index, llh, cd)) {
struct llog_cookie *lgc;
__u64 tmp_off;
int tmp_idx;
lgc->lgc_index = tmp_idx;
}
- if (rc == LLOG_PROC_BREAK) {
+ if (rc == LLOG_PROC_BREAK ||
+ rc == LLOG_SKIP_PLAIN) {
GOTO(out, rc);
} else if (rc == LLOG_DEL_RECORD) {
rc = llog_cancel_rec(lpi->lpi_env,
lpi->lpi_cbdata = data;
lpi->lpi_catdata = catdata;
- CDEBUG(D_OTHER, "Processing "DFID" flags 0x%03x startcat %d startidx %d first_idx %d last_idx %d\n",
+ CDEBUG(D_OTHER, "Processing "DFID" flags 0x%03x startcat %d startidx %d first_idx %d last_idx %d read_mode %d\n",
PFID(&loghandle->lgh_id.lgl_oi.oi_fid), flags,
(flags & LLOG_F_IS_CAT) && d ? d->lpd_startcat : -1,
(flags & LLOG_F_IS_CAT) && d ? d->lpd_startidx : -1,
- cd ? cd->lpcd_first_idx : -1, cd ? cd->lpcd_last_idx : -1);
+ cd ? cd->lpcd_first_idx : -1, cd ? cd->lpcd_last_idx : -1,
+ cd ? cd->lpcd_read_mode : -1);
if (fork) {
struct task_struct *task;
/* skip records not set in bitmap */
while (index >= first_index &&
- !test_bit_le(index, LLOG_HDR_BITMAP(llh)))
+ llog_is_index_skipable(index, llh, cd))
--index;
LASSERT(index >= first_index - 1);
if (tail->lrt_index == 0)
GOTO(out, rc = 0); /* no more records */
- /* if set, process the callback on this record */
- if (test_bit_le(index, LLOG_HDR_BITMAP(llh))) {
+ /* if needed, process the callback on this record */
+ if (!llog_is_index_skipable(index, llh, cd)) {
rec = (void *)tail - tail->lrt_len +
sizeof(*tail);
rc = cb(env, loghandle, rec, data);
- if (rc == LLOG_PROC_BREAK) {
+ if (rc == LLOG_PROC_BREAK ||
+ rc == LLOG_SKIP_PLAIN) {
GOTO(out, rc);
} else if (rc == LLOG_DEL_RECORD) {
rc = llog_cancel_rec(env, loghandle,
}
EXPORT_SYMBOL(llog_is_empty);
+/* this callback run in raw read mode (canceled record are processed) */
int llog_copy_handler(const struct lu_env *env, struct llog_handle *llh,
struct llog_rec_hdr *rec, void *data)
{
- struct llog_handle *copy_llh = data;
+ struct llog_handle *copy_llh = data;
+ int idx = rec->lrh_index;
+ int rc;
+
+ ENTRY;
/* Append all records */
- return llog_write(env, copy_llh, rec, LLOG_NEXT_IDX);
+ rc = llog_write(env, copy_llh, rec, LLOG_NEXT_IDX);
+
+ /* Cancel the record if it is canceled on the source */
+ if (!rc && !test_bit_le(idx, LLOG_HDR_BITMAP(llh->lgh_hdr)))
+ rc = llog_cancel_rec(env, copy_llh, copy_llh->lgh_last_idx);
+
+ RETURN(rc);
}
/* backup plain llog */
struct llog_ctxt *ctxt, struct llog_ctxt *bctxt,
char *name, char *backup)
{
- struct llog_handle *llh, *bllh;
- int rc;
+ struct llog_handle *llh, *bllh;
+ struct llog_process_cat_data cd = {0};
+ int rc;
ENTRY;
if (rc)
GOTO(out_backup, rc);
+ /* Read canceled records to have an exact copy */
+ cd.lpcd_read_mode = LLOG_READ_MODE_RAW;
/* Copy log record by record */
rc = llog_process_or_fork(env, llh, llog_copy_handler, (void *)bllh,
- NULL, false);
+ &cd, false);
if (rc)
CERROR("%s: failed to backup log %s: rc = %d\n",
obd->obd_name, name, rc);