init_rwsem(&loghandle->lgh_lock);
mutex_init(&loghandle->lgh_hdr_mutex);
+ init_rwsem(&loghandle->lgh_last_sem);
INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
atomic_set(&loghandle->lgh_refcount, 1);
unsigned int buf_offset = 0;
bool partial_chunk;
int lh_last_idx;
+ int synced_idx = 0;
/* skip records not set in bitmap */
while (index <= last_index &&
/* get the buf with our target record; avoid old garbage */
memset(buf, 0, chunk_size);
/* the record index for outdated chunk data */
- lh_last_idx = loghandle->lgh_last_idx + 1;
+ /* it is safe to process buffer until saved lgh_last_idx */
+ lh_last_idx = LLOG_HDR_TAIL(llh)->lrt_index;
rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
index, &cur_offset, buf, chunk_size);
if (repeated && rc)
CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
rec->lrh_type, rec->lrh_index);
- /* for partial chunk the end of it is zeroed, check
- * for index 0 to distinguish it. */
- if (partial_chunk && rec->lrh_index == 0) {
- /* concurrent llog_add() might add new records
- * while llog_processing, check this is not
- * the case and re-read the current chunk
- * otherwise. */
- int records;
- /* lgh_last_idx could be less then index
- * for catalog, if catalog is wrapped */
- if ((index > loghandle->lgh_last_idx &&
- !(loghandle->lgh_hdr->llh_flags &
- LLOG_F_IS_CAT)) || repeated ||
- (loghandle->lgh_obj != NULL &&
- dt_object_remote(loghandle->lgh_obj)))
- GOTO(out, rc = 0);
- /* <2 records means no more records
- * if the last record we processed was
- * the final one, then the underlying
- * object might have been destroyed yet.
- * we better don't access that.. */
- mutex_lock(&loghandle->lgh_hdr_mutex);
- records = loghandle->lgh_hdr->llh_count;
- mutex_unlock(&loghandle->lgh_hdr_mutex);
- if (records <= 1)
- GOTO(out, rc = 0);
- CDEBUG(D_OTHER, "Re-read last llog buffer for "
- "new records, index %u, last %u\n",
- index, loghandle->lgh_last_idx);
+ if (index == (synced_idx + 1) &&
+ synced_idx == LLOG_HDR_TAIL(llh)->lrt_index)
+ GOTO(out, rc = 0);
+
+ /* the bitmap could be changed during processing
+ * records from the chunk. For wrapped catalog
+ * it means we can read deleted record and try to
+ * process it. Check this case and reread the chunk.
+ * It is safe to process to lh_last_idx, including
+ * lh_last_idx if it was synced. We can not do <=
+ * comparison, cause for wrapped catalog lgh_last_idx
+ * could be less than index. So we detect last index
+ * for processing as index == lh_last_idx+1. But when
+ * catalog is wrapped and full lgh_last_idx=llh_cat_idx,
+ * the first processing index is llh_cat_idx+1.
+ */
+
+ if ((index == lh_last_idx && synced_idx != index) ||
+ (index == (lh_last_idx + 1) &&
+ !(index == (llh->llh_cat_idx + 1) &&
+ (llh->llh_flags & LLOG_F_IS_CAT))) ||
+ (rec->lrh_index == 0 && !repeated)) {
+
/* save offset inside buffer for the re-read */
buf_offset = (char *)rec - (char *)buf;
cur_offset = chunk_offset;
repeated = true;
+ /* We need to be sure lgh_last_idx
+ * record was saved to disk
+ */
+ down_read(&loghandle->lgh_last_sem);
+ synced_idx = LLOG_HDR_TAIL(llh)->lrt_index;
+ up_read(&loghandle->lgh_last_sem);
+ CDEBUG(D_OTHER, "synced_idx: %d\n", synced_idx);
goto repeat;
+
}
repeated = false;
loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
chunk_offset;
- OBD_FAIL_TIMEOUT(OBD_FAIL_LLOG_PROCESS_TIMEOUT, 2);
-
+ if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) &&
+ index == lh_last_idx &&
+ cfs_fail_val == (unsigned int)
+ (loghandle->lgh_id.lgl_oi.oi.oi_id &
+ 0xFFFFFFFF)) {
+ OBD_RACE(OBD_FAIL_LLOG_PROCESS_TIMEOUT);
+ }
/* if set, process the callback on this record */
if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
struct llog_cookie *lgc;
__u64 tmp_off;
int tmp_idx;
- /* the bitmap could be changed during processing
- * records from the chunk. For wrapped catalog
- * it means we can read deleted record and try to
- * process it. Check this case and reread the chunk.
- * Checking the race with llog_add the bit is set
- * after incrementation of lgh_last_idx */
- if (index == lh_last_idx &&
- lh_last_idx !=
- (loghandle->lgh_last_idx + 1)) {
- /* save offset inside buffer for
- * the re-read */
- buf_offset = (char *)rec - (char *)buf;
- cur_offset = chunk_offset;
- repeated = true;
- goto repeat;
- }
+ CDEBUG(D_OTHER, "index: %d, lh_last_idx: %d "
+ "synced_idx: %d lgh_last_idx: %d\n",
+ index, lh_last_idx, synced_idx,
+ loghandle->lgh_last_idx);
+
if (lti != NULL) {
lgc = <i->lgi_cookie;
/* store lu_env for recursive calls */
/* This is slightly more than the number of records that can fit into a
* single llog file, because the llog_log_header takes up some of the
* space in the first block that cannot be used for the bitmap. */
-#define LLOG_TEST_RECNUM (LLOG_MIN_CHUNK_SIZE * 8)
-
+static int llog_test_recnum = (LLOG_MIN_CHUNK_SIZE * 8);
static int llog_test_rand;
static struct obd_uuid uuid = { .uuid = "test_uuid" };
static struct llog_logid cat_logid;
/* Test catalogue additions */
static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
{
- struct llog_handle *cath;
+ struct llog_handle *cath, *llh;
char name[10];
int rc, rc2, i, buflen;
struct llog_mini_rec lmr;
if (rc)
GOTO(out, rc);
+ /* estimate the max number of record for the plain llog
+ * cause it depends on disk size
+ */
+ llh = cath->u.chd.chd_current_log;
+ if (llh->lgh_max_size != 0) {
+ llog_test_recnum = (llh->lgh_max_size -
+ sizeof(struct llog_log_hdr)) / LLOG_MIN_REC_SIZE;
+ }
+
+ if (llog_test_recnum >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr))
+ llog_test_recnum = LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1;
+
CWARN("4c: cancel 1 log record\n");
rc = llog_cat_cancel_records(env, cath, 1, &cookie);
if (rc) {
if (rc)
GOTO(out, rc);
- CWARN("4d: write %d more log records\n", LLOG_TEST_RECNUM);
- for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+ CWARN("4d: write %d more log records\n", llog_test_recnum);
+ for (i = 0; i < llog_test_recnum; i++) {
rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
if (rc) {
CERROR("4d: write %d records failed at #%d: %d\n",
- LLOG_TEST_RECNUM, i + 1, rc);
+ llog_test_recnum, i + 1, rc);
GOTO(out, rc);
}
num_recs++;
llog_cat_cancel_records(env, llh->u.phd.phd_cat_handle, 1, &cookie);
cancel_count++;
- if (cancel_count == LLOG_TEST_RECNUM)
+ if (cancel_count == llog_test_recnum)
RETURN(-LLOG_EEMPTY);
RETURN(0);
}
GOTO(out, rc = -EINVAL);
}
- CWARN("5c: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+ CWARN("5c: Cancel %d records, see one log zapped\n", llog_test_recnum);
cancel_count = 0;
rc = llog_cat_process(env, llh, llog_cancel_rec_cb, "foobar", 0, 0);
if (rc != -LLOG_EEMPTY) {
RETURN(rc);
}
-static int llog_truncate(const struct lu_env *env, struct dt_object *o)
-{
- struct lu_attr la;
- struct thandle *th;
- struct dt_device *d;
- int rc;
- ENTRY;
-
- LASSERT(o);
- d = lu2dt_dev(o->do_lu.lo_dev);
- LASSERT(d);
-
- rc = dt_attr_get(env, o, &la);
- if (rc)
- RETURN(rc);
-
- CDEBUG(D_OTHER, "original size %llu\n", la.la_size);
- rc = sizeof(struct llog_log_hdr) + sizeof(struct llog_mini_rec);
- if (la.la_size < rc) {
- CERROR("too small llog: %llu\n", la.la_size);
- RETURN(0);
- }
-
- /* drop 2 records */
- la.la_size = la.la_size - (sizeof(struct llog_mini_rec) * 2);
- la.la_valid = LA_SIZE;
-
- th = dt_trans_create(env, d);
- if (IS_ERR(th))
- RETURN(PTR_ERR(th));
-
- rc = dt_declare_attr_set(env, o, &la, th);
- if (rc)
- GOTO(stop, rc);
-
- rc = dt_declare_punch(env, o, la.la_size, OBD_OBJECT_EOF, th);
-
- rc = dt_trans_start_local(env, d, th);
- if (rc)
- GOTO(stop, rc);
-
- rc = dt_punch(env, o, la.la_size, OBD_OBJECT_EOF, th);
- if (rc)
- GOTO(stop, rc);
-
- rc = dt_attr_set(env, o, &la, th);
- if (rc)
- GOTO(stop, rc);
-
-stop:
- dt_trans_stop(env, d, th);
-
- RETURN(rc);
-}
-
static int test_8_cb(const struct lu_env *env, struct llog_handle *llh,
struct llog_rec_hdr *rec, void *data)
{
GOTO(out_put, rc);
}
- CWARN("8c: drop two records from the first plain llog\n");
- llog_truncate(env, obj);
+ /* Here was 8c: drop two records from the first plain llog
+ * llog_truncate was bad idea cause it creates a wrong state,
+ * lgh_last_idx is wrong and two records belongs to zeroed buffer
+ */
CWARN("8d: count survived records\n");
rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
GOTO(out, rc);
}
- if (orig_counter + 200 - 2 != plain_counter) {
+ if (orig_counter + 200 != plain_counter) {
CERROR("found %d records (expected %d)\n", plain_counter,
- orig_counter + 200 - 2);
+ orig_counter + 200);
rc = -EIO;
}
static int llog_test_process_thread(void *arg)
{
struct llog_process_info *lpi = arg;
+ int rc;
- llog_process(NULL, lpi->lpi_loghandle, lpi->lpi_cb, lpi->lpi_cbdata,
- lpi->lpi_catdata);
+ rc = llog_cat_process_or_fork(NULL, lpi->lpi_loghandle, lpi->lpi_cb,
+ NULL, lpi->lpi_cbdata, 1, 0, true);
complete(&lpi->lpi_completion);
- return 0;
+ lpi->lpi_rc = rc;
+ if (rc)
+ CWARN("10h: Error during catalog processing %d\n", rc);
+ return rc;
}
static int cat_check_old_cb(const struct lu_env *env, struct llog_handle *llh,
if (prev_fid->f_oid > fid.f_oid) {
CWARN("processing old record, fail\n");
prev_fid->f_oid = 0xbad;
- RETURN(LLOG_PROC_BREAK);
+ RETURN(-LLOG_EEMPTY);
}
- if (prev_fid->f_oid == 0)
- cfs_fail_loc = OBD_FAIL_LLOG_PROCESS_TIMEOUT;
-
+ if (prev_fid->f_oid == 0) {
+ cfs_fail_loc = OBD_FAIL_ONCE | OBD_FAIL_LLOG_PROCESS_TIMEOUT;
+ cfs_fail_val = (unsigned int) (llh->lgh_id.lgl_oi.oi.oi_id &
+ 0xFFFFFFFF);
+ }
*prev_fid = fid;
RETURN(0);
cfs_fail_loc = CFS_FAIL_SKIP|OBD_FAIL_CAT_RECORDS;
cfs_fail_val = 4;
- CWARN("10b: write %d log records\n", LLOG_TEST_RECNUM);
- for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+ CWARN("10b: write %d log records\n", llog_test_recnum);
+ for (i = 0; i < llog_test_recnum; i++) {
rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
if (rc) {
CERROR("10b: write %d records failed at #%d: %d\n",
- LLOG_TEST_RECNUM, i + 1, rc);
+ llog_test_recnum, i + 1, rc);
GOTO(out, rc);
}
}
GOTO(out, rc);
}
- CWARN("10c: write %d more log records\n", 2 * LLOG_TEST_RECNUM);
- for (i = 0; i < 2 * LLOG_TEST_RECNUM; i++) {
+ CWARN("10c: write %d more log records\n", 2 * llog_test_recnum);
+ for (i = 0; i < 2 * llog_test_recnum; i++) {
rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
if (rc) {
CERROR("10c: write %d records failed at #%d: %d\n",
- 2*LLOG_TEST_RECNUM, i + 1, rc);
+ 2*llog_test_recnum, i + 1, rc);
GOTO(out, rc);
}
}
*/
enospc = 0;
eok = 0;
- CWARN("10c: write %d more log records\n", LLOG_TEST_RECNUM);
- for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+ CWARN("10c: write %d more log records\n", llog_test_recnum);
+ for (i = 0; i < llog_test_recnum; i++) {
rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
if (rc && rc != -ENOSPC) {
CERROR("10c: write %d records failed at #%d: %d\n",
- LLOG_TEST_RECNUM, i + 1, rc);
+ llog_test_recnum, i + 1, rc);
GOTO(out, rc);
}
/*
}
}
- if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
+ if ((enospc == 0) && (enospc+eok != llog_test_recnum)) {
CERROR("10c: all last records adds should have failed with"
" -ENOSPC\n");
GOTO(out, rc = -EINVAL);
* cancel all 1st plain llog records to empty it, this will also cause
* its catalog entry to be freed for next forced wrap in 10e
*/
- CWARN("10d: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+ CWARN("10d: Cancel %d records, see one log zapped\n", llog_test_recnum);
cancel_count = 0;
rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
if (rc != -LLOG_EEMPTY) {
CERROR("10d: process with llog_cancel_rec_cb failed: %d\n", rc);
/*
- * need to indicate error if for any reason LLOG_TEST_RECNUM is
+ * need to indicate error if for any reason llog_test_recnum is
* not reached
*/
if (rc == 0)
enospc = 0;
eok = 0;
- CWARN("10e: write %d more log records\n", LLOG_TEST_RECNUM);
- for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+ CWARN("10e: write %d more log records\n", llog_test_recnum);
+ for (i = 0; i < llog_test_recnum; i++) {
rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
if (rc && rc != -ENOSPC) {
CERROR("10e: write %d records failed at #%d: %d\n",
- LLOG_TEST_RECNUM, i + 1, rc);
+ llog_test_recnum, i + 1, rc);
GOTO(out, rc);
}
/*
}
}
- if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
+ if ((enospc == 0) && (enospc+eok != llog_test_recnum)) {
CERROR("10e: all last records adds should have failed with"
" -ENOSPC\n");
GOTO(out, rc = -EINVAL);
CWARN("10e: print the catalog entries.. we expect 4\n");
cat_counter = 0;
- rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+ rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10",
+ 0, 0, false);
if (rc) {
- CERROR("10d: process with cat_print_cb failed: %d\n", rc);
+ CERROR("10e: process with cat_print_cb failed: %d\n", rc);
GOTO(out, rc);
}
if (cat_counter != 4) {
- CERROR("10d: %d entries in catalog\n", cat_counter);
+ CERROR("10e: %d entries in catalog\n", cat_counter);
GOTO(out, rc = -EINVAL);
}
* cancel more records to free one more slot in Catalog
* see if it is re-allocated when adding more records
*/
- CWARN("10f: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+ CWARN("10f: Cancel %d records, see one log zapped\n", llog_test_recnum);
cancel_count = 0;
rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
if (rc != -LLOG_EEMPTY) {
CERROR("10f: process with llog_cancel_rec_cb failed: %d\n", rc);
/*
- * need to indicate error if for any reason LLOG_TEST_RECNUM is
+ * need to indicate error if for any reason llog_test_recnum is
* not reached
*/
if (rc == 0)
CWARN("10f: print the catalog entries.. we expect 3\n");
cat_counter = 0;
- rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+ rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10",
+ 0, 0, false);
if (rc) {
CERROR("10f: process with cat_print_cb failed: %d\n", rc);
GOTO(out, rc);
enospc = 0;
eok = 0;
- CWARN("10f: write %d more log records\n", LLOG_TEST_RECNUM);
- for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+ CWARN("10f: write %d more log records\n", llog_test_recnum);
+ for (i = 0; i < llog_test_recnum; i++) {
rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
if (rc && rc != -ENOSPC) {
CERROR("10f: write %d records failed at #%d: %d\n",
- LLOG_TEST_RECNUM, i + 1, rc);
+ llog_test_recnum, i + 1, rc);
GOTO(out, rc);
}
/*
}
}
- if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
+ if ((enospc == 0) && (enospc+eok != llog_test_recnum)) {
CERROR("10f: all last records adds should have failed with"
" -ENOSPC\n");
GOTO(out, rc = -EINVAL);
*/
/* cancel more records to free one more slot in Catalog */
- CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+ CWARN("10g: Cancel %d records, see one log zapped\n", llog_test_recnum);
cancel_count = 0;
rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
if (rc != -LLOG_EEMPTY) {
CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
- /* need to indicate error if for any reason LLOG_TEST_RECNUM is
+ /* need to indicate error if for any reason llog_test_recnum is
* not reached */
if (rc == 0)
rc = -ERANGE;
CWARN("10g: print the catalog entries.. we expect 3\n");
cat_counter = 0;
- rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+ rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10",
+ 0, 0, false);
if (rc) {
CERROR("10g: process with cat_print_cb failed: %d\n", rc);
GOTO(out, rc);
}
/* cancel more records to free one more slot in Catalog */
- CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+ CWARN("10g: Cancel %d records, see one log zapped\n", llog_test_recnum);
cancel_count = 0;
rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
if (rc != -LLOG_EEMPTY) {
CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
/*
- * need to indicate error if for any reason LLOG_TEST_RECNUM is
+ * need to indicate error if for any reason llog_test_recnum is
* not reached
*/
if (rc == 0)
CWARN("10g: print the catalog entries.. we expect 2\n");
cat_counter = 0;
- rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+ rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10",
+ 0, 0, false);
if (rc) {
CERROR("10g: process with cat_print_cb failed: %d\n", rc);
GOTO(out, rc);
}
/* cancel more records to free one more slot in Catalog */
- CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+ CWARN("10g: Cancel %d records, see one log zapped\n", llog_test_recnum);
cancel_count = 0;
rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
if (rc != -LLOG_EEMPTY) {
CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
/*
- * need to indicate error if for any reason LLOG_TEST_RECNUM is
+ * need to indicate error if for any reason llog_test_recnum is
* not reached
*/
if (rc == 0)
CWARN("10g: print the catalog entries.. we expect 1\n");
cat_counter = 0;
- rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+ rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10",
+ 0, 0, false);
if (rc) {
CERROR("10g: process with cat_print_cb failed: %d\n", rc);
GOTO(out, rc);
kthread_run(llog_test_process_thread, &lpi, "llog_test_process_thread");
- msleep(1 * MSEC_PER_SEC);
enospc = 0;
eok = 0;
- CWARN("10h: write %d more log records\n", LLOG_TEST_RECNUM);
- for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+ CWARN("10h: write %d more log records\n", llog_test_recnum);
+ for (i = 0; i < llog_test_recnum; i++) {
rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
if (rc && rc != -ENOSPC) {
CERROR("10h: write %d records failed at #%d: %d\n",
- LLOG_TEST_RECNUM, i + 1, rc);
+ llog_test_recnum, i + 1, rc);
GOTO(out, rc);
}
/*
}
}
- if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
+ if ((enospc == 0) && (enospc+eok != llog_test_recnum)) {
CERROR("10h: all last records adds should have failed with"
" -ENOSPC\n");
GOTO(out, rc = -EINVAL);
wait_for_completion(&lpi.lpi_completion);
- if (test_fid.f_oid == 0xbad) {
+ if (lpi.lpi_rc != 0) {
CERROR("10h: race happened, old record was processed\n");
GOTO(out, rc = -EINVAL);
}