Whamcloud - gitweb
LU-6714 llog: fix wrong offset in llog_process_thread() 16/15316/3
authorMikhail Pershin <mike.pershin@intel.com>
Tue, 16 Jun 2015 21:15:01 +0000 (00:15 +0300)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 10 Jul 2015 03:11:27 +0000 (03:11 +0000)
- llh_cat_idx may become bigger than llog bitmap size in
  llog_cat_set_first_idx() function
- it is wrong to use previous cur_offset as new buffer offset,
  new offset should be calculated from value returned by
  llog_next_block().
- optimize llog_skip_over() to find llog entry offset by index
  for llog with fixed-size records.

Signed-off-by: Mikhail Pershin <mike.pershin@intel.com>
Change-Id: I0207ddd7e09161e25a2f5d56d69db4c993e650c3
Reviewed-on: http://review.whamcloud.com/15316
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_osd.c

index df98cc9..56413ef 100644 (file)
@@ -283,9 +283,8 @@ static int llog_process_thread(void *arg)
        struct llog_log_hdr             *llh = loghandle->lgh_hdr;
        struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
        char                            *buf;
        struct llog_log_hdr             *llh = loghandle->lgh_hdr;
        struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
        char                            *buf;
-       int                              chunk_size;
+       size_t                           chunk_size;
        __u64                            cur_offset;
        __u64                            cur_offset;
-       __u64                            last_offset;
        int                              rc = 0, index = 1, last_index;
        int                              saved_index = 0;
        int                              last_called_index = 0;
        int                              rc = 0, index = 1, last_index;
        int                              saved_index = 0;
        int                              last_called_index = 0;
@@ -296,6 +295,8 @@ static int llog_process_thread(void *arg)
                RETURN(-EINVAL);
 
        cur_offset = chunk_size = llh->llh_hdr.lrh_len;
                RETURN(-EINVAL);
 
        cur_offset = chunk_size = llh->llh_hdr.lrh_len;
+       /* expect chunk_size to be power of two */
+       LASSERT(is_power_of_2(chunk_size));
 
        OBD_ALLOC_LARGE(buf, chunk_size);
        if (buf == NULL) {
 
        OBD_ALLOC_LARGE(buf, chunk_size);
        if (buf == NULL) {
@@ -312,75 +313,108 @@ static int llog_process_thread(void *arg)
        else
                last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
 
        else
                last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
 
-       if (index > last_index) /* Record is not in this buffer. */
-               GOTO(out, rc);
-
        while (rc == 0) {
                struct llog_rec_hdr *rec;
        while (rc == 0) {
                struct llog_rec_hdr *rec;
+               off_t chunk_offset;
+               unsigned int buf_offset = 0;
+               bool partial_chunk;
 
                /* skip records not set in bitmap */
                while (index <= last_index &&
                       !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
                        ++index;
 
 
                /* skip records not set in bitmap */
                while (index <= last_index &&
                       !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
                        ++index;
 
-                LASSERT(index <= last_index + 1);
-                if (index == last_index + 1)
-                        break;
-repeat:
-                CDEBUG(D_OTHER, "index: %d last_index %d\n",
-                       index, last_index);
+               /* There are no indices prior the last_index */
+               if (index > last_index)
+                       break;
+
+               CDEBUG(D_OTHER, "index: %d last_index %d\n", index,
+                      last_index);
 
 
+repeat:
                /* get the buf with our target record; avoid old garbage */
                memset(buf, 0, chunk_size);
                /* get the buf with our target record; avoid old garbage */
                memset(buf, 0, chunk_size);
-               last_offset = cur_offset;
                rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
                                     index, &cur_offset, buf, chunk_size);
                if (rc != 0)
                        GOTO(out, rc);
 
                rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
                                     index, &cur_offset, buf, chunk_size);
                if (rc != 0)
                        GOTO(out, rc);
 
+               /* NB: after llog_next_block() call the cur_offset is the
+                * offset of the next block after read one.
+                * The absolute offset of the current chunk is calculated
+                * from cur_offset value and stored in chunk_offset variable.
+                */
+               if (cur_offset % chunk_size != 0) {
+                       partial_chunk = true;
+                       chunk_offset = cur_offset & ~(chunk_size - 1);
+               } else {
+                       partial_chunk = false;
+                       chunk_offset = cur_offset - chunk_size;
+               }
+
                /* NB: when rec->lrh_len is accessed it is already swabbed
                 * since it is used at the "end" of the loop and the rec
                 * swabbing is done at the beginning of the loop. */
                /* NB: when rec->lrh_len is accessed it is already swabbed
                 * since it is used at the "end" of the loop and the rec
                 * swabbing is done at the beginning of the loop. */
-               for (rec = (struct llog_rec_hdr *)buf;
+               for (rec = (struct llog_rec_hdr *)(buf + buf_offset);
                     (char *)rec < buf + chunk_size;
                     rec = llog_rec_hdr_next(rec)) {
 
                        CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
                               rec, rec->lrh_type);
 
                     (char *)rec < buf + chunk_size;
                     rec = llog_rec_hdr_next(rec)) {
 
                        CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
                               rec, rec->lrh_type);
 
-                        if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
+                       if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
                                lustre_swab_llog_rec(rec);
 
                                lustre_swab_llog_rec(rec);
 
-                        CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
-                               rec->lrh_type, rec->lrh_index);
-
-                       if (rec->lrh_index == 0) {
-                               /* probably another rec just got added? */
-                               if (index <= loghandle->lgh_last_idx)
-                                       GOTO(repeat, rc = 0);
-                               GOTO(out, rc = 0); /* no more records */
+                       CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
+                              rec->lrh_type, rec->lrh_index);
+
+                       /* for partial chunk the end of it is zeroed, check
+                        * for index 0 to distinguish it. */
+                       if (partial_chunk && rec->lrh_index == 0) {
+                               /* concurrent llog_add() might add new records
+                                * while llog_processing, check this is not
+                                * the case and re-read the current chunk
+                                * otherwise. */
+                               if (index > loghandle->lgh_last_idx)
+                                       GOTO(out, rc = 0);
+                               CDEBUG(D_OTHER, "Re-read last llog buffer for "
+                                      "new records, index %u, last %u\n",
+                                      index, loghandle->lgh_last_idx);
+                               /* save offset inside buffer for the re-read */
+                               buf_offset = (char *)rec - (char *)buf;
+                               cur_offset = chunk_offset;
+                               goto repeat;
                        }
                        }
+
                        if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
                        if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
-                                CWARN("invalid length %d in llog record for "
-                                      "index %d/%d\n", rec->lrh_len,
-                                      rec->lrh_index, index);
-                                GOTO(out, rc = -EINVAL);
-                        }
+                               CWARN("invalid length %d in llog record for "
+                                     "index %d/%d\n", rec->lrh_len,
+                                     rec->lrh_index, index);
+                               GOTO(out, rc = -EINVAL);
+                       }
 
 
-                        if (rec->lrh_index < index) {
-                                CDEBUG(D_OTHER, "skipping lrh_index %d\n",
-                                       rec->lrh_index);
-                                continue;
-                        }
+                       if (rec->lrh_index < index) {
+                               CDEBUG(D_OTHER, "skipping lrh_index %d\n",
+                                      rec->lrh_index);
+                               continue;
+                       }
+
+                       if (rec->lrh_index != index) {
+                               CERROR("%s: Invalid record: index %u but "
+                                      "expected %u\n",
+                                      loghandle->lgh_ctxt->loc_obd->obd_name,
+                                      rec->lrh_index, index);
+                               GOTO(out, rc = -ERANGE);
+                       }
 
                        CDEBUG(D_OTHER,
                               "lrh_index: %d lrh_len: %d (%d remains)\n",
                               rec->lrh_index, rec->lrh_len,
                               (int)(buf + chunk_size - (char *)rec));
 
 
                        CDEBUG(D_OTHER,
                               "lrh_index: %d lrh_len: %d (%d remains)\n",
                               rec->lrh_index, rec->lrh_len,
                               (int)(buf + chunk_size - (char *)rec));
 
-                        loghandle->lgh_cur_idx = rec->lrh_index;
-                        loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
-                                                    last_offset;
+                       loghandle->lgh_cur_idx = rec->lrh_index;
+                       loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
+                                                   chunk_offset;
 
                        /* if set, process the callback on this record */
                        if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
 
                        /* if set, process the callback on this record */
                        if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
@@ -393,23 +427,20 @@ repeat:
                                        rc = llog_cancel_rec(lpi->lpi_env,
                                                             loghandle,
                                                             rec->lrh_index);
                                        rc = llog_cancel_rec(lpi->lpi_env,
                                                             loghandle,
                                                             rec->lrh_index);
-                                }
-                                if (rc)
-                                        GOTO(out, rc);
-                        } else {
-                                CDEBUG(D_OTHER, "Skipped index %d\n", index);
-                        }
-
-                        /* next record, still in buffer? */
-                        ++index;
-                        if (index > last_index)
-                                GOTO(out, rc = 0);
-                }
-        }
+                               }
+                               if (rc)
+                                       GOTO(out, rc);
+                       }
+                       /* exit if the last index is reached */
+                       if (index >= last_index)
+                               GOTO(out, rc = 0);
+                       ++index;
+               }
+       }
 
 out:
 
 out:
-        if (cd != NULL)
-                cd->lpcd_last_idx = last_called_index;
+       if (cd != NULL)
+               cd->lpcd_last_idx = last_called_index;
 
        if (unlikely(rc == -EIO && loghandle->lgh_obj != NULL)) {
                /* something bad happened to the processing of a local
 
        if (unlikely(rc == -EIO && loghandle->lgh_obj != NULL)) {
                /* something bad happened to the processing of a local
index 9af09a7..e1ad2cd 100644 (file)
@@ -748,34 +748,40 @@ int llog_cat_reverse_process(const struct lu_env *env,
 }
 EXPORT_SYMBOL(llog_cat_reverse_process);
 
 }
 EXPORT_SYMBOL(llog_cat_reverse_process);
 
-static int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
+static int llog_cat_set_first_idx(struct llog_handle *cathandle, int idx)
 {
        struct llog_log_hdr *llh = cathandle->lgh_hdr;
 {
        struct llog_log_hdr *llh = cathandle->lgh_hdr;
-       int i, bitmap_size, idx;
+       int bitmap_size;
+
        ENTRY;
 
        bitmap_size = LLOG_HDR_BITMAP_SIZE(llh);
        ENTRY;
 
        bitmap_size = LLOG_HDR_BITMAP_SIZE(llh);
-       if (llh->llh_cat_idx == (index - 1)) {
-               idx = llh->llh_cat_idx + 1;
+       /*
+        * The llh_cat_idx equals to the first used index minus 1
+        * so if we canceled the first index then llh_cat_idx
+        * must be renewed.
+        */
+       if (llh->llh_cat_idx == (idx - 1)) {
                llh->llh_cat_idx = idx;
                llh->llh_cat_idx = idx;
-               if (idx == cathandle->lgh_last_idx)
-                       goto out;
-
-               for (i = (index + 1) % bitmap_size;
-                    i != cathandle->lgh_last_idx;
-                    i = (i + 1) % bitmap_size) {
-                       if (!ext2_test_bit(i, LLOG_HDR_BITMAP(llh))) {
-                               idx = llh->llh_cat_idx + 1;
+
+               do {
+                       idx = (idx + 1) % bitmap_size;
+                       if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) {
+                               /* update llh_cat_idx for each unset bit,
+                                * expecting the*/
                                llh->llh_cat_idx = idx;
                                llh->llh_cat_idx = idx;
-                       } else if (i == 0) {
-                               llh->llh_cat_idx = 0;
+                       } else if (idx == 0) {
+                               /* skip header bit */
+                               continue;
                        } else {
                        } else {
+                               /* the first index is found */
                                break;
                        }
                                break;
                        }
-               }
-out:
-               CDEBUG(D_RPCTRACE, "set catlog "DOSTID" first idx %u\n",
-                      POSTID(&cathandle->lgh_id.lgl_oi), llh->llh_cat_idx);
+               } while (idx != cathandle->lgh_last_idx);
+
+               CDEBUG(D_RPCTRACE, "Set catlog "DOSTID" first idx %u,"
+                      " (last_idx %u)\n", POSTID(&cathandle->lgh_id.lgl_oi),
+                      llh->llh_cat_idx, cathandle->lgh_last_idx);
        }
 
        RETURN(0);
        }
 
        RETURN(0);
index 0a09fb5..a79b1ec 100644 (file)
@@ -641,13 +641,19 @@ out:
  * actual records are larger than minimum size) we just skip
  * some more records.
  */
  * actual records are larger than minimum size) we just skip
  * some more records.
  */
-static inline void llog_skip_over(__u64 *off, int curr, int goal,
-                                 __u32 chunk_size)
+static inline void llog_skip_over(struct llog_log_hdr *llh, __u64 *off,
+                                 int curr, int goal, __u32 chunk_size)
 {
 {
-       if (goal <= curr)
-               return;
-       *off = (*off + (goal - curr - 1) * LLOG_MIN_REC_SIZE) &
-               ~(chunk_size - 1);
+       if (goal > curr) {
+               if (llh->llh_size == 0) {
+                       /* variable size records */
+                       *off = (*off + (goal - curr - 1) * LLOG_MIN_REC_SIZE);
+               } else {
+                       *off = chunk_size + (goal - 1) * llh->llh_size;
+               }
+       }
+       /* always align with lower chunk boundary*/
+       *off &= ~(chunk_size - 1);
 }
 
 /**
 }
 
 /**
@@ -734,7 +740,8 @@ static int llog_osd_next_block(const struct lu_env *env,
                struct llog_rec_hdr     *rec, *last_rec;
                struct llog_rec_tail    *tail;
 
                struct llog_rec_hdr     *rec, *last_rec;
                struct llog_rec_tail    *tail;
 
-               llog_skip_over(cur_offset, *cur_idx, next_idx, chunk_size);
+               llog_skip_over(loghandle->lgh_hdr, cur_offset, *cur_idx,
+                              next_idx, chunk_size);
 
                /* read up to next llog chunk_size block */
                lgi->lgi_buf.lb_len = chunk_size -
 
                /* read up to next llog chunk_size block */
                lgi->lgi_buf.lb_len = chunk_size -
@@ -864,7 +871,8 @@ static int llog_osd_prev_block(const struct lu_env *env,
        LASSERT(dt);
 
        cur_offset = chunk_size;
        LASSERT(dt);
 
        cur_offset = chunk_size;
-       llog_skip_over(&cur_offset, 0, prev_idx, chunk_size);
+       llog_skip_over(loghandle->lgh_hdr, &cur_offset, 0, prev_idx,
+                      chunk_size);
 
        rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)
 
        rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)