Whamcloud - gitweb
LU-15481 llog: Add LLOG_SKIP_PLAIN to skip llog plain
[fs/lustre-release.git] / lustre / obdclass / llog.c
index 5580286..1af4ae5 100644 (file)
@@ -487,6 +487,15 @@ int llog_verify_record(const struct llog_handle *llh, struct llog_rec_hdr *rec)
 }
 EXPORT_SYMBOL(llog_verify_record);
 
+static inline bool llog_is_index_skipable(int idx, struct llog_log_hdr *llh,
+                                         struct llog_process_cat_data *cd)
+{
+       if (cd && (cd->lpcd_read_mode & LLOG_READ_MODE_RAW))
+               return false;
+
+       return !test_bit_le(idx, LLOG_HDR_BITMAP(llh));
+}
+
 static int llog_process_thread(void *arg)
 {
        struct llog_process_info        *lpi = arg;
@@ -524,8 +533,10 @@ static int llog_process_thread(void *arg)
                last_called_index = cd->lpcd_first_idx;
                index = cd->lpcd_first_idx + 1;
        }
-       if (cd != NULL && cd->lpcd_last_idx)
+       if (cd && cd->lpcd_last_idx)
                last_index = cd->lpcd_last_idx;
+       else if (cd && (cd->lpcd_read_mode & LLOG_READ_MODE_RAW))
+               last_index = loghandle->lgh_last_idx;
        else
                last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
 
@@ -538,7 +549,7 @@ static int llog_process_thread(void *arg)
 
                /* skip records not set in bitmap */
                while (index <= last_index &&
-                      !test_bit_le(index, LLOG_HDR_BITMAP(llh)))
+                      llog_is_index_skipable(index, llh, cd))
                        ++index;
 
                /* There are no indices prior the last_index */
@@ -560,6 +571,8 @@ repeat:
                        CDEBUG(D_OTHER, "cur_offset %llu, chunk_offset %llu,"
                               " buf_offset %u, rc = %d\n", cur_offset,
                               (__u64)chunk_offset, buf_offset, rc);
+               if (rc == -ESTALE)
+                       GOTO(out, rc = 0);
                /* we`ve tried to reread the chunk, but there is no
                 * new records */
                if (rc == -EIO && repeated && (chunk_offset + buf_offset) ==
@@ -668,23 +681,15 @@ repeat:
                                continue;
                        }
 
-                       if (rec->lrh_index != index) {
-                               /*
-                                * the last time we couldn't parse the block due
-                                * to corruption, thus has no idea about the
-                                * next index, take it from the block, once.
-                                */
-                               if (refresh_idx) {
-                                       refresh_idx = false;
-                                       index = rec->lrh_index;
-                               } else {
-                                       CERROR("%s: "DFID" Invalid record: index"
-                                              " %u but expected %u\n",
-                                              loghandle2name(loghandle),
-                                              PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
-                                              rec->lrh_index, index);
-                                       GOTO(out, rc = -ERANGE);
-                               }
+                       if (rec->lrh_index > index) {
+                               /* the record itself looks good, but we met a
+                                * gap which can be result of old bugs, just
+                                * keep going */
+                               CERROR("%s: "DFID" index %u, expected %u\n",
+                                      loghandle2name(loghandle),
+                                      PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
+                                      rec->lrh_index, index);
+                               index = rec->lrh_index;
                        }
 
                        CDEBUG(D_OTHER,
@@ -696,8 +701,8 @@ repeat:
                        loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
                                                    chunk_offset;
 
-                       /* if set, process the callback on this record */
-                       if (test_bit_le(index, LLOG_HDR_BITMAP(llh))) {
+                       /* if needed, process the callback on this record */
+                       if (!llog_is_index_skipable(index, llh, cd)) {
                                struct llog_cookie *lgc;
                                __u64   tmp_off;
                                int     tmp_idx;
@@ -730,7 +735,8 @@ repeat:
                                        lgc->lgc_index = tmp_idx;
                                }
 
-                               if (rc == LLOG_PROC_BREAK) {
+                               if (rc == LLOG_PROC_BREAK ||
+                                   rc == LLOG_SKIP_PLAIN) {
                                        GOTO(out, rc);
                                } else if (rc == LLOG_DEL_RECORD) {
                                        rc = llog_cancel_rec(lpi->lpi_env,
@@ -866,11 +872,12 @@ int llog_process_or_fork(const struct lu_env *env,
        lpi->lpi_cbdata    = data;
        lpi->lpi_catdata   = catdata;
 
-       CDEBUG(D_OTHER, "Processing "DFID" flags 0x%03x startcat %d startidx %d first_idx %d last_idx %d\n",
+       CDEBUG(D_OTHER, "Processing "DFID" flags 0x%03x startcat %d startidx %d first_idx %d last_idx %d read_mode %d\n",
               PFID(&loghandle->lgh_id.lgl_oi.oi_fid), flags,
               (flags & LLOG_F_IS_CAT) && d ? d->lpd_startcat : -1,
               (flags & LLOG_F_IS_CAT) && d ? d->lpd_startidx : -1,
-              cd ? cd->lpcd_first_idx : -1, cd ? cd->lpcd_last_idx : -1);
+              cd ? cd->lpcd_first_idx : -1, cd ? cd->lpcd_last_idx : -1,
+              cd ? cd->lpcd_read_mode : -1);
        if (fork) {
                struct task_struct *task;
 
@@ -961,7 +968,7 @@ int llog_reverse_process(const struct lu_env *env,
 
                /* skip records not set in bitmap */
                while (index >= first_index &&
-                      !test_bit_le(index, LLOG_HDR_BITMAP(llh)))
+                      llog_is_index_skipable(index, llh, cd))
                        --index;
 
                LASSERT(index >= first_index - 1);
@@ -991,13 +998,14 @@ int llog_reverse_process(const struct lu_env *env,
                        if (tail->lrt_index == 0)
                                GOTO(out, rc = 0); /* no more records */
 
-                       /* if set, process the callback on this record */
-                       if (test_bit_le(index, LLOG_HDR_BITMAP(llh))) {
+                       /* if needed, process the callback on this record */
+                       if (!llog_is_index_skipable(index, llh, cd)) {
                                rec = (void *)tail - tail->lrt_len +
                                      sizeof(*tail);
 
                                rc = cb(env, loghandle, rec, data);
-                               if (rc == LLOG_PROC_BREAK) {
+                               if (rc == LLOG_PROC_BREAK ||
+                                   rc == LLOG_SKIP_PLAIN) {
                                        GOTO(out, rc);
                                } else if (rc == LLOG_DEL_RECORD) {
                                        rc = llog_cancel_rec(env, loghandle,
@@ -1423,13 +1431,24 @@ out:
 }
 EXPORT_SYMBOL(llog_is_empty);
 
+/* this callback run in raw read mode (canceled record are processed) */
 int llog_copy_handler(const struct lu_env *env, struct llog_handle *llh,
                      struct llog_rec_hdr *rec, void *data)
 {
-       struct llog_handle      *copy_llh = data;
+       struct llog_handle *copy_llh = data;
+       int idx = rec->lrh_index;
+       int rc;
+
+       ENTRY;
 
        /* Append all records */
-       return llog_write(env, copy_llh, rec, LLOG_NEXT_IDX);
+       rc = llog_write(env, copy_llh, rec, LLOG_NEXT_IDX);
+
+       /* Cancel the record if it is canceled on the source */
+       if (!rc && !test_bit_le(idx, LLOG_HDR_BITMAP(llh->lgh_hdr)))
+               rc = llog_cancel_rec(env, copy_llh, copy_llh->lgh_last_idx);
+
+       RETURN(rc);
 }
 
 /* backup plain llog */
@@ -1437,8 +1456,9 @@ int llog_backup(const struct lu_env *env, struct obd_device *obd,
                struct llog_ctxt *ctxt, struct llog_ctxt *bctxt,
                char *name, char *backup)
 {
-       struct llog_handle      *llh, *bllh;
-       int                      rc;
+       struct llog_handle *llh, *bllh;
+       struct llog_process_cat_data cd = {0};
+       int rc;
 
        ENTRY;
 
@@ -1483,9 +1503,11 @@ int llog_backup(const struct lu_env *env, struct obd_device *obd,
        if (rc)
                GOTO(out_backup, rc);
 
+       /* Read canceled records to have an exact copy */
+       cd.lpcd_read_mode = LLOG_READ_MODE_RAW;
        /* Copy log record by record */
        rc = llog_process_or_fork(env, llh, llog_copy_handler, (void *)bllh,
-                                 NULL, false);
+                                 &cd, false);
        if (rc)
                CERROR("%s: failed to backup log %s: rc = %d\n",
                       obd->obd_name, name, rc);