Whamcloud - gitweb
LU-11975 test: fix for llog test 10h
[fs/lustre-release.git] / lustre / obdclass / llog_test.c
index b0737f3..856e8d0 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2015, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -43,6 +39,8 @@
 
 #include <linux/module.h>
 #include <linux/init.h>
+#include <linux/kthread.h>
+#include <linux/delay.h>
 
 #include <obd_class.h>
 #include <lustre_fid.h>
 /* This is slightly more than the number of records that can fit into a
  * single llog file, because the llog_log_header takes up some of the
  * space in the first block that cannot be used for the bitmap. */
-#define LLOG_TEST_RECNUM  (LLOG_MIN_CHUNK_SIZE * 8)
-
+static int llog_test_recnum = (LLOG_MIN_CHUNK_SIZE * 8);
 static int llog_test_rand;
 static struct obd_uuid uuid = { .uuid = "test_uuid" };
 static struct llog_logid cat_logid;
 
 struct llog_mini_rec {
-        struct llog_rec_hdr     lmr_hdr;
-        struct llog_rec_tail    lmr_tail;
+       struct llog_rec_hdr lmr_hdr;
+       struct llog_rec_tail lmr_tail;
 } __attribute__((packed));
 
 static int verify_handle(char *test, struct llog_handle *llh, int num_recs)
@@ -105,8 +102,8 @@ static int verify_handle(char *test, struct llog_handle *llh, int num_recs)
 static int llog_test_1(const struct lu_env *env,
                       struct obd_device *obd, char *name)
 {
-       struct llog_handle      *llh;
-       struct llog_ctxt        *ctxt;
+       struct llog_handle *llh;
+       struct llog_ctxt *ctxt;
        int rc;
        int rc2;
 
@@ -152,11 +149,11 @@ static int test_2_cancel_cb(const struct lu_env *env, struct llog_handle *llh,
 static int llog_test_2(const struct lu_env *env, struct obd_device *obd,
                       char *name, struct llog_handle **llh)
 {
-       struct llog_ctxt        *ctxt;
-       struct llog_handle      *lgh;
-       struct llog_logid        logid;
-       int                      rc;
-       struct llog_mini_rec     lmr;
+       struct llog_ctxt *ctxt;
+       struct llog_handle *lgh;
+       struct llog_logid  logid;
+       int rc;
+       struct llog_mini_rec lmr;
 
        ENTRY;
 
@@ -298,27 +295,31 @@ static int test3_check_n_add_cb(const struct lu_env *env,
                LASSERT(lgh->lgh_hdr->llh_size > 0);
                if (lgh->lgh_cur_offset != lgh->lgh_hdr->llh_hdr.lrh_len +
                                        (cur_idx - 1) * lgh->lgh_hdr->llh_size)
-                       CERROR("Wrong record offset in cur_off: "LPU64", should be %u\n",
+                       CERROR("Wrong record offset in cur_off: %llu, should be %u\n",
                               lgh->lgh_cur_offset,
                               lgh->lgh_hdr->llh_hdr.lrh_len +
                               (cur_idx - 1) * lgh->lgh_hdr->llh_size);
        } else {
                size_t chunk_size = lgh->lgh_hdr->llh_hdr.lrh_len;
 
-               /* For variable size records the start offset is unknown, trust
-                * the first value and check others are consistent with it. */
+               /*
+                * For variable size records the start offset is unknown, trust
+                * the first value and check others are consistent with it.
+                */
                if (test_3_rec_off == 0)
                        test_3_rec_off = lgh->lgh_cur_offset;
 
                if (lgh->lgh_cur_offset != test_3_rec_off) {
+                       __u64 tmp = lgh->lgh_cur_offset;
+
                        /* there can be padding record */
-                       if ((lgh->lgh_cur_offset % chunk_size == 0) &&
+                       if ((do_div(tmp, chunk_size) == 0) &&
                            (lgh->lgh_cur_offset - test_3_rec_off <
                             rec->lrh_len + LLOG_MIN_REC_SIZE)) {
                                test_3_rec_off = lgh->lgh_cur_offset;
                                test_3_paddings++;
                        } else {
-                               CERROR("Wrong record offset in cur_off: "LPU64
+                               CERROR("Wrong record offset in cur_off: %llu"
                                       ", should be %lld (rec len %u)\n",
                                       lgh->lgh_cur_offset,
                                       (long long)test_3_rec_off,
@@ -339,8 +340,10 @@ static int test3_check_n_add_cb(const struct lu_env *env,
        if (rc < 0)
                CERROR("cb_test_3: cannot modify record while processing\n");
 
-       /* Add new record to the llog at *last_rec position one by one to
-        * check that last block is re-read during processing */
+       /*
+        * Add new record to the llog at *last_rec position one by one to
+        * check that last block is re-read during processing
+        */
        if (cur_idx == *last_rec || cur_idx == (*last_rec + 1)) {
                rc = llog_write(env, lgh, rec, LLOG_NEXT_IDX);
                if (rc < 0)
@@ -406,7 +409,8 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
        llh->lgh_hdr->llh_size = sizeof(struct llog_gen_rec);
        llh->lgh_hdr->llh_flags |= LLOG_F_IS_FIXSIZE;
 
-       /* Fill the llog with 64-bytes records, use 1023 records,
+       /*
+        * Fill the llog with 64-bytes records, use 1023 records,
         * so last chunk will be partially full. Don't change this
         * value until record size is changed.
         */
@@ -468,14 +472,17 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
 
        CWARN("3b: write 566 variable size llog records\n");
 
-       /* Drop llh_size to 0 to mark llog as variable-size and write
-        * header to make this change permanent. */
+       /*
+        * Drop llh_size to 0 to mark llog as variable-size and write
+        * header to make this change permanent.
+        */
        llh->lgh_hdr->llh_flags &= ~LLOG_F_IS_FIXSIZE;
        llog_write(env, llh, &llh->lgh_hdr->llh_hdr, LLOG_HEADER_IDX);
 
        hdr->lrh_type = OBD_CFG_REC;
 
-       /* there are 1025 64-bytes records in llog already,
+       /*
+        * there are 1025 64-bytes records in llog already,
         * the last chunk contains single record, i.e. 64 bytes.
         * Each pair of variable size records is 200 bytes, so
         * we will have the following distribution per chunks:
@@ -568,15 +575,15 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
 /* Test catalogue additions */
 static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
 {
-       struct llog_handle      *cath;
-       char                     name[10];
-       int                      rc, rc2, i, buflen;
-       struct llog_mini_rec     lmr;
-       struct llog_cookie       cookie;
-       struct llog_ctxt        *ctxt;
-       int                      num_recs = 0;
-       char                    *buf;
-       struct llog_rec_hdr     *rec;
+       struct llog_handle *cath, *llh;
+       char name[10];
+       int rc, rc2, i, buflen;
+       struct llog_mini_rec lmr;
+       struct llog_cookie cookie;
+       struct llog_ctxt *ctxt;
+       int num_recs = 0;
+       char *buf;
+       struct llog_rec_hdr *rec;
 
        ENTRY;
 
@@ -617,6 +624,18 @@ static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
        if (rc)
                GOTO(out, rc);
 
+       /* estimate the max number of record for the plain llog
+        * cause it depends on disk size
+        */
+       llh = cath->u.chd.chd_current_log;
+       if (llh->lgh_max_size != 0) {
+               llog_test_recnum = (llh->lgh_max_size -
+                       sizeof(struct llog_log_hdr)) / LLOG_MIN_REC_SIZE;
+       }
+
+       if (llog_test_recnum >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr))
+               llog_test_recnum = LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1;
+
        CWARN("4c: cancel 1 log record\n");
        rc = llog_cat_cancel_records(env, cath, 1, &cookie);
        if (rc) {
@@ -629,12 +648,12 @@ static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
        if (rc)
                GOTO(out, rc);
 
-       CWARN("4d: write %d more log records\n", LLOG_TEST_RECNUM);
-       for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+       CWARN("4d: write %d more log records\n", llog_test_recnum);
+       for (i = 0; i < llog_test_recnum; i++) {
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc) {
                        CERROR("4d: write %d records failed at #%d: %d\n",
-                              LLOG_TEST_RECNUM, i + 1, rc);
+                              llog_test_recnum, i + 1, rc);
                        GOTO(out, rc);
                }
                num_recs++;
@@ -682,8 +701,8 @@ static int cat_counter;
 static int cat_print_cb(const struct lu_env *env, struct llog_handle *llh,
                        struct llog_rec_hdr *rec, void *data)
 {
-       struct llog_logid_rec   *lir = (struct llog_logid_rec *)rec;
-       struct lu_fid            fid = {0};
+       struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
+       struct lu_fid fid = {0};
 
        if (rec->lrh_type != LLOG_LOGID_MAGIC) {
                CERROR("invalid record in catalog\n");
@@ -741,7 +760,7 @@ static int llog_cancel_rec_cb(const struct lu_env *env,
 
        llog_cat_cancel_records(env, llh->u.phd.phd_cat_handle, 1, &cookie);
        cancel_count++;
-       if (cancel_count == LLOG_TEST_RECNUM)
+       if (cancel_count == llog_test_recnum)
                RETURN(-LLOG_EEMPTY);
        RETURN(0);
 }
@@ -749,11 +768,11 @@ static int llog_cancel_rec_cb(const struct lu_env *env,
 /* Test log and catalogue processing */
 static int llog_test_5(const struct lu_env *env, struct obd_device *obd)
 {
-       struct llog_handle      *llh = NULL;
-       char                     name[10];
-       int                      rc, rc2;
-       struct llog_mini_rec     lmr;
-       struct llog_ctxt        *ctxt;
+       struct llog_handle *llh = NULL;
+       char name[10];
+       int rc, rc2;
+       struct llog_mini_rec lmr;
+       struct llog_ctxt *ctxt;
 
        ENTRY;
 
@@ -788,7 +807,7 @@ static int llog_test_5(const struct lu_env *env, struct obd_device *obd)
                GOTO(out, rc = -EINVAL);
        }
 
-       CWARN("5c: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       CWARN("5c: Cancel %d records, see one log zapped\n", llog_test_recnum);
        cancel_count = 0;
        rc = llog_cat_process(env, llh, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
@@ -859,14 +878,14 @@ out_put:
 static int llog_test_6(const struct lu_env *env, struct obd_device *obd,
                       char *name)
 {
-       struct obd_device       *mgc_obd;
-       struct llog_ctxt        *ctxt;
-       struct obd_uuid         *mgs_uuid;
-       struct obd_export       *exp;
-       struct obd_uuid          uuid = { "LLOG_TEST6_UUID" };
-       struct llog_handle      *llh = NULL;
-       struct llog_ctxt        *nctxt;
-       int                      rc, rc2;
+       struct obd_device *mgc_obd;
+       struct llog_ctxt *ctxt;
+       struct obd_uuid *mgs_uuid;
+       struct obd_export *exp;
+       struct obd_uuid uuid = { "LLOG_TEST6_UUID" };
+       struct llog_handle *llh = NULL;
+       struct llog_ctxt *nctxt;
+       int rc, rc2;
 
        ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
        LASSERT(ctxt);
@@ -944,6 +963,7 @@ static union {
        struct llog_logid_rec           llr;   /* LLOG_LOGID_MAGIC */
        struct llog_unlink64_rec        lur;   /* MDS_UNLINK64_REC */
        struct llog_setattr64_rec       lsr64; /* MDS_SETATTR64_REC */
+       struct llog_setattr64_rec_v2    lsr64_v2; /* MDS_SETATTR64_REC */
        struct llog_size_change_rec     lscr;  /* OST_SZ_REC */
        struct llog_changelog_rec       lcr;   /* CHANGELOG_REC */
        struct llog_changelog_user_rec  lcur;  /* CHANGELOG_USER_REC */
@@ -974,9 +994,9 @@ static int test_7_cancel_cb(const struct lu_env *env, struct llog_handle *llh,
 
 static int llog_test_7_sub(const struct lu_env *env, struct llog_ctxt *ctxt)
 {
-       struct llog_handle      *llh;
-       int                      rc = 0, i, process_count;
-       int                      num_recs = 0;
+       struct llog_handle *llh;
+       int rc = 0, i, process_count;
+       int num_recs = 0;
 
        ENTRY;
 
@@ -1059,8 +1079,8 @@ out_close:
 /* Test all llog records writing and processing */
 static int llog_test_7(const struct lu_env *env, struct obd_device *obd)
 {
-       struct llog_ctxt        *ctxt;
-       int                      rc;
+       struct llog_ctxt *ctxt;
+       int rc;
 
        ENTRY;
 
@@ -1143,63 +1163,19 @@ static int llog_test_7(const struct lu_env *env, struct obd_device *obd)
                CERROR("7g: llog_size_change_rec test failed\n");
                GOTO(out, rc);
        }
-out:
-       llog_ctxt_put(ctxt);
-       RETURN(rc);
-}
-
-static int llog_truncate(const struct lu_env *env, struct dt_object *o)
-{
-       struct lu_attr           la;
-       struct thandle          *th;
-       struct dt_device        *d;
-       int                      rc;
-       ENTRY;
-
-       LASSERT(o);
-       d = lu2dt_dev(o->do_lu.lo_dev);
-       LASSERT(d);
 
-       rc = dt_attr_get(env, o, &la);
-       if (rc)
-               RETURN(rc);
+       CWARN("7h: test llog_setattr64_rec_v2\n");
+       llog_records.lsr64.lsr_hdr.lrh_len = sizeof(llog_records.lsr64_v2);
+       llog_records.lsr64.lsr_tail.lrt_len = sizeof(llog_records.lsr64_v2);
+       llog_records.lsr64.lsr_hdr.lrh_type = MDS_SETATTR64_REC;
 
-       CDEBUG(D_OTHER, "original size "LPU64"\n", la.la_size);
-       rc = sizeof(struct llog_log_hdr) + sizeof(struct llog_mini_rec);
-       if (la.la_size < rc) {
-               CERROR("too small llog: "LPU64"\n", la.la_size);
-               RETURN(0);
+       rc = llog_test_7_sub(env, ctxt);
+       if (rc) {
+               CERROR("7h: llog_setattr64_rec_v2 test failed\n");
+               GOTO(out, rc);
        }
-
-       /* drop 2 records */
-       la.la_size = la.la_size - (sizeof(struct llog_mini_rec) * 2);
-       la.la_valid = LA_SIZE;
-
-       th = dt_trans_create(env, d);
-       if (IS_ERR(th))
-               RETURN(PTR_ERR(th));
-
-       rc = dt_declare_attr_set(env, o, &la, th);
-       if (rc)
-               GOTO(stop, rc);
-
-       rc = dt_declare_punch(env, o, la.la_size, OBD_OBJECT_EOF, th);
-
-       rc = dt_trans_start_local(env, d, th);
-       if (rc)
-               GOTO(stop, rc);
-
-       rc = dt_punch(env, o, la.la_size, OBD_OBJECT_EOF, th);
-       if (rc)
-               GOTO(stop, rc);
-
-       rc = dt_attr_set(env, o, &la, th);
-       if (rc)
-               GOTO(stop, rc);
-
-stop:
-       dt_trans_stop(env, d, th);
-
+out:
+       llog_ctxt_put(ctxt);
        RETURN(rc);
 }
 
@@ -1212,13 +1188,13 @@ static int test_8_cb(const struct lu_env *env, struct llog_handle *llh,
 
 static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
 {
-       struct llog_handle      *llh = NULL;
-       char                     name[10];
-       int                      rc, rc2, i;
-       int                      orig_counter;
-       struct llog_mini_rec     lmr;
-       struct llog_ctxt        *ctxt;
-       struct dt_object        *obj = NULL;
+       struct llog_handle *llh = NULL;
+       char name[10];
+       int rc, rc2, i;
+       int orig_counter;
+       struct llog_mini_rec lmr;
+       struct llog_ctxt *ctxt;
+       struct dt_object *obj = NULL;
 
        ENTRY;
 
@@ -1292,7 +1268,7 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
                }
        }
        CWARN("8b: second llog "DFID"\n",
-               PFID(lu_object_fid(&llh->u.chd.chd_current_log->lgh_obj->do_lu)));
+             PFID(lu_object_fid(&llh->u.chd.chd_current_log->lgh_obj->do_lu)));
 
        rc2 = llog_cat_close(env, llh);
        if (rc2) {
@@ -1302,8 +1278,10 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
                GOTO(out_put, rc);
        }
 
-       CWARN("8c: drop two records from the first plain llog\n");
-       llog_truncate(env, obj);
+       /* Here was 8c: drop two records from the first plain llog
+        * llog_truncate was bad idea cause it creates a wrong state,
+        * lgh_last_idx is wrong and two records belongs to zeroed buffer
+        */
 
        CWARN("8d: count survived records\n");
        rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
@@ -1325,9 +1303,9 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
                GOTO(out, rc);
        }
 
-       if (orig_counter + 200 - 2 != plain_counter) {
+       if (orig_counter + 200 != plain_counter) {
                CERROR("found %d records (expected %d)\n", plain_counter,
-                      orig_counter + 200 - 2);
+                      orig_counter + 200);
                rc = -EIO;
        }
 
@@ -1343,16 +1321,16 @@ out_put:
        llog_ctxt_put(ctxt);
 
        if (obj != NULL)
-               lu_object_put(env, &obj->do_lu);
+               dt_object_put(env, obj);
 
        RETURN(rc);
 }
 
 static int llog_test_9_sub(const struct lu_env *env, struct llog_ctxt *ctxt)
 {
-       struct llog_handle      *llh;
-       struct lu_fid            fid;
-       int                      rc = 0;
+       struct llog_handle *llh;
+       struct lu_fid fid;
+       int rc = 0;
 
        ENTRY;
 
@@ -1387,8 +1365,8 @@ out_close:
 /* Prepare different types of llog records for llog_reader test*/
 static int llog_test_9(const struct lu_env *env, struct obd_device *obd)
 {
-       struct llog_ctxt        *ctxt;
-       int                      rc;
+       struct llog_ctxt *ctxt;
+       int rc;
 
        ENTRY;
 
@@ -1444,17 +1422,80 @@ out:
        RETURN(rc);
 }
 
+struct llog_process_info {
+       struct llog_handle *lpi_loghandle;
+       llog_cb_t lpi_cb;
+       void *lpi_cbdata;
+       void *lpi_catdata;
+       int lpi_rc;
+       struct completion lpi_completion;
+       const struct lu_env *lpi_env;
+       struct task_struct *lpi_reftask;
+};
+
+
+static int llog_test_process_thread(void *arg)
+{
+       struct llog_process_info *lpi = arg;
+       int rc;
+
+       rc = llog_cat_process_or_fork(NULL, lpi->lpi_loghandle, lpi->lpi_cb,
+                                     NULL, lpi->lpi_cbdata, 1, 0, true);
+
+       complete(&lpi->lpi_completion);
+
+       lpi->lpi_rc = rc;
+       if (rc)
+               CWARN("10h: Error during catalog processing %d\n", rc);
+       return rc;
+}
+
+static int cat_check_old_cb(const struct lu_env *env, struct llog_handle *llh,
+                       struct llog_rec_hdr *rec, void *data)
+{
+       struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
+       struct lu_fid fid = {0};
+       struct lu_fid *prev_fid = data;
+
+       if (rec->lrh_type != LLOG_LOGID_MAGIC) {
+               CERROR("invalid record in catalog\n");
+               RETURN(-EINVAL);
+       }
+
+       logid_to_fid(&lir->lid_id, &fid);
+
+       CWARN("seeing record at index %d - "DFID" in log "DFID"\n",
+             rec->lrh_index, PFID(&fid),
+             PFID(lu_object_fid(&llh->lgh_obj->do_lu)));
+
+       if (prev_fid->f_oid > fid.f_oid) {
+               CWARN("processing old record, fail\n");
+               prev_fid->f_oid = 0xbad;
+               RETURN(-LLOG_EEMPTY);
+       }
+
+       if (prev_fid->f_oid == 0) {
+               cfs_fail_loc = OBD_FAIL_ONCE | OBD_FAIL_LLOG_PROCESS_TIMEOUT;
+               cfs_fail_val = (unsigned int) (llh->lgh_id.lgl_oi.oi.oi_id &
+                                              0xFFFFFFFF);
+               msleep(1 * MSEC_PER_SEC);
+       }
+       *prev_fid = fid;
+
+       RETURN(0);
+}
+
 /* test catalog wrap around */
 static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 {
-       struct llog_handle      *cath;
-       char                     name[10];
-       int                      rc, rc2, i, enospc, eok;
-       struct llog_mini_rec     lmr;
-       struct llog_ctxt        *ctxt;
-       struct lu_attr           la;
-       __u64                    cat_max_size;
-       struct dt_device        *dt;
+       struct llog_handle *cath;
+       char name[10];
+       int rc, rc2, i, enospc, eok;
+       struct llog_mini_rec lmr;
+       struct llog_ctxt *ctxt;
+       struct lu_attr la;
+       __u64 cat_max_size;
+       struct dt_device *dt;
 
        ENTRY;
 
@@ -1480,16 +1521,27 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        cat_logid = cath->lgh_id;
        dt = lu2dt_dev(cath->lgh_obj->do_lu.lo_dev);
 
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
+        * to consume a huge space with delayed journal commit callbacks
+        * particularly on low memory nodes or VMs
+        */
+       rc = dt_sync(env, dt);
+       if (rc) {
+               CERROR("10c: sync failed: %d\n", rc);
+               GOTO(out, rc);
+       }
+
        /* force catalog wrap for 5th plain LLOG */
        cfs_fail_loc = CFS_FAIL_SKIP|OBD_FAIL_CAT_RECORDS;
        cfs_fail_val = 4;
 
-       CWARN("10b: write %d log records\n", LLOG_TEST_RECNUM);
-       for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+       CWARN("10b: write %d log records\n", llog_test_recnum);
+       for (i = 0; i < llog_test_recnum; i++) {
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc) {
                        CERROR("10b: write %d records failed at #%d: %d\n",
-                              LLOG_TEST_RECNUM, i + 1, rc);
+                              llog_test_recnum, i + 1, rc);
                        GOTO(out, rc);
                }
        }
@@ -1499,21 +1551,23 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        if (rc)
                GOTO(out, rc);
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10b: sync failed: %d\n", rc);
                GOTO(out, rc);
        }
 
-       CWARN("10c: write %d more log records\n", 2 * LLOG_TEST_RECNUM);
-       for (i = 0; i < 2 * LLOG_TEST_RECNUM; i++) {
+       CWARN("10c: write %d more log records\n", 2 * llog_test_recnum);
+       for (i = 0; i < 2 * llog_test_recnum; i++) {
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc) {
                        CERROR("10c: write %d records failed at #%d: %d\n",
-                              2*LLOG_TEST_RECNUM, i + 1, rc);
+                              2*llog_test_recnum, i + 1, rc);
                        GOTO(out, rc);
                }
        }
@@ -1523,29 +1577,35 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        if (rc)
                GOTO(out, rc);
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10c: sync failed: %d\n", rc);
                GOTO(out, rc);
        }
 
-       /* fill last allocated plain LLOG and reach -ENOSPC condition
-        * because no slot available in Catalog */
+       /*
+        * fill last allocated plain LLOG and reach -ENOSPC condition
+        * because no slot available in Catalog
+        */
        enospc = 0;
        eok = 0;
-       CWARN("10c: write %d more log records\n", LLOG_TEST_RECNUM);
-       for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+       CWARN("10c: write %d more log records\n", llog_test_recnum);
+       for (i = 0; i < llog_test_recnum; i++) {
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc && rc != -ENOSPC) {
                        CERROR("10c: write %d records failed at #%d: %d\n",
-                              LLOG_TEST_RECNUM, i + 1, rc);
+                              llog_test_recnum, i + 1, rc);
                        GOTO(out, rc);
                }
-               /* after last added plain LLOG has filled up, all new
-                * records add should fail with -ENOSPC */
+               /*
+                * after last added plain LLOG has filled up, all new
+                * records add should fail with -ENOSPC
+                */
                if (rc == -ENOSPC) {
                        enospc++;
                } else {
@@ -1554,7 +1614,7 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
                }
        }
 
-       if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
+       if ((enospc == 0) && (enospc+eok != llog_test_recnum)) {
                CERROR("10c: all last records adds should have failed with"
                       " -ENOSPC\n");
                GOTO(out, rc = -EINVAL);
@@ -1576,15 +1636,19 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        }
        cat_max_size = la.la_size;
 
-       /* cancel all 1st plain llog records to empty it, this will also cause
-        * its catalog entry to be freed for next forced wrap in 10e */
-       CWARN("10d: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       /*
+        * cancel all 1st plain llog records to empty it, this will also cause
+        * its catalog entry to be freed for next forced wrap in 10e
+        */
+       CWARN("10d: Cancel %d records, see one log zapped\n", llog_test_recnum);
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10d: process with llog_cancel_rec_cb failed: %d\n", rc);
-               /* need to indicate error if for any reason LLOG_TEST_RECNUM is
-                * not reached */
+               /*
+                * need to indicate error if for any reason llog_test_recnum is
+                * not reached
+                */
                if (rc == 0)
                        rc = -ERANGE;
                GOTO(out, rc);
@@ -1607,9 +1671,11 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        if (rc)
                GOTO(out, rc);
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10d: sync failed: %d\n", rc);
@@ -1618,16 +1684,18 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        enospc = 0;
        eok = 0;
-       CWARN("10e: write %d more log records\n", LLOG_TEST_RECNUM);
-       for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+       CWARN("10e: write %d more log records\n", llog_test_recnum);
+       for (i = 0; i < llog_test_recnum; i++) {
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc && rc != -ENOSPC) {
                        CERROR("10e: write %d records failed at #%d: %d\n",
-                              LLOG_TEST_RECNUM, i + 1, rc);
+                              llog_test_recnum, i + 1, rc);
                        GOTO(out, rc);
                }
-               /* after last added plain LLOG has filled up, all new
-                * records add should fail with -ENOSPC */
+               /*
+                * after last added plain LLOG has filled up, all new
+                * records add should fail with -ENOSPC
+                */
                if (rc == -ENOSPC) {
                        enospc++;
                } else {
@@ -1636,7 +1704,7 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
                }
        }
 
-       if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
+       if ((enospc == 0) && (enospc+eok != llog_test_recnum)) {
                CERROR("10e: all last records adds should have failed with"
                       " -ENOSPC\n");
                GOTO(out, rc = -EINVAL);
@@ -1647,13 +1715,14 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        CWARN("10e: print the catalog entries.. we expect 4\n");
        cat_counter = 0;
-       rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+       rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10",
+                                     0, 0, false);
        if (rc) {
-               CERROR("10d: process with cat_print_cb failed: %d\n", rc);
+               CERROR("10e: process with cat_print_cb failed: %d\n", rc);
                GOTO(out, rc);
        }
        if (cat_counter != 4) {
-               CERROR("10d: %d entries in catalog\n", cat_counter);
+               CERROR("10e: %d entries in catalog\n", cat_counter);
                GOTO(out, rc = -EINVAL);
        }
 
@@ -1676,31 +1745,37 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        if (la.la_size != cat_max_size) {
                CERROR("10e: catalog size has changed after it has wrap around,"
-                      " current size = "LPU64", expected size = "LPU64"\n",
+                      " current size = %llu, expected size = %llu\n",
                       la.la_size, cat_max_size);
                GOTO(out, rc = -EINVAL);
        }
        CWARN("10e: catalog successfully wrap around, last_idx %d, first %d\n",
              cath->lgh_last_idx, cath->lgh_hdr->llh_cat_idx);
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10e: sync failed: %d\n", rc);
                GOTO(out, rc);
        }
 
-       /* cancel more records to free one more slot in Catalog
-        * see if it is re-allocated when adding more records */
-       CWARN("10f: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       /*
+        * cancel more records to free one more slot in Catalog
+        * see if it is re-allocated when adding more records
+        */
+       CWARN("10f: Cancel %d records, see one log zapped\n", llog_test_recnum);
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10f: process with llog_cancel_rec_cb failed: %d\n", rc);
-               /* need to indicate error if for any reason LLOG_TEST_RECNUM is
-                * not reached */
+               /*
+                * need to indicate error if for any reason llog_test_recnum is
+                * not reached
+                */
                if (rc == 0)
                        rc = -ERANGE;
                GOTO(out, rc);
@@ -1708,7 +1783,8 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        CWARN("10f: print the catalog entries.. we expect 3\n");
        cat_counter = 0;
-       rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+       rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10",
+                                     0, 0, false);
        if (rc) {
                CERROR("10f: process with cat_print_cb failed: %d\n", rc);
                GOTO(out, rc);
@@ -1723,9 +1799,11 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        if (rc)
                GOTO(out, rc);
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10f: sync failed: %d\n", rc);
@@ -1734,16 +1812,18 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        enospc = 0;
        eok = 0;
-       CWARN("10f: write %d more log records\n", LLOG_TEST_RECNUM);
-       for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+       CWARN("10f: write %d more log records\n", llog_test_recnum);
+       for (i = 0; i < llog_test_recnum; i++) {
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc && rc != -ENOSPC) {
                        CERROR("10f: write %d records failed at #%d: %d\n",
-                              LLOG_TEST_RECNUM, i + 1, rc);
+                              llog_test_recnum, i + 1, rc);
                        GOTO(out, rc);
                }
-               /* after last added plain LLOG has filled up, all new
-                * records add should fail with -ENOSPC */
+               /*
+                * after last added plain LLOG has filled up, all new
+                * records add should fail with -ENOSPC
+                */
                if (rc == -ENOSPC) {
                        enospc++;
                } else {
@@ -1752,7 +1832,7 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
                }
        }
 
-       if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
+       if ((enospc == 0) && (enospc+eok != llog_test_recnum)) {
                CERROR("10f: all last records adds should have failed with"
                       " -ENOSPC\n");
                GOTO(out, rc = -EINVAL);
@@ -1782,14 +1862,16 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        if (la.la_size != cat_max_size) {
                CERROR("10f: catalog size has changed after it has wrap around,"
-                      " current size = "LPU64", expected size = "LPU64"\n",
+                      " current size = %llu, expected size = %llu\n",
                       la.la_size, cat_max_size);
                GOTO(out, rc = -EINVAL);
        }
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10f: sync failed: %d\n", rc);
@@ -1798,16 +1880,18 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        /* will llh_cat_idx also successfully wrap ? */
 
-       /* cancel all records in the plain LLOGs referenced by 2 last indexes in
-        * Catalog */
+       /*
+        * cancel all records in the plain LLOGs referenced by 2 last indexes in
+        * Catalog
+        */
 
        /* cancel more records to free one more slot in Catalog */
-       CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       CWARN("10g: Cancel %d records, see one log zapped\n", llog_test_recnum);
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
-               /* need to indicate error if for any reason LLOG_TEST_RECNUM is
+               /* need to indicate error if for any reason llog_test_recnum is
                 * not reached */
                if (rc == 0)
                        rc = -ERANGE;
@@ -1816,7 +1900,8 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        CWARN("10g: print the catalog entries.. we expect 3\n");
        cat_counter = 0;
-       rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+       rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10",
+                                     0, 0, false);
        if (rc) {
                CERROR("10g: process with cat_print_cb failed: %d\n", rc);
                GOTO(out, rc);
@@ -1831,9 +1916,11 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        if (rc)
                GOTO(out, rc);
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10g: sync failed: %d\n", rc);
@@ -1841,13 +1928,15 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        }
 
        /* cancel more records to free one more slot in Catalog */
-       CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       CWARN("10g: Cancel %d records, see one log zapped\n", llog_test_recnum);
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
-               /* need to indicate error if for any reason LLOG_TEST_RECNUM is
-                * not reached */
+               /*
+                * need to indicate error if for any reason llog_test_recnum is
+                * not reached
+                */
                if (rc == 0)
                        rc = -ERANGE;
                GOTO(out, rc);
@@ -1855,7 +1944,8 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        CWARN("10g: print the catalog entries.. we expect 2\n");
        cat_counter = 0;
-       rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+       rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10",
+                                     0, 0, false);
        if (rc) {
                CERROR("10g: process with cat_print_cb failed: %d\n", rc);
                GOTO(out, rc);
@@ -1878,9 +1968,11 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
                GOTO(out, rc = -EINVAL);
        }
 
-       /* sync device to commit all recent LLOG changes to disk and avoid
+       /*
+        * sync device to commit all recent LLOG changes to disk and avoid
         * to consume a huge space with delayed journal commit callbacks
-        * particularly on low memory nodes or VMs */
+        * particularly on low memory nodes or VMs
+        */
        rc = dt_sync(env, dt);
        if (rc) {
                CERROR("10g: sync failed: %d\n", rc);
@@ -1888,13 +1980,15 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        }
 
        /* cancel more records to free one more slot in Catalog */
-       CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       CWARN("10g: Cancel %d records, see one log zapped\n", llog_test_recnum);
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
-               /* need to indicate error if for any reason LLOG_TEST_RECNUM is
-                * not reached */
+               /*
+                * need to indicate error if for any reason llog_test_recnum is
+                * not reached
+                */
                if (rc == 0)
                        rc = -ERANGE;
                GOTO(out, rc);
@@ -1902,7 +1996,8 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        CWARN("10g: print the catalog entries.. we expect 1\n");
        cat_counter = 0;
-       rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+       rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10",
+                                     0, 0, false);
        if (rc) {
                CERROR("10g: process with cat_print_cb failed: %d\n", rc);
                GOTO(out, rc);
@@ -1927,6 +2022,64 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        CWARN("10g: llh_cat_idx has also successfully wrapped!\n");
 
+       /*
+        * catalog has only one valid entry other slots has outdated
+        * records. Trying to race the llog_thread_process with llog_add
+        * llog_thread_process read buffer and loop record on it.
+        * llog_add adds a record and mark a record in bitmap.
+        * llog_thread_process process record with old data.
+        */
+       {
+       struct llog_process_info lpi;
+       struct lu_fid test_fid = {0};
+
+       lpi.lpi_loghandle = cath;
+       lpi.lpi_cb = cat_check_old_cb;
+       lpi.lpi_catdata = NULL;
+       lpi.lpi_cbdata = &test_fid;
+       init_completion(&lpi.lpi_completion);
+
+       kthread_run(llog_test_process_thread, &lpi, "llog_test_process_thread");
+
+       msleep(1 * MSEC_PER_SEC / 2);
+       enospc = 0;
+       eok = 0;
+       CWARN("10h: write %d more log records\n", llog_test_recnum);
+       for (i = 0; i < llog_test_recnum; i++) {
+               rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
+               if (rc && rc != -ENOSPC) {
+                       CERROR("10h: write %d records failed at #%d: %d\n",
+                              llog_test_recnum, i + 1, rc);
+                       GOTO(out, rc);
+               }
+               /*
+                * after last added plain LLOG has filled up, all new
+                * records add should fail with -ENOSPC
+                */
+               if (rc == -ENOSPC) {
+                       enospc++;
+               } else {
+                       enospc = 0;
+                       eok++;
+               }
+       }
+
+       if ((enospc == 0) && (enospc+eok != llog_test_recnum)) {
+               CERROR("10h: all last records adds should have failed with"
+                      " -ENOSPC\n");
+               GOTO(out, rc = -EINVAL);
+       }
+
+       CWARN("10h: wrote %d records then %d failed with ENOSPC\n", eok,
+             enospc);
+
+       wait_for_completion(&lpi.lpi_completion);
+
+       if (lpi.lpi_rc != 0) {
+               CERROR("10h: race happened, old record was processed\n");
+               GOTO(out, rc = -EINVAL);
+       }
+       }
 out:
        cfs_fail_loc = 0;
        cfs_fail_val = 0;
@@ -1943,15 +2096,17 @@ ctxt_release:
        RETURN(rc);
 }
 
-/* -------------------------------------------------------------------------
+/*
+ * -------------------------------------------------------------------------
  * Tests above, boring obd functions below
- * ------------------------------------------------------------------------- */
+ * -------------------------------------------------------------------------
+ */
 static int llog_run_tests(const struct lu_env *env, struct obd_device *obd)
 {
-       struct llog_handle      *llh = NULL;
-       struct llog_ctxt        *ctxt;
-       int                      rc, err;
-       char                     name[10];
+       struct llog_handle *llh = NULL;
+       struct llog_ctxt *ctxt;
+       int rc, err;
+       char name[10];
 
        ENTRY;
        ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
@@ -2013,9 +2168,9 @@ cleanup_ctxt:
 
 static int llog_test_cleanup(struct obd_device *obd)
 {
-       struct obd_device       *tgt;
-       struct lu_env            env;
-       int                      rc;
+       struct obd_device *tgt;
+       struct lu_env env;
+       int rc;
 
        ENTRY;
 
@@ -2033,32 +2188,32 @@ static int llog_test_cleanup(struct obd_device *obd)
 
 static int llog_test_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
-       struct obd_device       *tgt;
-       struct llog_ctxt        *ctxt;
-       struct dt_object        *o;
-       struct lu_env            env;
-       struct lu_context        test_session;
-       int                      rc;
-
-        ENTRY;
-
-        if (lcfg->lcfg_bufcount < 2) {
-                CERROR("requires a TARGET OBD name\n");
-                RETURN(-EINVAL);
-        }
+       struct obd_device *tgt;
+       struct llog_ctxt *ctxt;
+       struct dt_object *o;
+       struct lu_env env;
+       struct lu_context test_session;
+       int rc;
 
-        if (lcfg->lcfg_buflens[1] < 1) {
-                CERROR("requires a TARGET OBD name\n");
-                RETURN(-EINVAL);
-        }
+       ENTRY;
 
-        /* disk obd */
-        tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
-        if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
-                CERROR("target device not attached or not set up (%s)\n",
-                       lustre_cfg_string(lcfg, 1));
-                RETURN(-EINVAL);
-        }
+       if (lcfg->lcfg_bufcount < 2) {
+               CERROR("requires a TARGET OBD name\n");
+               RETURN(-EINVAL);
+       }
+
+       if (lcfg->lcfg_buflens[1] < 1) {
+               CERROR("requires a TARGET OBD name\n");
+               RETURN(-EINVAL);
+       }
+
+       /* disk obd */
+       tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
+       if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
+               CERROR("target device not attached or not set up (%s)\n",
+                       lustre_cfg_string(lcfg, 1));
+               RETURN(-EINVAL);
+       }
 
        rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
        if (rc)
@@ -2107,14 +2262,14 @@ cleanup_env:
 }
 
 static struct obd_ops llog_obd_ops = {
-        .o_owner       = THIS_MODULE,
-        .o_setup       = llog_test_setup,
-        .o_cleanup     = llog_test_cleanup,
+       .o_owner       = THIS_MODULE,
+       .o_setup       = llog_test_setup,
+       .o_cleanup     = llog_test_cleanup,
 };
 
 static int __init llog_test_init(void)
 {
-       return class_register_type(&llog_obd_ops, NULL, true, NULL,
+       return class_register_type(&llog_obd_ops, NULL, false, NULL,
                                   "llog_test", NULL);
 }