Whamcloud - gitweb
LU-6714 llog: test plain llog more throughly 35/15835/6
authorMikhail Pershin <mike.pershin@intel.com>
Wed, 24 Jun 2015 06:49:38 +0000 (09:49 +0300)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 18 Aug 2015 11:31:09 +0000 (11:31 +0000)
Modify llog test 3 to check more aspects of its
functionality:
- search the record by index
- check all records are processed
- check modification in-place
- check that processing is seeing concurrently added
  new records
- check the correctness of lgh_cur_offset and lgh_cur_idx

Signed-off-by: Mikhail Pershin <mike.pershin@intel.com>
Change-Id: I11a7c6474a7420393987a964e9112d61263bdc81
Reviewed-on: http://review.whamcloud.com/15835
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/obdclass/llog_osd.c
lustre/obdclass/llog_test.c

index b820139..519fd27 100644 (file)
@@ -637,7 +637,7 @@ static inline void llog_skip_over(struct llog_log_hdr *llh, __u64 *off,
        if (goal > curr) {
                if (llh->llh_size == 0) {
                        /* variable size records */
-                       *off = (*off + (goal - curr - 1) * LLOG_MIN_REC_SIZE);
+                       *off = *off + (goal - curr - 1) * LLOG_MIN_REC_SIZE;
                } else {
                        *off = chunk_size + (goal - 1) * llh->llh_size;
                }
index 12b6383..259c83b 100644 (file)
@@ -216,74 +216,287 @@ out_put:
        RETURN(rc);
 }
 
-/* Test record writing, single and in bulk */
+static int records;
+static off_t rec_offset;
+static int paddings;
+static int start_idx;
+
+/*
+ * Test 3 callback.
+ * - check lgh_cur_offset correctness
+ * - check record index consistency
+ * - modify each record in-place
+ * - add new record during *last_idx processing
+ */
+static int test3_check_n_add_cb(const struct lu_env *env,
+                               struct llog_handle *lgh,
+                               struct llog_rec_hdr *rec, void *data)
+{
+       struct llog_gen_rec *lgr = (struct llog_gen_rec *)rec;
+       int *last_rec = data;
+       int rc;
+
+       if (lgh->lgh_hdr->llh_size > 0) {
+               if (lgh->lgh_cur_offset != lgh->lgh_hdr->llh_hdr.lrh_len +
+                               (start_idx + records - 1) *
+                               lgh->lgh_hdr->llh_size)
+                       CERROR("Wrong record offset in cur_off: "LPU64", should"
+                              " be %u\n", lgh->lgh_cur_offset,
+                              lgh->lgh_hdr->llh_hdr.lrh_len +
+                              (start_idx + records - 1) *
+                              lgh->lgh_hdr->llh_size);
+       } else {
+               size_t chunk_size = lgh->lgh_hdr->llh_hdr.lrh_len;
+
+               /* For variable size records the start offset is unknown, trust
+                * the first value and check others are consistent with it. */
+               if (rec_offset == 0)
+                       rec_offset = lgh->lgh_cur_offset;
+
+               if (lgh->lgh_cur_offset != rec_offset) {
+                       /* there can be padding record */
+                       if ((lgh->lgh_cur_offset % chunk_size == 0) &&
+                           (lgh->lgh_cur_offset - rec_offset <
+                            rec->lrh_len + LLOG_MIN_REC_SIZE)) {
+                               rec_offset = lgh->lgh_cur_offset;
+                               paddings++;
+                       } else {
+                               CERROR("Wrong record offset in cur_off: "LPU64
+                                      ", should be %lld (rec len %u)\n",
+                                      lgh->lgh_cur_offset,
+                                      (long long)rec_offset, rec->lrh_len);
+                       }
+               }
+               rec_offset += rec->lrh_len;
+       }
+
+       if ((start_idx + records + paddings) != rec->lrh_index)
+               CERROR("Record with wrong index was read: %u, expected %u\n",
+                      rec->lrh_index, start_idx + records + paddings);
+
+       /* modify all records in place */
+       lgr->lgr_gen.conn_cnt = rec->lrh_index;
+       rc = llog_write(env, lgh, rec, rec->lrh_index);
+       if (rc < 0)
+               CERROR("cb_test_3: cannot modify record while processing\n");
+
+       /* Add new record to the llog at *last_rec position one by one to
+        * check that last block is re-read during processing */
+       if ((start_idx + records + paddings) == *last_rec ||
+           (start_idx + records + paddings) == (*last_rec + 1)) {
+               rc = llog_write(env, lgh, rec, LLOG_NEXT_IDX);
+               if (rc < 0)
+                       CERROR("cb_test_3: cannot add new record while "
+                              "processing\n");
+       }
+       records++;
+
+       return rc;
+}
+
+/* Check in-place modifications were done for all records*/
+static int test3_check_cb(const struct lu_env *env, struct llog_handle *lgh,
+                         struct llog_rec_hdr *rec, void *data)
+{
+       struct llog_gen_rec *lgr = (struct llog_gen_rec *)rec;
+
+       if (lgr->lgr_gen.conn_cnt != rec->lrh_index) {
+               CERROR("cb_test_3: record %u is not modified\n",
+                      rec->lrh_index);
+               return -EINVAL;
+       }
+       records++;
+       return 0;
+}
+
+static int llog_test3_process(const struct lu_env *env,
+                             struct llog_handle *lgh,
+                             llog_cb_t cb, int start)
+{
+       struct llog_process_cat_data cd;
+       int last_idx; /* new record will be injected here */
+       int rc = 0;
+
+       CWARN("test3: processing records from index %d to the end\n",
+             start);
+       cd.lpcd_first_idx = start - 1;
+       cd.lpcd_last_idx = 0;
+       records = paddings = 0;
+       last_idx = lgh->lgh_last_idx;
+       rc = llog_process(env, lgh, cb, &last_idx, &cd);
+       if (rc < 0)
+               return rc;
+       CWARN("test3: total %u records processed with %u paddings\n",
+             records, paddings);
+       return records;
+}
+
+/* Test plain llog functionality */
 static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
                       struct llog_handle *llh)
 {
-       struct llog_gen_rec      lgr;
-       int                      rc, i;
-       int                      num_recs = 1; /* 1 for the header */
+       char buf[128];
+       struct llog_rec_hdr *hdr = (void *)buf;
+       int rc, i;
+       int num_recs = 1; /* 1 for the header */
+       int expected;
 
        ENTRY;
 
-       lgr.lgr_hdr.lrh_len = lgr.lgr_tail.lrt_len = sizeof(lgr);
-       lgr.lgr_hdr.lrh_type = LLOG_GEN_REC;
+       hdr->lrh_len = sizeof(struct llog_gen_rec);
+       hdr->lrh_type = LLOG_GEN_REC;
+       llh->lgh_hdr->llh_size = sizeof(struct llog_gen_rec);
 
-       CWARN("3a: write one create_rec\n");
-       rc = llog_write(env, llh,  &lgr.lgr_hdr, LLOG_NEXT_IDX);
-       num_recs++;
-       if (rc < 0) {
-               CERROR("3a: write one log record failed: %d\n", rc);
-               RETURN(rc);
+       /* Fill the llog with 64-bytes records, use 1023 records,
+        * so last chunk will be partially full. Don't change this
+        * value until record size is changed.
+        */
+       CWARN("3a: write 1023 fixed-size llog records\n");
+       for (i = 0; i < 1023; i++) {
+               rc = llog_write(env, llh, hdr, LLOG_NEXT_IDX);
+               if (rc < 0) {
+                       CERROR("3a: write 1023 records failed at #%d: %d\n",
+                              i + 1, rc);
+                       RETURN(rc);
+               }
+               num_recs++;
        }
 
        rc = verify_handle("3a", llh, num_recs);
        if (rc)
                RETURN(rc);
 
-       CWARN("3c: write 1000 more log records\n");
-       for (i = 0; i < 1000; i++) {
-               rc = llog_write(env, llh, &lgr.lgr_hdr, LLOG_NEXT_IDX);
+       /*
+        * Test fixed-size records processing:
+        * - search the needed index
+        * - go through all records from that index
+        * - check all indices are growing monotonically and exist
+        * - modify each record
+        *
+        * NB: test3_check_n_add adds two new records while processing
+        * after last record. There were 1023 records created so the last chunk
+        * misses exactly one record. Therefore one of new records will be
+        * the last in the current chunk and second causes the new chunk to be
+        * created.
+        */
+       rec_offset = 0;
+       start_idx = 501;
+       expected = 525;
+       rc = llog_test3_process(env, llh, test3_check_n_add_cb, start_idx);
+       if (rc < 0)
+               RETURN(rc);
+
+       /* extra record is created during llog_process() */
+       if (rc != expected) {
+               CERROR("3a: process total %d records but expect %d\n",
+                      rc, expected);
+               RETURN(-ERANGE);
+       }
+
+       num_recs += 2;
+
+       /* test modification in place */
+       rc = llog_test3_process(env, llh, test3_check_cb, start_idx);
+       if (rc < 0)
+               RETURN(rc);
+
+       if (rc != expected) {
+               CERROR("3a: process total %d records but expect %d\n",
+                      rc, expected);
+               RETURN(-ERANGE);
+       }
+
+       CWARN("3b: write 566 variable size llog records\n");
+
+       /* Drop llh_size to 0 to mark llog as variable-size and write
+        * header to make this change permanent. */
+       llh->lgh_hdr->llh_size = 0;
+       llog_write(env, llh, &llh->lgh_hdr->llh_hdr, LLOG_HEADER_IDX);
+
+       hdr->lrh_type = OBD_CFG_REC;
+
+       /* there are 1025 64-bytes records in llog already,
+        * the last chunk contains single record, i.e. 64 bytes.
+        * Each pair of variable size records is 200 bytes, so
+        * we will have the following distribution per chunks:
+        * block 1: 64 + 80(80/120) + 80 + 48(pad) = 81 iterations
+        * block 2: 80(120/80) + 120 + 72(pad) = 81 itereations
+        * block 3: 80(80/120) + 80 + 112(pad) = 81 iterations
+        * -- the same as block 2 again and so on.
+        * block 7: 80(80/120) = 80 iterations and 192 bytes remain
+        * Total 6 * 81 + 80 = 566 itereations.
+        * Callback will add another 120 bytes in the end of the last chunk
+        * and another 120 bytes will cause padding (72 bytes) plus 120
+        * bytes in the new block.
+        */
+       for (i = 0; i < 566; i++) {
+               if ((i % 2) == 0)
+                       hdr->lrh_len = 80;
+               else
+                       hdr->lrh_len = 120;
+
+               rc = llog_write(env, llh, hdr, LLOG_NEXT_IDX);
                if (rc < 0) {
-                       CERROR("3c: write 1000 records failed at #%d: %d\n",
+                       CERROR("3a: write 566 records failed at #%d: %d\n",
                               i + 1, rc);
                        RETURN(rc);
                }
                num_recs++;
        }
 
-       rc = verify_handle("3c", llh, num_recs);
+       rc = verify_handle("3b", llh, num_recs);
        if (rc)
                RETURN(rc);
 
-       CWARN("3d: write records with variable size until BITMAP_SIZE, "
-             "return -ENOSPC\n");
-       for (i = 0; i < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) + 1; i++) {
-               char                     buf[64];
-               struct llog_rec_hdr     *hdr = (void *)&buf;
+       start_idx = 1026;
+       expected = 568;
+       rc = llog_test3_process(env, llh, test3_check_n_add_cb, start_idx);
+       if (rc < 0)
+               RETURN(rc);
 
-               memset(buf, 0, sizeof buf);
-               if ((i % 2) == 0)
-                       hdr->lrh_len = 40;
+       if (rc != expected) {
+               CERROR("3b: process total %d records but expect %d\n",
+                      rc, expected);
+               RETURN(-ERANGE);
+       }
+
+       num_recs += 2;
+
+       /* test modification in place */
+       rc = llog_test3_process(env, llh, test3_check_cb, start_idx);
+       if (rc < 0)
+               RETURN(rc);
+
+       if (rc != expected) {
+               CERROR("3b: process total %d records but expect %d\n",
+                      rc, expected);
+               RETURN(-ERANGE);
+       }
+
+       CWARN("3c: write records with variable size until BITMAP_SIZE, "
+             "return -ENOSPC\n");
+       while (num_recs < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr)) {
+               if ((num_recs % 2) == 0)
+                       hdr->lrh_len = 80;
                else
-                       hdr->lrh_len = 64;
-               hdr->lrh_type = OBD_CFG_REC;
-               rc = llog_write(env, llh, hdr, LLOG_NEXT_IDX);
+                       hdr->lrh_len = 128;
 
+               rc = llog_write(env, llh, hdr, LLOG_NEXT_IDX);
                if (rc == -ENOSPC) {
                        break;
                } else if (rc < 0) {
-                       CERROR("3d: write recs failed at #%d: %d\n",
-                              i + 1, rc);
+                       CERROR("3c: write recs failed at #%d: %d\n",
+                              num_recs, rc);
                        RETURN(rc);
                }
                num_recs++;
        }
+
        if (rc != -ENOSPC) {
-               CWARN("3d: write record more than BITMAP size!\n");
+               CWARN("3c: write record more than BITMAP size!\n");
                RETURN(-EINVAL);
        }
-       CWARN("3d: wrote %d more records before end of llog is reached\n",
+       CWARN("3c: wrote %d more records before end of llog is reached\n",
              num_recs);
 
        rc = verify_handle("3d", llh, num_recs);