From: Alexander Boyko Date: Thu, 29 Nov 2018 13:58:30 +0000 (-0500) Subject: LU-11591 llog: add synchronization for the last record X-Git-Tag: 2.12.52~78 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=ec4194e4e78c959ace97afeacd580d3609658115;ds=sidebyside LU-11591 llog: add synchronization for the last record The initial problem was a race between llog_process_thread and llog_osd_write_rec for a last record with lgh_last_idx. The catalog should be wrapped for the problem. The lgh_last_idx could be increased with a modification of llog bitmap, and a writing record happen a bit later. When llog_process_thread processing lgh_last_idx after modification and before a write it operates with old record data. The patch adds synchronization when lgh_last_idx is processed. The patch changes llog_test 10h to check race between llog_process_thread and llog_osd_write_rec. 1 Thread with write 2 Thread with read llog_osd_write_rec() llog_process_thread() lgh_last_idx++ lock lgh_hdr_mutex ext2_set_bit() dt_write_rec (write header) ext2_test_bit() check lgh_last_idx was changed dt_read_rec() reread the record, and here we got the old value of record unlock lgh_hdr_mutex dt_write_rec (write the record) Signed-off-by: Alexander Boyko Cray-bug-id: LUS-6683 Change-Id: I642b488655940b9456ca8e2f2174c98a966ba242 Reviewed-on: https://review.whamcloud.com/33683 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Alexander Zarochentsev Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/lustre_log.h b/lustre/include/lustre_log.h index aa0a0b5..d47d990 100644 --- a/lustre/include/lustre_log.h +++ b/lustre/include/lustre_log.h @@ -268,6 +268,7 @@ struct llog_handle { * case, after it will have reached LLOG_HDR_BITMAP_SIZE, llh_cat_idx * will become its upper limit */ int lgh_last_idx; + struct rw_semaphore lgh_last_sem; __u64 lgh_cur_offset; /* used for test only */ struct llog_ctxt *lgh_ctxt; union { diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index 9046c65..6ecc0a8 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -64,6 +64,7 @@ static struct llog_handle *llog_alloc_handle(void) init_rwsem(&loghandle->lgh_lock); mutex_init(&loghandle->lgh_hdr_mutex); + init_rwsem(&loghandle->lgh_last_sem); INIT_LIST_HEAD(&loghandle->u.phd.phd_entry); atomic_set(&loghandle->lgh_refcount, 1); @@ -477,6 +478,7 @@ static int llog_process_thread(void *arg) unsigned int buf_offset = 0; bool partial_chunk; int lh_last_idx; + int synced_idx = 0; /* skip records not set in bitmap */ while (index <= last_index && @@ -494,7 +496,8 @@ repeat: /* get the buf with our target record; avoid old garbage */ memset(buf, 0, chunk_size); /* the record index for outdated chunk data */ - lh_last_idx = loghandle->lgh_last_idx + 1; + /* it is safe to process buffer until saved lgh_last_idx */ + lh_last_idx = LLOG_HDR_TAIL(llh)->lrt_index; rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index, index, &cur_offset, buf, chunk_size); if (repeated && rc) @@ -538,40 +541,42 @@ repeat: CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n", rec->lrh_type, rec->lrh_index); - /* for partial chunk the end of it is zeroed, check - * for index 0 to distinguish it. */ - if (partial_chunk && rec->lrh_index == 0) { - /* concurrent llog_add() might add new records - * while llog_processing, check this is not - * the case and re-read the current chunk - * otherwise. */ - int records; - /* lgh_last_idx could be less then index - * for catalog, if catalog is wrapped */ - if ((index > loghandle->lgh_last_idx && - !(loghandle->lgh_hdr->llh_flags & - LLOG_F_IS_CAT)) || repeated || - (loghandle->lgh_obj != NULL && - dt_object_remote(loghandle->lgh_obj))) - GOTO(out, rc = 0); - /* <2 records means no more records - * if the last record we processed was - * the final one, then the underlying - * object might have been destroyed yet. - * we better don't access that.. */ - mutex_lock(&loghandle->lgh_hdr_mutex); - records = loghandle->lgh_hdr->llh_count; - mutex_unlock(&loghandle->lgh_hdr_mutex); - if (records <= 1) - GOTO(out, rc = 0); - CDEBUG(D_OTHER, "Re-read last llog buffer for " - "new records, index %u, last %u\n", - index, loghandle->lgh_last_idx); + if (index == (synced_idx + 1) && + synced_idx == LLOG_HDR_TAIL(llh)->lrt_index) + GOTO(out, rc = 0); + + /* the bitmap could be changed during processing + * records from the chunk. For wrapped catalog + * it means we can read deleted record and try to + * process it. Check this case and reread the chunk. + * It is safe to process to lh_last_idx, including + * lh_last_idx if it was synced. We can not do <= + * comparison, cause for wrapped catalog lgh_last_idx + * could be less than index. So we detect last index + * for processing as index == lh_last_idx+1. But when + * catalog is wrapped and full lgh_last_idx=llh_cat_idx, + * the first processing index is llh_cat_idx+1. + */ + + if ((index == lh_last_idx && synced_idx != index) || + (index == (lh_last_idx + 1) && + !(index == (llh->llh_cat_idx + 1) && + (llh->llh_flags & LLOG_F_IS_CAT))) || + (rec->lrh_index == 0 && !repeated)) { + /* save offset inside buffer for the re-read */ buf_offset = (char *)rec - (char *)buf; cur_offset = chunk_offset; repeated = true; + /* We need to be sure lgh_last_idx + * record was saved to disk + */ + down_read(&loghandle->lgh_last_sem); + synced_idx = LLOG_HDR_TAIL(llh)->lrt_index; + up_read(&loghandle->lgh_last_sem); + CDEBUG(D_OTHER, "synced_idx: %d\n", synced_idx); goto repeat; + } repeated = false; @@ -611,30 +616,24 @@ repeat: loghandle->lgh_cur_offset = (char *)rec - (char *)buf + chunk_offset; - OBD_FAIL_TIMEOUT(OBD_FAIL_LLOG_PROCESS_TIMEOUT, 2); - + if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) && + index == lh_last_idx && + cfs_fail_val == (unsigned int) + (loghandle->lgh_id.lgl_oi.oi.oi_id & + 0xFFFFFFFF)) { + OBD_RACE(OBD_FAIL_LLOG_PROCESS_TIMEOUT); + } /* if set, process the callback on this record */ if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) { struct llog_cookie *lgc; __u64 tmp_off; int tmp_idx; - /* the bitmap could be changed during processing - * records from the chunk. For wrapped catalog - * it means we can read deleted record and try to - * process it. Check this case and reread the chunk. - * Checking the race with llog_add the bit is set - * after incrementation of lgh_last_idx */ - if (index == lh_last_idx && - lh_last_idx != - (loghandle->lgh_last_idx + 1)) { - /* save offset inside buffer for - * the re-read */ - buf_offset = (char *)rec - (char *)buf; - cur_offset = chunk_offset; - repeated = true; - goto repeat; - } + CDEBUG(D_OTHER, "index: %d, lh_last_idx: %d " + "synced_idx: %d lgh_last_idx: %d\n", + index, lh_last_idx, synced_idx, + loghandle->lgh_last_idx); + if (lti != NULL) { lgc = <i->lgi_cookie; /* store lu_env for recursive calls */ diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c index f939e8b..56f8f11 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/obdclass/llog_cat.c @@ -880,7 +880,10 @@ int llog_cat_process_or_fork(const struct lu_env *env, /* processing the catalog part at the end */ cd.lpcd_first_idx = (startcat ? startcat : llh->llh_cat_idx); - cd.lpcd_last_idx = 0; + if (OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS)) + cd.lpcd_last_idx = cfs_fail_val; + else + cd.lpcd_last_idx = 0; rc = llog_process_or_fork(env, cat_llh, cat_cb, &d, &cd, fork); /* Reset the startcat becasue it has already reached @@ -907,6 +910,7 @@ int llog_cat_process_or_fork(const struct lu_env *env, RETURN(rc); } +EXPORT_SYMBOL(llog_cat_process_or_fork); int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh, llog_cb_t cb, void *data, int startcat, int startidx) diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 9511e00..fc0b16f 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -44,6 +44,8 @@ #define DEBUG_SUBSYSTEM S_LOG +#include + #include #include #include @@ -590,6 +592,7 @@ static int llog_osd_write_rec(const struct lu_env *env, RETURN(-ENOSPC); } + down_write(&loghandle->lgh_last_sem); /* increment the last_idx along with llh_tail index, they should * be equal for a llog lifetime */ loghandle->lgh_last_idx++; @@ -673,6 +676,12 @@ out_unlock: if (rc) GOTO(out, rc); + if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) && + cfs_fail_val == (unsigned int)(loghandle->lgh_id.lgl_oi.oi.oi_id & + 0xFFFFFFFF)) { + OBD_RACE(OBD_FAIL_LLOG_PROCESS_TIMEOUT); + msleep(1 * MSEC_PER_SEC); + } /* computed index can be used to determine offset for fixed-size * records. This also allows to handle Catalog wrap around case */ if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { @@ -693,6 +702,8 @@ out_unlock: if (rc < 0) GOTO(out, rc); + up_write(&loghandle->lgh_last_sem); + CDEBUG(D_HA, "added record "DFID".%u, %u off%llu\n", PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len, lgi->lgi_off); @@ -726,6 +737,7 @@ out: } LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx; + up_write(&loghandle->lgh_last_sem); RETURN(rc); } diff --git a/lustre/obdclass/llog_test.c b/lustre/obdclass/llog_test.c index 4fd4b1a..6b2628f 100644 --- a/lustre/obdclass/llog_test.c +++ b/lustre/obdclass/llog_test.c @@ -49,8 +49,7 @@ /* This is slightly more than the number of records that can fit into a * single llog file, because the llog_log_header takes up some of the * space in the first block that cannot be used for the bitmap. */ -#define LLOG_TEST_RECNUM (LLOG_MIN_CHUNK_SIZE * 8) - +static int llog_test_recnum = (LLOG_MIN_CHUNK_SIZE * 8); static int llog_test_rand; static struct obd_uuid uuid = { .uuid = "test_uuid" }; static struct llog_logid cat_logid; @@ -576,7 +575,7 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd, /* Test catalogue additions */ static int llog_test_4(const struct lu_env *env, struct obd_device *obd) { - struct llog_handle *cath; + struct llog_handle *cath, *llh; char name[10]; int rc, rc2, i, buflen; struct llog_mini_rec lmr; @@ -625,6 +624,18 @@ static int llog_test_4(const struct lu_env *env, struct obd_device *obd) if (rc) GOTO(out, rc); + /* estimate the max number of record for the plain llog + * cause it depends on disk size + */ + llh = cath->u.chd.chd_current_log; + if (llh->lgh_max_size != 0) { + llog_test_recnum = (llh->lgh_max_size - + sizeof(struct llog_log_hdr)) / LLOG_MIN_REC_SIZE; + } + + if (llog_test_recnum >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr)) + llog_test_recnum = LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1; + CWARN("4c: cancel 1 log record\n"); rc = llog_cat_cancel_records(env, cath, 1, &cookie); if (rc) { @@ -637,12 +648,12 @@ static int llog_test_4(const struct lu_env *env, struct obd_device *obd) if (rc) GOTO(out, rc); - CWARN("4d: write %d more log records\n", LLOG_TEST_RECNUM); - for (i = 0; i < LLOG_TEST_RECNUM; i++) { + CWARN("4d: write %d more log records\n", llog_test_recnum); + for (i = 0; i < llog_test_recnum; i++) { rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL); if (rc) { CERROR("4d: write %d records failed at #%d: %d\n", - LLOG_TEST_RECNUM, i + 1, rc); + llog_test_recnum, i + 1, rc); GOTO(out, rc); } num_recs++; @@ -749,7 +760,7 @@ static int llog_cancel_rec_cb(const struct lu_env *env, llog_cat_cancel_records(env, llh->u.phd.phd_cat_handle, 1, &cookie); cancel_count++; - if (cancel_count == LLOG_TEST_RECNUM) + if (cancel_count == llog_test_recnum) RETURN(-LLOG_EEMPTY); RETURN(0); } @@ -796,7 +807,7 @@ static int llog_test_5(const struct lu_env *env, struct obd_device *obd) GOTO(out, rc = -EINVAL); } - CWARN("5c: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM); + CWARN("5c: Cancel %d records, see one log zapped\n", llog_test_recnum); cancel_count = 0; rc = llog_cat_process(env, llh, llog_cancel_rec_cb, "foobar", 0, 0); if (rc != -LLOG_EEMPTY) { @@ -1168,61 +1179,6 @@ out: RETURN(rc); } -static int llog_truncate(const struct lu_env *env, struct dt_object *o) -{ - struct lu_attr la; - struct thandle *th; - struct dt_device *d; - int rc; - ENTRY; - - LASSERT(o); - d = lu2dt_dev(o->do_lu.lo_dev); - LASSERT(d); - - rc = dt_attr_get(env, o, &la); - if (rc) - RETURN(rc); - - CDEBUG(D_OTHER, "original size %llu\n", la.la_size); - rc = sizeof(struct llog_log_hdr) + sizeof(struct llog_mini_rec); - if (la.la_size < rc) { - CERROR("too small llog: %llu\n", la.la_size); - RETURN(0); - } - - /* drop 2 records */ - la.la_size = la.la_size - (sizeof(struct llog_mini_rec) * 2); - la.la_valid = LA_SIZE; - - th = dt_trans_create(env, d); - if (IS_ERR(th)) - RETURN(PTR_ERR(th)); - - rc = dt_declare_attr_set(env, o, &la, th); - if (rc) - GOTO(stop, rc); - - rc = dt_declare_punch(env, o, la.la_size, OBD_OBJECT_EOF, th); - - rc = dt_trans_start_local(env, d, th); - if (rc) - GOTO(stop, rc); - - rc = dt_punch(env, o, la.la_size, OBD_OBJECT_EOF, th); - if (rc) - GOTO(stop, rc); - - rc = dt_attr_set(env, o, &la, th); - if (rc) - GOTO(stop, rc); - -stop: - dt_trans_stop(env, d, th); - - RETURN(rc); -} - static int test_8_cb(const struct lu_env *env, struct llog_handle *llh, struct llog_rec_hdr *rec, void *data) { @@ -1322,8 +1278,10 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd) GOTO(out_put, rc); } - CWARN("8c: drop two records from the first plain llog\n"); - llog_truncate(env, obj); + /* Here was 8c: drop two records from the first plain llog + * llog_truncate was bad idea cause it creates a wrong state, + * lgh_last_idx is wrong and two records belongs to zeroed buffer + */ CWARN("8d: count survived records\n"); rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS); @@ -1345,9 +1303,9 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd) GOTO(out, rc); } - if (orig_counter + 200 - 2 != plain_counter) { + if (orig_counter + 200 != plain_counter) { CERROR("found %d records (expected %d)\n", plain_counter, - orig_counter + 200 - 2); + orig_counter + 200); rc = -EIO; } @@ -1479,13 +1437,17 @@ struct llog_process_info { static int llog_test_process_thread(void *arg) { struct llog_process_info *lpi = arg; + int rc; - llog_process(NULL, lpi->lpi_loghandle, lpi->lpi_cb, lpi->lpi_cbdata, - lpi->lpi_catdata); + rc = llog_cat_process_or_fork(NULL, lpi->lpi_loghandle, lpi->lpi_cb, + NULL, lpi->lpi_cbdata, 1, 0, true); complete(&lpi->lpi_completion); - return 0; + lpi->lpi_rc = rc; + if (rc) + CWARN("10h: Error during catalog processing %d\n", rc); + return rc; } static int cat_check_old_cb(const struct lu_env *env, struct llog_handle *llh, @@ -1509,12 +1471,14 @@ static int cat_check_old_cb(const struct lu_env *env, struct llog_handle *llh, if (prev_fid->f_oid > fid.f_oid) { CWARN("processing old record, fail\n"); prev_fid->f_oid = 0xbad; - RETURN(LLOG_PROC_BREAK); + RETURN(-LLOG_EEMPTY); } - if (prev_fid->f_oid == 0) - cfs_fail_loc = OBD_FAIL_LLOG_PROCESS_TIMEOUT; - + if (prev_fid->f_oid == 0) { + cfs_fail_loc = OBD_FAIL_ONCE | OBD_FAIL_LLOG_PROCESS_TIMEOUT; + cfs_fail_val = (unsigned int) (llh->lgh_id.lgl_oi.oi.oi_id & + 0xFFFFFFFF); + } *prev_fid = fid; RETURN(0); @@ -1571,12 +1535,12 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) cfs_fail_loc = CFS_FAIL_SKIP|OBD_FAIL_CAT_RECORDS; cfs_fail_val = 4; - CWARN("10b: write %d log records\n", LLOG_TEST_RECNUM); - for (i = 0; i < LLOG_TEST_RECNUM; i++) { + CWARN("10b: write %d log records\n", llog_test_recnum); + for (i = 0; i < llog_test_recnum; i++) { rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL); if (rc) { CERROR("10b: write %d records failed at #%d: %d\n", - LLOG_TEST_RECNUM, i + 1, rc); + llog_test_recnum, i + 1, rc); GOTO(out, rc); } } @@ -1597,12 +1561,12 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) GOTO(out, rc); } - CWARN("10c: write %d more log records\n", 2 * LLOG_TEST_RECNUM); - for (i = 0; i < 2 * LLOG_TEST_RECNUM; i++) { + CWARN("10c: write %d more log records\n", 2 * llog_test_recnum); + for (i = 0; i < 2 * llog_test_recnum; i++) { rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL); if (rc) { CERROR("10c: write %d records failed at #%d: %d\n", - 2*LLOG_TEST_RECNUM, i + 1, rc); + 2*llog_test_recnum, i + 1, rc); GOTO(out, rc); } } @@ -1629,12 +1593,12 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) */ enospc = 0; eok = 0; - CWARN("10c: write %d more log records\n", LLOG_TEST_RECNUM); - for (i = 0; i < LLOG_TEST_RECNUM; i++) { + CWARN("10c: write %d more log records\n", llog_test_recnum); + for (i = 0; i < llog_test_recnum; i++) { rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL); if (rc && rc != -ENOSPC) { CERROR("10c: write %d records failed at #%d: %d\n", - LLOG_TEST_RECNUM, i + 1, rc); + llog_test_recnum, i + 1, rc); GOTO(out, rc); } /* @@ -1649,7 +1613,7 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) } } - if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) { + if ((enospc == 0) && (enospc+eok != llog_test_recnum)) { CERROR("10c: all last records adds should have failed with" " -ENOSPC\n"); GOTO(out, rc = -EINVAL); @@ -1675,13 +1639,13 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) * cancel all 1st plain llog records to empty it, this will also cause * its catalog entry to be freed for next forced wrap in 10e */ - CWARN("10d: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM); + CWARN("10d: Cancel %d records, see one log zapped\n", llog_test_recnum); cancel_count = 0; rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0); if (rc != -LLOG_EEMPTY) { CERROR("10d: process with llog_cancel_rec_cb failed: %d\n", rc); /* - * need to indicate error if for any reason LLOG_TEST_RECNUM is + * need to indicate error if for any reason llog_test_recnum is * not reached */ if (rc == 0) @@ -1719,12 +1683,12 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) enospc = 0; eok = 0; - CWARN("10e: write %d more log records\n", LLOG_TEST_RECNUM); - for (i = 0; i < LLOG_TEST_RECNUM; i++) { + CWARN("10e: write %d more log records\n", llog_test_recnum); + for (i = 0; i < llog_test_recnum; i++) { rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL); if (rc && rc != -ENOSPC) { CERROR("10e: write %d records failed at #%d: %d\n", - LLOG_TEST_RECNUM, i + 1, rc); + llog_test_recnum, i + 1, rc); GOTO(out, rc); } /* @@ -1739,7 +1703,7 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) } } - if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) { + if ((enospc == 0) && (enospc+eok != llog_test_recnum)) { CERROR("10e: all last records adds should have failed with" " -ENOSPC\n"); GOTO(out, rc = -EINVAL); @@ -1750,13 +1714,14 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) CWARN("10e: print the catalog entries.. we expect 4\n"); cat_counter = 0; - rc = llog_process(env, cath, cat_print_cb, "test 10", NULL); + rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10", + 0, 0, false); if (rc) { - CERROR("10d: process with cat_print_cb failed: %d\n", rc); + CERROR("10e: process with cat_print_cb failed: %d\n", rc); GOTO(out, rc); } if (cat_counter != 4) { - CERROR("10d: %d entries in catalog\n", cat_counter); + CERROR("10e: %d entries in catalog\n", cat_counter); GOTO(out, rc = -EINVAL); } @@ -1801,13 +1766,13 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) * cancel more records to free one more slot in Catalog * see if it is re-allocated when adding more records */ - CWARN("10f: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM); + CWARN("10f: Cancel %d records, see one log zapped\n", llog_test_recnum); cancel_count = 0; rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0); if (rc != -LLOG_EEMPTY) { CERROR("10f: process with llog_cancel_rec_cb failed: %d\n", rc); /* - * need to indicate error if for any reason LLOG_TEST_RECNUM is + * need to indicate error if for any reason llog_test_recnum is * not reached */ if (rc == 0) @@ -1817,7 +1782,8 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) CWARN("10f: print the catalog entries.. we expect 3\n"); cat_counter = 0; - rc = llog_process(env, cath, cat_print_cb, "test 10", NULL); + rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10", + 0, 0, false); if (rc) { CERROR("10f: process with cat_print_cb failed: %d\n", rc); GOTO(out, rc); @@ -1845,12 +1811,12 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) enospc = 0; eok = 0; - CWARN("10f: write %d more log records\n", LLOG_TEST_RECNUM); - for (i = 0; i < LLOG_TEST_RECNUM; i++) { + CWARN("10f: write %d more log records\n", llog_test_recnum); + for (i = 0; i < llog_test_recnum; i++) { rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL); if (rc && rc != -ENOSPC) { CERROR("10f: write %d records failed at #%d: %d\n", - LLOG_TEST_RECNUM, i + 1, rc); + llog_test_recnum, i + 1, rc); GOTO(out, rc); } /* @@ -1865,7 +1831,7 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) } } - if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) { + if ((enospc == 0) && (enospc+eok != llog_test_recnum)) { CERROR("10f: all last records adds should have failed with" " -ENOSPC\n"); GOTO(out, rc = -EINVAL); @@ -1919,12 +1885,12 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) */ /* cancel more records to free one more slot in Catalog */ - CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM); + CWARN("10g: Cancel %d records, see one log zapped\n", llog_test_recnum); cancel_count = 0; rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0); if (rc != -LLOG_EEMPTY) { CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc); - /* need to indicate error if for any reason LLOG_TEST_RECNUM is + /* need to indicate error if for any reason llog_test_recnum is * not reached */ if (rc == 0) rc = -ERANGE; @@ -1933,7 +1899,8 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) CWARN("10g: print the catalog entries.. we expect 3\n"); cat_counter = 0; - rc = llog_process(env, cath, cat_print_cb, "test 10", NULL); + rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10", + 0, 0, false); if (rc) { CERROR("10g: process with cat_print_cb failed: %d\n", rc); GOTO(out, rc); @@ -1960,13 +1927,13 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) } /* cancel more records to free one more slot in Catalog */ - CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM); + CWARN("10g: Cancel %d records, see one log zapped\n", llog_test_recnum); cancel_count = 0; rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0); if (rc != -LLOG_EEMPTY) { CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc); /* - * need to indicate error if for any reason LLOG_TEST_RECNUM is + * need to indicate error if for any reason llog_test_recnum is * not reached */ if (rc == 0) @@ -1976,7 +1943,8 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) CWARN("10g: print the catalog entries.. we expect 2\n"); cat_counter = 0; - rc = llog_process(env, cath, cat_print_cb, "test 10", NULL); + rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10", + 0, 0, false); if (rc) { CERROR("10g: process with cat_print_cb failed: %d\n", rc); GOTO(out, rc); @@ -2011,13 +1979,13 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) } /* cancel more records to free one more slot in Catalog */ - CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM); + CWARN("10g: Cancel %d records, see one log zapped\n", llog_test_recnum); cancel_count = 0; rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0); if (rc != -LLOG_EEMPTY) { CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc); /* - * need to indicate error if for any reason LLOG_TEST_RECNUM is + * need to indicate error if for any reason llog_test_recnum is * not reached */ if (rc == 0) @@ -2027,7 +1995,8 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) CWARN("10g: print the catalog entries.. we expect 1\n"); cat_counter = 0; - rc = llog_process(env, cath, cat_print_cb, "test 10", NULL); + rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10", + 0, 0, false); if (rc) { CERROR("10g: process with cat_print_cb failed: %d\n", rc); GOTO(out, rc); @@ -2071,15 +2040,14 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) kthread_run(llog_test_process_thread, &lpi, "llog_test_process_thread"); - msleep(1 * MSEC_PER_SEC); enospc = 0; eok = 0; - CWARN("10h: write %d more log records\n", LLOG_TEST_RECNUM); - for (i = 0; i < LLOG_TEST_RECNUM; i++) { + CWARN("10h: write %d more log records\n", llog_test_recnum); + for (i = 0; i < llog_test_recnum; i++) { rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL); if (rc && rc != -ENOSPC) { CERROR("10h: write %d records failed at #%d: %d\n", - LLOG_TEST_RECNUM, i + 1, rc); + llog_test_recnum, i + 1, rc); GOTO(out, rc); } /* @@ -2094,7 +2062,7 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) } } - if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) { + if ((enospc == 0) && (enospc+eok != llog_test_recnum)) { CERROR("10h: all last records adds should have failed with" " -ENOSPC\n"); GOTO(out, rc = -EINVAL); @@ -2105,7 +2073,7 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd) wait_for_completion(&lpi.lpi_completion); - if (test_fid.f_oid == 0xbad) { + if (lpi.lpi_rc != 0) { CERROR("10h: race happened, old record was processed\n"); GOTO(out, rc = -EINVAL); } diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index df7f53f..1e0196c 100755 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -6157,7 +6157,7 @@ test_60a() { skip_env "missing subtest run-llog.sh" log "$TEST60_HEAD - from kernel mode" - do_facet mgs "$LCTL set_param debug=warning; $LCTL dk > /dev/null" + do_facet mgs "$LCTL dk > /dev/null" do_facet mgs "bash run-llog.sh" || error "run-llog.sh failed" do_facet mgs $LCTL dk > $TMP/$tfile