}
EXPORT_SYMBOL(llog_init_handle);
+#define LLOG_ERROR_REC(lgh, rec, format, a...) \
+ CERROR("%s: "DFID" rec type=%x idx=%u len=%u, " format "\n" , \
+ loghandle2name(lgh), PLOGID(&lgh->lgh_id), (rec)->lrh_type, \
+ (rec)->lrh_index, (rec)->lrh_len, ##a)
+
int llog_verify_record(const struct llog_handle *llh, struct llog_rec_hdr *rec)
{
int chunk_size = llh->lgh_hdr->llh_hdr.lrh_len;
- if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
- CERROR("%s: record is too large: %d > %d\n",
- loghandle2name(llh), rec->lrh_len, chunk_size);
- return -EINVAL;
- }
- if (rec->lrh_index >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr)) {
- CERROR("%s: index is too high: %d\n",
- loghandle2name(llh), rec->lrh_index);
- return -EINVAL;
- }
- if ((rec->lrh_type & LLOG_OP_MASK) != LLOG_OP_MAGIC) {
- CERROR("%s: magic %x is bad\n",
- loghandle2name(llh), rec->lrh_type);
- return -EINVAL;
- }
+ if ((rec->lrh_type & LLOG_OP_MASK) != LLOG_OP_MAGIC)
+ LLOG_ERROR_REC(llh, rec, "magic is bad");
+ else if (rec->lrh_len == 0 || rec->lrh_len > chunk_size)
+ LLOG_ERROR_REC(llh, rec, "bad record len, chunk size is %d",
+ chunk_size);
+ else if (rec->lrh_index >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr))
+ LLOG_ERROR_REC(llh, rec, "index is too high");
+ else
+ return 0;
- return 0;
+ return -EINVAL;
}
EXPORT_SYMBOL(llog_verify_record);
static int llog_process_thread(void *arg)
{
- struct llog_process_info *lpi = arg;
- struct llog_handle *loghandle = lpi->lpi_loghandle;
- struct llog_log_hdr *llh = loghandle->lgh_hdr;
- struct llog_process_cat_data *cd = lpi->lpi_catdata;
- struct llog_thread_info *lti;
- char *buf;
- size_t chunk_size;
- __u64 cur_offset;
- int rc = 0, index = 1, last_index;
- int saved_index = 0;
- int last_called_index = 0;
- bool repeated = false;
- bool refresh_idx = false;
+ struct llog_process_info *lpi = arg;
+ struct llog_handle *loghandle = lpi->lpi_loghandle;
+ struct llog_log_hdr *llh = loghandle->lgh_hdr;
+ struct llog_process_cat_data *cd = lpi->lpi_catdata;
+ struct llog_thread_info *lti;
+ char *buf;
+ size_t chunk_size;
+ __u64 cur_offset;
+ int rc = 0, index = 1, last_index;
+ int saved_index = 0;
+ int last_called_index = 0;
+ bool repeated = false;
ENTRY;
struct llog_rec_hdr *rec;
off_t chunk_offset = 0;
unsigned int buf_offset = 0;
- int lh_last_idx;
- int synced_idx = 0;
+ int lh_last_idx;
+ int synced_idx = 0;
/* skip records not set in bitmap */
while (index <= last_index &&
if (repeated && (chunk_offset + buf_offset) == cur_offset &&
(rc == -EBADR || rc == -EIO))
GOTO(out, rc = 0);
+ /* EOF while trying to skip to the next chunk */
+ if (!index && rc == -EBADR)
+ GOTO(out, rc = 0);
if (rc != 0)
GOTO(out, rc);
CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
rec->lrh_type, rec->lrh_index);
+ /* start with first rec if block was skipped */
+ if (!index) {
+ CDEBUG(D_OTHER,
+ "%s: skipping to the index %u\n",
+ loghandle2name(loghandle),
+ rec->lrh_index);
+ index = rec->lrh_index;
+ }
+
if (index == (synced_idx + 1) &&
synced_idx == LLOG_HDR_TAIL(llh)->lrt_index)
GOTO(out, rc = 0);
* it turns to
* lh_last_idx != LLOG_HDR_TAIL(llh)->lrt_index
* This exception is working for catalog only.
+ * The last check is for the partial chunk boundary,
+ * if it is reached then try to re-read for possible
+ * new records once.
*/
-
if ((index == lh_last_idx && synced_idx != index) ||
(index == (lh_last_idx + 1) &&
lh_last_idx != LLOG_HDR_TAIL(llh)->lrt_index) ||
- (rec->lrh_index == 0 && !repeated)) {
-
+ (((char *)rec - buf >= cur_offset - chunk_offset) &&
+ !repeated)) {
/* save offset inside buffer for the re-read */
buf_offset = (char *)rec - (char *)buf;
cur_offset = chunk_offset;
up_read(&loghandle->lgh_last_sem);
CDEBUG(D_OTHER, "synced_idx: %d\n", synced_idx);
goto repeat;
-
}
-
repeated = false;
rc = llog_verify_record(loghandle, rec);
if (rc) {
- CERROR("%s: invalid record in llog "DFID
- " record for index %d/%d: rc = %d\n",
- loghandle2name(loghandle),
- PLOGID(&loghandle->lgh_id),
- rec->lrh_index, index, rc);
+ CDEBUG(D_OTHER, "invalid record at index %d\n",
+ index);
/*
- * the block seem to be corrupted, let's try
- * with the next one. reset rc to go to the
- * next chunk.
+ * for fixed-sized llogs we can skip one record
+ * by using llh_size from llog header.
+ * Otherwise skip the next llog chunk.
*/
- refresh_idx = true;
+ rc = 0;
+ if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
+ rec->lrh_len = llh->llh_size;
+ goto next_rec;
+ }
+ /* make sure that is always next block */
+ cur_offset = chunk_offset + chunk_size;
+ /* no goal to find, just next block to read */
index = 0;
- GOTO(repeat, rc = 0);
+ break;
}
if (rec->lrh_index < index) {
/* the record itself looks good, but we met a
* gap which can be result of old bugs, just
* keep going */
- CERROR("%s: "DFID" index %u, expected %u\n",
- loghandle2name(loghandle),
- PLOGID(&loghandle->lgh_id),
- rec->lrh_index, index);
+ LLOG_ERROR_REC(loghandle, rec,
+ "gap in index, expected %u",
+ index);
index = rec->lrh_index;
}
loghandle->lgh_hdr->llh_count == 1)
GOTO(out, rc = 0);
}
+next_rec:
/* exit if the last index is reached */
if (index >= last_index)
GOTO(out, rc = 0);
return 0;
}
+static int llog_zeroes(const struct lu_env *env, struct dt_object *o,
+ __u64 start, __u64 end)
+{
+ struct lu_attr la;
+ struct thandle *th;
+ struct dt_device *d;
+ char *buf;
+ loff_t pos = start;
+ struct lu_buf lb;
+ int rc;
+
+ ENTRY;
+
+ OBD_ALLOC(buf, end - start);
+ if (!buf)
+ RETURN(-ENOMEM);
+
+ LASSERT(o);
+ d = lu2dt_dev(o->do_lu.lo_dev);
+ LASSERT(d);
+
+ rc = dt_attr_get(env, o, &la);
+ if (rc)
+ GOTO(free, rc);
+
+ CDEBUG(D_OTHER, "llog size %llu\n", la.la_size);
+ rc = sizeof(struct llog_log_hdr) + sizeof(struct llog_mini_rec);
+ if (la.la_size < end) {
+ CERROR("llog size %llu is small for punch at [%llu;%llu]\n",
+ la.la_size, start, end);
+ GOTO(free, rc = 0);
+ }
+
+ th = dt_trans_create(env, d);
+ if (IS_ERR(th))
+ GOTO(free, rc = PTR_ERR(th));
+
+ lb.lb_buf = buf;
+ lb.lb_len = end - start;
+ rc = dt_declare_write(env, o, &lb, pos, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_trans_start_local(env, d, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_write(env, o, &lb, &pos, th);
+ if (rc)
+ GOTO(stop, rc);
+stop:
+ dt_trans_stop(env, d, th);
+free:
+ OBD_FREE(buf, end - start);
+
+ RETURN(rc);
+}
+
+struct llog_test8_rec {
+ struct llog_rec_hdr ltr_hdr;
+ __u64 padding[29];
+ struct llog_rec_tail ltr_tail;
+} __attribute__((packed));
+
static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
{
struct llog_handle *llh = NULL;
char name[10];
int rc, rc2, i;
int orig_counter;
- struct llog_mini_rec lmr;
+ struct llog_test8_rec ltr;
struct llog_ctxt *ctxt;
struct dt_object *obj = NULL;
+ int plain_pos;
+ int reclen = sizeof(ltr);
ENTRY;
ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
LASSERT(ctxt);
- lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
- lmr.lmr_hdr.lrh_type = LLOG_OP_MAGIC;
+ /* simulate generic llog records with 256-bytes size */
+ ltr.ltr_hdr.lrh_len = ltr.ltr_tail.lrt_len = reclen;
+ ltr.ltr_hdr.lrh_type = LLOG_OP_MAGIC;
CWARN("8a: fill the first plain llog\n");
rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
plain_counter = 0;
rc = llog_cat_process(env, llh, test_8_cb, "foobar", 0, 0);
if (rc != 0) {
- CERROR("5a: process with test_8_cb failed: %d\n", rc);
+ CERROR("8a: process with test_8_cb failed: %d\n", rc);
GOTO(out, rc);
}
orig_counter = plain_counter;
- for (i = 0; i < 100; i++) {
- rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL);
+ for (i = 0; i < 20; i++) {
+ rc = llog_cat_add(env, llh, <r.ltr_hdr, NULL);
if (rc) {
CERROR("5a: add record failed\n");
GOTO(out, rc);
}
}
+ CWARN("8b: first llog "DFID"\n",
+ PFID(lu_object_fid(&llh->u.chd.chd_current_log->lgh_obj->do_lu)));
- /* grab the current plain llog, we'll corrupt it later */
- obj = llh->u.chd.chd_current_log->lgh_obj;
- LASSERT(obj);
- lu_object_get(&obj->do_lu);
- CWARN("8a: pin llog "DFID"\n", PFID(lu_object_fid(&obj->do_lu)));
+ /* get llog index in catalog to clear it later */
+ plain_pos = (llh->lgh_last_idx - 1) * sizeof(struct llog_logid_rec);
+ /* destroy plain llog to don't leave it orphaned */
+ list_del_init(&llh->u.chd.chd_current_log->u.phd.phd_entry);
+ llog_destroy(env, llh->u.chd.chd_current_log);
+ llog_close(env, llh->u.chd.chd_current_log);
+ llh->u.chd.chd_current_log = NULL;
rc2 = llog_cat_close(env, llh);
if (rc2) {
}
for (i = 0; i < 100; i++) {
- rc = llog_cat_add(env, llh, &lmr.lmr_hdr, NULL);
+ rc = llog_cat_add(env, llh, <r.ltr_hdr, NULL);
if (rc) {
CERROR("8b: add record failed\n");
GOTO(out, rc);
}
}
- CWARN("8b: second llog "DFID"\n",
- PFID(lu_object_fid(&llh->u.chd.chd_current_log->lgh_obj->do_lu)));
+ /* grab the current plain llog, we'll corrupt it later */
+ obj = llh->u.chd.chd_current_log->lgh_obj;
+ LASSERT(obj);
+ lu_object_get(&obj->do_lu);
+ CWARN("8b: pin llog "DFID"\n", PFID(lu_object_fid(&obj->do_lu)));
+
+ /* must lost all 20 records */
+ CWARN("8b: clean first llog record in catalog\n");
+ llog_zeroes(env, llh->lgh_obj, 8192 + plain_pos,
+ 8192 + plain_pos + sizeof(struct llog_logid_rec));
rc2 = llog_cat_close(env, llh);
if (rc2) {
GOTO(out_put, rc);
}
- /* Here was 8c: drop two records from the first plain llog
- * llog_truncate was bad idea cause it creates a wrong state,
- * lgh_last_idx is wrong and two records belongs to zeroed buffer
- */
+ /* lost 28 records, from 5 to 32 in block */
+ CWARN("8c: corrupt first chunk in the middle\n");
+ llog_zeroes(env, obj, 8192 + reclen * 4, 8192 + reclen * 10);
+ /* lost whole chunk - 32 records */
+ CWARN("8c: corrupt second chunk at start\n");
+ llog_zeroes(env, obj, 16384, 16384 + reclen);
CWARN("8d: count survived records\n");
rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
GOTO(out, rc);
}
- if (orig_counter + 200 != plain_counter) {
+ /* if llog processing skips bad data as expected then 80
+ * records from 120 should be lost
+ */
+ if (orig_counter + 120 - 80 != plain_counter) {
CERROR("found %d records (expected %d)\n", plain_counter,
- orig_counter + 200);
+ orig_counter + 120 - 80);
rc = -EIO;
}
NULL, lpi->lpi_cbdata, 1, 0, true);
complete(&lpi->lpi_completion);
-
+ msleep(MSEC_PER_SEC / 2);
lpi->lpi_rc = rc;
if (rc)
CWARN("10h: Error during catalog processing %d\n", rc);
struct llog_ctxt *ctxt;
struct dt_object *o;
struct lu_env env;
- struct lu_context test_session;
int rc;
ENTRY;
if (rc)
RETURN(rc);
- rc = lu_context_init(&test_session, LCT_SERVER_SESSION);
- if (rc)
- GOTO(cleanup_env, rc);
- test_session.lc_thread = (struct ptlrpc_thread *)current;
- lu_context_enter(&test_session);
- env.le_ses = &test_session;
-
CWARN("Setup llog-test device over %s device\n",
lustre_cfg_string(lcfg, 1));
rc = llog_setup(&env, tgt, &tgt->obd_olg, LLOG_TEST_ORIG_CTXT, tgt,
&llog_osd_ops);
if (rc)
- GOTO(cleanup_session, rc);
+ GOTO(cleanup_env, rc);
/* use MGS llog dir for tests */
ctxt = llog_get_context(tgt, LLOG_CONFIG_ORIG_CTXT);
rc = llog_run_tests(&env, tgt);
if (rc)
llog_test_cleanup(obd);
-cleanup_session:
- lu_context_exit(&test_session);
- lu_context_fini(&test_session);
+
cleanup_env:
lu_env_fini(&env);
RETURN(rc);