Whamcloud - gitweb
LU-6602 obdclass: variable llog chunk size 83/14883/6
authorAndreas Dilger <andreas.dilger@intel.com>
Wed, 20 May 2015 09:27:24 +0000 (02:27 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 11 Jun 2015 16:11:50 +0000 (16:11 +0000)
Do not use fix LLOG_CHUNK_SIZE (8192 bytes), and
it will get the llog_chunk_size from llog_log_hdr.
Accordingly llog header will be variable too, so
we can enlarge the bitmap in the header, then
have more records in each llog file.

Signed-off-by: Andreas Dilger <andreas.dilger@intel.com>
Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I1d18c4a502c5ab51d574bb026c94752ddb7dff8a
Reviewed-on: http://review.whamcloud.com/14883
Tested-by: Jenkins
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
15 files changed:
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_log.h
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_internal.h
lustre/obdclass/llog_obd.c
lustre/obdclass/llog_osd.c
lustre/obdclass/llog_swab.c
lustre/obdclass/llog_test.c
lustre/osd-zfs/osd_io.c
lustre/ptlrpc/llog_client.c
lustre/ptlrpc/llog_server.c
lustre/target/update_records.c
lustre/utils/llog_reader.c
lustre/utils/wirecheck.c

index 0a129dc..538fb22 100644 (file)
@@ -3420,13 +3420,6 @@ struct llog_gen_rec {
        struct llog_rec_tail    lgr_tail;
 };
 
-/* On-disk header structure of each log object, stored in little endian order */
-#define LLOG_CHUNK_SIZE         8192
-#define LLOG_HEADER_SIZE        (96)
-#define LLOG_BITMAP_BYTES       (LLOG_CHUNK_SIZE - LLOG_HEADER_SIZE)
-
-#define LLOG_MIN_REC_SIZE       (24) /* round(llog_rec_hdr + llog_rec_tail) */
-
 /* flags for the logs */
 enum llog_flag {
        LLOG_F_ZAP_WHEN_EMPTY   = 0x1,
@@ -3437,24 +3430,46 @@ enum llog_flag {
        LLOG_F_EXT_MASK = LLOG_F_EXT_JOBID,
 };
 
+/* On-disk header structure of each log object, stored in little endian order */
+#define LLOG_MIN_CHUNK_SIZE    8192
+#define LLOG_HEADER_SIZE        (96) /* sizeof (llog_log_hdr) + sizeof(llh_tail)
+                                     * - sizeof(llh_bitmap) */
+#define LLOG_BITMAP_BYTES       (LLOG_MIN_CHUNK_SIZE - LLOG_HEADER_SIZE)
+#define LLOG_MIN_REC_SIZE       (24) /* round(llog_rec_hdr + llog_rec_tail) */
+
 struct llog_log_hdr {
        struct llog_rec_hdr     llh_hdr;
        __s64                   llh_timestamp;
-        __u32                   llh_count;
-        __u32                   llh_bitmap_offset;
-        __u32                   llh_size;
-        __u32                   llh_flags;
-        __u32                   llh_cat_idx;
-        /* for a catalog the first plain slot is next to it */
-        struct obd_uuid         llh_tgtuuid;
-        __u32                   llh_reserved[LLOG_HEADER_SIZE/sizeof(__u32) - 23];
-        __u32                   llh_bitmap[LLOG_BITMAP_BYTES/sizeof(__u32)];
-        struct llog_rec_tail    llh_tail;
+       __u32                   llh_count;
+       __u32                   llh_bitmap_offset;
+       __u32                   llh_size;
+       __u32                   llh_flags;
+       __u32                   llh_cat_idx;
+       /* for a catalog the first plain slot is next to it */
+       struct obd_uuid         llh_tgtuuid;
+       __u32                   llh_reserved[LLOG_HEADER_SIZE/sizeof(__u32)-23];
+       /* These fields must always be at the end of the llog_log_hdr.
+        * Note: llh_bitmap size is variable because llog chunk size could be
+        * bigger than LLOG_MIN_CHUNK_SIZE, i.e. sizeof(llog_log_hdr) > 8192
+        * bytes, and the real size is stored in llh_hdr.lrh_len, which means
+        * llh_tail should only be refered by LLOG_HDR_TAIL().
+        * But this structure is also used by client/server llog interface
+        * (see llog_client.c), it will be kept in its original way to avoid
+        * compatiblity issue. */
+       __u32                   llh_bitmap[LLOG_BITMAP_BYTES / sizeof(__u32)];
+       struct llog_rec_tail    llh_tail;
 } __attribute__((packed));
-
-#define LLOG_BITMAP_SIZE(llh)  (__u32)((llh->llh_hdr.lrh_len -         \
-                                       llh->llh_bitmap_offset -        \
-                                       sizeof(llh->llh_tail)) * 8)
+#undef LLOG_HEADER_SIZE
+#undef LLOG_BITMAP_BYTES
+
+#define LLOG_HDR_BITMAP_SIZE(llh)      (__u32)((llh->llh_hdr.lrh_len - \
+                                        llh->llh_bitmap_offset -       \
+                                        sizeof(llh->llh_tail)) * 8)
+#define LLOG_HDR_BITMAP(llh)   (__u32 *)((char *)(llh) +               \
+                                         (llh)->llh_bitmap_offset)
+#define LLOG_HDR_TAIL(llh)     ((struct llog_rec_tail *)((char *)llh + \
+                                                llh->llh_hdr.lrh_len - \
+                                                sizeof(llh->llh_tail)))
 
 /** log cookies are used to reference a specific log file and a record therein */
 struct llog_cookie {
index 18e9a3a..27573c8 100644 (file)
@@ -295,6 +295,7 @@ struct llog_handle {
        struct rw_semaphore      lgh_hdr_lock; /* protect lgh_hdr data */
        struct llog_logid        lgh_id; /* id of this log */
        struct llog_log_hdr     *lgh_hdr;
+       size_t                  lgh_hdr_size;
        struct dt_object        *lgh_obj;
        int                      lgh_last_idx;
        int                      lgh_cur_idx; /* used during llog_process */
@@ -342,6 +343,9 @@ struct llog_ctxt {
        struct dt_object        *loc_dir;
        struct local_oid_storage *loc_los_nameless;
        struct local_oid_storage *loc_los_named;
+       /* llog chunk size, and llog record size can not be bigger than
+        * loc_chunk_size */
+       __u32                    loc_chunk_size;
 };
 
 #define LLOG_PROC_BREAK 0x0001
index 33d2ea3..707fde9 100644 (file)
@@ -80,15 +80,14 @@ static void llog_free_handle(struct llog_handle *loghandle)
        LASSERT(loghandle != NULL);
 
        /* failed llog_init_handle */
-       if (!loghandle->lgh_hdr)
+       if (loghandle->lgh_hdr == NULL)
                goto out;
 
        if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
                LASSERT(list_empty(&loghandle->u.phd.phd_entry));
        else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
                LASSERT(list_empty(&loghandle->u.chd.chd_head));
-       LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
-       OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
+       OBD_FREE_LARGE(loghandle->lgh_hdr, loghandle->lgh_hdr_size);
 out:
        OBD_FREE_PTR(loghandle);
 }
@@ -122,7 +121,7 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
         }
 
        down_write(&loghandle->lgh_hdr_lock);
-       if (!ext2_clear_bit(index, llh->llh_bitmap)) {
+       if (!ext2_clear_bit(index, LLOG_HDR_BITMAP(llh))) {
                up_write(&loghandle->lgh_hdr_lock);
                CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
                RETURN(-ENOENT);
@@ -132,7 +131,7 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
 
        if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
            (llh->llh_count == 1) &&
-           (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
+           (loghandle->lgh_last_idx == LLOG_HDR_BITMAP_SIZE(llh) - 1)) {
                up_write(&loghandle->lgh_hdr_lock);
                rc = llog_destroy(env, loghandle);
                if (rc < 0) {
@@ -159,7 +158,7 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
        RETURN(0);
 out_err:
        down_write(&loghandle->lgh_hdr_lock);
-       ext2_set_bit(index, llh->llh_bitmap);
+       ext2_set_bit(index, LLOG_HDR_BITMAP(llh));
        llh->llh_count++;
        up_write(&loghandle->lgh_hdr_lock);
        return rc;
@@ -183,17 +182,22 @@ static int llog_read_header(const struct lu_env *env,
        if (rc == LLOG_EEMPTY) {
                struct llog_log_hdr *llh = handle->lgh_hdr;
 
+               /* lrh_len should be initialized in llog_init_handle */
                handle->lgh_last_idx = 0; /* header is record with index 0 */
                llh->llh_count = 1;         /* for the header record */
                llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
-               llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
-               llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
+               LASSERT(handle->lgh_ctxt->loc_chunk_size >=
+                                               LLOG_MIN_CHUNK_SIZE);
+               llh->llh_hdr.lrh_len = handle->lgh_ctxt->loc_chunk_size;
+               llh->llh_hdr.lrh_index = 0;
                llh->llh_timestamp = cfs_time_current_sec();
                if (uuid)
                        memcpy(&llh->llh_tgtuuid, uuid,
                               sizeof(llh->llh_tgtuuid));
                llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
-               ext2_set_bit(0, llh->llh_bitmap);
+               ext2_set_bit(0, LLOG_HDR_BITMAP(llh));
+               LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len;
+               LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index;
                rc = 0;
        }
        return rc;
@@ -205,14 +209,18 @@ int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
        struct llog_log_hdr     *llh;
        enum llog_flag           fmt = flags & LLOG_F_EXT_MASK;
        int                      rc;
-
+       int                     chunk_size = handle->lgh_ctxt->loc_chunk_size;
        ENTRY;
+
        LASSERT(handle->lgh_hdr == NULL);
 
-       OBD_ALLOC_PTR(llh);
+       LASSERT(chunk_size >= LLOG_MIN_CHUNK_SIZE);
+       OBD_ALLOC_LARGE(llh, chunk_size);
        if (llh == NULL)
                RETURN(-ENOMEM);
+
        handle->lgh_hdr = llh;
+       handle->lgh_hdr_size = chunk_size;
        /* first assign flags to use llog_client_ops */
        llh->llh_flags = flags;
        rc = llog_read_header(env, handle, uuid);
@@ -261,7 +269,7 @@ int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
        llh->llh_flags |= fmt;
 out:
        if (rc) {
-               OBD_FREE_PTR(llh);
+               OBD_FREE_LARGE(llh, chunk_size);
                handle->lgh_hdr = NULL;
        }
        RETURN(rc);
@@ -275,7 +283,8 @@ static int llog_process_thread(void *arg)
        struct llog_log_hdr             *llh = loghandle->lgh_hdr;
        struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
        char                            *buf;
-       __u64                            cur_offset = LLOG_CHUNK_SIZE;
+       int                              chunk_size;
+       __u64                            cur_offset;
        __u64                            last_offset;
        int                              rc = 0, index = 1, last_index;
        int                              saved_index = 0;
@@ -283,35 +292,36 @@ static int llog_process_thread(void *arg)
 
        ENTRY;
 
-        LASSERT(llh);
+       if (llh == NULL)
+               RETURN(-EINVAL);
+
+       cur_offset = chunk_size = llh->llh_hdr.lrh_len;
 
-        OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
-        if (!buf) {
-                lpi->lpi_rc = -ENOMEM;
+       OBD_ALLOC_LARGE(buf, chunk_size);
+       if (buf == NULL) {
+               lpi->lpi_rc = -ENOMEM;
                RETURN(0);
-        }
+       }
 
-        if (cd != NULL) {
-                last_called_index = cd->lpcd_first_idx;
-                index = cd->lpcd_first_idx + 1;
-        }
-        if (cd != NULL && cd->lpcd_last_idx)
-                last_index = cd->lpcd_last_idx;
-        else
-                last_index = LLOG_BITMAP_BYTES * 8 - 1;
+       if (cd != NULL) {
+               last_called_index = cd->lpcd_first_idx;
+               index = cd->lpcd_first_idx + 1;
+       }
+       if (cd != NULL && cd->lpcd_last_idx)
+               last_index = cd->lpcd_last_idx;
+       else
+               last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
 
-       if (index > last_index) {
-               /* Record is not in this buffer. */
+       if (index > last_index) /* Record is not in this buffer. */
                GOTO(out, rc);
-       }
 
-        while (rc == 0) {
-                struct llog_rec_hdr *rec;
+       while (rc == 0) {
+               struct llog_rec_hdr *rec;
 
-                /* skip records not set in bitmap */
-                while (index <= last_index &&
-                       !ext2_test_bit(index, llh->llh_bitmap))
-                        ++index;
+               /* skip records not set in bitmap */
+               while (index <= last_index &&
+                      !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
+                       ++index;
 
                 LASSERT(index <= last_index + 1);
                 if (index == last_index + 1)
@@ -320,19 +330,19 @@ repeat:
                 CDEBUG(D_OTHER, "index: %d last_index %d\n",
                        index, last_index);
 
-                /* get the buf with our target record; avoid old garbage */
-                memset(buf, 0, LLOG_CHUNK_SIZE);
-                last_offset = cur_offset;
+               /* get the buf with our target record; avoid old garbage */
+               memset(buf, 0, chunk_size);
+               last_offset = cur_offset;
                rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
-                                    index, &cur_offset, buf, LLOG_CHUNK_SIZE);
-                if (rc)
-                        GOTO(out, rc);
+                                    index, &cur_offset, buf, chunk_size);
+               if (rc != 0)
+                       GOTO(out, rc);
 
                /* NB: when rec->lrh_len is accessed it is already swabbed
                 * since it is used at the "end" of the loop and the rec
                 * swabbing is done at the beginning of the loop. */
                for (rec = (struct llog_rec_hdr *)buf;
-                    (char *)rec < buf + LLOG_CHUNK_SIZE;
+                    (char *)rec < buf + chunk_size;
                     rec = llog_rec_hdr_next(rec)) {
 
                        CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
@@ -350,8 +360,7 @@ repeat:
                                        GOTO(repeat, rc = 0);
                                GOTO(out, rc = 0); /* no more records */
                        }
-                       if (rec->lrh_len == 0 ||
-                           rec->lrh_len > LLOG_CHUNK_SIZE) {
+                       if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
                                 CWARN("invalid length %d in llog record for "
                                       "index %d/%d\n", rec->lrh_len,
                                       rec->lrh_index, index);
@@ -364,17 +373,17 @@ repeat:
                                 continue;
                         }
 
-                        CDEBUG(D_OTHER,
-                               "lrh_index: %d lrh_len: %d (%d remains)\n",
-                               rec->lrh_index, rec->lrh_len,
-                               (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
+                       CDEBUG(D_OTHER,
+                              "lrh_index: %d lrh_len: %d (%d remains)\n",
+                              rec->lrh_index, rec->lrh_len,
+                              (int)(buf + chunk_size - (char *)rec));
 
                         loghandle->lgh_cur_idx = rec->lrh_index;
                         loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
                                                     last_offset;
 
-                        /* if set, process the callback on this record */
-                        if (ext2_test_bit(index, llh->llh_bitmap)) {
+                       /* if set, process the callback on this record */
+                       if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
                                rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
                                                 lpi->lpi_cbdata);
                                last_called_index = index;
@@ -409,14 +418,14 @@ out:
                 * remaining bits in the header */
                CERROR("Local llog found corrupted\n");
                while (index <= last_index) {
-                       if (ext2_test_bit(index, llh->llh_bitmap) != 0)
+                       if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh)) != 0)
                                llog_cancel_rec(lpi->lpi_env, loghandle, index);
                        index++;
                }
                rc = 0;
        }
 
-       OBD_FREE(buf, LLOG_CHUNK_SIZE);
+       OBD_FREE_LARGE(buf, chunk_size);
         lpi->lpi_rc = rc;
         return 0;
 }
@@ -507,36 +516,36 @@ int llog_reverse_process(const struct lu_env *env,
         struct llog_process_cat_data *cd = catdata;
         void *buf;
         int rc = 0, first_index = 1, index, idx;
+       __u32   chunk_size = llh->llh_hdr.lrh_len;
         ENTRY;
 
-        OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
-        if (!buf)
-                RETURN(-ENOMEM);
-
-        if (cd != NULL)
-                first_index = cd->lpcd_first_idx + 1;
-        if (cd != NULL && cd->lpcd_last_idx)
-                index = cd->lpcd_last_idx;
-        else
-                index = LLOG_BITMAP_BYTES * 8 - 1;
-
-        while (rc == 0) {
-                struct llog_rec_hdr *rec;
-                struct llog_rec_tail *tail;
-
-                /* skip records not set in bitmap */
-                while (index >= first_index &&
-                       !ext2_test_bit(index, llh->llh_bitmap))
-                        --index;
-
-                LASSERT(index >= first_index - 1);
-                if (index == first_index - 1)
-                        break;
+       OBD_ALLOC_LARGE(buf, chunk_size);
+       if (buf == NULL)
+               RETURN(-ENOMEM);
 
-                /* get the buf with our target record; avoid old garbage */
-                memset(buf, 0, LLOG_CHUNK_SIZE);
-               rc = llog_prev_block(env, loghandle, index, buf,
-                                    LLOG_CHUNK_SIZE);
+       if (cd != NULL)
+               first_index = cd->lpcd_first_idx + 1;
+       if (cd != NULL && cd->lpcd_last_idx)
+               index = cd->lpcd_last_idx;
+       else
+               index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
+
+       while (rc == 0) {
+               struct llog_rec_hdr *rec;
+               struct llog_rec_tail *tail;
+
+               /* skip records not set in bitmap */
+               while (index >= first_index &&
+                      !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
+                       --index;
+
+               LASSERT(index >= first_index - 1);
+               if (index == first_index - 1)
+                       break;
+
+               /* get the buf with our target record; avoid old garbage */
+               memset(buf, 0, chunk_size);
+               rc = llog_prev_block(env, loghandle, index, buf, chunk_size);
                if (rc)
                        GOTO(out, rc);
 
@@ -552,13 +561,13 @@ int llog_reverse_process(const struct lu_env *env,
                LASSERT(idx == index);
                tail = (void *)rec + rec->lrh_len - sizeof(*tail);
 
-                /* process records in buffer, starting where we found one */
-                while ((void *)tail > buf) {
+               /* process records in buffer, starting where we found one */
+               while ((void *)tail > buf) {
                        if (tail->lrt_index == 0)
                                GOTO(out, rc = 0); /* no more records */
 
-                        /* if set, process the callback on this record */
-                        if (ext2_test_bit(index, llh->llh_bitmap)) {
+                       /* if set, process the callback on this record */
+                       if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
                                rec = (void *)tail - tail->lrt_len +
                                      sizeof(*tail);
 
@@ -582,8 +591,8 @@ int llog_reverse_process(const struct lu_env *env,
         }
 
 out:
-        if (buf)
-                OBD_FREE(buf, LLOG_CHUNK_SIZE);
+       if (buf != NULL)
+               OBD_FREE_LARGE(buf, chunk_size);
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_reverse_process);
index bc5c126..9af09a7 100644 (file)
@@ -264,7 +264,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
                down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
                llh = loghandle->lgh_hdr;
                if (llh == NULL ||
-                   loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
+                   loghandle->lgh_last_idx < LLOG_HDR_BITMAP_SIZE(llh) - 1) {
                        up_read(&cathandle->lgh_lock);
                         RETURN(loghandle);
                 } else {
@@ -284,12 +284,12 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
                down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
                llh = loghandle->lgh_hdr;
                LASSERT(llh);
-                if (loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
+               if (loghandle->lgh_last_idx < LLOG_HDR_BITMAP_SIZE(llh) - 1) {
                        up_write(&cathandle->lgh_lock);
-                        RETURN(loghandle);
-                } else {
+                       RETURN(loghandle);
+               } else {
                        up_write(&loghandle->lgh_lock);
-                }
+               }
         }
 
        CDEBUG(D_INODE, "use next log\n");
@@ -316,7 +316,7 @@ int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle,
         int rc;
         ENTRY;
 
-        LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
+       LASSERT(rec->lrh_len <= cathandle->lgh_ctxt->loc_chunk_size);
        loghandle = llog_cat_current_log(cathandle, th);
        LASSERT(!IS_ERR(loghandle));
 
@@ -750,28 +750,29 @@ EXPORT_SYMBOL(llog_cat_reverse_process);
 
 static int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
 {
-        struct llog_log_hdr *llh = cathandle->lgh_hdr;
-        int i, bitmap_size, idx;
-        ENTRY;
+       struct llog_log_hdr *llh = cathandle->lgh_hdr;
+       int i, bitmap_size, idx;
+       ENTRY;
 
-        bitmap_size = LLOG_BITMAP_SIZE(llh);
-        if (llh->llh_cat_idx == (index - 1)) {
-                idx = llh->llh_cat_idx + 1;
-                llh->llh_cat_idx = idx;
-                if (idx == cathandle->lgh_last_idx)
-                        goto out;
-                for (i = (index + 1) % bitmap_size;
-                     i != cathandle->lgh_last_idx;
-                     i = (i + 1) % bitmap_size) {
-                        if (!ext2_test_bit(i, llh->llh_bitmap)) {
-                                idx = llh->llh_cat_idx + 1;
-                                llh->llh_cat_idx = idx;
-                        } else if (i == 0) {
-                                llh->llh_cat_idx = 0;
-                        } else {
-                                break;
-                        }
-                }
+       bitmap_size = LLOG_HDR_BITMAP_SIZE(llh);
+       if (llh->llh_cat_idx == (index - 1)) {
+               idx = llh->llh_cat_idx + 1;
+               llh->llh_cat_idx = idx;
+               if (idx == cathandle->lgh_last_idx)
+                       goto out;
+
+               for (i = (index + 1) % bitmap_size;
+                    i != cathandle->lgh_last_idx;
+                    i = (i + 1) % bitmap_size) {
+                       if (!ext2_test_bit(i, LLOG_HDR_BITMAP(llh))) {
+                               idx = llh->llh_cat_idx + 1;
+                               llh->llh_cat_idx = idx;
+                       } else if (i == 0) {
+                               llh->llh_cat_idx = 0;
+                       } else {
+                               break;
+                       }
+               }
 out:
                CDEBUG(D_RPCTRACE, "set catlog "DOSTID" first idx %u\n",
                       POSTID(&cathandle->lgh_id.lgl_oi), llh->llh_cat_idx);
index e0763da..2950864 100644 (file)
@@ -47,7 +47,6 @@ struct llog_process_info {
         int                 lpi_rc;
        struct completion       lpi_completion;
        const struct lu_env     *lpi_env;
-
 };
 
 struct llog_thread_info {
index 2fc816f..e80150f 100644 (file)
@@ -171,6 +171,7 @@ int llog_setup(const struct lu_env *env, struct obd_device *obd,
                ctxt->loc_exp = class_export_get(obd->obd_self_export);
 
        ctxt->loc_flags = LLOG_CTXT_FLAG_UNINITIALIZED;
+       ctxt->loc_chunk_size = LLOG_MIN_CHUNK_SIZE;
 
         rc = llog_group_set_ctxt(olg, ctxt, index);
         if (rc) {
index ad843e9..13bd82c 100644 (file)
@@ -116,7 +116,7 @@ static int llog_osd_create_new_object(const struct lu_env *env,
  * This function writes a padding record to the end of llog. That may
  * be needed if llog contains records of variable size, e.g. config logs
  * or changelogs.
- * The padding record just aligns llog to the LLOG_CHUNK_SIZE boundary if
+ * The padding record just aligns llog to the llog chunk_size boundary if
  * the current record doesn't fit in the remaining space.
  *
  * It allocates full length to avoid two separate writes for header and tail.
@@ -189,11 +189,10 @@ static int llog_osd_read_header(const struct lu_env *env,
        struct llog_thread_info *lgi;
        enum llog_flag           flags;
        int                      rc;
+       __u32                   max_size = handle->lgh_hdr_size;
 
        ENTRY;
 
-       LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
-
        o = handle->lgh_obj;
        LASSERT(o);
 
@@ -214,8 +213,7 @@ static int llog_osd_read_header(const struct lu_env *env,
 
        lgi->lgi_off = 0;
        lgi->lgi_buf.lb_buf = handle->lgh_hdr;
-       lgi->lgi_buf.lb_len = LLOG_CHUNK_SIZE;
-
+       lgi->lgi_buf.lb_len = max_size;
        rc = dt_record_read(env, o, &lgi->lgi_buf, &lgi->lgi_off);
        if (rc) {
                CERROR("%s: error reading log header from "DFID": rc = %d\n",
@@ -235,19 +233,20 @@ static int llog_osd_read_header(const struct lu_env *env,
                       PFID(lu_object_fid(&o->do_lu)),
                       llh_hdr->lrh_type, LLOG_HDR_MAGIC);
                RETURN(-EIO);
-       } else if (llh_hdr->lrh_len != LLOG_CHUNK_SIZE) {
+       } else if (llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE ||
+                  llh_hdr->lrh_len > max_size) {
                CERROR("%s: incorrectly sized log %s "DFID" header: "
-                      "%#x (expected %#x)\n"
+                      "%#x (expected at least %#x)\n"
                       "you may need to re-run lconf --write_conf.\n",
                       o->do_lu.lo_dev->ld_obd->obd_name,
                       handle->lgh_name ? handle->lgh_name : "",
                       PFID(lu_object_fid(&o->do_lu)),
-                      llh_hdr->lrh_len, LLOG_CHUNK_SIZE);
+                      llh_hdr->lrh_len, LLOG_MIN_CHUNK_SIZE);
                RETURN(-EIO);
        }
 
        handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
-       handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
+       handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
 
        RETURN(0);
 }
@@ -286,7 +285,7 @@ static int llog_osd_declare_write_rec(const struct lu_env *env,
        LASSERT(th);
        LASSERT(loghandle);
        LASSERT(rec);
-       LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
+       LASSERT(rec->lrh_len <= loghandle->lgh_ctxt->loc_chunk_size);
 
        o = loghandle->lgh_obj;
        LASSERT(o);
@@ -346,6 +345,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
        int                      index, rc;
        struct llog_rec_tail    *lrt;
        struct dt_object        *o;
+       __u32                   chunk_size;
        size_t                   left;
 
        ENTRY;
@@ -357,11 +357,12 @@ static int llog_osd_write_rec(const struct lu_env *env,
        LASSERT(o);
        LASSERT(th);
 
+       chunk_size = llh->llh_hdr.lrh_len;
        CDEBUG(D_OTHER, "new record %x to "DFID"\n",
               rec->lrh_type, PFID(lu_object_fid(&o->do_lu)));
 
-       /* record length should not bigger than LLOG_CHUNK_SIZE */
-       if (reclen > LLOG_CHUNK_SIZE)
+       /* record length should not bigger than  */
+       if (reclen > loghandle->lgh_hdr->llh_hdr.lrh_len)
                RETURN(-E2BIG);
 
        rc = dt_attr_get(env, o, &lgi->lgi_attr);
@@ -389,7 +390,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
                /* llog can be empty only when first record is being written */
                LASSERT(ergo(idx > 0, lgi->lgi_attr.la_size > 0));
 
-               if (!ext2_test_bit(idx, llh->llh_bitmap)) {
+               if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) {
                        CERROR("%s: modify unset record %u\n",
                               o->do_lu.lo_dev->ld_obd->obd_name, idx);
                        RETURN(-ENOENT);
@@ -404,7 +405,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
 
                if (idx == LLOG_HEADER_IDX) {
                        /* llog header update */
-                       LASSERT(reclen == sizeof(struct llog_log_hdr));
+                       LASSERT(reclen >= sizeof(struct llog_log_hdr));
                        LASSERT(rec == &llh->llh_hdr);
 
                        lgi->lgi_off = 0;
@@ -475,7 +476,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
         */
        LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
        lgi->lgi_off = lgi->lgi_attr.la_size;
-       left = LLOG_CHUNK_SIZE - (lgi->lgi_off & (LLOG_CHUNK_SIZE - 1));
+       left = chunk_size - (lgi->lgi_off & (chunk_size - 1));
        /* NOTE: padding is a record, but no bit is set */
        if (left != 0 && left != reclen &&
            left < (reclen + LLOG_MIN_REC_SIZE)) {
@@ -486,20 +487,20 @@ static int llog_osd_write_rec(const struct lu_env *env,
                loghandle->lgh_last_idx++; /* for pad rec */
        }
        /* if it's the last idx in log file, then return -ENOSPC */
-       if (loghandle->lgh_last_idx >= LLOG_BITMAP_SIZE(llh) - 1)
+       if (loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1)
                RETURN(-ENOSPC);
 
        /* increment the last_idx along with llh_tail index, they should
         * be equal for a llog lifetime */
        loghandle->lgh_last_idx++;
        index = loghandle->lgh_last_idx;
-       llh->llh_tail.lrt_index = index;
+       LLOG_HDR_TAIL(llh)->lrt_index = index;
        /**
         * NB: the caller should make sure only 1 process access
         * the lgh_last_idx, e.g. append should be exclusive.
         * Otherwise it might hit the assert.
         */
-       LASSERT(index < LLOG_BITMAP_SIZE(llh));
+       LASSERT(index < LLOG_HDR_BITMAP_SIZE(llh));
        rec->lrh_index = index;
        lrt = rec_tail(rec);
        lrt->lrt_len = rec->lrh_len;
@@ -508,7 +509,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
        /* the lgh_hdr_lock protects llog header data from concurrent
         * update/cancel, the llh_count and llh_bitmap are protected */
        down_write(&loghandle->lgh_hdr_lock);
-       if (ext2_set_bit(index, llh->llh_bitmap)) {
+       if (ext2_set_bit(index, LLOG_HDR_BITMAP(llh))) {
                CERROR("%s: index %u already set in log bitmap\n",
                       o->do_lu.lo_dev->ld_obd->obd_name, index);
                up_write(&loghandle->lgh_hdr_lock);
@@ -603,13 +604,13 @@ out_remote_unlock:
 out:
        /* cleanup llog for error case */
        down_write(&loghandle->lgh_hdr_lock);
-       ext2_clear_bit(index, llh->llh_bitmap);
+       ext2_clear_bit(index, LLOG_HDR_BITMAP(llh));
        llh->llh_count--;
        up_write(&loghandle->lgh_hdr_lock);
 
        /* restore llog last_idx */
        loghandle->lgh_last_idx--;
-       llh->llh_tail.lrt_index = loghandle->lgh_last_idx;
+       LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
 
        RETURN(rc);
 }
@@ -621,12 +622,13 @@ out:
  * actual records are larger than minimum size) we just skip
  * some more records.
  */
-static inline void llog_skip_over(__u64 *off, int curr, int goal)
+static inline void llog_skip_over(__u64 *off, int curr, int goal,
+                                 __u32 chunk_size)
 {
        if (goal <= curr)
                return;
        *off = (*off + (goal - curr - 1) * LLOG_MIN_REC_SIZE) &
-               ~(LLOG_CHUNK_SIZE - 1);
+               ~(chunk_size - 1);
 }
 
 /**
@@ -668,7 +670,7 @@ static void changelog_block_trim_ext(struct llog_rec_hdr *hdr,
  * \param[in,out] cur_offset   furtherst point read in the file
  * \param[in]     buf          pointer to data buffer to fill
  * \param[in]     len          required len to read, it is
- *                             LLOG_CHUNK_SIZE usually.
+ *                             usually llog chunk_size.
  *
  * \retval                     0 on successful buffer read
  * \retval                     negative value on error
@@ -682,13 +684,15 @@ static int llog_osd_next_block(const struct lu_env *env,
        struct dt_object        *o;
        struct dt_device        *dt;
        int                      rc;
+       __u32                   chunk_size;
 
        ENTRY;
 
        LASSERT(env);
        LASSERT(lgi);
 
-       if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
+       chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len;
+       if (len == 0 || len & (chunk_size - 1))
                RETURN(-EINVAL);
 
        CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
@@ -711,11 +715,11 @@ static int llog_osd_next_block(const struct lu_env *env,
                struct llog_rec_hdr     *rec, *last_rec;
                struct llog_rec_tail    *tail;
 
-               llog_skip_over(cur_offset, *cur_idx, next_idx);
+               llog_skip_over(cur_offset, *cur_idx, next_idx, chunk_size);
 
-               /* read up to next LLOG_CHUNK_SIZE block */
-               lgi->lgi_buf.lb_len = LLOG_CHUNK_SIZE -
-                                     (*cur_offset & (LLOG_CHUNK_SIZE - 1));
+               /* read up to next llog chunk_size block */
+               lgi->lgi_buf.lb_len = chunk_size -
+                                     (*cur_offset & (chunk_size - 1));
                lgi->lgi_buf.lb_buf = buf;
 
                rc = dt_read(env, o, &lgi->lgi_buf, cur_offset);
@@ -807,7 +811,7 @@ out:
  * \param[in] loghandle        llog handle of the current llog
  * \param[in] prev_idx target index to find
  * \param[in] buf      pointer to data buffer to fill
- * \param[in] len      required len to read, it is LLOG_CHUNK_SIZE usually.
+ * \param[in] len      required len to read, it is llog_chunk_size usually.
  *
  * \retval             0 on successful buffer read
  * \retval             negative value on error
@@ -820,11 +824,13 @@ static int llog_osd_prev_block(const struct lu_env *env,
        struct dt_object        *o;
        struct dt_device        *dt;
        loff_t                   cur_offset;
+       __u32                   chunk_size;
        int                      rc;
 
        ENTRY;
 
-       if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
+       chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len;
+       if (len == 0 || len & (chunk_size - 1))
                RETURN(-EINVAL);
 
        CDEBUG(D_OTHER, "looking for log index %u\n", prev_idx);
@@ -838,8 +844,8 @@ static int llog_osd_prev_block(const struct lu_env *env,
        dt = lu2dt_dev(o->do_lu.lo_dev);
        LASSERT(dt);
 
-       cur_offset = LLOG_CHUNK_SIZE;
-       llog_skip_over(&cur_offset, 0, prev_idx);
+       cur_offset = chunk_size;
+       llog_skip_over(&cur_offset, 0, prev_idx, chunk_size);
 
        rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)
index 366dc19..57483da 100644 (file)
@@ -252,23 +252,23 @@ void lustre_swab_llog_rec(struct llog_rec_hdr *rec)
                __swab64s(&lsr->lsr_valid);
                tail = &lsr->lsr_tail;
                break;
-        }
-        case OBD_CFG_REC:
-                /* these are swabbed as they are consumed */
-                break;
+       }
+       case OBD_CFG_REC:
+               /* these are swabbed as they are consumed */
+               break;
        case LLOG_HDR_MAGIC:
        {
-                struct llog_log_hdr *llh = (struct llog_log_hdr *)rec;
-
-                __swab64s(&llh->llh_timestamp);
-                __swab32s(&llh->llh_count);
-                __swab32s(&llh->llh_bitmap_offset);
-                __swab32s(&llh->llh_flags);
-                __swab32s(&llh->llh_size);
-                __swab32s(&llh->llh_cat_idx);
-               tail = &llh->llh_tail;
-                break;
-        }
+               struct llog_log_hdr *llh = (struct llog_log_hdr *)rec;
+
+               __swab64s(&llh->llh_timestamp);
+               __swab32s(&llh->llh_count);
+               __swab32s(&llh->llh_bitmap_offset);
+               __swab32s(&llh->llh_flags);
+               __swab32s(&llh->llh_size);
+               __swab32s(&llh->llh_cat_idx);
+               tail = LLOG_HDR_TAIL(llh);
+               break;
+       }
        case LLOG_LOGID_MAGIC:
        {
                struct llog_logid_rec *lid = (struct llog_logid_rec *)rec;
@@ -320,18 +320,20 @@ void lustre_swab_llog_rec(struct llog_rec_hdr *rec)
 
 static void print_llog_hdr(struct llog_log_hdr *h)
 {
-        CDEBUG(D_OTHER, "llog header: %p\n", h);
-        CDEBUG(D_OTHER, "\tllh_hdr.lrh_index: %#x\n", h->llh_hdr.lrh_index);
-        CDEBUG(D_OTHER, "\tllh_hdr.lrh_len: %#x\n", h->llh_hdr.lrh_len);
-        CDEBUG(D_OTHER, "\tllh_hdr.lrh_type: %#x\n", h->llh_hdr.lrh_type);
-        CDEBUG(D_OTHER, "\tllh_timestamp: "LPX64"\n", h->llh_timestamp);
-        CDEBUG(D_OTHER, "\tllh_count: %#x\n", h->llh_count);
-        CDEBUG(D_OTHER, "\tllh_bitmap_offset: %#x\n", h->llh_bitmap_offset);
-        CDEBUG(D_OTHER, "\tllh_flags: %#x\n", h->llh_flags);
-        CDEBUG(D_OTHER, "\tllh_size: %#x\n", h->llh_size);
-        CDEBUG(D_OTHER, "\tllh_cat_idx: %#x\n", h->llh_cat_idx);
-        CDEBUG(D_OTHER, "\tllh_tail.lrt_index: %#x\n", h->llh_tail.lrt_index);
-        CDEBUG(D_OTHER, "\tllh_tail.lrt_len: %#x\n", h->llh_tail.lrt_len);
+       CDEBUG(D_OTHER, "llog header: %p\n", h);
+       CDEBUG(D_OTHER, "\tllh_hdr.lrh_index: %#x\n", h->llh_hdr.lrh_index);
+       CDEBUG(D_OTHER, "\tllh_hdr.lrh_len: %#x\n", h->llh_hdr.lrh_len);
+       CDEBUG(D_OTHER, "\tllh_hdr.lrh_type: %#x\n", h->llh_hdr.lrh_type);
+       CDEBUG(D_OTHER, "\tllh_timestamp: "LPX64"\n", h->llh_timestamp);
+       CDEBUG(D_OTHER, "\tllh_count: %#x\n", h->llh_count);
+       CDEBUG(D_OTHER, "\tllh_bitmap_offset: %#x\n", h->llh_bitmap_offset);
+       CDEBUG(D_OTHER, "\tllh_flags: %#x\n", h->llh_flags);
+       CDEBUG(D_OTHER, "\tllh_size: %#x\n", h->llh_size);
+       CDEBUG(D_OTHER, "\tllh_cat_idx: %#x\n", h->llh_cat_idx);
+       CDEBUG(D_OTHER, "\tllh_tail.lrt_index: %#x\n",
+              LLOG_HDR_TAIL(h)->lrt_index);
+       CDEBUG(D_OTHER, "\tllh_tail.lrt_len: %#x\n",
+              LLOG_HDR_TAIL(h)->lrt_len);
 }
 
 void lustre_swab_llog_hdr (struct llog_log_hdr *h)
index 902bbaa..12b6383 100644 (file)
@@ -51,7 +51,7 @@
 /* This is slightly more than the number of records that can fit into a
  * single llog file, because the llog_log_header takes up some of the
  * space in the first block that cannot be used for the bitmap. */
-#define LLOG_TEST_RECNUM  (LLOG_CHUNK_SIZE * 8)
+#define LLOG_TEST_RECNUM  (LLOG_MIN_CHUNK_SIZE * 8)
 
 static int llog_test_rand;
 static struct obd_uuid uuid = { .uuid = "test_uuid" };
@@ -64,16 +64,16 @@ struct llog_mini_rec {
 
 static int verify_handle(char *test, struct llog_handle *llh, int num_recs)
 {
-        int i;
-        int last_idx = 0;
-        int active_recs = 0;
-
-        for (i = 0; i < LLOG_BITMAP_BYTES * 8; i++) {
-                if (ext2_test_bit(i, llh->lgh_hdr->llh_bitmap)) {
-                        last_idx = i;
-                        active_recs++;
-                }
-        }
+       int i;
+       int last_idx = 0;
+       int active_recs = 0;
+
+       for (i = 0; i < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr); i++) {
+               if (ext2_test_bit(i, LLOG_HDR_BITMAP(llh->lgh_hdr))) {
+                       last_idx = i;
+                       active_recs++;
+               }
+       }
 
         if (active_recs != num_recs) {
                 CERROR("%s: expected %d active recs after write, found %d\n",
@@ -258,7 +258,7 @@ static int llog_test_3(const struct lu_env *env, struct obd_device *obd,
 
        CWARN("3d: write records with variable size until BITMAP_SIZE, "
              "return -ENOSPC\n");
-       for (i = 0; i < LLOG_BITMAP_SIZE(llh->lgh_hdr) + 1; i++) {
+       for (i = 0; i < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) + 1; i++) {
                char                     buf[64];
                struct llog_rec_hdr     *hdr = (void *)&buf;
 
@@ -372,7 +372,7 @@ static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
                GOTO(out, rc);
 
        CWARN("4e: add 5 large records, one record per block\n");
-       buflen = LLOG_CHUNK_SIZE;
+       buflen = LLOG_MIN_CHUNK_SIZE;
        OBD_ALLOC(buf, buflen);
        if (buf == NULL)
                GOTO(out, rc = -ENOMEM);
@@ -719,7 +719,7 @@ static int llog_test_7_sub(const struct lu_env *env, struct llog_ctxt *ctxt)
                CERROR("7_sub: can't init llog handle: %d\n", rc);
                GOTO(out_close, rc);
        }
-       for (i = 0; i < LLOG_BITMAP_SIZE(llh->lgh_hdr); i++) {
+       for (i = 0; i < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr); i++) {
                rc = llog_write(env, llh, &llog_records.lrh, LLOG_NEXT_IDX);
                if (rc == -ENOSPC) {
                        break;
@@ -740,9 +740,9 @@ static int llog_test_7_sub(const struct lu_env *env, struct llog_ctxt *ctxt)
                CERROR("7_sub: verify handle failed: %d\n", rc);
                GOTO(out_close, rc);
        }
-       if (num_recs < LLOG_BITMAP_SIZE(llh->lgh_hdr) - 1)
+       if (num_recs < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1)
                CWARN("7_sub: records are not aligned, written %d from %u\n",
-                     num_recs, LLOG_BITMAP_SIZE(llh->lgh_hdr) - 1);
+                     num_recs, LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1);
 
        plain_counter = 0;
        rc = llog_process(env, llh, test_7_print_cb, "test 7", NULL);
index 49c69c1..9b650f5 100644 (file)
@@ -49,7 +49,7 @@
 #include <obd_class.h>
 #include <lustre_disk.h>
 #include <lustre_fid.h>
-#include <lustre/lustre_idl.h> /* LLOG_CHUNK_SIZE definition */
+#include <lustre/lustre_idl.h> /* LLOG_MIN_CHUNK_SIZE definition */
 
 #include "osd_internal.h"
 
@@ -183,9 +183,9 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
 
        /* XXX: we still miss for append declaration support in ZFS
         *      -1 means append which is used by llog mostly, llog
-        *      can grow upto LLOG_CHUNK_SIZE*8 records */
+        *      can grow upto LLOG_MIN_CHUNK_SIZE*8 records */
        if (pos == -1)
-               pos = max_t(loff_t, 256 * 8 * LLOG_CHUNK_SIZE,
+               pos = max_t(loff_t, 256 * 8 * LLOG_MIN_CHUNK_SIZE,
                            obj->oo_attr.la_size + (2 << 20));
        dmu_tx_hold_write(oh->ot_tx, oid, pos, buf->lb_len);
 
index b514727..e618714 100644 (file)
@@ -310,23 +310,31 @@ static int llog_client_read_header(const struct lu_env *env,
         if (hdr == NULL)
                 GOTO(out, rc =-EFAULT);
 
-        memcpy(handle->lgh_hdr, hdr, sizeof (*hdr));
-        handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
-
-        /* sanity checks */
-        llh_hdr = &handle->lgh_hdr->llh_hdr;
-        if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
-                CERROR("bad log header magic: %#x (expecting %#x)\n",
-                       llh_hdr->lrh_type, LLOG_HDR_MAGIC);
-                rc = -EIO;
-        } else if (llh_hdr->lrh_len != LLOG_CHUNK_SIZE) {
-                CERROR("incorrectly sized log header: %#x "
-                       "(expecting %#x)\n",
-                       llh_hdr->lrh_len, LLOG_CHUNK_SIZE);
-                CERROR("you may need to re-run lconf --write_conf.\n");
-                rc = -EIO;
-        }
-        EXIT;
+       if (handle->lgh_hdr_size < hdr->llh_hdr.lrh_len)
+               GOTO(out, rc = -EFAULT);
+
+       memcpy(handle->lgh_hdr, hdr, hdr->llh_hdr.lrh_len);
+       handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
+
+       /* sanity checks */
+       llh_hdr = &handle->lgh_hdr->llh_hdr;
+       if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
+               CERROR("bad log header magic: %#x (expecting %#x)\n",
+                      llh_hdr->lrh_type, LLOG_HDR_MAGIC);
+               rc = -EIO;
+       } else if (llh_hdr->lrh_len !=
+                  LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len ||
+                  (llh_hdr->lrh_len & (llh_hdr->lrh_len - 1)) != 0 ||
+                  llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE ||
+                  llh_hdr->lrh_len > handle->lgh_hdr_size) {
+               CERROR("incorrectly sized log header: %#x, "
+                      "expecting %#x (power of two > 8192)\n",
+                      llh_hdr->lrh_len,
+                      LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len);
+               CERROR("you may need to re-run lconf --write_conf.\n");
+               rc = -EIO;
+       }
+       EXIT;
 out:
         ptlrpc_req_finished(req);
 err_exit:
index b7a550d..bde9651 100644 (file)
@@ -171,7 +171,7 @@ int llog_origin_handle_next_block(struct ptlrpc_request *req)
                RETURN(err_serious(-EFAULT));
 
        req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
-                            LLOG_CHUNK_SIZE);
+                            LLOG_MIN_CHUNK_SIZE);
        rc = req_capsule_server_pack(&req->rq_pill);
        if (rc)
                RETURN(err_serious(-ENOMEM));
@@ -203,7 +203,8 @@ int llog_origin_handle_next_block(struct ptlrpc_request *req)
        ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
        rc = llog_next_block(req->rq_svc_thread->t_env, loghandle,
                             &repbody->lgd_saved_index, repbody->lgd_index,
-                            &repbody->lgd_cur_offset, ptr, LLOG_CHUNK_SIZE);
+                            &repbody->lgd_cur_offset, ptr,
+                            LLOG_MIN_CHUNK_SIZE);
        if (rc)
                GOTO(out_close, rc);
        EXIT;
@@ -231,7 +232,7 @@ int llog_origin_handle_prev_block(struct ptlrpc_request *req)
                RETURN(err_serious(-EFAULT));
 
        req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
-                            LLOG_CHUNK_SIZE);
+                            LLOG_MIN_CHUNK_SIZE);
        rc = req_capsule_server_pack(&req->rq_pill);
        if (rc)
                RETURN(err_serious(-ENOMEM));
@@ -262,7 +263,7 @@ int llog_origin_handle_prev_block(struct ptlrpc_request *req)
 
        ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
        rc = llog_prev_block(req->rq_svc_thread->t_env, loghandle,
-                            body->lgd_index, ptr, LLOG_CHUNK_SIZE);
+                            body->lgd_index, ptr, LLOG_MIN_CHUNK_SIZE);
        if (rc)
                GOTO(out_close, rc);
 
index a460e3d..698dacb 100644 (file)
@@ -895,7 +895,7 @@ int check_and_prepare_update_record(const struct lu_env *env,
        lur->lur_update_rec.ur_master_transno = 0;
        lur->lur_update_rec.ur_batchid = 0;
        lur->lur_update_rec.ur_flags = 0;
-       lur->lur_hdr.lrh_len = LLOG_CHUNK_SIZE;
+       lur->lur_hdr.lrh_len = LLOG_MIN_CHUNK_SIZE;
 
        tur->tur_update_param_count = 0;
 
index fed31ee..cf84329 100644 (file)
@@ -202,18 +202,18 @@ int llog_pack_buffer(int fd, struct llog_log_hdr **llog,
 
                cur_rec = (struct llog_rec_hdr *)ptr;
                idx = le32_to_cpu(cur_rec->lrh_index);
-                recs_pr[i] = cur_rec;
-
-                if (ext2_test_bit(idx, (*llog)->llh_bitmap)) {
-                        if (le32_to_cpu(cur_rec->lrh_type) != OBD_CFG_REC)
-                                printf("rec #%d type=%x len=%u\n", idx,
-                                       cur_rec->lrh_type, cur_rec->lrh_len);
-                } else {
-                        printf("Bit %d of %d not set\n", idx, recs_num);
-                        cur_rec->lrh_id = CANCELLED;
-                        /* The header counts only set records */
-                        i--;
-                }
+               recs_pr[i] = cur_rec;
+
+               if (ext2_test_bit(idx, LLOG_HDR_BITMAP(*llog))) {
+                       if (le32_to_cpu(cur_rec->lrh_type) != OBD_CFG_REC)
+                               printf("rec #%d type=%x len=%u\n", idx,
+                                      cur_rec->lrh_type, cur_rec->lrh_len);
+               } else {
+                       printf("Bit %d of %d not set\n", idx, recs_num);
+                       cur_rec->lrh_id = CANCELLED;
+                       /* The header counts only set records */
+                       i--;
+               }
 
                 ptr += le32_to_cpu(cur_rec->lrh_len);
                 if ((ptr - file_buf) > file_size) {
index 2e997c7..243693a 100644 (file)
@@ -1629,9 +1629,6 @@ check_llog_log_hdr(void)
        CHECK_MEMBER(llog_log_hdr, llh_flags);
        CHECK_MEMBER(llog_log_hdr, llh_cat_idx);
        CHECK_MEMBER(llog_log_hdr, llh_tgtuuid);
-       CHECK_MEMBER(llog_log_hdr, llh_reserved);
-       CHECK_MEMBER(llog_log_hdr, llh_bitmap);
-       CHECK_MEMBER(llog_log_hdr, llh_tail);
 }
 
 static void