Whamcloud - gitweb
LU-6556 obdclass: re-allow catalog to wrap around 12/14912/28
authorBruno Faccini <bruno.faccini@intel.com>
Thu, 21 May 2015 18:02:50 +0000 (20:02 +0200)
committerOleg Drokin <oleg.drokin@intel.com>
Sat, 17 Oct 2015 23:32:20 +0000 (23:32 +0000)
Since patch for LU-4528 a LLOG catalog is no longer allowed to
wrap around. This is a regression and it can also cause catalog
corruption (grow behind max-size/records) upon upgrading if
catalog has already wrap around.

This patch reintroduces catalog wrap around capability, and also
introduces a new test to extensively check it.

Signed-off-by: Bruno Faccini <bruno.faccini@intel.com>
Change-Id: Ife9a452199895ed9d9f43eb9fdeeac15322e272a
Reviewed-on: http://review.whamcloud.com/14912
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: wangdi <di.wang@intel.com>
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_log.h
lustre/include/obd_support.h
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_osd.c
lustre/obdclass/llog_test.c

index a1b9417..c314664 100644 (file)
@@ -3335,8 +3335,10 @@ struct llog_log_hdr {
        __u32                   llh_bitmap_offset;
        __u32                   llh_size;
        __u32                   llh_flags;
+       /* for a catalog the first/oldest and still in-use plain slot is just
+        * next to it. It will serve as the upper limit after Catalog has
+        * wrapped around */
        __u32                   llh_cat_idx;
-       /* for a catalog the first plain slot is next to it */
        struct obd_uuid         llh_tgtuuid;
        __u32                   llh_reserved[LLOG_HEADER_SIZE/sizeof(__u32)-23];
        /* These fields must always be at the end of the llog_log_hdr.
index c048739..6922954 100644 (file)
@@ -264,6 +264,10 @@ struct llog_handle {
        struct llog_log_hdr     *lgh_hdr;
        size_t                  lgh_hdr_size;
        struct dt_object        *lgh_obj;
+       /* For a Catalog, is the last/newest used index for a plain slot.
+        * Used in conjunction with llh_cat_idx to handle Catalog wrap-around
+        * case, after it will have reached LLOG_HDR_BITMAP_SIZE, llh_cat_idx
+        * will become its upper limit */
        int                      lgh_last_idx;
        int                      lgh_cur_idx; /* used during llog_process */
        __u64                    lgh_cur_offset; /* used during llog_process */
index dfc5dc2..5d897fd 100644 (file)
@@ -492,6 +492,7 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_LLOG_CATINFO_NET                   0x1309
 #define OBD_FAIL_MDS_SYNC_CAPA_SL                   0x1310
 #define OBD_FAIL_SEQ_ALLOC                          0x1311
+#define OBD_FAIL_CAT_RECORDS                       0x1312
 
 #define OBD_FAIL_LLITE                              0x1400
 #define OBD_FAIL_LLITE_FAULT_TRUNC_RACE             0x1401
index c6b4122..afa7653 100644 (file)
@@ -65,12 +65,35 @@ static int llog_cat_new_log(const struct lu_env *env,
 {
        struct llog_thread_info *lgi = llog_info(env);
        struct llog_logid_rec   *rec = &lgi->lgi_logid;
-       int                      rc;
        struct thandle *handle = NULL;
        struct dt_device *dt = NULL;
+       struct llog_log_hdr     *llh = cathandle->lgh_hdr;
+       int                      rc, index;
 
        ENTRY;
 
+       index = (cathandle->lgh_last_idx + 1) %
+               (OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) ? (cfs_fail_val + 1) :
+                                               LLOG_HDR_BITMAP_SIZE(llh));
+
+       /* check that new llog index will not overlap with the first one.
+        * - llh_cat_idx is the index just before the first/oldest still in-use
+        *      index in catalog
+        * - lgh_last_idx is the last/newest used index in catalog
+        *
+        * When catalog is not wrapped yet then lgh_last_idx is always larger
+        * than llh_cat_idx. After the wrap around lgh_last_idx re-starts
+        * from 0 and llh_cat_idx becomes the upper limit for it
+        *
+        * Check if catalog has already wrapped around or not by comparing
+        * last_idx and cat_idx */
+       if ((index == llh->llh_cat_idx + 1 && llh->llh_count > 1) ||
+           (index == 0 && llh->llh_cat_idx == 0)) {
+               CWARN("%s: there are no more free slots in catalog\n",
+                     loghandle->lgh_ctxt->loc_obd->obd_name);
+               RETURN(-ENOSPC);
+       }
+
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
                RETURN(-ENOSPC);
 
@@ -146,7 +169,7 @@ static int llog_cat_new_log(const struct lu_env *env,
        if (rc < 0)
                GOTO(out, rc);
 
-       CDEBUG(D_OTHER, "new recovery log "DOSTID":%x for index %u of catalog"
+       CDEBUG(D_OTHER, "new plain log "DOSTID":%x for index %u of catalog"
               DOSTID"\n", POSTID(&loghandle->lgh_id.lgl_oi),
               loghandle->lgh_id.lgl_ogen, rec->lid_hdr.lrh_index,
               POSTID(&cathandle->lgh_id.lgl_oi));
@@ -669,7 +692,8 @@ int llog_cat_process_or_fork(const struct lu_env *env,
         d.lpd_startcat = startcat;
         d.lpd_startidx = startidx;
 
-        if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
+       if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
+           llh->llh_count > 1) {
                 struct llog_process_cat_data cd;
 
                 CWARN("catlog "DOSTID" crosses index zero\n",
@@ -777,7 +801,8 @@ int llog_cat_reverse_process(const struct lu_env *env,
         d.lpd_data = data;
         d.lpd_cb = cb;
 
-       if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
+       if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
+           llh->llh_count > 1) {
                CWARN("catalog "DOSTID" crosses index zero\n",
                      POSTID(&cat_llh->lgh_id.lgl_oi));
 
index a099df2..fb8fba4 100644 (file)
@@ -382,6 +382,12 @@ static int llog_osd_write_rec(const struct lu_env *env,
        if (reclen > loghandle->lgh_hdr->llh_hdr.lrh_len)
                RETURN(-E2BIG);
 
+       /* sanity check for fixed-records llog */
+       if (idx != LLOG_HEADER_IDX && (llh->llh_flags & LLOG_F_IS_FIXSIZE)) {
+               LASSERT(llh->llh_size != 0);
+               LASSERT(llh->llh_size == reclen);
+       }
+
        rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)
                RETURN(rc);
@@ -479,15 +485,8 @@ static int llog_osd_write_rec(const struct lu_env *env,
                               POSTID(&loghandle->lgh_id.lgl_oi), idx,
                               rec->lrh_len, (long long)lgi->lgi_off);
                } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
-                       if (llh->llh_size == 0 ||
-                           llh->llh_size != rec->lrh_len) {
-                               CERROR("%s: wrong record size, llh_size is %u"
-                                      " but record size is %u\n",
-                                      o->do_lu.lo_dev->ld_obd->obd_name,
-                                      llh->llh_size, rec->lrh_len);
-                               RETURN(-EINVAL);
-                       }
-                       lgi->lgi_off = sizeof(*llh) + (idx - 1) * reclen;
+                       lgi->lgi_off = llh->llh_hdr.lrh_len +
+                                      (idx - 1) * reclen;
                } else {
                        /* This can be result of lgh_cur_idx is not set during
                         * llog processing or llh_size is not set to proper
@@ -534,9 +533,17 @@ static int llog_osd_write_rec(const struct lu_env *env,
                        RETURN(rc);
                loghandle->lgh_last_idx++; /* for pad rec */
        }
-       /* if it's the last idx in log file, then return -ENOSPC */
-       if (loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1)
-               RETURN(-ENOSPC);
+       /* if it's the last idx in log file, then return -ENOSPC
+        * or wrap around if a catalog */
+       if ((loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1) ||
+           unlikely(llh->llh_flags & LLOG_F_IS_CAT &&
+                    OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) &&
+                    loghandle->lgh_last_idx >= cfs_fail_val)) {
+               if (llh->llh_flags & LLOG_F_IS_CAT)
+                       loghandle->lgh_last_idx = 0;
+               else
+                       RETURN(-ENOSPC);
+       }
 
        /* increment the last_idx along with llh_tail index, they should
         * be equal for a llog lifetime */
@@ -565,9 +572,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
        }
        llh->llh_count++;
 
-       if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
-               LASSERT(llh->llh_size == reclen);
-       } else {
+       if (!(llh->llh_flags & LLOG_F_IS_FIXSIZE)) {
                /* Update the minimum size of the llog record */
                if (llh->llh_size == 0)
                        llh->llh_size = reclen;
@@ -623,12 +628,20 @@ out_unlock:
        if (rc)
                GOTO(out, rc);
 
-       rc = dt_attr_get(env, o, &lgi->lgi_attr);
-       if (rc)
-               GOTO(out, rc);
+       /* computed index can be used to determine offset for fixed-size
+        * records. This also allows to handle Catalog wrap around case */
+       if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
+               lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen;
+       } else {
+               rc = dt_attr_get(env, o, &lgi->lgi_attr);
+               if (rc)
+                       GOTO(out, rc);
+
+               LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
+               lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size,
+                                    lgi->lgi_off);
+       }
 
-       LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
-       lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size, lgi->lgi_off);
        lgi->lgi_buf.lb_len = reclen;
        lgi->lgi_buf.lb_buf = rec;
        rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
@@ -659,7 +672,11 @@ out:
        mutex_unlock(&loghandle->lgh_hdr_mutex);
 
        /* restore llog last_idx */
-       loghandle->lgh_last_idx--;
+       if (--loghandle->lgh_last_idx == 0 &&
+           (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) {
+               /* catalog had just wrap-around case */
+               loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
+       }
        LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
 
        RETURN(rc);
index 4d85962..ef4fc60 100644 (file)
@@ -87,11 +87,13 @@ static int verify_handle(char *test, struct llog_handle *llh, int num_recs)
                 RETURN(-ERANGE);
         }
 
-        if (llh->lgh_last_idx < last_idx) {
-                CERROR("%s: handle->last_idx is %d, expected %d after write\n",
-                       test, llh->lgh_last_idx, last_idx);
-                RETURN(-ERANGE);
-        }
+       /* a catalog may wrap */
+       if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_CAT) &&
+           (llh->lgh_last_idx < last_idx)) {
+               CERROR("%s: handle->last_idx is %d, expected %d after write\n",
+                      test, llh->lgh_last_idx, last_idx);
+               RETURN(-ERANGE);
+       }
 
         RETURN(0);
 }
@@ -1384,6 +1386,432 @@ out:
        llog_ctxt_put(ctxt);
        RETURN(rc);
 }
+
+/* test catalog wrap around */
+static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
+{
+       struct llog_handle      *cath;
+       char                     name[10];
+       int                      rc, rc2, i, enospc, eok;
+       struct llog_mini_rec     lmr;
+       struct llog_ctxt        *ctxt;
+       struct lu_attr           la;
+       __u64                    cat_max_size;
+
+       ENTRY;
+
+       ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
+       LASSERT(ctxt);
+
+       lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
+       lmr.lmr_hdr.lrh_type = 0xf00f00;
+
+       snprintf(name, sizeof(name), "%x", llog_test_rand + 2);
+       CWARN("10a: create a catalog log with name: %s\n", name);
+       rc = llog_open_create(env, ctxt, &cath, NULL, name);
+       if (rc) {
+               CERROR("10a: llog_create with name %s failed: %d\n", name, rc);
+               GOTO(ctxt_release, rc);
+       }
+       rc = llog_init_handle(env, cath, LLOG_F_IS_CAT, &uuid);
+       if (rc) {
+               CERROR("10a: can't init llog handle: %d\n", rc);
+               GOTO(out, rc);
+       }
+
+       cat_logid = cath->lgh_id;
+
+       /* force catalog wrap for 5th plain LLOG */
+       cfs_fail_loc = CFS_FAIL_SKIP|OBD_FAIL_CAT_RECORDS;
+       cfs_fail_val = 4;
+
+       CWARN("10b: write %d log records\n", LLOG_TEST_RECNUM);
+       for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+               rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
+               if (rc) {
+                       CERROR("10b: write %d records failed at #%d: %d\n",
+                              LLOG_TEST_RECNUM, i + 1, rc);
+                       GOTO(out, rc);
+               }
+       }
+
+       /* make sure 2 new plain llog appears in catalog (+1 with hdr) */
+       rc = verify_handle("10b", cath, 3);
+       if (rc)
+               GOTO(out, rc);
+
+       CWARN("10c: write %d more log records\n", 2 * LLOG_TEST_RECNUM);
+       for (i = 0; i < 2 * LLOG_TEST_RECNUM; i++) {
+               rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
+               if (rc) {
+                       CERROR("10c: write %d records failed at #%d: %d\n",
+                              2*LLOG_TEST_RECNUM, i + 1, rc);
+                       GOTO(out, rc);
+               }
+       }
+
+       /* make sure 2 new plain llog appears in catalog (+1 with hdr) */
+       rc = verify_handle("10c", cath, 5);
+       if (rc)
+               GOTO(out, rc);
+
+       /* fill last allocated plain LLOG and reach -ENOSPC condition
+        * because no slot available in Catalog */
+       enospc = 0;
+       eok = 0;
+       CWARN("10c: write %d more log records\n", LLOG_TEST_RECNUM);
+       for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+               rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
+               if (rc && rc != -ENOSPC) {
+                       CERROR("10c: write %d records failed at #%d: %d\n",
+                              LLOG_TEST_RECNUM, i + 1, rc);
+                       GOTO(out, rc);
+               }
+               /* after last added plain LLOG has filled up, all new
+                * records add should fail with -ENOSPC */
+               if (rc == -ENOSPC) {
+                       enospc++;
+               } else {
+                       enospc = 0;
+                       eok++;
+               }
+       }
+
+       if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
+               CERROR("10c: all last records adds should have failed with"
+                      " -ENOSPC\n");
+               GOTO(out, rc = -EINVAL);
+       }
+
+       CWARN("10c: wrote %d records then %d failed with ENOSPC\n", eok,
+             enospc);
+
+       /* make sure no new record in Catalog */
+       rc = verify_handle("10c", cath, 5);
+       if (rc)
+               GOTO(out, rc);
+
+       /* Catalog should have reached its max size for test */
+       rc = dt_attr_get(env, cath->lgh_obj, &la);
+       if (rc) {
+               CERROR("10c: failed to get catalog attrs: %d\n", rc);
+               GOTO(out, rc);
+       }
+       cat_max_size = la.la_size;
+
+       /* cancel all 1st plain llog records to empty it, this will also cause
+        * its catalog entry to be freed for next forced wrap in 10e */
+       CWARN("10d: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       cancel_count = 0;
+       rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
+       if (rc != -LLOG_EEMPTY) {
+               CERROR("10d: process with llog_cancel_rec_cb failed: %d\n", rc);
+               /* need to indicate error if for any reason LLOG_TEST_RECNUM is
+                * not reached */
+               if (rc == 0)
+                       rc = -ERANGE;
+               GOTO(out, rc);
+       }
+
+       CWARN("10d: print the catalog entries.. we expect 3\n");
+       cat_counter = 0;
+       rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+       if (rc) {
+               CERROR("10d: process with cat_print_cb failed: %d\n", rc);
+               GOTO(out, rc);
+       }
+       if (cat_counter != 3) {
+               CERROR("10d: %d entries in catalog\n", cat_counter);
+               GOTO(out, rc = -EINVAL);
+       }
+
+       /* verify one down in catalog (+1 with hdr) */
+       rc = verify_handle("10d", cath, 4);
+       if (rc)
+               GOTO(out, rc);
+
+       enospc = 0;
+       eok = 0;
+       CWARN("10e: write %d more log records\n", LLOG_TEST_RECNUM);
+       for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+               rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
+               if (rc && rc != -ENOSPC) {
+                       CERROR("10e: write %d records failed at #%d: %d\n",
+                              LLOG_TEST_RECNUM, i + 1, rc);
+                       GOTO(out, rc);
+               }
+               /* after last added plain LLOG has filled up, all new
+                * records add should fail with -ENOSPC */
+               if (rc == -ENOSPC) {
+                       enospc++;
+               } else {
+                       enospc = 0;
+                       eok++;
+               }
+       }
+
+       if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
+               CERROR("10e: all last records adds should have failed with"
+                      " -ENOSPC\n");
+               GOTO(out, rc = -EINVAL);
+       }
+
+       CWARN("10e: wrote %d records then %d failed with ENOSPC\n", eok,
+             enospc);
+
+       CWARN("10e: print the catalog entries.. we expect 4\n");
+       cat_counter = 0;
+       rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+       if (rc) {
+               CERROR("10d: process with cat_print_cb failed: %d\n", rc);
+               GOTO(out, rc);
+       }
+       if (cat_counter != 4) {
+               CERROR("10d: %d entries in catalog\n", cat_counter);
+               GOTO(out, rc = -EINVAL);
+       }
+
+       /* make sure 1 new plain llog appears in catalog (+1 with hdr) */
+       rc = verify_handle("10e", cath, 5);
+       if (rc)
+               GOTO(out, rc);
+
+       /* verify catalog has wrap around */
+       if (cath->lgh_last_idx > cath->lgh_hdr->llh_cat_idx) {
+               CERROR("10e: catalog failed to wrap around\n");
+               GOTO(out, rc = -EINVAL);
+       }
+
+       rc = dt_attr_get(env, cath->lgh_obj, &la);
+       if (rc) {
+               CERROR("10e: failed to get catalog attrs: %d\n", rc);
+               GOTO(out, rc);
+       }
+
+       if (la.la_size != cat_max_size) {
+               CERROR("10e: catalog size has changed after it has wrap around,"
+                      " current size = "LPU64", expected size = "LPU64"\n",
+                      la.la_size, cat_max_size);
+               GOTO(out, rc = -EINVAL);
+       }
+       CWARN("10e: catalog successfully wrap around, last_idx %d, first %d\n",
+             cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx);
+
+       /* cancel more records to free one more slot in Catalog
+        * see if it is re-allocated when adding more records */
+       CWARN("10f: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       cancel_count = 0;
+       rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
+       if (rc != -LLOG_EEMPTY) {
+               CERROR("10f: process with llog_cancel_rec_cb failed: %d\n", rc);
+               /* need to indicate error if for any reason LLOG_TEST_RECNUM is
+                * not reached */
+               if (rc == 0)
+                       rc = -ERANGE;
+               GOTO(out, rc);
+       }
+
+       CWARN("10f: print the catalog entries.. we expect 3\n");
+       cat_counter = 0;
+       rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+       if (rc) {
+               CERROR("10f: process with cat_print_cb failed: %d\n", rc);
+               GOTO(out, rc);
+       }
+       if (cat_counter != 3) {
+               CERROR("10f: %d entries in catalog\n", cat_counter);
+               GOTO(out, rc = -EINVAL);
+       }
+
+       /* verify one down in catalog (+1 with hdr) */
+       rc = verify_handle("10f", cath, 4);
+       if (rc)
+               GOTO(out, rc);
+
+       enospc = 0;
+       eok = 0;
+       CWARN("10f: write %d more log records\n", LLOG_TEST_RECNUM);
+       for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+               rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
+               if (rc && rc != -ENOSPC) {
+                       CERROR("10f: write %d records failed at #%d: %d\n",
+                              LLOG_TEST_RECNUM, i + 1, rc);
+                       GOTO(out, rc);
+               }
+               /* after last added plain LLOG has filled up, all new
+                * records add should fail with -ENOSPC */
+               if (rc == -ENOSPC) {
+                       enospc++;
+               } else {
+                       enospc = 0;
+                       eok++;
+               }
+       }
+
+       if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
+               CERROR("10f: all last records adds should have failed with"
+                      " -ENOSPC\n");
+               GOTO(out, rc = -EINVAL);
+       }
+
+       CWARN("10f: wrote %d records then %d failed with ENOSPC\n", eok,
+             enospc);
+
+       /* make sure 1 new plain llog appears in catalog (+1 with hdr) */
+       rc = verify_handle("10f", cath, 5);
+       if (rc)
+               GOTO(out, rc);
+
+       /* verify lgh_last_idx = llh_cat_idx = 2 now */
+       if (cath->lgh_last_idx != cath->lgh_hdr->llh_cat_idx ||
+           cath->lgh_last_idx != 2) {
+               CERROR("10f: lgh_last_idx = %d vs 2, llh_cat_idx = %d vs 2\n",
+                      cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx);
+               GOTO(out, rc = -EINVAL);
+       }
+
+       rc = dt_attr_get(env, cath->lgh_obj, &la);
+       if (rc) {
+               CERROR("10f: failed to get catalog attrs: %d\n", rc);
+               GOTO(out, rc);
+       }
+
+       if (la.la_size != cat_max_size) {
+               CERROR("10f: catalog size has changed after it has wrap around,"
+                      " current size = "LPU64", expected size = "LPU64"\n",
+                      la.la_size, cat_max_size);
+               GOTO(out, rc = -EINVAL);
+       }
+
+       /* will llh_cat_idx also successfully wrap ? */
+
+       /* cancel all records in the plain LLOGs referenced by 2 last indexes in
+        * Catalog */
+
+       /* cancel more records to free one more slot in Catalog */
+       CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       cancel_count = 0;
+       rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
+       if (rc != -LLOG_EEMPTY) {
+               CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
+               /* need to indicate error if for any reason LLOG_TEST_RECNUM is
+                * not reached */
+               if (rc == 0)
+                       rc = -ERANGE;
+               GOTO(out, rc);
+       }
+
+       CWARN("10g: print the catalog entries.. we expect 3\n");
+       cat_counter = 0;
+       rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+       if (rc) {
+               CERROR("10g: process with cat_print_cb failed: %d\n", rc);
+               GOTO(out, rc);
+       }
+       if (cat_counter != 3) {
+               CERROR("10g: %d entries in catalog\n", cat_counter);
+               GOTO(out, rc = -EINVAL);
+       }
+
+       /* verify one down in catalog (+1 with hdr) */
+       rc = verify_handle("10g", cath, 4);
+       if (rc)
+               GOTO(out, rc);
+
+       /* cancel more records to free one more slot in Catalog */
+       CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       cancel_count = 0;
+       rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
+       if (rc != -LLOG_EEMPTY) {
+               CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
+               /* need to indicate error if for any reason LLOG_TEST_RECNUM is
+                * not reached */
+               if (rc == 0)
+                       rc = -ERANGE;
+               GOTO(out, rc);
+       }
+
+       CWARN("10g: print the catalog entries.. we expect 2\n");
+       cat_counter = 0;
+       rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+       if (rc) {
+               CERROR("10g: process with cat_print_cb failed: %d\n", rc);
+               GOTO(out, rc);
+       }
+       if (cat_counter != 2) {
+               CERROR("10g: %d entries in catalog\n", cat_counter);
+               GOTO(out, rc = -EINVAL);
+       }
+
+       /* verify one down in catalog (+1 with hdr) */
+       rc = verify_handle("10g", cath, 3);
+       if (rc)
+               GOTO(out, rc);
+
+       /* verify lgh_last_idx = 2 and llh_cat_idx = 0 now */
+       if (cath->lgh_hdr->llh_cat_idx != 0 ||
+           cath->lgh_last_idx != 2) {
+               CERROR("10g: lgh_last_idx = %d vs 2, llh_cat_idx = %d vs 0\n",
+                      cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx);
+               GOTO(out, rc = -EINVAL);
+       }
+
+       /* cancel more records to free one more slot in Catalog */
+       CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       cancel_count = 0;
+       rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
+       if (rc != -LLOG_EEMPTY) {
+               CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
+               /* need to indicate error if for any reason LLOG_TEST_RECNUM is
+                * not reached */
+               if (rc == 0)
+                       rc = -ERANGE;
+               GOTO(out, rc);
+       }
+
+       CWARN("10g: print the catalog entries.. we expect 1\n");
+       cat_counter = 0;
+       rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+       if (rc) {
+               CERROR("10g: process with cat_print_cb failed: %d\n", rc);
+               GOTO(out, rc);
+       }
+       if (cat_counter != 1) {
+               CERROR("10g: %d entries in catalog\n", cat_counter);
+               GOTO(out, rc = -EINVAL);
+       }
+
+       /* verify one down in catalog (+1 with hdr) */
+       rc = verify_handle("10g", cath, 2);
+       if (rc)
+               GOTO(out, rc);
+
+       /* verify lgh_last_idx = 2 and llh_cat_idx = 1 now */
+       if (cath->lgh_hdr->llh_cat_idx != 1 ||
+           cath->lgh_last_idx != 2) {
+               CERROR("10g: lgh_last_idx = %d vs 2, llh_cat_idx = %d vs 1\n",
+                      cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx);
+               GOTO(out, rc = -EINVAL);
+       }
+
+       CWARN("10g: llh_cat_idx has also successfully wrapped!\n");
+
+out:
+       cfs_fail_loc = 0;
+       cfs_fail_val = 0;
+
+       CWARN("10: put newly-created catalog\n");
+       rc2 = llog_cat_close(env, cath);
+       if (rc2) {
+               CERROR("10: close log %s failed: %d\n", name, rc2);
+               if (rc == 0)
+                       rc = rc2;
+       }
+ctxt_release:
+       llog_ctxt_put(ctxt);
+       RETURN(rc);
+}
+
 /* -------------------------------------------------------------------------
  * Tests above, boring obd functions below
  * ------------------------------------------------------------------------- */
@@ -1435,6 +1863,11 @@ static int llog_run_tests(const struct lu_env *env, struct obd_device *obd)
        rc = llog_test_9(env, obd);
        if (rc != 0)
                GOTO(cleanup, rc);
+
+       rc = llog_test_10(env, obd);
+       if (rc)
+               GOTO(cleanup, rc);
+
 cleanup:
        err = llog_destroy(env, llh);
        if (err)