Whamcloud - gitweb
LU-7050 llog: record the minimum record size
[fs/lustre-release.git] / lustre / obdclass / llog_osd.c
index bc9951b..7913294 100644 (file)
@@ -478,8 +478,9 @@ static int llog_osd_write_rec(const struct lu_env *env,
                               "len:%u offset %llu\n",
                               POSTID(&loghandle->lgh_id.lgl_oi), idx,
                               rec->lrh_len, (long long)lgi->lgi_off);
-               } else if (llh->llh_size > 0) {
-                       if (llh->llh_size != rec->lrh_len) {
+               } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
+                       if (llh->llh_size == 0 ||
+                           llh->llh_size != rec->lrh_len) {
                                CERROR("%s: wrong record size, llh_size is %u"
                                       " but record size is %u\n",
                                       o->do_lu.lo_dev->ld_obd->obd_name,
@@ -564,6 +565,16 @@ static int llog_osd_write_rec(const struct lu_env *env,
        }
        llh->llh_count++;
 
+       if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
+               LASSERT(llh->llh_size == reclen);
+       } else {
+               /* Update the minimum size of the llog record */
+               if (llh->llh_size == 0)
+                       llh->llh_size = reclen;
+               else if (reclen < llh->llh_size)
+                       llh->llh_size = reclen;
+       }
+
        if (lgi->lgi_attr.la_size == 0) {
                lgi->lgi_off = 0;
                lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len;
@@ -582,9 +593,9 @@ static int llog_osd_write_rec(const struct lu_env *env,
                 * the RPC (1MB limit), if we write 8K for each operation, which
                 * will cost a lot space, and keep us adding more updates to one
                 * update log.*/
-               lgi->lgi_off = offsetof(typeof(*llh), llh_count);
-               lgi->lgi_buf.lb_len = sizeof(llh->llh_count);
-               lgi->lgi_buf.lb_buf = &llh->llh_count;
+               lgi->lgi_off = 0;
+               lgi->lgi_buf.lb_len = llh->llh_bitmap_offset;
+               lgi->lgi_buf.lb_buf = &llh->llh_hdr;
                rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
                if (rc != 0)
                        GOTO(out_unlock, rc);
@@ -660,16 +671,38 @@ out:
  * that we are not far enough along the log (because the
  * actual records are larger than minimum size) we just skip
  * some more records.
+ *
+ * Note: in llog_process_thread, it will use bitmap offset as
+ * the index to locate the record, which also includs some pad
+ * records, whose record size is very small, and it also does not
+ * consider pad record when recording minimum record size (otherwise
+ * min_record size might be too small), so in some rare cases,
+ * it might skip too much record for @goal, see llog_osd_next_block().
+ *
+ * When force_mini_rec is true, it means we have to use LLOG_MIN_REC_SIZE
+ * as the min record size to skip over, usually because in the previous
+ * try, it skip too much record, see loog_osd_next(prev)_block().
  */
-static inline void llog_skip_over(struct llog_log_hdr *llh, __u64 *off,
-                                 int curr, int goal, __u32 chunk_size)
+static inline void llog_skip_over(struct llog_handle *lgh, __u64 *off,
+                                 int curr, int goal, __u32 chunk_size,
+                                 bool force_mini_rec)
 {
+       struct llog_log_hdr *llh = lgh->lgh_hdr;
+
+       /* Goal should not bigger than the record count */
+       if (goal > lgh->lgh_last_idx)
+               goal = lgh->lgh_last_idx;
+
        if (goal > curr) {
-               if (llh->llh_size == 0) {
-                       /* variable size records */
-                       *off = *off + (goal - curr - 1) * LLOG_MIN_REC_SIZE;
-               } else {
+               if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
                        *off = chunk_size + (goal - 1) * llh->llh_size;
+               } else {
+                       __u64 min_rec_size = LLOG_MIN_REC_SIZE;
+
+                       if (llh->llh_size > 0 && !force_mini_rec)
+                               min_rec_size = llh->llh_size;
+
+                       *off = *off + (goal - curr - 1) * min_rec_size;
                }
        }
        /* always align with lower chunk boundary*/
@@ -730,6 +763,9 @@ static int llog_osd_next_block(const struct lu_env *env,
        struct dt_device        *dt;
        int                      rc;
        __u32                   chunk_size;
+       int last_idx = *cur_idx;
+       __u64 last_offset = *cur_offset;
+       bool force_mini_rec = false;
 
        ENTRY;
 
@@ -760,8 +796,8 @@ static int llog_osd_next_block(const struct lu_env *env,
                struct llog_rec_hdr     *rec, *last_rec;
                struct llog_rec_tail    *tail;
 
-               llog_skip_over(loghandle->lgh_hdr, cur_offset, *cur_idx,
-                              next_idx, chunk_size);
+               llog_skip_over(loghandle, cur_offset, *cur_idx,
+                              next_idx, chunk_size, force_mini_rec);
 
                /* read up to next llog chunk_size block */
                lgi->lgi_buf.lb_len = chunk_size -
@@ -770,6 +806,9 @@ static int llog_osd_next_block(const struct lu_env *env,
 
                rc = dt_read(env, o, &lgi->lgi_buf, cur_offset);
                if (rc < 0) {
+                       if (rc == -EBADR && !force_mini_rec)
+                               goto retry;
+
                        CERROR("%s: can't read llog block from log "DFID
                               " offset "LPU64": rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
@@ -784,10 +823,16 @@ static int llog_osd_next_block(const struct lu_env *env,
                        memset(buf + rc, 0, len - rc);
                }
 
-               if (rc == 0) /* end of file, nothing to do */
+               if (rc == 0) { /* end of file, nothing to do */
+                       if (!force_mini_rec)
+                               goto retry;
                        GOTO(out, rc);
+               }
 
                if (rc < sizeof(*tail)) {
+                       if (!force_mini_rec)
+                               goto retry;
+
                        CERROR("%s: invalid llog block at log id "DOSTID"/%u "
                               "offset "LPU64"\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
@@ -821,12 +866,18 @@ static int llog_osd_next_block(const struct lu_env *env,
                               loghandle->lgh_id.lgl_ogen, *cur_offset);
                        GOTO(out, rc = -EINVAL);
                }
-               if (tail->lrt_index < next_idx)
+               if (tail->lrt_index < next_idx) {
+                       last_idx = *cur_idx;
+                       last_offset = *cur_offset;
                        continue;
+               }
 
                /* sanity check that the start of the new buffer is no farther
                 * than the record that we wanted.  This shouldn't happen. */
                if (rec->lrh_index > next_idx) {
+                       if (!force_mini_rec && next_idx > last_idx)
+                               goto retry;
+
                        CERROR("%s: missed desired record? %u > %u\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
                               rec->lrh_index, next_idx);
@@ -839,6 +890,14 @@ static int llog_osd_next_block(const struct lu_env *env,
                                                 CLF_VERSION | CLF_RENAME);
 
                GOTO(out, rc = 0);
+
+retry:
+               /* Note: because there are some pad records in the
+                * llog, so llog_skip_over() might skip too much
+                * records, let's try skip again with minimum record */
+               force_mini_rec = true;
+               *cur_offset = last_offset;
+               *cur_idx = last_idx;
        }
        GOTO(out, rc = -EIO);
 out:
@@ -890,9 +949,11 @@ static int llog_osd_prev_block(const struct lu_env *env,
        dt = lu2dt_dev(o->do_lu.lo_dev);
        LASSERT(dt);
 
+       /* Let's only use mini record size for previous block read
+        * for now XXX */
        cur_offset = chunk_size;
-       llog_skip_over(loghandle->lgh_hdr, &cur_offset, 0, prev_idx,
-                      chunk_size);
+       llog_skip_over(loghandle, &cur_offset, 0, prev_idx,
+                      chunk_size, true);
 
        rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)