Whamcloud - gitweb
LU-11591 llog: add synchronization for the last record 83/33683/6
authorAlexander Boyko <c17825@cray.com>
Thu, 29 Nov 2018 13:58:30 +0000 (08:58 -0500)
committerOleg Drokin <green@whamcloud.com>
Mon, 18 Feb 2019 06:37:30 +0000 (06:37 +0000)
The initial problem was a race between llog_process_thread
and llog_osd_write_rec for a last record with lgh_last_idx.
The catalog should be wrapped for the problem. The lgh_last_idx
could be increased with a modification of llog bitmap, and a writing
record happen a bit later. When llog_process_thread processing
lgh_last_idx after modification and before a write it operates
with old record data.

The patch adds synchronization when lgh_last_idx is processed.

The patch changes llog_test 10h to check race between
llog_process_thread and llog_osd_write_rec.

1 Thread with write                  2 Thread with read
llog_osd_write_rec()                llog_process_thread()
lgh_last_idx++
lock lgh_hdr_mutex
ext2_set_bit()
dt_write_rec (write header)         ext2_test_bit()
         check lgh_last_idx was changed
         dt_read_rec()
         reread the record, and here we
         got the old value of record
unlock lgh_hdr_mutex
dt_write_rec (write the record)

Signed-off-by: Alexander Boyko <c17825@cray.com>
Cray-bug-id: LUS-6683
Change-Id: I642b488655940b9456ca8e2f2174c98a966ba242
Reviewed-on: https://review.whamcloud.com/33683
Tested-by: Jenkins
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Alexander Zarochentsev <c17826@cray.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_log.h
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_osd.c
lustre/obdclass/llog_test.c
lustre/tests/sanity.sh

index aa0a0b5..d47d990 100644 (file)
@@ -268,6 +268,7 @@ struct llog_handle {
         * case, after it will have reached LLOG_HDR_BITMAP_SIZE, llh_cat_idx
         * will become its upper limit */
        int                      lgh_last_idx;
         * case, after it will have reached LLOG_HDR_BITMAP_SIZE, llh_cat_idx
         * will become its upper limit */
        int                      lgh_last_idx;
+       struct rw_semaphore      lgh_last_sem;
        __u64                    lgh_cur_offset; /* used for test only */
        struct llog_ctxt        *lgh_ctxt;
        union {
        __u64                    lgh_cur_offset; /* used for test only */
        struct llog_ctxt        *lgh_ctxt;
        union {
index 9046c65..6ecc0a8 100644 (file)
@@ -64,6 +64,7 @@ static struct llog_handle *llog_alloc_handle(void)
 
        init_rwsem(&loghandle->lgh_lock);
        mutex_init(&loghandle->lgh_hdr_mutex);
 
        init_rwsem(&loghandle->lgh_lock);
        mutex_init(&loghandle->lgh_hdr_mutex);
+       init_rwsem(&loghandle->lgh_last_sem);
        INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
        atomic_set(&loghandle->lgh_refcount, 1);
 
        INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
        atomic_set(&loghandle->lgh_refcount, 1);
 
@@ -477,6 +478,7 @@ static int llog_process_thread(void *arg)
                unsigned int buf_offset = 0;
                bool partial_chunk;
                int     lh_last_idx;
                unsigned int buf_offset = 0;
                bool partial_chunk;
                int     lh_last_idx;
+               int     synced_idx = 0;
 
                /* skip records not set in bitmap */
                while (index <= last_index &&
 
                /* skip records not set in bitmap */
                while (index <= last_index &&
@@ -494,7 +496,8 @@ repeat:
                /* get the buf with our target record; avoid old garbage */
                memset(buf, 0, chunk_size);
                /* the record index for outdated chunk data */
                /* get the buf with our target record; avoid old garbage */
                memset(buf, 0, chunk_size);
                /* the record index for outdated chunk data */
-               lh_last_idx = loghandle->lgh_last_idx + 1;
+               /* it is safe to process buffer until saved lgh_last_idx */
+               lh_last_idx = LLOG_HDR_TAIL(llh)->lrt_index;
                rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
                                     index, &cur_offset, buf, chunk_size);
                if (repeated && rc)
                rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
                                     index, &cur_offset, buf, chunk_size);
                if (repeated && rc)
@@ -538,40 +541,42 @@ repeat:
                        CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
                               rec->lrh_type, rec->lrh_index);
 
                        CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
                               rec->lrh_type, rec->lrh_index);
 
-                       /* for partial chunk the end of it is zeroed, check
-                        * for index 0 to distinguish it. */
-                       if (partial_chunk && rec->lrh_index == 0) {
-                               /* concurrent llog_add() might add new records
-                                * while llog_processing, check this is not
-                                * the case and re-read the current chunk
-                                * otherwise. */
-                               int records;
-                               /* lgh_last_idx could be less then index
-                                * for catalog, if catalog is wrapped */
-                               if ((index > loghandle->lgh_last_idx &&
-                                   !(loghandle->lgh_hdr->llh_flags &
-                                     LLOG_F_IS_CAT)) || repeated ||
-                                   (loghandle->lgh_obj != NULL &&
-                                    dt_object_remote(loghandle->lgh_obj)))
-                                       GOTO(out, rc = 0);
-                               /* <2 records means no more records
-                                * if the last record we processed was
-                                * the final one, then the underlying
-                                * object might have been destroyed yet.
-                                * we better don't access that.. */
-                               mutex_lock(&loghandle->lgh_hdr_mutex);
-                               records = loghandle->lgh_hdr->llh_count;
-                               mutex_unlock(&loghandle->lgh_hdr_mutex);
-                               if (records <= 1)
-                                       GOTO(out, rc = 0);
-                               CDEBUG(D_OTHER, "Re-read last llog buffer for "
-                                      "new records, index %u, last %u\n",
-                                      index, loghandle->lgh_last_idx);
+                       if (index == (synced_idx + 1) &&
+                           synced_idx == LLOG_HDR_TAIL(llh)->lrt_index)
+                               GOTO(out, rc = 0);
+
+                       /* the bitmap could be changed during processing
+                        * records from the chunk. For wrapped catalog
+                        * it means we can read deleted record and try to
+                        * process it. Check this case and reread the chunk.
+                        * It is safe to process to lh_last_idx, including
+                        * lh_last_idx if it was synced. We can not do <=
+                        * comparison, cause for wrapped catalog lgh_last_idx
+                        * could be less than index. So we detect last index
+                        * for processing as index == lh_last_idx+1. But when
+                        * catalog is wrapped and full lgh_last_idx=llh_cat_idx,
+                        * the first processing index is llh_cat_idx+1.
+                        */
+
+                       if ((index == lh_last_idx && synced_idx != index) ||
+                           (index == (lh_last_idx + 1) &&
+                            !(index == (llh->llh_cat_idx + 1) &&
+                              (llh->llh_flags & LLOG_F_IS_CAT))) ||
+                           (rec->lrh_index == 0 && !repeated)) {
+
                                /* save offset inside buffer for the re-read */
                                buf_offset = (char *)rec - (char *)buf;
                                cur_offset = chunk_offset;
                                repeated = true;
                                /* save offset inside buffer for the re-read */
                                buf_offset = (char *)rec - (char *)buf;
                                cur_offset = chunk_offset;
                                repeated = true;
+                               /* We need to be sure lgh_last_idx
+                                * record was saved to disk
+                                */
+                               down_read(&loghandle->lgh_last_sem);
+                               synced_idx = LLOG_HDR_TAIL(llh)->lrt_index;
+                               up_read(&loghandle->lgh_last_sem);
+                               CDEBUG(D_OTHER, "synced_idx: %d\n", synced_idx);
                                goto repeat;
                                goto repeat;
+
                        }
 
                        repeated = false;
                        }
 
                        repeated = false;
@@ -611,30 +616,24 @@ repeat:
                        loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
                                                    chunk_offset;
 
                        loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
                                                    chunk_offset;
 
-                       OBD_FAIL_TIMEOUT(OBD_FAIL_LLOG_PROCESS_TIMEOUT, 2);
-
+                       if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) &&
+                               index == lh_last_idx &&
+                               cfs_fail_val == (unsigned int)
+                                       (loghandle->lgh_id.lgl_oi.oi.oi_id &
+                                        0xFFFFFFFF)) {
+                               OBD_RACE(OBD_FAIL_LLOG_PROCESS_TIMEOUT);
+                       }
                        /* if set, process the callback on this record */
                        if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
                                struct llog_cookie *lgc;
                                __u64   tmp_off;
                                int     tmp_idx;
                        /* if set, process the callback on this record */
                        if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
                                struct llog_cookie *lgc;
                                __u64   tmp_off;
                                int     tmp_idx;
-                       /* the bitmap could be changed during processing
-                        * records from the chunk. For wrapped catalog
-                        * it means we can read deleted record and try to
-                        * process it. Check this case and reread the chunk.
-                        * Checking the race with llog_add the bit is set
-                        * after incrementation of lgh_last_idx */
-                               if (index == lh_last_idx &&
-                                   lh_last_idx !=
-                                   (loghandle->lgh_last_idx + 1)) {
-                                       /* save offset inside buffer for
-                                        *  the re-read */
-                                       buf_offset = (char *)rec - (char *)buf;
-                                       cur_offset = chunk_offset;
-                                       repeated = true;
-                                       goto repeat;
 
 
-                               }
+                               CDEBUG(D_OTHER, "index: %d, lh_last_idx: %d "
+                                      "synced_idx: %d lgh_last_idx: %d\n",
+                                      index, lh_last_idx, synced_idx,
+                                      loghandle->lgh_last_idx);
+
                                if (lti != NULL) {
                                        lgc = &lti->lgi_cookie;
                                        /* store lu_env for recursive calls */
                                if (lti != NULL) {
                                        lgc = &lti->lgi_cookie;
                                        /* store lu_env for recursive calls */
index f939e8b..56f8f11 100644 (file)
@@ -880,7 +880,10 @@ int llog_cat_process_or_fork(const struct lu_env *env,
                        /* processing the catalog part at the end */
                        cd.lpcd_first_idx = (startcat ? startcat :
                                             llh->llh_cat_idx);
                        /* processing the catalog part at the end */
                        cd.lpcd_first_idx = (startcat ? startcat :
                                             llh->llh_cat_idx);
-                       cd.lpcd_last_idx = 0;
+                       if (OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS))
+                               cd.lpcd_last_idx = cfs_fail_val;
+                       else
+                               cd.lpcd_last_idx = 0;
                        rc = llog_process_or_fork(env, cat_llh, cat_cb,
                                                  &d, &cd, fork);
                        /* Reset the startcat becasue it has already reached
                        rc = llog_process_or_fork(env, cat_llh, cat_cb,
                                                  &d, &cd, fork);
                        /* Reset the startcat becasue it has already reached
@@ -907,6 +910,7 @@ int llog_cat_process_or_fork(const struct lu_env *env,
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
+EXPORT_SYMBOL(llog_cat_process_or_fork);
 
 int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
                     llog_cb_t cb, void *data, int startcat, int startidx)
 
 int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
                     llog_cb_t cb, void *data, int startcat, int startidx)
index 9511e00..fc0b16f 100644 (file)
@@ -44,6 +44,8 @@
 
 #define DEBUG_SUBSYSTEM S_LOG
 
 
 #define DEBUG_SUBSYSTEM S_LOG
 
+#include <linux/delay.h>
+
 #include <dt_object.h>
 #include <llog_swab.h>
 #include <lustre_fid.h>
 #include <dt_object.h>
 #include <llog_swab.h>
 #include <lustre_fid.h>
@@ -590,6 +592,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
                        RETURN(-ENOSPC);
        }
 
                        RETURN(-ENOSPC);
        }
 
+       down_write(&loghandle->lgh_last_sem);
        /* increment the last_idx along with llh_tail index, they should
         * be equal for a llog lifetime */
        loghandle->lgh_last_idx++;
        /* increment the last_idx along with llh_tail index, they should
         * be equal for a llog lifetime */
        loghandle->lgh_last_idx++;
@@ -673,6 +676,12 @@ out_unlock:
        if (rc)
                GOTO(out, rc);
 
        if (rc)
                GOTO(out, rc);
 
+       if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) &&
+          cfs_fail_val == (unsigned int)(loghandle->lgh_id.lgl_oi.oi.oi_id &
+                                         0xFFFFFFFF)) {
+               OBD_RACE(OBD_FAIL_LLOG_PROCESS_TIMEOUT);
+               msleep(1 * MSEC_PER_SEC);
+       }
        /* computed index can be used to determine offset for fixed-size
         * records. This also allows to handle Catalog wrap around case */
        if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
        /* computed index can be used to determine offset for fixed-size
         * records. This also allows to handle Catalog wrap around case */
        if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
@@ -693,6 +702,8 @@ out_unlock:
        if (rc < 0)
                GOTO(out, rc);
 
        if (rc < 0)
                GOTO(out, rc);
 
+       up_write(&loghandle->lgh_last_sem);
+
        CDEBUG(D_HA, "added record "DFID".%u, %u off%llu\n",
               PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len,
               lgi->lgi_off);
        CDEBUG(D_HA, "added record "DFID".%u, %u off%llu\n",
               PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len,
               lgi->lgi_off);
@@ -726,6 +737,7 @@ out:
        }
 
        LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
        }
 
        LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
+       up_write(&loghandle->lgh_last_sem);
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
index 4fd4b1a..6b2628f 100644 (file)
@@ -49,8 +49,7 @@
 /* This is slightly more than the number of records that can fit into a
  * single llog file, because the llog_log_header takes up some of the
  * space in the first block that cannot be used for the bitmap. */
 /* This is slightly more than the number of records that can fit into a
  * single llog file, because the llog_log_header takes up some of the
  * space in the first block that cannot be used for the bitmap. */
-#define LLOG_TEST_RECNUM  (LLOG_MIN_CHUNK_SIZE * 8)
-
+static int llog_test_recnum = (LLOG_MIN_CHUNK_SIZE * 8);
 static int llog_test_rand;
 static struct obd_uuid uuid = { .uuid = "test_uuid" };
 static struct llog_logid cat_logid;
 static int llog_test_rand;
 static struct obd_uuid uuid = { .uuid = "test_uuid" };
 static struct llog_logid cat_logid;
@@ -576,7 +575,7 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
 /* Test catalogue additions */
 static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
 {
 /* Test catalogue additions */
 static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
 {
-       struct llog_handle *cath;
+       struct llog_handle *cath, *llh;
        char name[10];
        int rc, rc2, i, buflen;
        struct llog_mini_rec lmr;
        char name[10];
        int rc, rc2, i, buflen;
        struct llog_mini_rec lmr;
@@ -625,6 +624,18 @@ static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
        if (rc)
                GOTO(out, rc);
 
        if (rc)
                GOTO(out, rc);
 
+       /* estimate the max number of record for the plain llog
+        * cause it depends on disk size
+        */
+       llh = cath->u.chd.chd_current_log;
+       if (llh->lgh_max_size != 0) {
+               llog_test_recnum = (llh->lgh_max_size -
+                       sizeof(struct llog_log_hdr)) / LLOG_MIN_REC_SIZE;
+       }
+
+       if (llog_test_recnum >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr))
+               llog_test_recnum = LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1;
+
        CWARN("4c: cancel 1 log record\n");
        rc = llog_cat_cancel_records(env, cath, 1, &cookie);
        if (rc) {
        CWARN("4c: cancel 1 log record\n");
        rc = llog_cat_cancel_records(env, cath, 1, &cookie);
        if (rc) {
@@ -637,12 +648,12 @@ static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
        if (rc)
                GOTO(out, rc);
 
        if (rc)
                GOTO(out, rc);
 
-       CWARN("4d: write %d more log records\n", LLOG_TEST_RECNUM);
-       for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+       CWARN("4d: write %d more log records\n", llog_test_recnum);
+       for (i = 0; i < llog_test_recnum; i++) {
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc) {
                        CERROR("4d: write %d records failed at #%d: %d\n",
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc) {
                        CERROR("4d: write %d records failed at #%d: %d\n",
-                              LLOG_TEST_RECNUM, i + 1, rc);
+                              llog_test_recnum, i + 1, rc);
                        GOTO(out, rc);
                }
                num_recs++;
                        GOTO(out, rc);
                }
                num_recs++;
@@ -749,7 +760,7 @@ static int llog_cancel_rec_cb(const struct lu_env *env,
 
        llog_cat_cancel_records(env, llh->u.phd.phd_cat_handle, 1, &cookie);
        cancel_count++;
 
        llog_cat_cancel_records(env, llh->u.phd.phd_cat_handle, 1, &cookie);
        cancel_count++;
-       if (cancel_count == LLOG_TEST_RECNUM)
+       if (cancel_count == llog_test_recnum)
                RETURN(-LLOG_EEMPTY);
        RETURN(0);
 }
                RETURN(-LLOG_EEMPTY);
        RETURN(0);
 }
@@ -796,7 +807,7 @@ static int llog_test_5(const struct lu_env *env, struct obd_device *obd)
                GOTO(out, rc = -EINVAL);
        }
 
                GOTO(out, rc = -EINVAL);
        }
 
-       CWARN("5c: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       CWARN("5c: Cancel %d records, see one log zapped\n", llog_test_recnum);
        cancel_count = 0;
        rc = llog_cat_process(env, llh, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
        cancel_count = 0;
        rc = llog_cat_process(env, llh, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
@@ -1168,61 +1179,6 @@ out:
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-static int llog_truncate(const struct lu_env *env, struct dt_object *o)
-{
-       struct lu_attr la;
-       struct thandle *th;
-       struct dt_device *d;
-       int rc;
-       ENTRY;
-
-       LASSERT(o);
-       d = lu2dt_dev(o->do_lu.lo_dev);
-       LASSERT(d);
-
-       rc = dt_attr_get(env, o, &la);
-       if (rc)
-               RETURN(rc);
-
-       CDEBUG(D_OTHER, "original size %llu\n", la.la_size);
-       rc = sizeof(struct llog_log_hdr) + sizeof(struct llog_mini_rec);
-       if (la.la_size < rc) {
-               CERROR("too small llog: %llu\n", la.la_size);
-               RETURN(0);
-       }
-
-       /* drop 2 records */
-       la.la_size = la.la_size - (sizeof(struct llog_mini_rec) * 2);
-       la.la_valid = LA_SIZE;
-
-       th = dt_trans_create(env, d);
-       if (IS_ERR(th))
-               RETURN(PTR_ERR(th));
-
-       rc = dt_declare_attr_set(env, o, &la, th);
-       if (rc)
-               GOTO(stop, rc);
-
-       rc = dt_declare_punch(env, o, la.la_size, OBD_OBJECT_EOF, th);
-
-       rc = dt_trans_start_local(env, d, th);
-       if (rc)
-               GOTO(stop, rc);
-
-       rc = dt_punch(env, o, la.la_size, OBD_OBJECT_EOF, th);
-       if (rc)
-               GOTO(stop, rc);
-
-       rc = dt_attr_set(env, o, &la, th);
-       if (rc)
-               GOTO(stop, rc);
-
-stop:
-       dt_trans_stop(env, d, th);
-
-       RETURN(rc);
-}
-
 static int test_8_cb(const struct lu_env *env, struct llog_handle *llh,
                          struct llog_rec_hdr *rec, void *data)
 {
 static int test_8_cb(const struct lu_env *env, struct llog_handle *llh,
                          struct llog_rec_hdr *rec, void *data)
 {
@@ -1322,8 +1278,10 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
                GOTO(out_put, rc);
        }
 
                GOTO(out_put, rc);
        }
 
-       CWARN("8c: drop two records from the first plain llog\n");
-       llog_truncate(env, obj);
+       /* Here was 8c: drop two records from the first plain llog
+        * llog_truncate was bad idea cause it creates a wrong state,
+        * lgh_last_idx is wrong and two records belongs to zeroed buffer
+        */
 
        CWARN("8d: count survived records\n");
        rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
 
        CWARN("8d: count survived records\n");
        rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
@@ -1345,9 +1303,9 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
                GOTO(out, rc);
        }
 
                GOTO(out, rc);
        }
 
-       if (orig_counter + 200 - 2 != plain_counter) {
+       if (orig_counter + 200 != plain_counter) {
                CERROR("found %d records (expected %d)\n", plain_counter,
                CERROR("found %d records (expected %d)\n", plain_counter,
-                      orig_counter + 200 - 2);
+                      orig_counter + 200);
                rc = -EIO;
        }
 
                rc = -EIO;
        }
 
@@ -1479,13 +1437,17 @@ struct llog_process_info {
 static int llog_test_process_thread(void *arg)
 {
        struct llog_process_info *lpi = arg;
 static int llog_test_process_thread(void *arg)
 {
        struct llog_process_info *lpi = arg;
+       int rc;
 
 
-       llog_process(NULL, lpi->lpi_loghandle, lpi->lpi_cb, lpi->lpi_cbdata,
-                    lpi->lpi_catdata);
+       rc = llog_cat_process_or_fork(NULL, lpi->lpi_loghandle, lpi->lpi_cb,
+                                     NULL, lpi->lpi_cbdata, 1, 0, true);
 
        complete(&lpi->lpi_completion);
 
 
        complete(&lpi->lpi_completion);
 
-       return 0;
+       lpi->lpi_rc = rc;
+       if (rc)
+               CWARN("10h: Error during catalog processing %d\n", rc);
+       return rc;
 }
 
 static int cat_check_old_cb(const struct lu_env *env, struct llog_handle *llh,
 }
 
 static int cat_check_old_cb(const struct lu_env *env, struct llog_handle *llh,
@@ -1509,12 +1471,14 @@ static int cat_check_old_cb(const struct lu_env *env, struct llog_handle *llh,
        if (prev_fid->f_oid > fid.f_oid) {
                CWARN("processing old record, fail\n");
                prev_fid->f_oid = 0xbad;
        if (prev_fid->f_oid > fid.f_oid) {
                CWARN("processing old record, fail\n");
                prev_fid->f_oid = 0xbad;
-               RETURN(LLOG_PROC_BREAK);
+               RETURN(-LLOG_EEMPTY);
        }
 
        }
 
-       if (prev_fid->f_oid == 0)
-               cfs_fail_loc = OBD_FAIL_LLOG_PROCESS_TIMEOUT;
-
+       if (prev_fid->f_oid == 0) {
+               cfs_fail_loc = OBD_FAIL_ONCE | OBD_FAIL_LLOG_PROCESS_TIMEOUT;
+               cfs_fail_val = (unsigned int) (llh->lgh_id.lgl_oi.oi.oi_id &
+                                              0xFFFFFFFF);
+       }
        *prev_fid = fid;
 
        RETURN(0);
        *prev_fid = fid;
 
        RETURN(0);
@@ -1571,12 +1535,12 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        cfs_fail_loc = CFS_FAIL_SKIP|OBD_FAIL_CAT_RECORDS;
        cfs_fail_val = 4;
 
        cfs_fail_loc = CFS_FAIL_SKIP|OBD_FAIL_CAT_RECORDS;
        cfs_fail_val = 4;
 
-       CWARN("10b: write %d log records\n", LLOG_TEST_RECNUM);
-       for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+       CWARN("10b: write %d log records\n", llog_test_recnum);
+       for (i = 0; i < llog_test_recnum; i++) {
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc) {
                        CERROR("10b: write %d records failed at #%d: %d\n",
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc) {
                        CERROR("10b: write %d records failed at #%d: %d\n",
-                              LLOG_TEST_RECNUM, i + 1, rc);
+                              llog_test_recnum, i + 1, rc);
                        GOTO(out, rc);
                }
        }
                        GOTO(out, rc);
                }
        }
@@ -1597,12 +1561,12 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
                GOTO(out, rc);
        }
 
                GOTO(out, rc);
        }
 
-       CWARN("10c: write %d more log records\n", 2 * LLOG_TEST_RECNUM);
-       for (i = 0; i < 2 * LLOG_TEST_RECNUM; i++) {
+       CWARN("10c: write %d more log records\n", 2 * llog_test_recnum);
+       for (i = 0; i < 2 * llog_test_recnum; i++) {
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc) {
                        CERROR("10c: write %d records failed at #%d: %d\n",
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc) {
                        CERROR("10c: write %d records failed at #%d: %d\n",
-                              2*LLOG_TEST_RECNUM, i + 1, rc);
+                              2*llog_test_recnum, i + 1, rc);
                        GOTO(out, rc);
                }
        }
                        GOTO(out, rc);
                }
        }
@@ -1629,12 +1593,12 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
         */
        enospc = 0;
        eok = 0;
         */
        enospc = 0;
        eok = 0;
-       CWARN("10c: write %d more log records\n", LLOG_TEST_RECNUM);
-       for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+       CWARN("10c: write %d more log records\n", llog_test_recnum);
+       for (i = 0; i < llog_test_recnum; i++) {
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc && rc != -ENOSPC) {
                        CERROR("10c: write %d records failed at #%d: %d\n",
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc && rc != -ENOSPC) {
                        CERROR("10c: write %d records failed at #%d: %d\n",
-                              LLOG_TEST_RECNUM, i + 1, rc);
+                              llog_test_recnum, i + 1, rc);
                        GOTO(out, rc);
                }
                /*
                        GOTO(out, rc);
                }
                /*
@@ -1649,7 +1613,7 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
                }
        }
 
                }
        }
 
-       if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
+       if ((enospc == 0) && (enospc+eok != llog_test_recnum)) {
                CERROR("10c: all last records adds should have failed with"
                       " -ENOSPC\n");
                GOTO(out, rc = -EINVAL);
                CERROR("10c: all last records adds should have failed with"
                       " -ENOSPC\n");
                GOTO(out, rc = -EINVAL);
@@ -1675,13 +1639,13 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
         * cancel all 1st plain llog records to empty it, this will also cause
         * its catalog entry to be freed for next forced wrap in 10e
         */
         * cancel all 1st plain llog records to empty it, this will also cause
         * its catalog entry to be freed for next forced wrap in 10e
         */
-       CWARN("10d: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       CWARN("10d: Cancel %d records, see one log zapped\n", llog_test_recnum);
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10d: process with llog_cancel_rec_cb failed: %d\n", rc);
                /*
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10d: process with llog_cancel_rec_cb failed: %d\n", rc);
                /*
-                * need to indicate error if for any reason LLOG_TEST_RECNUM is
+                * need to indicate error if for any reason llog_test_recnum is
                 * not reached
                 */
                if (rc == 0)
                 * not reached
                 */
                if (rc == 0)
@@ -1719,12 +1683,12 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        enospc = 0;
        eok = 0;
 
        enospc = 0;
        eok = 0;
-       CWARN("10e: write %d more log records\n", LLOG_TEST_RECNUM);
-       for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+       CWARN("10e: write %d more log records\n", llog_test_recnum);
+       for (i = 0; i < llog_test_recnum; i++) {
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc && rc != -ENOSPC) {
                        CERROR("10e: write %d records failed at #%d: %d\n",
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc && rc != -ENOSPC) {
                        CERROR("10e: write %d records failed at #%d: %d\n",
-                              LLOG_TEST_RECNUM, i + 1, rc);
+                              llog_test_recnum, i + 1, rc);
                        GOTO(out, rc);
                }
                /*
                        GOTO(out, rc);
                }
                /*
@@ -1739,7 +1703,7 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
                }
        }
 
                }
        }
 
-       if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
+       if ((enospc == 0) && (enospc+eok != llog_test_recnum)) {
                CERROR("10e: all last records adds should have failed with"
                       " -ENOSPC\n");
                GOTO(out, rc = -EINVAL);
                CERROR("10e: all last records adds should have failed with"
                       " -ENOSPC\n");
                GOTO(out, rc = -EINVAL);
@@ -1750,13 +1714,14 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        CWARN("10e: print the catalog entries.. we expect 4\n");
        cat_counter = 0;
 
        CWARN("10e: print the catalog entries.. we expect 4\n");
        cat_counter = 0;
-       rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+       rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10",
+                                     0, 0, false);
        if (rc) {
        if (rc) {
-               CERROR("10d: process with cat_print_cb failed: %d\n", rc);
+               CERROR("10e: process with cat_print_cb failed: %d\n", rc);
                GOTO(out, rc);
        }
        if (cat_counter != 4) {
                GOTO(out, rc);
        }
        if (cat_counter != 4) {
-               CERROR("10d: %d entries in catalog\n", cat_counter);
+               CERROR("10e: %d entries in catalog\n", cat_counter);
                GOTO(out, rc = -EINVAL);
        }
 
                GOTO(out, rc = -EINVAL);
        }
 
@@ -1801,13 +1766,13 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
         * cancel more records to free one more slot in Catalog
         * see if it is re-allocated when adding more records
         */
         * cancel more records to free one more slot in Catalog
         * see if it is re-allocated when adding more records
         */
-       CWARN("10f: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       CWARN("10f: Cancel %d records, see one log zapped\n", llog_test_recnum);
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10f: process with llog_cancel_rec_cb failed: %d\n", rc);
                /*
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10f: process with llog_cancel_rec_cb failed: %d\n", rc);
                /*
-                * need to indicate error if for any reason LLOG_TEST_RECNUM is
+                * need to indicate error if for any reason llog_test_recnum is
                 * not reached
                 */
                if (rc == 0)
                 * not reached
                 */
                if (rc == 0)
@@ -1817,7 +1782,8 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        CWARN("10f: print the catalog entries.. we expect 3\n");
        cat_counter = 0;
 
        CWARN("10f: print the catalog entries.. we expect 3\n");
        cat_counter = 0;
-       rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+       rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10",
+                                     0, 0, false);
        if (rc) {
                CERROR("10f: process with cat_print_cb failed: %d\n", rc);
                GOTO(out, rc);
        if (rc) {
                CERROR("10f: process with cat_print_cb failed: %d\n", rc);
                GOTO(out, rc);
@@ -1845,12 +1811,12 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        enospc = 0;
        eok = 0;
 
        enospc = 0;
        eok = 0;
-       CWARN("10f: write %d more log records\n", LLOG_TEST_RECNUM);
-       for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+       CWARN("10f: write %d more log records\n", llog_test_recnum);
+       for (i = 0; i < llog_test_recnum; i++) {
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc && rc != -ENOSPC) {
                        CERROR("10f: write %d records failed at #%d: %d\n",
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc && rc != -ENOSPC) {
                        CERROR("10f: write %d records failed at #%d: %d\n",
-                              LLOG_TEST_RECNUM, i + 1, rc);
+                              llog_test_recnum, i + 1, rc);
                        GOTO(out, rc);
                }
                /*
                        GOTO(out, rc);
                }
                /*
@@ -1865,7 +1831,7 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
                }
        }
 
                }
        }
 
-       if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
+       if ((enospc == 0) && (enospc+eok != llog_test_recnum)) {
                CERROR("10f: all last records adds should have failed with"
                       " -ENOSPC\n");
                GOTO(out, rc = -EINVAL);
                CERROR("10f: all last records adds should have failed with"
                       " -ENOSPC\n");
                GOTO(out, rc = -EINVAL);
@@ -1919,12 +1885,12 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
         */
 
        /* cancel more records to free one more slot in Catalog */
         */
 
        /* cancel more records to free one more slot in Catalog */
-       CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       CWARN("10g: Cancel %d records, see one log zapped\n", llog_test_recnum);
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
-               /* need to indicate error if for any reason LLOG_TEST_RECNUM is
+               /* need to indicate error if for any reason llog_test_recnum is
                 * not reached */
                if (rc == 0)
                        rc = -ERANGE;
                 * not reached */
                if (rc == 0)
                        rc = -ERANGE;
@@ -1933,7 +1899,8 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        CWARN("10g: print the catalog entries.. we expect 3\n");
        cat_counter = 0;
 
        CWARN("10g: print the catalog entries.. we expect 3\n");
        cat_counter = 0;
-       rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+       rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10",
+                                     0, 0, false);
        if (rc) {
                CERROR("10g: process with cat_print_cb failed: %d\n", rc);
                GOTO(out, rc);
        if (rc) {
                CERROR("10g: process with cat_print_cb failed: %d\n", rc);
                GOTO(out, rc);
@@ -1960,13 +1927,13 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        }
 
        /* cancel more records to free one more slot in Catalog */
        }
 
        /* cancel more records to free one more slot in Catalog */
-       CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       CWARN("10g: Cancel %d records, see one log zapped\n", llog_test_recnum);
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
                /*
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
                /*
-                * need to indicate error if for any reason LLOG_TEST_RECNUM is
+                * need to indicate error if for any reason llog_test_recnum is
                 * not reached
                 */
                if (rc == 0)
                 * not reached
                 */
                if (rc == 0)
@@ -1976,7 +1943,8 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        CWARN("10g: print the catalog entries.. we expect 2\n");
        cat_counter = 0;
 
        CWARN("10g: print the catalog entries.. we expect 2\n");
        cat_counter = 0;
-       rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+       rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10",
+                                     0, 0, false);
        if (rc) {
                CERROR("10g: process with cat_print_cb failed: %d\n", rc);
                GOTO(out, rc);
        if (rc) {
                CERROR("10g: process with cat_print_cb failed: %d\n", rc);
                GOTO(out, rc);
@@ -2011,13 +1979,13 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        }
 
        /* cancel more records to free one more slot in Catalog */
        }
 
        /* cancel more records to free one more slot in Catalog */
-       CWARN("10g: Cancel %d records, see one log zapped\n", LLOG_TEST_RECNUM);
+       CWARN("10g: Cancel %d records, see one log zapped\n", llog_test_recnum);
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
                /*
        cancel_count = 0;
        rc = llog_cat_process(env, cath, llog_cancel_rec_cb, "foobar", 0, 0);
        if (rc != -LLOG_EEMPTY) {
                CERROR("10g: process with llog_cancel_rec_cb failed: %d\n", rc);
                /*
-                * need to indicate error if for any reason LLOG_TEST_RECNUM is
+                * need to indicate error if for any reason llog_test_recnum is
                 * not reached
                 */
                if (rc == 0)
                 * not reached
                 */
                if (rc == 0)
@@ -2027,7 +1995,8 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        CWARN("10g: print the catalog entries.. we expect 1\n");
        cat_counter = 0;
 
        CWARN("10g: print the catalog entries.. we expect 1\n");
        cat_counter = 0;
-       rc = llog_process(env, cath, cat_print_cb, "test 10", NULL);
+       rc = llog_cat_process_or_fork(env, cath, cat_print_cb, NULL, "test 10",
+                                     0, 0, false);
        if (rc) {
                CERROR("10g: process with cat_print_cb failed: %d\n", rc);
                GOTO(out, rc);
        if (rc) {
                CERROR("10g: process with cat_print_cb failed: %d\n", rc);
                GOTO(out, rc);
@@ -2071,15 +2040,14 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        kthread_run(llog_test_process_thread, &lpi, "llog_test_process_thread");
 
 
        kthread_run(llog_test_process_thread, &lpi, "llog_test_process_thread");
 
-       msleep(1 * MSEC_PER_SEC);
        enospc = 0;
        eok = 0;
        enospc = 0;
        eok = 0;
-       CWARN("10h: write %d more log records\n", LLOG_TEST_RECNUM);
-       for (i = 0; i < LLOG_TEST_RECNUM; i++) {
+       CWARN("10h: write %d more log records\n", llog_test_recnum);
+       for (i = 0; i < llog_test_recnum; i++) {
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc && rc != -ENOSPC) {
                        CERROR("10h: write %d records failed at #%d: %d\n",
                rc = llog_cat_add(env, cath, &lmr.lmr_hdr, NULL);
                if (rc && rc != -ENOSPC) {
                        CERROR("10h: write %d records failed at #%d: %d\n",
-                              LLOG_TEST_RECNUM, i + 1, rc);
+                              llog_test_recnum, i + 1, rc);
                        GOTO(out, rc);
                }
                /*
                        GOTO(out, rc);
                }
                /*
@@ -2094,7 +2062,7 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
                }
        }
 
                }
        }
 
-       if ((enospc == 0) && (enospc+eok != LLOG_TEST_RECNUM)) {
+       if ((enospc == 0) && (enospc+eok != llog_test_recnum)) {
                CERROR("10h: all last records adds should have failed with"
                       " -ENOSPC\n");
                GOTO(out, rc = -EINVAL);
                CERROR("10h: all last records adds should have failed with"
                       " -ENOSPC\n");
                GOTO(out, rc = -EINVAL);
@@ -2105,7 +2073,7 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
 
        wait_for_completion(&lpi.lpi_completion);
 
 
        wait_for_completion(&lpi.lpi_completion);
 
-       if (test_fid.f_oid == 0xbad) {
+       if (lpi.lpi_rc != 0) {
                CERROR("10h: race happened, old record was processed\n");
                GOTO(out, rc = -EINVAL);
        }
                CERROR("10h: race happened, old record was processed\n");
                GOTO(out, rc = -EINVAL);
        }
index df7f53f..1e0196c 100755 (executable)
@@ -6157,7 +6157,7 @@ test_60a() {
                        skip_env "missing subtest run-llog.sh"
 
        log "$TEST60_HEAD - from kernel mode"
                        skip_env "missing subtest run-llog.sh"
 
        log "$TEST60_HEAD - from kernel mode"
-       do_facet mgs "$LCTL set_param debug=warning; $LCTL dk > /dev/null"
+       do_facet mgs "$LCTL dk > /dev/null"
        do_facet mgs "bash run-llog.sh" || error "run-llog.sh failed"
        do_facet mgs $LCTL dk > $TMP/$tfile
 
        do_facet mgs "bash run-llog.sh" || error "run-llog.sh failed"
        do_facet mgs $LCTL dk > $TMP/$tfile