}
EXPORT_SYMBOL(llog_verify_record);
+static inline bool llog_is_index_skipable(int idx, struct llog_log_hdr *llh,
+ struct llog_process_cat_data *cd)
+{
+ if (cd && (cd->lpcd_read_mode & LLOG_READ_MODE_RAW))
+ return false;
+
+ return !test_bit_le(idx, LLOG_HDR_BITMAP(llh));
+}
+
static int llog_process_thread(void *arg)
{
struct llog_process_info *lpi = arg;
last_called_index = cd->lpcd_first_idx;
index = cd->lpcd_first_idx + 1;
}
- if (cd != NULL && cd->lpcd_last_idx)
+ if (cd && cd->lpcd_last_idx)
last_index = cd->lpcd_last_idx;
+ else if (cd && (cd->lpcd_read_mode & LLOG_READ_MODE_RAW))
+ last_index = loghandle->lgh_last_idx;
else
last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
/* skip records not set in bitmap */
while (index <= last_index &&
- !test_bit_le(index, LLOG_HDR_BITMAP(llh)))
+ llog_is_index_skipable(index, llh, cd))
++index;
/* There are no indices prior the last_index */
loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
chunk_offset;
- /* if set, process the callback on this record */
- if (test_bit_le(index, LLOG_HDR_BITMAP(llh))) {
+ /* if needed, process the callback on this record */
+ if (!llog_is_index_skipable(index, llh, cd)) {
struct llog_cookie *lgc;
__u64 tmp_off;
int tmp_idx;
lpi->lpi_cbdata = data;
lpi->lpi_catdata = catdata;
- CDEBUG(D_OTHER, "Processing "DFID" flags 0x%03x startcat %d startidx %d first_idx %d last_idx %d\n",
+ CDEBUG(D_OTHER, "Processing "DFID" flags 0x%03x startcat %d startidx %d first_idx %d last_idx %d read_mode %d\n",
PFID(&loghandle->lgh_id.lgl_oi.oi_fid), flags,
(flags & LLOG_F_IS_CAT) && d ? d->lpd_startcat : -1,
(flags & LLOG_F_IS_CAT) && d ? d->lpd_startidx : -1,
- cd ? cd->lpcd_first_idx : -1, cd ? cd->lpcd_last_idx : -1);
+ cd ? cd->lpcd_first_idx : -1, cd ? cd->lpcd_last_idx : -1,
+ cd ? cd->lpcd_read_mode : -1);
if (fork) {
struct task_struct *task;
/* skip records not set in bitmap */
while (index >= first_index &&
- !test_bit_le(index, LLOG_HDR_BITMAP(llh)))
+ llog_is_index_skipable(index, llh, cd))
--index;
LASSERT(index >= first_index - 1);
if (tail->lrt_index == 0)
GOTO(out, rc = 0); /* no more records */
- /* if set, process the callback on this record */
- if (test_bit_le(index, LLOG_HDR_BITMAP(llh))) {
+ /* if needed, process the callback on this record */
+ if (!llog_is_index_skipable(index, llh, cd)) {
rec = (void *)tail - tail->lrt_len +
sizeof(*tail);
}
EXPORT_SYMBOL(llog_is_empty);
+/* this callback run in raw read mode (canceled record are processed) */
int llog_copy_handler(const struct lu_env *env, struct llog_handle *llh,
struct llog_rec_hdr *rec, void *data)
{
- struct llog_handle *copy_llh = data;
+ struct llog_handle *copy_llh = data;
+ int idx = rec->lrh_index;
+ int rc;
+
+ ENTRY;
/* Append all records */
- return llog_write(env, copy_llh, rec, LLOG_NEXT_IDX);
+ rc = llog_write(env, copy_llh, rec, LLOG_NEXT_IDX);
+
+ /* Cancel the record if it is canceled on the source */
+ if (!rc && !test_bit_le(idx, LLOG_HDR_BITMAP(llh->lgh_hdr)))
+ rc = llog_cancel_rec(env, copy_llh, copy_llh->lgh_last_idx);
+
+ RETURN(rc);
}
/* backup plain llog */
struct llog_ctxt *ctxt, struct llog_ctxt *bctxt,
char *name, char *backup)
{
- struct llog_handle *llh, *bllh;
- int rc;
+ struct llog_handle *llh, *bllh;
+ struct llog_process_cat_data cd = {0};
+ int rc;
ENTRY;
if (rc)
GOTO(out_backup, rc);
+ /* Read canceled records to have an exact copy */
+ cd.lpcd_read_mode = LLOG_READ_MODE_RAW;
/* Copy log record by record */
rc = llog_process_or_fork(env, llh, llog_copy_handler, (void *)bllh,
- NULL, false);
+ &cd, false);
if (rc)
CERROR("%s: failed to backup log %s: rc = %d\n",
obd->obd_name, name, rc);