From 4691290f6d39bffaa3e463697fbc3ac351015e76 Mon Sep 17 00:00:00 2001 From: Bruno Faccini Date: Thu, 21 May 2015 20:02:50 +0200 Subject: [PATCH] LU-6556 obdclass: re-allow catalog to wrap around Since patch for LU-4528 a LLOG catalog is no longer allowed to wrap around. This is a regression and it can also cause catalog corruption (grow behind max-size/records) upon upgrading if catalog has already wrap around. This patch reintroduces catalog wrap around capability, and also introduces a new test to extensively check it. Signed-off-by: Bruno Faccini Change-Id: Ife9a452199895ed9d9f43eb9fdeeac15322e272a Reviewed-on: http://review.whamcloud.com/14912 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: wangdi Reviewed-by: Mike Pershin Reviewed-by: Andreas Dilger Reviewed-by: Oleg Drokin --- lustre/include/lustre/lustre_idl.h | 4 +- lustre/include/lustre_log.h | 4 + lustre/include/obd_support.h | 1 + lustre/obdclass/llog_cat.c | 33 ++- lustre/obdclass/llog_osd.c | 59 +++-- lustre/obdclass/llog_test.c | 443 ++++++++++++++++++++++++++++++++++++- 6 files changed, 513 insertions(+), 31 deletions(-) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index a1b9417..c314664 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -3335,8 +3335,10 @@ struct llog_log_hdr { __u32 llh_bitmap_offset; __u32 llh_size; __u32 llh_flags; + /* for a catalog the first/oldest and still in-use plain slot is just + * next to it. It will serve as the upper limit after Catalog has + * wrapped around */ __u32 llh_cat_idx; - /* for a catalog the first plain slot is next to it */ struct obd_uuid llh_tgtuuid; __u32 llh_reserved[LLOG_HEADER_SIZE/sizeof(__u32)-23]; /* These fields must always be at the end of the llog_log_hdr. diff --git a/lustre/include/lustre_log.h b/lustre/include/lustre_log.h index c048739..6922954 100644 --- a/lustre/include/lustre_log.h +++ b/lustre/include/lustre_log.h @@ -264,6 +264,10 @@ struct llog_handle { struct llog_log_hdr *lgh_hdr; size_t lgh_hdr_size; struct dt_object *lgh_obj; + /* For a Catalog, is the last/newest used index for a plain slot. + * Used in conjunction with llh_cat_idx to handle Catalog wrap-around + * case, after it will have reached LLOG_HDR_BITMAP_SIZE, llh_cat_idx + * will become its upper limit */ int lgh_last_idx; int lgh_cur_idx; /* used during llog_process */ __u64 lgh_cur_offset; /* used during llog_process */ diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index dfc5dc2..5d897fd 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -492,6 +492,7 @@ extern char obd_jobid_var[]; #define OBD_FAIL_LLOG_CATINFO_NET 0x1309 #define OBD_FAIL_MDS_SYNC_CAPA_SL 0x1310 #define OBD_FAIL_SEQ_ALLOC 0x1311 +#define OBD_FAIL_CAT_RECORDS 0x1312 #define OBD_FAIL_LLITE 0x1400 #define OBD_FAIL_LLITE_FAULT_TRUNC_RACE 0x1401 diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c index c6b4122..afa7653 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/obdclass/llog_cat.c @@ -65,12 +65,35 @@ static int llog_cat_new_log(const struct lu_env *env, { struct llog_thread_info *lgi = llog_info(env); struct llog_logid_rec *rec = &lgi->lgi_logid; - int rc; struct thandle *handle = NULL; struct dt_device *dt = NULL; + struct llog_log_hdr *llh = cathandle->lgh_hdr; + int rc, index; ENTRY; + index = (cathandle->lgh_last_idx + 1) % + (OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) ? (cfs_fail_val + 1) : + LLOG_HDR_BITMAP_SIZE(llh)); + + /* check that new llog index will not overlap with the first one. + * - llh_cat_idx is the index just before the first/oldest still in-use + * index in catalog + * - lgh_last_idx is the last/newest used index in catalog + * + * When catalog is not wrapped yet then lgh_last_idx is always larger + * than llh_cat_idx. After the wrap around lgh_last_idx re-starts + * from 0 and llh_cat_idx becomes the upper limit for it + * + * Check if catalog has already wrapped around or not by comparing + * last_idx and cat_idx */ + if ((index == llh->llh_cat_idx + 1 && llh->llh_count > 1) || + (index == 0 && llh->llh_cat_idx == 0)) { + CWARN("%s: there are no more free slots in catalog\n", + loghandle->lgh_ctxt->loc_obd->obd_name); + RETURN(-ENOSPC); + } + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED)) RETURN(-ENOSPC); @@ -146,7 +169,7 @@ static int llog_cat_new_log(const struct lu_env *env, if (rc < 0) GOTO(out, rc); - CDEBUG(D_OTHER, "new recovery log "DOSTID":%x for index %u of catalog" + CDEBUG(D_OTHER, "new plain log "DOSTID":%x for index %u of catalog" DOSTID"\n", POSTID(&loghandle->lgh_id.lgl_oi), loghandle->lgh_id.lgl_ogen, rec->lid_hdr.lrh_index, POSTID(&cathandle->lgh_id.lgl_oi)); @@ -669,7 +692,8 @@ int llog_cat_process_or_fork(const struct lu_env *env, d.lpd_startcat = startcat; d.lpd_startidx = startidx; - if (llh->llh_cat_idx > cat_llh->lgh_last_idx) { + if (llh->llh_cat_idx >= cat_llh->lgh_last_idx && + llh->llh_count > 1) { struct llog_process_cat_data cd; CWARN("catlog "DOSTID" crosses index zero\n", @@ -777,7 +801,8 @@ int llog_cat_reverse_process(const struct lu_env *env, d.lpd_data = data; d.lpd_cb = cb; - if (llh->llh_cat_idx > cat_llh->lgh_last_idx) { + if (llh->llh_cat_idx >= cat_llh->lgh_last_idx && + llh->llh_count > 1) { CWARN("catalog "DOSTID" crosses index zero\n", POSTID(&cat_llh->lgh_id.lgl_oi)); diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index a099df2..fb8fba4 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -382,6 +382,12 @@ static int llog_osd_write_rec(const struct lu_env *env, if (reclen > loghandle->lgh_hdr->llh_hdr.lrh_len) RETURN(-E2BIG); + /* sanity check for fixed-records llog */ + if (idx != LLOG_HEADER_IDX && (llh->llh_flags & LLOG_F_IS_FIXSIZE)) { + LASSERT(llh->llh_size != 0); + LASSERT(llh->llh_size == reclen); + } + rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) RETURN(rc); @@ -479,15 +485,8 @@ static int llog_osd_write_rec(const struct lu_env *env, POSTID(&loghandle->lgh_id.lgl_oi), idx, rec->lrh_len, (long long)lgi->lgi_off); } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { - if (llh->llh_size == 0 || - llh->llh_size != rec->lrh_len) { - CERROR("%s: wrong record size, llh_size is %u" - " but record size is %u\n", - o->do_lu.lo_dev->ld_obd->obd_name, - llh->llh_size, rec->lrh_len); - RETURN(-EINVAL); - } - lgi->lgi_off = sizeof(*llh) + (idx - 1) * reclen; + lgi->lgi_off = llh->llh_hdr.lrh_len + + (idx - 1) * reclen; } else { /* This can be result of lgh_cur_idx is not set during * llog processing or llh_size is not set to proper @@ -534,9 +533,17 @@ static int llog_osd_write_rec(const struct lu_env *env, RETURN(rc); loghandle->lgh_last_idx++; /* for pad rec */ } - /* if it's the last idx in log file, then return -ENOSPC */ - if (loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1) - RETURN(-ENOSPC); + /* if it's the last idx in log file, then return -ENOSPC + * or wrap around if a catalog */ + if ((loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1) || + unlikely(llh->llh_flags & LLOG_F_IS_CAT && + OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) && + loghandle->lgh_last_idx >= cfs_fail_val)) { + if (llh->llh_flags & LLOG_F_IS_CAT) + loghandle->lgh_last_idx = 0; + else + RETURN(-ENOSPC); + } /* increment the last_idx along with llh_tail index, they should * be equal for a llog lifetime */ @@ -565,9 +572,7 @@ static int llog_osd_write_rec(const struct lu_env *env, } llh->llh_count++; - if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { - LASSERT(llh->llh_size == reclen); - } else { + if (!(llh->llh_flags & LLOG_F_IS_FIXSIZE)) { /* Update the minimum size of the llog record */ if (llh->llh_size == 0) llh->llh_size = reclen; @@ -623,12 +628,20 @@ out_unlock: if (rc) GOTO(out, rc); - rc = dt_attr_get(env, o, &lgi->lgi_attr); - if (rc) - GOTO(out, rc); + /* computed index can be used to determine offset for fixed-size + * records. This also allows to handle Catalog wrap around case */ + if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { + lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen; + } else { + rc = dt_attr_get(env, o, &lgi->lgi_attr); + if (rc) + GOTO(out, rc); + + LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); + lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size, + lgi->lgi_off); + } - LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); - lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size, lgi->lgi_off); lgi->lgi_buf.lb_len = reclen; lgi->lgi_buf.lb_buf = rec; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); @@ -659,7 +672,11 @@ out: mutex_unlock(&loghandle->lgh_hdr_mutex); /* restore llog last_idx */ - loghandle->lgh_last_idx--; + if (--loghandle->lgh_last_idx == 0 && + (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) { + /* catalog had just wrap-around case */ + loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1; + } LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx; RETURN(rc); diff --git a/lustre/obdclass/llog_test.c b/lustre/obdclass/llog_test.c index 4d85962..ef4fc60 100644 --- a/lustre/obdclass/llog_test.c +++ b/lustre/obdclass/llog_test.c @@ -87,11 +87,13 @@ static int verify_handle(char *test, struct llog_handle *llh, int num_recs) RETURN(-ERANGE); } - if (llh->lgh_last_idx < last_idx) { - CERROR("%s: handle->last_idx is %d, expected %d after write\n", - test, llh->lgh_last_idx, last_idx); - RETURN(-ERANGE); - } + /* a catalog may wrap */ + if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_CAT) && + (llh->lgh_last_idx < last_idx)) { + CERROR("%s: handle->last_idx is %d, expected %d after write\n", + test, llh->lgh_last_idx, last_idx); + RETURN(-ERANGE); + } RETURN(0); } @@ -1384,6 +1386,432 @@ out: llog_ctxt_put(ctxt); RETURN(rc); } + +/* test catalog wrap around */ +static int llog_test_10(const struct lu_env *env, struct obd_device *obd) +{ + struct llog_handle *cath; + char name[10]; + int rc, rc2, i, enospc, eok; + struct llog_mini_rec lmr; + struct llog_ctxt *ctxt; + struct lu_attr la; + __u64 cat_max_size; + + ENTRY; + + ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT); + LASSERT(ctxt); + + lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE; + lmr.lmr_hdr.lrh_type = 0xf00f00; + + snprintf(name, sizeof(name), "%x", llog_test_rand + 2); + CWARN("10a: create a catalog log with name: %s\n", name); + rc = llog_open_create(env, ctxt, &cath, NULL, name); + if (rc) { + CERROR("10a: llog_create with name %s failed: %d\n", name, rc); + GOTO(ctxt_release, rc); + } + rc = llog_init_handle(env, cath, LLOG_F_IS_CAT, &uuid); + if (rc) { + CERROR("10a: can't init llog handle: %d\n", rc); + GOTO(out, rc); + } + + cat_logid = cath->lgh_id; + + /* force catalog wrap for 5th plain LLOG */ + cfs_fail_loc = CFS_FAIL_SKIP|OBD_FAIL_CAT_RECORDS; + cfs_fail_val = 4; + + CWARN("10b: write %d log records\n", LLOG_TEST_RECNUM); + for (i = 0; i < LLOG_TEST_RECNUM; i++) { + rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL); + if (rc) { + CERROR("10b: write %d records failed at #%d: %d\n", + LLOG_TEST_RECNUM, i + 1, rc); + GOTO(out, rc); + } + } + + /* make sure 2 new plain llog appears in catalog (+1 with hdr) */ + rc = verify_handle("10b", cath, 3); + if (rc) + GOTO(out, rc); + + CWARN("10c: write %d more log records\n", 2 * LLOG_TEST_RECNUM); + for (i = 0; i < 2 * LLOG_TEST_RECNUM; i++) { + rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL); + if (rc) { + CERROR("10c: write %d records failed at #%d: %d\n", + 2*LLOG_TEST_RECNUM, i + 1, rc); + GOTO(out, rc); + } + } + + /* make sure 2 new plain llog appears in catalog (+1 with hdr) */ + rc = verify_handle("10c", cath, 5); + if (rc) + GOTO(out, rc); + + /* fill last allocated plain LLOG and reach -ENOSPC condition + * because no slot available in Catalog */ + enospc = 0; + eok = 0; + CWARN("10c: write %d more log records\n", LLOG_TEST_RECNUM); + for (i = 0; i < LLOG_TEST_RECNUM; i++) { + rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL); + if (rc && rc != -ENOSPC) { + CERROR("10c: write %d records failed at #%d: %d\n", + LLOG_TEST_RECNUM, i + 1, rc); + GOTO(out, rc); + } + /* after last added plain LLOG has filled up, all new + * records add should fail with -ENOSPC */ + if (rc == -ENOSPC) { + enospc++; + } else { + enospc = 0; + eok++; + } + } + + if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) { + CERROR("10c: all last records adds should have failed with" + " -ENOSPC\n"); + GOTO(out, rc = -EINVAL); + } + + CWARN("10c: wrote %d records then %d failed with ENOSPC\n", eok, + enospc); + + /* make sure no new record in Catalog */ + rc = verify_handle("10c", cath, 5); + if (rc) + GOTO(out, rc); + + /* Catalog should have reached its max size for test */ + rc = dt_attr_get(env, cath->lgh_obj, &la); + if (rc) { + CERROR("10c: failed to get catalog attrs: %d\n", rc); + GOTO(out, rc); + } + cat_max_size = la.la_size; + + /* cancel all 1st plain llog records to empty it, this will also cause + * its catalog entry to be freed for next forced wrap in 10e */ + CWARN("10d: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM); + cancel_count = 0; + rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0); + if (rc != -LLOG_EEMPTY) { + CERROR("10d: process with llog_cancel_rec_cb failed: %d\n", rc); + /* need to indicate error if for any reason LLOG_TEST_RECNUM is + * not reached */ + if (rc == 0) + rc = -ERANGE; + GOTO(out, rc); + } + + CWARN("10d: print the catalog entries.. we expect 3\n"); + cat_counter = 0; + rc = llog_process(env, cath, cat_print_cb, "test 10", NULL); + if (rc) { + CERROR("10d: process with cat_print_cb failed: %d\n", rc); + GOTO(out, rc); + } + if (cat_counter != 3) { + CERROR("10d: %d entries in catalog\n", cat_counter); + GOTO(out, rc = -EINVAL); + } + + /* verify one down in catalog (+1 with hdr) */ + rc = verify_handle("10d", cath, 4); + if (rc) + GOTO(out, rc); + + enospc = 0; + eok = 0; + CWARN("10e: write %d more log records\n", LLOG_TEST_RECNUM); + for (i = 0; i < LLOG_TEST_RECNUM; i++) { + rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL); + if (rc && rc != -ENOSPC) { + CERROR("10e: write %d records failed at #%d: %d\n", + LLOG_TEST_RECNUM, i + 1, rc); + GOTO(out, rc); + } + /* after last added plain LLOG has filled up, all new + * records add should fail with -ENOSPC */ + if (rc == -ENOSPC) { + enospc++; + } else { + enospc = 0; + eok++; + } + } + + if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) { + CERROR("10e: all last records adds should have failed with" + " -ENOSPC\n"); + GOTO(out, rc = -EINVAL); + } + + CWARN("10e: wrote %d records then %d failed with ENOSPC\n", eok, + enospc); + + CWARN("10e: print the catalog entries.. we expect 4\n"); + cat_counter = 0; + rc = llog_process(env, cath, cat_print_cb, "test 10", NULL); + if (rc) { + CERROR("10d: process with cat_print_cb failed: %d\n", rc); + GOTO(out, rc); + } + if (cat_counter != 4) { + CERROR("10d: %d entries in catalog\n", cat_counter); + GOTO(out, rc = -EINVAL); + } + + /* make sure 1 new plain llog appears in catalog (+1 with hdr) */ + rc = verify_handle("10e", cath, 5); + if (rc) + GOTO(out, rc); + + /* verify catalog has wrap around */ + if (cath->lgh_last_idx > cath->lgh_hdr->llh_cat_idx) { + CERROR("10e: catalog failed to wrap around\n"); + GOTO(out, rc = -EINVAL); + } + + rc = dt_attr_get(env, cath->lgh_obj, &la); + if (rc) { + CERROR("10e: failed to get catalog attrs: %d\n", rc); + GOTO(out, rc); + } + + if (la.la_size != cat_max_size) { + CERROR("10e: catalog size has changed after it has wrap around," + " current size = "LPU64", expected size = "LPU64"\n", + la.la_size, cat_max_size); + GOTO(out, rc = -EINVAL); + } + CWARN("10e: catalog successfully wrap around, last_idx %d, first %d\n", + cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx); + + /* cancel more records to free one more slot in Catalog + * see if it is re-allocated when adding more records */ + CWARN("10f: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM); + cancel_count = 0; + rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0); + if (rc != -LLOG_EEMPTY) { + CERROR("10f: process with llog_cancel_rec_cb failed: %d\n", rc); + /* need to indicate error if for any reason LLOG_TEST_RECNUM is + * not reached */ + if (rc == 0) + rc = -ERANGE; + GOTO(out, rc); + } + + CWARN("10f: print the catalog entries.. we expect 3\n"); + cat_counter = 0; + rc = llog_process(env, cath, cat_print_cb, "test 10", NULL); + if (rc) { + CERROR("10f: process with cat_print_cb failed: %d\n", rc); + GOTO(out, rc); + } + if (cat_counter != 3) { + CERROR("10f: %d entries in catalog\n", cat_counter); + GOTO(out, rc = -EINVAL); + } + + /* verify one down in catalog (+1 with hdr) */ + rc = verify_handle("10f", cath, 4); + if (rc) + GOTO(out, rc); + + enospc = 0; + eok = 0; + CWARN("10f: write %d more log records\n", LLOG_TEST_RECNUM); + for (i = 0; i < LLOG_TEST_RECNUM; i++) { + rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL); + if (rc && rc != -ENOSPC) { + CERROR("10f: write %d records failed at #%d: %d\n", + LLOG_TEST_RECNUM, i + 1, rc); + GOTO(out, rc); + } + /* after last added plain LLOG has filled up, all new + * records add should fail with -ENOSPC */ + if (rc == -ENOSPC) { + enospc++; + } else { + enospc = 0; + eok++; + } + } + + if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) { + CERROR("10f: all last records adds should have failed with" + " -ENOSPC\n"); + GOTO(out, rc = -EINVAL); + } + + CWARN("10f: wrote %d records then %d failed with ENOSPC\n", eok, + enospc); + + /* make sure 1 new plain llog appears in catalog (+1 with hdr) */ + rc = verify_handle("10f", cath, 5); + if (rc) + GOTO(out, rc); + + /* verify lgh_last_idx = llh_cat_idx = 2 now */ + if (cath->lgh_last_idx != cath->lgh_hdr->llh_cat_idx || + cath->lgh_last_idx != 2) { + CERROR("10f: lgh_last_idx = %d vs 2, llh_cat_idx = %d vs 2\n", + cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx); + GOTO(out, rc = -EINVAL); + } + + rc = dt_attr_get(env, cath->lgh_obj, &la); + if (rc) { + CERROR("10f: failed to get catalog attrs: %d\n", rc); + GOTO(out, rc); + } + + if (la.la_size != cat_max_size) { + CERROR("10f: catalog size has changed after it has wrap around," + " current size = "LPU64", expected size = "LPU64"\n", + la.la_size, cat_max_size); + GOTO(out, rc = -EINVAL); + } + + /* will llh_cat_idx also successfully wrap ? */ + + /* cancel all records in the plain LLOGs referenced by 2 last indexes in + * Catalog */ + + /* cancel more records to free one more slot in Catalog */ + CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM); + cancel_count = 0; + rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0); + if (rc != -LLOG_EEMPTY) { + CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc); + /* need to indicate error if for any reason LLOG_TEST_RECNUM is + * not reached */ + if (rc == 0) + rc = -ERANGE; + GOTO(out, rc); + } + + CWARN("10g: print the catalog entries.. we expect 3\n"); + cat_counter = 0; + rc = llog_process(env, cath, cat_print_cb, "test 10", NULL); + if (rc) { + CERROR("10g: process with cat_print_cb failed: %d\n", rc); + GOTO(out, rc); + } + if (cat_counter != 3) { + CERROR("10g: %d entries in catalog\n", cat_counter); + GOTO(out, rc = -EINVAL); + } + + /* verify one down in catalog (+1 with hdr) */ + rc = verify_handle("10g", cath, 4); + if (rc) + GOTO(out, rc); + + /* cancel more records to free one more slot in Catalog */ + CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM); + cancel_count = 0; + rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0); + if (rc != -LLOG_EEMPTY) { + CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc); + /* need to indicate error if for any reason LLOG_TEST_RECNUM is + * not reached */ + if (rc == 0) + rc = -ERANGE; + GOTO(out, rc); + } + + CWARN("10g: print the catalog entries.. we expect 2\n"); + cat_counter = 0; + rc = llog_process(env, cath, cat_print_cb, "test 10", NULL); + if (rc) { + CERROR("10g: process with cat_print_cb failed: %d\n", rc); + GOTO(out, rc); + } + if (cat_counter != 2) { + CERROR("10g: %d entries in catalog\n", cat_counter); + GOTO(out, rc = -EINVAL); + } + + /* verify one down in catalog (+1 with hdr) */ + rc = verify_handle("10g", cath, 3); + if (rc) + GOTO(out, rc); + + /* verify lgh_last_idx = 2 and llh_cat_idx = 0 now */ + if (cath->lgh_hdr->llh_cat_idx != 0 || + cath->lgh_last_idx != 2) { + CERROR("10g: lgh_last_idx = %d vs 2, llh_cat_idx = %d vs 0\n", + cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx); + GOTO(out, rc = -EINVAL); + } + + /* cancel more records to free one more slot in Catalog */ + CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM); + cancel_count = 0; + rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0); + if (rc != -LLOG_EEMPTY) { + CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc); + /* need to indicate error if for any reason LLOG_TEST_RECNUM is + * not reached */ + if (rc == 0) + rc = -ERANGE; + GOTO(out, rc); + } + + CWARN("10g: print the catalog entries.. we expect 1\n"); + cat_counter = 0; + rc = llog_process(env, cath, cat_print_cb, "test 10", NULL); + if (rc) { + CERROR("10g: process with cat_print_cb failed: %d\n", rc); + GOTO(out, rc); + } + if (cat_counter != 1) { + CERROR("10g: %d entries in catalog\n", cat_counter); + GOTO(out, rc = -EINVAL); + } + + /* verify one down in catalog (+1 with hdr) */ + rc = verify_handle("10g", cath, 2); + if (rc) + GOTO(out, rc); + + /* verify lgh_last_idx = 2 and llh_cat_idx = 1 now */ + if (cath->lgh_hdr->llh_cat_idx != 1 || + cath->lgh_last_idx != 2) { + CERROR("10g: lgh_last_idx = %d vs 2, llh_cat_idx = %d vs 1\n", + cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx); + GOTO(out, rc = -EINVAL); + } + + CWARN("10g: llh_cat_idx has also successfully wrapped!\n"); + +out: + cfs_fail_loc = 0; + cfs_fail_val = 0; + + CWARN("10: put newly-created catalog\n"); + rc2 = llog_cat_close(env, cath); + if (rc2) { + CERROR("10: close log %s failed: %d\n", name, rc2); + if (rc == 0) + rc = rc2; + } +ctxt_release: + llog_ctxt_put(ctxt); + RETURN(rc); +} + /* ------------------------------------------------------------------------- * Tests above, boring obd functions below * ------------------------------------------------------------------------- */ @@ -1435,6 +1863,11 @@ static int llog_run_tests(const struct lu_env *env, struct obd_device *obd) rc = llog_test_9(env, obd); if (rc != 0) GOTO(cleanup, rc); + + rc = llog_test_10(env, obd); + if (rc) + GOTO(cleanup, rc); + cleanup: err = llog_destroy(env, llh); if (err) -- 1.8.3.1