*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* lustre/obdclass/llog.c
*
}
out_unlock:
+ if (rc < 0) {
+ /* restore bitmap while holding a mutex */
+ if (subtract_count) {
+ loghandle->lgh_hdr->llh_count += num;
+ subtract_count = false;
+ }
+ for (i = i - 1; i >= 0; i--)
+ set_bit_le(index[i], LLOG_HDR_BITMAP(llh));
+ }
mutex_unlock(&loghandle->lgh_hdr_mutex);
up_write(&loghandle->lgh_lock);
out_trans:
rc1 = dt_trans_stop(env, dt, th);
if (rc == 0)
rc = rc1;
- if (rc < 0) {
+ if (rc1 < 0) {
mutex_lock(&loghandle->lgh_hdr_mutex);
if (subtract_count)
loghandle->lgh_hdr->llh_count += num;
}
EXPORT_SYMBOL(llog_verify_record);
+static inline bool llog_is_index_skipable(int idx, struct llog_log_hdr *llh,
+ struct llog_process_cat_data *cd)
+{
+ if (cd && (cd->lpcd_read_mode & LLOG_READ_MODE_RAW))
+ return false;
+
+ return !test_bit_le(idx, LLOG_HDR_BITMAP(llh));
+}
+
static int llog_process_thread(void *arg)
{
struct llog_process_info *lpi = arg;
last_called_index = cd->lpcd_first_idx;
index = cd->lpcd_first_idx + 1;
}
- if (cd != NULL && cd->lpcd_last_idx)
+ if (cd && cd->lpcd_last_idx)
last_index = cd->lpcd_last_idx;
+ else if (cd && (cd->lpcd_read_mode & LLOG_READ_MODE_RAW))
+ last_index = loghandle->lgh_last_idx;
else
last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
/* skip records not set in bitmap */
while (index <= last_index &&
- !test_bit_le(index, LLOG_HDR_BITMAP(llh)))
+ llog_is_index_skipable(index, llh, cd))
++index;
/* There are no indices prior the last_index */
CDEBUG(D_OTHER, "cur_offset %llu, chunk_offset %llu,"
" buf_offset %u, rc = %d\n", cur_offset,
(__u64)chunk_offset, buf_offset, rc);
+ if (rc == -ESTALE)
+ GOTO(out, rc = 0);
/* we`ve tried to reread the chunk, but there is no
* new records */
if (rc == -EIO && repeated && (chunk_offset + buf_offset) ==
continue;
}
- if (rec->lrh_index != index) {
- /*
- * the last time we couldn't parse the block due
- * to corruption, thus has no idea about the
- * next index, take it from the block, once.
- */
- if (refresh_idx) {
- refresh_idx = false;
- index = rec->lrh_index;
- } else {
- CERROR("%s: "DFID" Invalid record: index"
- " %u but expected %u\n",
- loghandle2name(loghandle),
- PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
- rec->lrh_index, index);
- GOTO(out, rc = -ERANGE);
- }
+ if (rec->lrh_index > index) {
+ /* the record itself looks good, but we met a
+ * gap which can be result of old bugs, just
+ * keep going */
+ CERROR("%s: "DFID" index %u, expected %u\n",
+ loghandle2name(loghandle),
+ PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
+ rec->lrh_index, index);
+ index = rec->lrh_index;
}
CDEBUG(D_OTHER,
loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
chunk_offset;
- /* if set, process the callback on this record */
- if (test_bit_le(index, LLOG_HDR_BITMAP(llh))) {
+ /* if needed, process the callback on this record */
+ if (!llog_is_index_skipable(index, llh, cd)) {
struct llog_cookie *lgc;
__u64 tmp_off;
int tmp_idx;
lgc->lgc_index = tmp_idx;
}
- if (rc == LLOG_PROC_BREAK) {
+ if (rc == LLOG_PROC_BREAK ||
+ rc == LLOG_SKIP_PLAIN) {
GOTO(out, rc);
} else if (rc == LLOG_DEL_RECORD) {
rc = llog_cancel_rec(lpi->lpi_env,
loghandle,
rec->lrh_index);
+ /* Allow parallel cancelling, ENOENT
+ * means record was canceled at another
+ * processing thread or callback
+ */
+ if (rc == -ENOENT)
+ rc = 0;
}
if (rc)
GOTO(out, rc);
lpi->lpi_cbdata = data;
lpi->lpi_catdata = catdata;
- CDEBUG(D_OTHER, "Processing "DFID" flags 0x%03x startcat %d startidx %d first_idx %d last_idx %d\n",
+ CDEBUG(D_OTHER, "Processing "DFID" flags 0x%03x startcat %d startidx %d first_idx %d last_idx %d read_mode %d\n",
PFID(&loghandle->lgh_id.lgl_oi.oi_fid), flags,
(flags & LLOG_F_IS_CAT) && d ? d->lpd_startcat : -1,
(flags & LLOG_F_IS_CAT) && d ? d->lpd_startidx : -1,
- cd ? cd->lpcd_first_idx : -1, cd ? cd->lpcd_last_idx : -1);
+ cd ? cd->lpcd_first_idx : -1, cd ? cd->lpcd_last_idx : -1,
+ cd ? cd->lpcd_read_mode : -1);
if (fork) {
struct task_struct *task;
/* skip records not set in bitmap */
while (index >= first_index &&
- !test_bit_le(index, LLOG_HDR_BITMAP(llh)))
+ llog_is_index_skipable(index, llh, cd))
--index;
LASSERT(index >= first_index - 1);
if (tail->lrt_index == 0)
GOTO(out, rc = 0); /* no more records */
- /* if set, process the callback on this record */
- if (test_bit_le(index, LLOG_HDR_BITMAP(llh))) {
+ /* if needed, process the callback on this record */
+ if (!llog_is_index_skipable(index, llh, cd)) {
rec = (void *)tail - tail->lrt_len +
sizeof(*tail);
rc = cb(env, loghandle, rec, data);
- if (rc == LLOG_PROC_BREAK) {
+ if (rc == LLOG_PROC_BREAK ||
+ rc == LLOG_SKIP_PLAIN) {
GOTO(out, rc);
} else if (rc == LLOG_DEL_RECORD) {
rc = llog_cancel_rec(env, loghandle,
}
EXPORT_SYMBOL(llog_is_empty);
+/* this callback run in raw read mode (canceled record are processed) */
int llog_copy_handler(const struct lu_env *env, struct llog_handle *llh,
struct llog_rec_hdr *rec, void *data)
{
- struct llog_handle *copy_llh = data;
+ struct llog_handle *copy_llh = data;
+ int idx = rec->lrh_index;
+ int rc;
+
+ ENTRY;
/* Append all records */
- return llog_write(env, copy_llh, rec, LLOG_NEXT_IDX);
+ rc = llog_write(env, copy_llh, rec, LLOG_NEXT_IDX);
+
+ /* Cancel the record if it is canceled on the source */
+ if (!rc && !test_bit_le(idx, LLOG_HDR_BITMAP(llh->lgh_hdr)))
+ rc = llog_cancel_rec(env, copy_llh, copy_llh->lgh_last_idx);
+
+ RETURN(rc);
}
/* backup plain llog */
struct llog_ctxt *ctxt, struct llog_ctxt *bctxt,
char *name, char *backup)
{
- struct llog_handle *llh, *bllh;
- int rc;
+ struct llog_handle *llh, *bllh;
+ struct llog_process_cat_data cd = {0};
+ int rc;
ENTRY;
if (rc)
GOTO(out_backup, rc);
+ /* Read canceled records to have an exact copy */
+ cd.lpcd_read_mode = LLOG_READ_MODE_RAW;
/* Copy log record by record */
rc = llog_process_or_fork(env, llh, llog_copy_handler, (void *)bllh,
- NULL, false);
+ &cd, false);
if (rc)
CERROR("%s: failed to backup log %s: rc = %d\n",
obd->obd_name, name, rc);