Whamcloud - gitweb
LU-7050 llog: record the minimum record size 03/16103/6
authorwang di <di.wang@intel.com>
Mon, 24 Aug 2015 17:28:03 +0000 (10:28 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 10 Sep 2015 04:06:56 +0000 (04:06 +0000)
Remember the minimum record size in llog header, so in
llog_skip_over, it can skip the records properly. In current
implementation, it will use LLOG_MIN_REC_SIZE, only 24 bytes,
which too less for update records(usually more than 1000 bytes),
and cause update recovery reading a lot useless update records
from other MDTs.

The minimum record size will be recorded in llh_size, which is
only used by fixed size record llog now, and also add another
flag LLOG_F_IS_FIXSIZE to indicate the fix size record llog.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: Ia62684d1fb744e3aca74107f22683b2ee63a2d16
Reviewed-on: http://review.whamcloud.com/16103
Tested-by: Jenkins
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre/lustre_idl.h
lustre/obdclass/llog.c
lustre/obdclass/llog_osd.c
lustre/obdclass/llog_test.c

index 589b2c6..fccdc1b 100644 (file)
@@ -3308,7 +3308,12 @@ enum llog_flag {
        LLOG_F_IS_CAT           = 0x2,
        LLOG_F_IS_PLAIN         = 0x4,
        LLOG_F_EXT_JOBID        = 0x8,
+       LLOG_F_IS_FIXSIZE       = 0x10,
 
+       /* Note: Flags covered by LLOG_F_EXT_MASK will be inherited from
+        * catlog to plain log, so do not add LLOG_F_IS_FIXSIZE here,
+        * because the catlog record is usually fixed size, but its plain
+        * log record can be variable */
        LLOG_F_EXT_MASK = LLOG_F_EXT_JOBID,
 };
 
index 7bdd90f..6d1e07c 100644 (file)
@@ -379,6 +379,7 @@ int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
                LASSERT(list_empty(&handle->u.chd.chd_head));
                INIT_LIST_HEAD(&handle->u.chd.chd_head);
                llh->llh_size = sizeof(struct llog_logid_rec);
+               llh->llh_flags |= LLOG_F_IS_FIXSIZE;
        } else if (!(flags & LLOG_F_IS_PLAIN)) {
                CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
                       handle->lgh_ctxt->loc_obd->obd_name,
index bc9951b..7913294 100644 (file)
@@ -478,8 +478,9 @@ static int llog_osd_write_rec(const struct lu_env *env,
                               "len:%u offset %llu\n",
                               POSTID(&loghandle->lgh_id.lgl_oi), idx,
                               rec->lrh_len, (long long)lgi->lgi_off);
-               } else if (llh->llh_size > 0) {
-                       if (llh->llh_size != rec->lrh_len) {
+               } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
+                       if (llh->llh_size == 0 ||
+                           llh->llh_size != rec->lrh_len) {
                                CERROR("%s: wrong record size, llh_size is %u"
                                       " but record size is %u\n",
                                       o->do_lu.lo_dev->ld_obd->obd_name,
@@ -564,6 +565,16 @@ static int llog_osd_write_rec(const struct lu_env *env,
        }
        llh->llh_count++;
 
+       if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
+               LASSERT(llh->llh_size == reclen);
+       } else {
+               /* Update the minimum size of the llog record */
+               if (llh->llh_size == 0)
+                       llh->llh_size = reclen;
+               else if (reclen < llh->llh_size)
+                       llh->llh_size = reclen;
+       }
+
        if (lgi->lgi_attr.la_size == 0) {
                lgi->lgi_off = 0;
                lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len;
@@ -582,9 +593,9 @@ static int llog_osd_write_rec(const struct lu_env *env,
                 * the RPC (1MB limit), if we write 8K for each operation, which
                 * will cost a lot space, and keep us adding more updates to one
                 * update log.*/
-               lgi->lgi_off = offsetof(typeof(*llh), llh_count);
-               lgi->lgi_buf.lb_len = sizeof(llh->llh_count);
-               lgi->lgi_buf.lb_buf = &llh->llh_count;
+               lgi->lgi_off = 0;
+               lgi->lgi_buf.lb_len = llh->llh_bitmap_offset;
+               lgi->lgi_buf.lb_buf = &llh->llh_hdr;
                rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
                if (rc != 0)
                        GOTO(out_unlock, rc);
@@ -660,16 +671,38 @@ out:
  * that we are not far enough along the log (because the
  * actual records are larger than minimum size) we just skip
  * some more records.
+ *
+ * Note: in llog_process_thread, it will use bitmap offset as
+ * the index to locate the record, which also includs some pad
+ * records, whose record size is very small, and it also does not
+ * consider pad record when recording minimum record size (otherwise
+ * min_record size might be too small), so in some rare cases,
+ * it might skip too much record for @goal, see llog_osd_next_block().
+ *
+ * When force_mini_rec is true, it means we have to use LLOG_MIN_REC_SIZE
+ * as the min record size to skip over, usually because in the previous
+ * try, it skip too much record, see loog_osd_next(prev)_block().
  */
-static inline void llog_skip_over(struct llog_log_hdr *llh, __u64 *off,
-                                 int curr, int goal, __u32 chunk_size)
+static inline void llog_skip_over(struct llog_handle *lgh, __u64 *off,
+                                 int curr, int goal, __u32 chunk_size,
+                                 bool force_mini_rec)
 {
+       struct llog_log_hdr *llh = lgh->lgh_hdr;
+
+       /* Goal should not bigger than the record count */
+       if (goal > lgh->lgh_last_idx)
+               goal = lgh->lgh_last_idx;
+
        if (goal > curr) {
-               if (llh->llh_size == 0) {
-                       /* variable size records */
-                       *off = *off + (goal - curr - 1) * LLOG_MIN_REC_SIZE;
-               } else {
+               if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
                        *off = chunk_size + (goal - 1) * llh->llh_size;
+               } else {
+                       __u64 min_rec_size = LLOG_MIN_REC_SIZE;
+
+                       if (llh->llh_size > 0 && !force_mini_rec)
+                               min_rec_size = llh->llh_size;
+
+                       *off = *off + (goal - curr - 1) * min_rec_size;
                }
        }
        /* always align with lower chunk boundary*/
@@ -730,6 +763,9 @@ static int llog_osd_next_block(const struct lu_env *env,
        struct dt_device        *dt;
        int                      rc;
        __u32                   chunk_size;
+       int last_idx = *cur_idx;
+       __u64 last_offset = *cur_offset;
+       bool force_mini_rec = false;
 
        ENTRY;
 
@@ -760,8 +796,8 @@ static int llog_osd_next_block(const struct lu_env *env,
                struct llog_rec_hdr     *rec, *last_rec;
                struct llog_rec_tail    *tail;
 
-               llog_skip_over(loghandle->lgh_hdr, cur_offset, *cur_idx,
-                              next_idx, chunk_size);
+               llog_skip_over(loghandle, cur_offset, *cur_idx,
+                              next_idx, chunk_size, force_mini_rec);
 
                /* read up to next llog chunk_size block */
                lgi->lgi_buf.lb_len = chunk_size -
@@ -770,6 +806,9 @@ static int llog_osd_next_block(const struct lu_env *env,
 
                rc = dt_read(env, o, &lgi->lgi_buf, cur_offset);
                if (rc < 0) {
+                       if (rc == -EBADR && !force_mini_rec)
+                               goto retry;
+
                        CERROR("%s: can't read llog block from log "DFID
                               " offset "LPU64": rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
@@ -784,10 +823,16 @@ static int llog_osd_next_block(const struct lu_env *env,
                        memset(buf + rc, 0, len - rc);
                }
 
-               if (rc == 0) /* end of file, nothing to do */
+               if (rc == 0) { /* end of file, nothing to do */
+                       if (!force_mini_rec)
+                               goto retry;
                        GOTO(out, rc);
+               }
 
                if (rc < sizeof(*tail)) {
+                       if (!force_mini_rec)
+                               goto retry;
+
                        CERROR("%s: invalid llog block at log id "DOSTID"/%u "
                               "offset "LPU64"\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
@@ -821,12 +866,18 @@ static int llog_osd_next_block(const struct lu_env *env,
                               loghandle->lgh_id.lgl_ogen, *cur_offset);
                        GOTO(out, rc = -EINVAL);
                }
-               if (tail->lrt_index < next_idx)
+               if (tail->lrt_index < next_idx) {
+                       last_idx = *cur_idx;
+                       last_offset = *cur_offset;
                        continue;
+               }
 
                /* sanity check that the start of the new buffer is no farther
                 * than the record that we wanted.  This shouldn't happen. */
                if (rec->lrh_index > next_idx) {
+                       if (!force_mini_rec && next_idx > last_idx)
+                               goto retry;
+
                        CERROR("%s: missed desired record? %u > %u\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
                               rec->lrh_index, next_idx);
@@ -839,6 +890,14 @@ static int llog_osd_next_block(const struct lu_env *env,
                                                 CLF_VERSION | CLF_RENAME);
 
                GOTO(out, rc = 0);
+
+retry:
+               /* Note: because there are some pad records in the
+                * llog, so llog_skip_over() might skip too much
+                * records, let's try skip again with minimum record */
+               force_mini_rec = true;
+               *cur_offset = last_offset;
+               *cur_idx = last_idx;
        }
        GOTO(out, rc = -EIO);
 out:
@@ -890,9 +949,11 @@ static int llog_osd_prev_block(const struct lu_env *env,
        dt = lu2dt_dev(o->do_lu.lo_dev);
        LASSERT(dt);
 
+       /* Let's only use mini record size for previous block read
+        * for now XXX */
        cur_offset = chunk_size;
-       llog_skip_over(loghandle->lgh_hdr, &cur_offset, 0, prev_idx,
-                      chunk_size);
+       llog_skip_over(loghandle, &cur_offset, 0, prev_idx,
+                      chunk_size, true);
 
        rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)
index 259c83b..aff9204 100644 (file)
@@ -236,7 +236,8 @@ static int test3_check_n_add_cb(const struct lu_env *env,
        int *last_rec = data;
        int rc;
 
-       if (lgh->lgh_hdr->llh_size > 0) {
+       if (lgh->lgh_hdr->llh_flags & LLOG_F_IS_FIXSIZE) {
+               LASSERT(lgh->lgh_hdr->llh_size > 0);
                if (lgh->lgh_cur_offset != lgh->lgh_hdr->llh_hdr.lrh_len +
                                (start_idx + records - 1) *
                                lgh->lgh_hdr->llh_size)
@@ -346,6 +347,7 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
        hdr->lrh_len = sizeof(struct llog_gen_rec);
        hdr->lrh_type = LLOG_GEN_REC;
        llh->lgh_hdr->llh_size = sizeof(struct llog_gen_rec);
+       llh->lgh_hdr->llh_flags |= LLOG_F_IS_FIXSIZE;
 
        /* Fill the llog with 64-bytes records, use 1023 records,
         * so last chunk will be partially full. Don't change this
@@ -410,7 +412,7 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
 
        /* Drop llh_size to 0 to mark llog as variable-size and write
         * header to make this change permanent. */
-       llh->lgh_hdr->llh_size = 0;
+       llh->lgh_hdr->llh_flags &= ~LLOG_F_IS_FIXSIZE;
        llog_write(env, llh, &llh->lgh_hdr->llh_hdr, LLOG_HEADER_IDX);
 
        hdr->lrh_type = OBD_CFG_REC;