Whamcloud - gitweb
LU-6142 obdclass: Fix style issues for llog_test.c
[fs/lustre-release.git] / lustre / obdclass / llog_test.c
index 27f52aa..4fd4b1a 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2016, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -39,6 +39,8 @@
 
 #include <linux/module.h>
 #include <linux/init.h>
+#include <linux/kthread.h>
+#include <linux/delay.h>
 
 #include <obd_class.h>
 #include <lustre_fid.h>
@@ -54,8 +56,8 @@ static struct obd_uuid uuid = { .uuid = "test_uuid" };
 static struct llog_logid cat_logid;
 
 struct llog_mini_rec {
-        struct llog_rec_hdr     lmr_hdr;
-        struct llog_rec_tail    lmr_tail;
+       struct llog_rec_hdr lmr_hdr;
+       struct llog_rec_tail lmr_tail;
 } __attribute__((packed));
 
 static int verify_handle(char *test, struct llog_handle *llh, int num_recs)
@@ -101,8 +103,8 @@ static int verify_handle(char *test, struct llog_handle *llh, int num_recs)
 static int llog_test_1(const struct lu_env *env,
                       struct obd_device *obd, char *name)
 {
-       struct llog_handle      *llh;
-       struct llog_ctxt        *ctxt;
+       struct llog_handle *llh;
+       struct llog_ctxt *ctxt;
        int rc;
        int rc2;
 
@@ -148,11 +150,11 @@ static int test_2_cancel_cb(const struct lu_env *env, struct llog_handle *llh,
 static int llog_test_2(const struct lu_env *env, struct obd_device *obd,
                       char *name, struct llog_handle **llh)
 {
-       struct llog_ctxt        *ctxt;
-       struct llog_handle      *lgh;
-       struct llog_logid        logid;
-       int                      rc;
-       struct llog_mini_rec     lmr;
+       struct llog_ctxt *ctxt;
+       struct llog_handle *lgh;
+       struct llog_logid  logid;
+       int rc;
+       struct llog_mini_rec lmr;
 
        ENTRY;
 
@@ -301,8 +303,10 @@ static int test3_check_n_add_cb(const struct lu_env *env,
        } else {
                size_t chunk_size = lgh->lgh_hdr->llh_hdr.lrh_len;
 
-               /* For variable size records the start offset is unknown, trust
-                * the first value and check others are consistent with it. */
+               /*
+                * For variable size records the start offset is unknown, trust
+                * the first value and check others are consistent with it.
+                */
                if (test_3_rec_off == 0)
                        test_3_rec_off = lgh->lgh_cur_offset;
 
@@ -337,8 +341,10 @@ static int test3_check_n_add_cb(const struct lu_env *env,
        if (rc < 0)
                CERROR("cb_test_3: cannot modify record while processing\n");
 
-       /* Add new record to the llog at *last_rec position one by one to
-        * check that last block is re-read during processing */
+       /*
+        * Add new record to the llog at *last_rec position one by one to
+        * check that last block is re-read during processing
+        */
        if (cur_idx == *last_rec || cur_idx == (*last_rec + 1)) {
                rc = llog_write(env, lgh, rec, LLOG_NEXT_IDX);
                if (rc < 0)
@@ -404,7 +410,8 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
        llh->lgh_hdr->llh_size = sizeof(struct llog_gen_rec);
        llh->lgh_hdr->llh_flags |= LLOG_F_IS_FIXSIZE;
 
-       /* Fill the llog with 64-bytes records, use 1023 records,
+       /*
+        * Fill the llog with 64-bytes records, use 1023 records,
         * so last chunk will be partially full. Don't change this
         * value until record size is changed.
         */
@@ -466,14 +473,17 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
 
        CWARN("3b: write 566 variable size llog records\n");
 
-       /* Drop llh_size to 0 to mark llog as variable-size and write
-        * header to make this change permanent. */
+       /*
+        * Drop llh_size to 0 to mark llog as variable-size and write
+        * header to make this change permanent.
+        */
        llh->lgh_hdr->llh_flags &= ~LLOG_F_IS_FIXSIZE;
        llog_write(env, llh, &llh->lgh_hdr->llh_hdr, LLOG_HEADER_IDX);
 
        hdr->lrh_type = OBD_CFG_REC;
 
-       /* there are 1025 64-bytes records in llog already,
+       /*
+        * there are 1025 64-bytes records in llog already,
         * the last chunk contains single record, i.e. 64 bytes.
         * Each pair of variable size records is 200 bytes, so
         * we will have the following distribution per chunks:
@@ -566,15 +576,15 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
 /* Test catalogue additions */
 static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
 {
-       struct llog_handle      *cath;
-       char                     name[10];
-       int                      rc, rc2, i, buflen;
-       struct llog_mini_rec     lmr;
-       struct llog_cookie       cookie;
-       struct llog_ctxt        *ctxt;
-       int                      num_recs = 0;
-       char                    *buf;
-       struct llog_rec_hdr     *rec;
+       struct llog_handle *cath;
+       char name[10];
+       int rc, rc2, i, buflen;
+       struct llog_mini_rec lmr;
+       struct llog_cookie cookie;
+       struct llog_ctxt *ctxt;
+       int num_recs = 0;
+       char *buf;
+       struct llog_rec_hdr *rec;
 
        ENTRY;
 
@@ -680,8 +690,8 @@ static int cat_counter;
 static int cat_print_cb(const struct lu_env *env, struct llog_handle *llh,
                        struct llog_rec_hdr *rec, void *data)
 {
-       struct llog_logid_rec   *lir = (struct llog_logid_rec *)rec;
-       struct lu_fid            fid = {0};
+       struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
+       struct lu_fid fid = {0};
 
        if (rec->lrh_type != LLOG_LOGID_MAGIC) {
                CERROR("invalid record in catalog\n");
@@ -747,11 +757,11 @@ static int llog_cancel_rec_cb(const struct lu_env *env,
 /* Test log and catalogue processing */
 static int llog_test_5(const struct lu_env *env, struct obd_device *obd)
 {
-       struct llog_handle      *llh = NULL;
-       char                     name[10];
-       int                      rc, rc2;
-       struct llog_mini_rec     lmr;
-       struct llog_ctxt        *ctxt;
+       struct llog_handle *llh = NULL;
+       char name[10];
+       int rc, rc2;
+       struct llog_mini_rec lmr;
+       struct llog_ctxt *ctxt;
 
        ENTRY;
 
@@ -857,14 +867,14 @@ out_put:
 static int llog_test_6(const struct lu_env *env, struct obd_device *obd,
                       char *name)
 {
-       struct obd_device       *mgc_obd;
-       struct llog_ctxt        *ctxt;
-       struct obd_uuid         *mgs_uuid;
-       struct obd_export       *exp;
-       struct obd_uuid          uuid = { "LLOG_TEST6_UUID" };
-       struct llog_handle      *llh = NULL;
-       struct llog_ctxt        *nctxt;
-       int                      rc, rc2;
+       struct obd_device *mgc_obd;
+       struct llog_ctxt *ctxt;
+       struct obd_uuid *mgs_uuid;
+       struct obd_export *exp;
+       struct obd_uuid uuid = { "LLOG_TEST6_UUID" };
+       struct llog_handle *llh = NULL;
+       struct llog_ctxt *nctxt;
+       int rc, rc2;
 
        ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
        LASSERT(ctxt);
@@ -973,9 +983,9 @@ static int test_7_cancel_cb(const struct lu_env *env, struct llog_handle *llh,
 
 static int llog_test_7_sub(const struct lu_env *env, struct llog_ctxt *ctxt)
 {
-       struct llog_handle      *llh;
-       int                      rc = 0, i, process_count;
-       int                      num_recs = 0;
+       struct llog_handle *llh;
+       int rc = 0, i, process_count;
+       int num_recs = 0;
 
        ENTRY;
 
@@ -1058,8 +1068,8 @@ out_close:
 /* Test all llog records writing and processing */
 static int llog_test_7(const struct lu_env *env, struct obd_device *obd)
 {
-       struct llog_ctxt        *ctxt;
-       int                      rc;
+       struct llog_ctxt *ctxt;
+       int rc;
 
        ENTRY;
 
@@ -1160,10 +1170,10 @@ out:
 
 static int llog_truncate(const struct lu_env *env, struct dt_object *o)
 {
-       struct lu_attr           la;
-       struct thandle          *th;
-       struct dt_device        *d;
-       int                      rc;
+       struct lu_attr la;
+       struct thandle *th;
+       struct dt_device *d;
+       int rc;
        ENTRY;
 
        LASSERT(o);
@@ -1222,13 +1232,13 @@ static int test_8_cb(const struct lu_env *env, struct llog_handle *llh,
 
 static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
 {
-       struct llog_handle      *llh = NULL;
-       char                     name[10];
-       int                      rc, rc2, i;
-       int                      orig_counter;
-       struct llog_mini_rec     lmr;
-       struct llog_ctxt        *ctxt;
-       struct dt_object        *obj = NULL;
+       struct llog_handle *llh = NULL;
+       char name[10];
+       int rc, rc2, i;
+       int orig_counter;
+       struct llog_mini_rec lmr;
+       struct llog_ctxt *ctxt;
+       struct dt_object *obj = NULL;
 
        ENTRY;
 
@@ -1302,7 +1312,7 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
                }
        }
        CWARN("8b: second llog "DFID"\n",
-               PFID(lu_object_fid(&llh->u.chd.chd_current_log->lgh_obj->do_lu)));
+             PFID(lu_object_fid(&llh->u.chd.chd_current_log->lgh_obj->do_lu)));
 
        rc2 = llog_cat_close(env, llh);
        if (rc2) {
@@ -1360,9 +1370,9 @@ out_put:
 
 static int llog_test_9_sub(const struct lu_env *env, struct llog_ctxt *ctxt)
 {
-       struct llog_handle      *llh;
-       struct lu_fid            fid;
-       int                      rc = 0;
+       struct llog_handle *llh;
+       struct lu_fid fid;
+       int rc = 0;
 
        ENTRY;
 
@@ -1397,8 +1407,8 @@ out_close:
 /* Prepare different types of llog records for llog_reader test*/
 static int llog_test_9(const struct lu_env *env, struct obd_device *obd)
 {
-       struct llog_ctxt        *ctxt;
-       int                      rc;
+       struct llog_ctxt *ctxt;
+       int rc;
 
        ENTRY;
 
@@ -1454,17 +1464,73 @@ out:
        RETURN(rc);
 }
 
+struct llog_process_info {
+       struct llog_handle *lpi_loghandle;
+       llog_cb_t lpi_cb;
+       void *lpi_cbdata;
+       void *lpi_catdata;
+       int lpi_rc;
+       struct completion lpi_completion;
+       const struct lu_env *lpi_env;
+       struct task_struct *lpi_reftask;
+};
+
+
+static int llog_test_process_thread(void *arg)
+{
+       struct llog_process_info *lpi = arg;
+
+       llog_process(NULL, lpi->lpi_loghandle, lpi->lpi_cb, lpi->lpi_cbdata,
+                    lpi->lpi_catdata);
+
+       complete(&lpi->lpi_completion);
+
+       return 0;
+}
+
+static int cat_check_old_cb(const struct lu_env *env, struct llog_handle *llh,
+                       struct llog_rec_hdr *rec, void *data)
+{
+       struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
+       struct lu_fid fid = {0};
+       struct lu_fid *prev_fid = data;
+
+       if (rec->lrh_type != LLOG_LOGID_MAGIC) {
+               CERROR("invalid record in catalog\n");
+               RETURN(-EINVAL);
+       }
+
+       logid_to_fid(&lir->lid_id, &fid);
+
+       CWARN("seeing record at index %d - "DFID" in log "DFID"\n",
+             rec->lrh_index, PFID(&fid),
+             PFID(lu_object_fid(&llh->lgh_obj->do_lu)));
+
+       if (prev_fid->f_oid > fid.f_oid) {
+               CWARN("processing old record, fail\n");
+               prev_fid->f_oid = 0xbad;
+               RETURN(LLOG_PROC_BREAK);
+       }
+
+       if (prev_fid->f_oid == 0)
+               cfs_fail_loc = OBD_FAIL_LLOG_PROCESS_TIMEOUT;
+
+       *prev_fid = fid;
+
+       RETURN(0);
+}
+
 /* test catalog wrap around */
 static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 {
-       struct llog_handle      *cath;
-       char                     name[10];
-       int                      rc, rc2, i, enospc, eok;
-       struct llog_mini_rec     lmr;
-       struct llog_ctxt        *ctxt;
-       struct lu_attr           la;
-       __u64                    cat_max_size;
-       struct dt_device        *dt;
+       struct llog_handle *cath;
+       char name[10];
+       int rc, rc2, i, enospc, eok;
+       struct llog_mini_rec lmr;
+       struct llog_ctxt *ctxt;
+       struct lu_attr la;
+       __u64 cat_max_size;
+       struct dt_device *dt;
 
        ENTRY;
 
@@ -1490,9 +1556,11 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        cat_logid = cath->lgh_id;
        dt = lu2dt_dev(cath->lgh_obj->do_lu.lo_dev);
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10c: sync failed: %d\n", rc);
@@ -1518,9 +1586,11 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        if (rc)
                GOTO(out, rc);
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10b: sync failed: %d\n", rc);
@@ -1542,17 +1612,21 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        if (rc)
                GOTO(out, rc);
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10c: sync failed: %d\n", rc);
                GOTO(out, rc);
        }
 
-       /* fill last allocated plain LLOG and reach -ENOSPC condition
-        * because no slot available in Catalog */
+       /*
+        * fill last allocated plain LLOG and reach -ENOSPC condition
+        * because no slot available in Catalog
+        */
        enospc = 0;
        eok = 0;
        CWARN("10c: write %d more log records\n", LLOG_TEST_RECNUM);
@@ -1563,8 +1637,10 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
                               LLOG_TEST_RECNUM, i + 1, rc);
                        GOTO(out, rc);
                }
-               /* after last added plain LLOG has filled up, all new
-                * records add should fail with -ENOSPC */
+               /*
+                * after last added plain LLOG has filled up, all new
+                * records add should fail with -ENOSPC
+                */
                if (rc == -ENOSPC) {
                        enospc++;
                } else {
@@ -1595,15 +1671,19 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        }
        cat_max_size = la.la_size;
 
-       /* cancel all 1st plain llog records to empty it, this will also cause
-        * its catalog entry to be freed for next forced wrap in 10e */
+       /*
+        * cancel all 1st plain llog records to empty it, this will also cause
+        * its catalog entry to be freed for next forced wrap in 10e
+        */
        CWARN("10d: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10d: process with llog_cancel_rec_cb failed: %d\n", rc);
-               /* need to indicate error if for any reason LLOG_TEST_RECNUM is
-                * not reached */
+               /*
+                * need to indicate error if for any reason LLOG_TEST_RECNUM is
+                * not reached
+                */
                if (rc == 0)
                        rc = -ERANGE;
                GOTO(out, rc);
@@ -1626,9 +1706,11 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        if (rc)
                GOTO(out, rc);
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10d: sync failed: %d\n", rc);
@@ -1645,8 +1727,10 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
                               LLOG_TEST_RECNUM, i + 1, rc);
                        GOTO(out, rc);
                }
-               /* after last added plain LLOG has filled up, all new
-                * records add should fail with -ENOSPC */
+               /*
+                * after last added plain LLOG has filled up, all new
+                * records add should fail with -ENOSPC
+                */
                if (rc == -ENOSPC) {
                        enospc++;
                } else {
@@ -1702,24 +1786,30 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        CWARN("10e: catalog successfully wrap around, last_idx %d, first %d\n",
              cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx);
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10e: sync failed: %d\n", rc);
                GOTO(out, rc);
        }
 
-       /* cancel more records to free one more slot in Catalog
-        * see if it is re-allocated when adding more records */
+       /*
+        * cancel more records to free one more slot in Catalog
+        * see if it is re-allocated when adding more records
+        */
        CWARN("10f: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10f: process with llog_cancel_rec_cb failed: %d\n", rc);
-               /* need to indicate error if for any reason LLOG_TEST_RECNUM is
-                * not reached */
+               /*
+                * need to indicate error if for any reason LLOG_TEST_RECNUM is
+                * not reached
+                */
                if (rc == 0)
                        rc = -ERANGE;
                GOTO(out, rc);
@@ -1742,9 +1832,11 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        if (rc)
                GOTO(out, rc);
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10f: sync failed: %d\n", rc);
@@ -1761,8 +1853,10 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
                               LLOG_TEST_RECNUM, i + 1, rc);
                        GOTO(out, rc);
                }
-               /* after last added plain LLOG has filled up, all new
-                * records add should fail with -ENOSPC */
+               /*
+                * after last added plain LLOG has filled up, all new
+                * records add should fail with -ENOSPC
+                */
                if (rc == -ENOSPC) {
                        enospc++;
                } else {
@@ -1806,9 +1900,11 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
                GOTO(out, rc = -EINVAL);
        }
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10f: sync failed: %d\n", rc);
@@ -1817,8 +1913,10 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        /* will llh_cat_idx also successfully wrap ? */
 
-       /* cancel all records in the plain LLOGs referenced by 2 last indexes in
-        * Catalog */
+       /*
+        * cancel all records in the plain LLOGs referenced by 2 last indexes in
+        * Catalog
+        */
 
        /* cancel more records to free one more slot in Catalog */
        CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
@@ -1850,9 +1948,11 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        if (rc)
                GOTO(out, rc);
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10g: sync failed: %d\n", rc);
@@ -1865,8 +1965,10 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
-               /* need to indicate error if for any reason LLOG_TEST_RECNUM is
-                * not reached */
+               /*
+                * need to indicate error if for any reason LLOG_TEST_RECNUM is
+                * not reached
+                */
                if (rc == 0)
                        rc = -ERANGE;
                GOTO(out, rc);
@@ -1897,9 +1999,11 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
                GOTO(out, rc = -EINVAL);
        }
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10g: sync failed: %d\n", rc);
@@ -1912,8 +2016,10 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
-               /* need to indicate error if for any reason LLOG_TEST_RECNUM is
-                * not reached */
+               /*
+                * need to indicate error if for any reason LLOG_TEST_RECNUM is
+                * not reached
+                */
                if (rc == 0)
                        rc = -ERANGE;
                GOTO(out, rc);
@@ -1946,6 +2052,64 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        CWARN("10g: llh_cat_idx has also successfully wrapped!\n");
 
+       /*
+        * catalog has only one valid entry other slots has outdated
+        * records. Trying to race the llog_thread_process with llog_add
+        * llog_thread_process read buffer and loop record on it.
+        * llog_add adds a record and mark a record in bitmap.
+        * llog_thread_process process record with old data.
+        */
+       {
+       struct llog_process_info lpi;
+       struct lu_fid test_fid = {0};
+
+       lpi.lpi_loghandle = cath;
+       lpi.lpi_cb = cat_check_old_cb;
+       lpi.lpi_catdata = NULL;
+       lpi.lpi_cbdata = &test_fid;
+       init_completion(&lpi.lpi_completion);
+
+       kthread_run(llog_test_process_thread, &lpi, "llog_test_process_thread");
+
+       msleep(1 * MSEC_PER_SEC);
+       enospc = 0;
+       eok = 0;
+       CWARN("10h: write %d more log records\n", LLOG_TEST_RECNUM);
+       for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+               rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
+               if (rc && rc != -ENOSPC) {
+                       CERROR("10h: write %d records failed at #%d: %d\n",
+                              LLOG_TEST_RECNUM, i + 1, rc);
+                       GOTO(out, rc);
+               }
+               /*
+                * after last added plain LLOG has filled up, all new
+                * records add should fail with -ENOSPC
+                */
+               if (rc == -ENOSPC) {
+                       enospc++;
+               } else {
+                       enospc = 0;
+                       eok++;
+               }
+       }
+
+       if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
+               CERROR("10h: all last records adds should have failed with"
+                      " -ENOSPC\n");
+               GOTO(out, rc = -EINVAL);
+       }
+
+       CWARN("10h: wrote %d records then %d failed with ENOSPC\n", eok,
+             enospc);
+
+       wait_for_completion(&lpi.lpi_completion);
+
+       if (test_fid.f_oid == 0xbad) {
+               CERROR("10h: race happened, old record was processed\n");
+               GOTO(out, rc = -EINVAL);
+       }
+       }
 out:
        cfs_fail_loc = 0;
        cfs_fail_val = 0;
@@ -1962,15 +2126,17 @@ ctxt_release:
        RETURN(rc);
 }
 
-/* -------------------------------------------------------------------------
+/*
+ * -------------------------------------------------------------------------
  * Tests above, boring obd functions below
- * ------------------------------------------------------------------------- */
+ * -------------------------------------------------------------------------
+ */
 static int llog_run_tests(const struct lu_env *env, struct obd_device *obd)
 {
-       struct llog_handle      *llh = NULL;
-       struct llog_ctxt        *ctxt;
-       int                      rc, err;
-       char                     name[10];
+       struct llog_handle *llh = NULL;
+       struct llog_ctxt *ctxt;
+       int rc, err;
+       char name[10];
 
        ENTRY;
        ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
@@ -2032,9 +2198,9 @@ cleanup_ctxt:
 
 static int llog_test_cleanup(struct obd_device *obd)
 {
-       struct obd_device       *tgt;
-       struct lu_env            env;
-       int                      rc;
+       struct obd_device *tgt;
+       struct lu_env env;
+       int rc;
 
        ENTRY;
 
@@ -2052,32 +2218,32 @@ static int llog_test_cleanup(struct obd_device *obd)
 
 static int llog_test_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
-       struct obd_device       *tgt;
-       struct llog_ctxt        *ctxt;
-       struct dt_object        *o;
-       struct lu_env            env;
-       struct lu_context        test_session;
-       int                      rc;
-
-        ENTRY;
-
-        if (lcfg->lcfg_bufcount < 2) {
-                CERROR("requires a TARGET OBD name\n");
-                RETURN(-EINVAL);
-        }
+       struct obd_device *tgt;
+       struct llog_ctxt *ctxt;
+       struct dt_object *o;
+       struct lu_env env;
+       struct lu_context test_session;
+       int rc;
 
-        if (lcfg->lcfg_buflens[1] < 1) {
-                CERROR("requires a TARGET OBD name\n");
-                RETURN(-EINVAL);
-        }
+       ENTRY;
 
-        /* disk obd */
-        tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
-        if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
-                CERROR("target device not attached or not set up (%s)\n",
-                       lustre_cfg_string(lcfg, 1));
-                RETURN(-EINVAL);
-        }
+       if (lcfg->lcfg_bufcount < 2) {
+               CERROR("requires a TARGET OBD name\n");
+               RETURN(-EINVAL);
+       }
+
+       if (lcfg->lcfg_buflens[1] < 1) {
+               CERROR("requires a TARGET OBD name\n");
+               RETURN(-EINVAL);
+       }
+
+       /* disk obd */
+       tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
+       if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
+               CERROR("target device not attached or not set up (%s)\n",
+                       lustre_cfg_string(lcfg, 1));
+               RETURN(-EINVAL);
+       }
 
        rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
        if (rc)
@@ -2126,14 +2292,14 @@ cleanup_env:
 }
 
 static struct obd_ops llog_obd_ops = {
-        .o_owner       = THIS_MODULE,
-        .o_setup       = llog_test_setup,
-        .o_cleanup     = llog_test_cleanup,
+       .o_owner       = THIS_MODULE,
+       .o_setup       = llog_test_setup,
+       .o_cleanup     = llog_test_cleanup,
 };
 
 static int __init llog_test_init(void)
 {
-       return class_register_type(&llog_obd_ops, NULL, true, NULL,
+       return class_register_type(&llog_obd_ops, NULL, false, NULL,
                                   "llog_test", NULL);
 }