struct llog_rec_tail lgr_tail;
};
-/* On-disk header structure of each log object, stored in little endian order */
-#define LLOG_CHUNK_SIZE 8192
-#define LLOG_HEADER_SIZE (96)
-#define LLOG_BITMAP_BYTES (LLOG_CHUNK_SIZE - LLOG_HEADER_SIZE)
-
-#define LLOG_MIN_REC_SIZE (24) /* round(llog_rec_hdr + llog_rec_tail) */
-
/* flags for the logs */
enum llog_flag {
LLOG_F_ZAP_WHEN_EMPTY = 0x1,
LLOG_F_EXT_MASK = LLOG_F_EXT_JOBID,
};
+/* On-disk header structure of each log object, stored in little endian order */
+#define LLOG_MIN_CHUNK_SIZE 8192
+#define LLOG_HEADER_SIZE (96) /* sizeof (llog_log_hdr) + sizeof(llh_tail)
+ * - sizeof(llh_bitmap) */
+#define LLOG_BITMAP_BYTES (LLOG_MIN_CHUNK_SIZE - LLOG_HEADER_SIZE)
+#define LLOG_MIN_REC_SIZE (24) /* round(llog_rec_hdr + llog_rec_tail) */
+
struct llog_log_hdr {
struct llog_rec_hdr llh_hdr;
__s64 llh_timestamp;
- __u32 llh_count;
- __u32 llh_bitmap_offset;
- __u32 llh_size;
- __u32 llh_flags;
- __u32 llh_cat_idx;
- /* for a catalog the first plain slot is next to it */
- struct obd_uuid llh_tgtuuid;
- __u32 llh_reserved[LLOG_HEADER_SIZE/sizeof(__u32) - 23];
- __u32 llh_bitmap[LLOG_BITMAP_BYTES/sizeof(__u32)];
- struct llog_rec_tail llh_tail;
+ __u32 llh_count;
+ __u32 llh_bitmap_offset;
+ __u32 llh_size;
+ __u32 llh_flags;
+ __u32 llh_cat_idx;
+ /* for a catalog the first plain slot is next to it */
+ struct obd_uuid llh_tgtuuid;
+ __u32 llh_reserved[LLOG_HEADER_SIZE/sizeof(__u32)-23];
+ /* These fields must always be at the end of the llog_log_hdr.
+ * Note: llh_bitmap size is variable because llog chunk size could be
+ * bigger than LLOG_MIN_CHUNK_SIZE, i.e. sizeof(llog_log_hdr) > 8192
+ * bytes, and the real size is stored in llh_hdr.lrh_len, which means
+ * llh_tail should only be refered by LLOG_HDR_TAIL().
+ * But this structure is also used by client/server llog interface
+ * (see llog_client.c), it will be kept in its original way to avoid
+ * compatiblity issue. */
+ __u32 llh_bitmap[LLOG_BITMAP_BYTES / sizeof(__u32)];
+ struct llog_rec_tail llh_tail;
} __attribute__((packed));
-
-#define LLOG_BITMAP_SIZE(llh) (__u32)((llh->llh_hdr.lrh_len - \
- llh->llh_bitmap_offset - \
- sizeof(llh->llh_tail)) * 8)
+#undef LLOG_HEADER_SIZE
+#undef LLOG_BITMAP_BYTES
+
+#define LLOG_HDR_BITMAP_SIZE(llh) (__u32)((llh->llh_hdr.lrh_len - \
+ llh->llh_bitmap_offset - \
+ sizeof(llh->llh_tail)) * 8)
+#define LLOG_HDR_BITMAP(llh) (__u32 *)((char *)(llh) + \
+ (llh)->llh_bitmap_offset)
+#define LLOG_HDR_TAIL(llh) ((struct llog_rec_tail *)((char *)llh + \
+ llh->llh_hdr.lrh_len - \
+ sizeof(llh->llh_tail)))
/** log cookies are used to reference a specific log file and a record therein */
struct llog_cookie {
struct rw_semaphore lgh_hdr_lock; /* protect lgh_hdr data */
struct llog_logid lgh_id; /* id of this log */
struct llog_log_hdr *lgh_hdr;
+ size_t lgh_hdr_size;
struct dt_object *lgh_obj;
int lgh_last_idx;
int lgh_cur_idx; /* used during llog_process */
struct dt_object *loc_dir;
struct local_oid_storage *loc_los_nameless;
struct local_oid_storage *loc_los_named;
+ /* llog chunk size, and llog record size can not be bigger than
+ * loc_chunk_size */
+ __u32 loc_chunk_size;
};
#define LLOG_PROC_BREAK 0x0001
LASSERT(loghandle != NULL);
/* failed llog_init_handle */
- if (!loghandle->lgh_hdr)
+ if (loghandle->lgh_hdr == NULL)
goto out;
if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
LASSERT(list_empty(&loghandle->u.phd.phd_entry));
else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
LASSERT(list_empty(&loghandle->u.chd.chd_head));
- LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
- OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
+ OBD_FREE_LARGE(loghandle->lgh_hdr, loghandle->lgh_hdr_size);
out:
OBD_FREE_PTR(loghandle);
}
}
down_write(&loghandle->lgh_hdr_lock);
- if (!ext2_clear_bit(index, llh->llh_bitmap)) {
+ if (!ext2_clear_bit(index, LLOG_HDR_BITMAP(llh))) {
up_write(&loghandle->lgh_hdr_lock);
CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
RETURN(-ENOENT);
if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
(llh->llh_count == 1) &&
- (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
+ (loghandle->lgh_last_idx == LLOG_HDR_BITMAP_SIZE(llh) - 1)) {
up_write(&loghandle->lgh_hdr_lock);
rc = llog_destroy(env, loghandle);
if (rc < 0) {
RETURN(0);
out_err:
down_write(&loghandle->lgh_hdr_lock);
- ext2_set_bit(index, llh->llh_bitmap);
+ ext2_set_bit(index, LLOG_HDR_BITMAP(llh));
llh->llh_count++;
up_write(&loghandle->lgh_hdr_lock);
return rc;
if (rc == LLOG_EEMPTY) {
struct llog_log_hdr *llh = handle->lgh_hdr;
+ /* lrh_len should be initialized in llog_init_handle */
handle->lgh_last_idx = 0; /* header is record with index 0 */
llh->llh_count = 1; /* for the header record */
llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
- llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
- llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
+ LASSERT(handle->lgh_ctxt->loc_chunk_size >=
+ LLOG_MIN_CHUNK_SIZE);
+ llh->llh_hdr.lrh_len = handle->lgh_ctxt->loc_chunk_size;
+ llh->llh_hdr.lrh_index = 0;
llh->llh_timestamp = cfs_time_current_sec();
if (uuid)
memcpy(&llh->llh_tgtuuid, uuid,
sizeof(llh->llh_tgtuuid));
llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
- ext2_set_bit(0, llh->llh_bitmap);
+ ext2_set_bit(0, LLOG_HDR_BITMAP(llh));
+ LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len;
+ LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index;
rc = 0;
}
return rc;
struct llog_log_hdr *llh;
enum llog_flag fmt = flags & LLOG_F_EXT_MASK;
int rc;
-
+ int chunk_size = handle->lgh_ctxt->loc_chunk_size;
ENTRY;
+
LASSERT(handle->lgh_hdr == NULL);
- OBD_ALLOC_PTR(llh);
+ LASSERT(chunk_size >= LLOG_MIN_CHUNK_SIZE);
+ OBD_ALLOC_LARGE(llh, chunk_size);
if (llh == NULL)
RETURN(-ENOMEM);
+
handle->lgh_hdr = llh;
+ handle->lgh_hdr_size = chunk_size;
/* first assign flags to use llog_client_ops */
llh->llh_flags = flags;
rc = llog_read_header(env, handle, uuid);
llh->llh_flags |= fmt;
out:
if (rc) {
- OBD_FREE_PTR(llh);
+ OBD_FREE_LARGE(llh, chunk_size);
handle->lgh_hdr = NULL;
}
RETURN(rc);
struct llog_log_hdr *llh = loghandle->lgh_hdr;
struct llog_process_cat_data *cd = lpi->lpi_catdata;
char *buf;
- __u64 cur_offset = LLOG_CHUNK_SIZE;
+ int chunk_size;
+ __u64 cur_offset;
__u64 last_offset;
int rc = 0, index = 1, last_index;
int saved_index = 0;
ENTRY;
- LASSERT(llh);
+ if (llh == NULL)
+ RETURN(-EINVAL);
+
+ cur_offset = chunk_size = llh->llh_hdr.lrh_len;
- OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
- if (!buf) {
- lpi->lpi_rc = -ENOMEM;
+ OBD_ALLOC_LARGE(buf, chunk_size);
+ if (buf == NULL) {
+ lpi->lpi_rc = -ENOMEM;
RETURN(0);
- }
+ }
- if (cd != NULL) {
- last_called_index = cd->lpcd_first_idx;
- index = cd->lpcd_first_idx + 1;
- }
- if (cd != NULL && cd->lpcd_last_idx)
- last_index = cd->lpcd_last_idx;
- else
- last_index = LLOG_BITMAP_BYTES * 8 - 1;
+ if (cd != NULL) {
+ last_called_index = cd->lpcd_first_idx;
+ index = cd->lpcd_first_idx + 1;
+ }
+ if (cd != NULL && cd->lpcd_last_idx)
+ last_index = cd->lpcd_last_idx;
+ else
+ last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
- if (index > last_index) {
- /* Record is not in this buffer. */
+ if (index > last_index) /* Record is not in this buffer. */
GOTO(out, rc);
- }
- while (rc == 0) {
- struct llog_rec_hdr *rec;
+ while (rc == 0) {
+ struct llog_rec_hdr *rec;
- /* skip records not set in bitmap */
- while (index <= last_index &&
- !ext2_test_bit(index, llh->llh_bitmap))
- ++index;
+ /* skip records not set in bitmap */
+ while (index <= last_index &&
+ !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
+ ++index;
LASSERT(index <= last_index + 1);
if (index == last_index + 1)
CDEBUG(D_OTHER, "index: %d last_index %d\n",
index, last_index);
- /* get the buf with our target record; avoid old garbage */
- memset(buf, 0, LLOG_CHUNK_SIZE);
- last_offset = cur_offset;
+ /* get the buf with our target record; avoid old garbage */
+ memset(buf, 0, chunk_size);
+ last_offset = cur_offset;
rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
- index, &cur_offset, buf, LLOG_CHUNK_SIZE);
- if (rc)
- GOTO(out, rc);
+ index, &cur_offset, buf, chunk_size);
+ if (rc != 0)
+ GOTO(out, rc);
/* NB: when rec->lrh_len is accessed it is already swabbed
* since it is used at the "end" of the loop and the rec
* swabbing is done at the beginning of the loop. */
for (rec = (struct llog_rec_hdr *)buf;
- (char *)rec < buf + LLOG_CHUNK_SIZE;
+ (char *)rec < buf + chunk_size;
rec = llog_rec_hdr_next(rec)) {
CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
GOTO(repeat, rc = 0);
GOTO(out, rc = 0); /* no more records */
}
- if (rec->lrh_len == 0 ||
- rec->lrh_len > LLOG_CHUNK_SIZE) {
+ if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
CWARN("invalid length %d in llog record for "
"index %d/%d\n", rec->lrh_len,
rec->lrh_index, index);
continue;
}
- CDEBUG(D_OTHER,
- "lrh_index: %d lrh_len: %d (%d remains)\n",
- rec->lrh_index, rec->lrh_len,
- (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
+ CDEBUG(D_OTHER,
+ "lrh_index: %d lrh_len: %d (%d remains)\n",
+ rec->lrh_index, rec->lrh_len,
+ (int)(buf + chunk_size - (char *)rec));
loghandle->lgh_cur_idx = rec->lrh_index;
loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
last_offset;
- /* if set, process the callback on this record */
- if (ext2_test_bit(index, llh->llh_bitmap)) {
+ /* if set, process the callback on this record */
+ if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
lpi->lpi_cbdata);
last_called_index = index;
* remaining bits in the header */
CERROR("Local llog found corrupted\n");
while (index <= last_index) {
- if (ext2_test_bit(index, llh->llh_bitmap) != 0)
+ if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh)) != 0)
llog_cancel_rec(lpi->lpi_env, loghandle, index);
index++;
}
rc = 0;
}
- OBD_FREE(buf, LLOG_CHUNK_SIZE);
+ OBD_FREE_LARGE(buf, chunk_size);
lpi->lpi_rc = rc;
return 0;
}
struct llog_process_cat_data *cd = catdata;
void *buf;
int rc = 0, first_index = 1, index, idx;
+ __u32 chunk_size = llh->llh_hdr.lrh_len;
ENTRY;
- OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
- if (!buf)
- RETURN(-ENOMEM);
-
- if (cd != NULL)
- first_index = cd->lpcd_first_idx + 1;
- if (cd != NULL && cd->lpcd_last_idx)
- index = cd->lpcd_last_idx;
- else
- index = LLOG_BITMAP_BYTES * 8 - 1;
-
- while (rc == 0) {
- struct llog_rec_hdr *rec;
- struct llog_rec_tail *tail;
-
- /* skip records not set in bitmap */
- while (index >= first_index &&
- !ext2_test_bit(index, llh->llh_bitmap))
- --index;
-
- LASSERT(index >= first_index - 1);
- if (index == first_index - 1)
- break;
+ OBD_ALLOC_LARGE(buf, chunk_size);
+ if (buf == NULL)
+ RETURN(-ENOMEM);
- /* get the buf with our target record; avoid old garbage */
- memset(buf, 0, LLOG_CHUNK_SIZE);
- rc = llog_prev_block(env, loghandle, index, buf,
- LLOG_CHUNK_SIZE);
+ if (cd != NULL)
+ first_index = cd->lpcd_first_idx + 1;
+ if (cd != NULL && cd->lpcd_last_idx)
+ index = cd->lpcd_last_idx;
+ else
+ index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
+
+ while (rc == 0) {
+ struct llog_rec_hdr *rec;
+ struct llog_rec_tail *tail;
+
+ /* skip records not set in bitmap */
+ while (index >= first_index &&
+ !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
+ --index;
+
+ LASSERT(index >= first_index - 1);
+ if (index == first_index - 1)
+ break;
+
+ /* get the buf with our target record; avoid old garbage */
+ memset(buf, 0, chunk_size);
+ rc = llog_prev_block(env, loghandle, index, buf, chunk_size);
if (rc)
GOTO(out, rc);
LASSERT(idx == index);
tail = (void *)rec + rec->lrh_len - sizeof(*tail);
- /* process records in buffer, starting where we found one */
- while ((void *)tail > buf) {
+ /* process records in buffer, starting where we found one */
+ while ((void *)tail > buf) {
if (tail->lrt_index == 0)
GOTO(out, rc = 0); /* no more records */
- /* if set, process the callback on this record */
- if (ext2_test_bit(index, llh->llh_bitmap)) {
+ /* if set, process the callback on this record */
+ if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
rec = (void *)tail - tail->lrt_len +
sizeof(*tail);
}
out:
- if (buf)
- OBD_FREE(buf, LLOG_CHUNK_SIZE);
+ if (buf != NULL)
+ OBD_FREE_LARGE(buf, chunk_size);
RETURN(rc);
}
EXPORT_SYMBOL(llog_reverse_process);
down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
llh = loghandle->lgh_hdr;
if (llh == NULL ||
- loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
+ loghandle->lgh_last_idx < LLOG_HDR_BITMAP_SIZE(llh) - 1) {
up_read(&cathandle->lgh_lock);
RETURN(loghandle);
} else {
down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
llh = loghandle->lgh_hdr;
LASSERT(llh);
- if (loghandle->lgh_last_idx < LLOG_BITMAP_SIZE(llh) - 1) {
+ if (loghandle->lgh_last_idx < LLOG_HDR_BITMAP_SIZE(llh) - 1) {
up_write(&cathandle->lgh_lock);
- RETURN(loghandle);
- } else {
+ RETURN(loghandle);
+ } else {
up_write(&loghandle->lgh_lock);
- }
+ }
}
CDEBUG(D_INODE, "use next log\n");
int rc;
ENTRY;
- LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
+ LASSERT(rec->lrh_len <= cathandle->lgh_ctxt->loc_chunk_size);
loghandle = llog_cat_current_log(cathandle, th);
LASSERT(!IS_ERR(loghandle));
static int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
{
- struct llog_log_hdr *llh = cathandle->lgh_hdr;
- int i, bitmap_size, idx;
- ENTRY;
+ struct llog_log_hdr *llh = cathandle->lgh_hdr;
+ int i, bitmap_size, idx;
+ ENTRY;
- bitmap_size = LLOG_BITMAP_SIZE(llh);
- if (llh->llh_cat_idx == (index - 1)) {
- idx = llh->llh_cat_idx + 1;
- llh->llh_cat_idx = idx;
- if (idx == cathandle->lgh_last_idx)
- goto out;
- for (i = (index + 1) % bitmap_size;
- i != cathandle->lgh_last_idx;
- i = (i + 1) % bitmap_size) {
- if (!ext2_test_bit(i, llh->llh_bitmap)) {
- idx = llh->llh_cat_idx + 1;
- llh->llh_cat_idx = idx;
- } else if (i == 0) {
- llh->llh_cat_idx = 0;
- } else {
- break;
- }
- }
+ bitmap_size = LLOG_HDR_BITMAP_SIZE(llh);
+ if (llh->llh_cat_idx == (index - 1)) {
+ idx = llh->llh_cat_idx + 1;
+ llh->llh_cat_idx = idx;
+ if (idx == cathandle->lgh_last_idx)
+ goto out;
+
+ for (i = (index + 1) % bitmap_size;
+ i != cathandle->lgh_last_idx;
+ i = (i + 1) % bitmap_size) {
+ if (!ext2_test_bit(i, LLOG_HDR_BITMAP(llh))) {
+ idx = llh->llh_cat_idx + 1;
+ llh->llh_cat_idx = idx;
+ } else if (i == 0) {
+ llh->llh_cat_idx = 0;
+ } else {
+ break;
+ }
+ }
out:
CDEBUG(D_RPCTRACE, "set catlog "DOSTID" first idx %u\n",
POSTID(&cathandle->lgh_id.lgl_oi), llh->llh_cat_idx);
int lpi_rc;
struct completion lpi_completion;
const struct lu_env *lpi_env;
-
};
struct llog_thread_info {
ctxt->loc_exp = class_export_get(obd->obd_self_export);
ctxt->loc_flags = LLOG_CTXT_FLAG_UNINITIALIZED;
+ ctxt->loc_chunk_size = LLOG_MIN_CHUNK_SIZE;
rc = llog_group_set_ctxt(olg, ctxt, index);
if (rc) {
* This function writes a padding record to the end of llog. That may
* be needed if llog contains records of variable size, e.g. config logs
* or changelogs.
- * The padding record just aligns llog to the LLOG_CHUNK_SIZE boundary if
+ * The padding record just aligns llog to the llog chunk_size boundary if
* the current record doesn't fit in the remaining space.
*
* It allocates full length to avoid two separate writes for header and tail.
struct llog_thread_info *lgi;
enum llog_flag flags;
int rc;
+ __u32 max_size = handle->lgh_hdr_size;
ENTRY;
- LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
-
o = handle->lgh_obj;
LASSERT(o);
lgi->lgi_off = 0;
lgi->lgi_buf.lb_buf = handle->lgh_hdr;
- lgi->lgi_buf.lb_len = LLOG_CHUNK_SIZE;
-
+ lgi->lgi_buf.lb_len = max_size;
rc = dt_record_read(env, o, &lgi->lgi_buf, &lgi->lgi_off);
if (rc) {
CERROR("%s: error reading log header from "DFID": rc = %d\n",
PFID(lu_object_fid(&o->do_lu)),
llh_hdr->lrh_type, LLOG_HDR_MAGIC);
RETURN(-EIO);
- } else if (llh_hdr->lrh_len != LLOG_CHUNK_SIZE) {
+ } else if (llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE ||
+ llh_hdr->lrh_len > max_size) {
CERROR("%s: incorrectly sized log %s "DFID" header: "
- "%#x (expected %#x)\n"
+ "%#x (expected at least %#x)\n"
"you may need to re-run lconf --write_conf.\n",
o->do_lu.lo_dev->ld_obd->obd_name,
handle->lgh_name ? handle->lgh_name : "",
PFID(lu_object_fid(&o->do_lu)),
- llh_hdr->lrh_len, LLOG_CHUNK_SIZE);
+ llh_hdr->lrh_len, LLOG_MIN_CHUNK_SIZE);
RETURN(-EIO);
}
handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
- handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
+ handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
RETURN(0);
}
LASSERT(th);
LASSERT(loghandle);
LASSERT(rec);
- LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
+ LASSERT(rec->lrh_len <= loghandle->lgh_ctxt->loc_chunk_size);
o = loghandle->lgh_obj;
LASSERT(o);
int index, rc;
struct llog_rec_tail *lrt;
struct dt_object *o;
+ __u32 chunk_size;
size_t left;
ENTRY;
LASSERT(o);
LASSERT(th);
+ chunk_size = llh->llh_hdr.lrh_len;
CDEBUG(D_OTHER, "new record %x to "DFID"\n",
rec->lrh_type, PFID(lu_object_fid(&o->do_lu)));
- /* record length should not bigger than LLOG_CHUNK_SIZE */
- if (reclen > LLOG_CHUNK_SIZE)
+ /* record length should not bigger than */
+ if (reclen > loghandle->lgh_hdr->llh_hdr.lrh_len)
RETURN(-E2BIG);
rc = dt_attr_get(env, o, &lgi->lgi_attr);
/* llog can be empty only when first record is being written */
LASSERT(ergo(idx > 0, lgi->lgi_attr.la_size > 0));
- if (!ext2_test_bit(idx, llh->llh_bitmap)) {
+ if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) {
CERROR("%s: modify unset record %u\n",
o->do_lu.lo_dev->ld_obd->obd_name, idx);
RETURN(-ENOENT);
if (idx == LLOG_HEADER_IDX) {
/* llog header update */
- LASSERT(reclen == sizeof(struct llog_log_hdr));
+ LASSERT(reclen >= sizeof(struct llog_log_hdr));
LASSERT(rec == &llh->llh_hdr);
lgi->lgi_off = 0;
*/
LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
lgi->lgi_off = lgi->lgi_attr.la_size;
- left = LLOG_CHUNK_SIZE - (lgi->lgi_off & (LLOG_CHUNK_SIZE - 1));
+ left = chunk_size - (lgi->lgi_off & (chunk_size - 1));
/* NOTE: padding is a record, but no bit is set */
if (left != 0 && left != reclen &&
left < (reclen + LLOG_MIN_REC_SIZE)) {
loghandle->lgh_last_idx++; /* for pad rec */
}
/* if it's the last idx in log file, then return -ENOSPC */
- if (loghandle->lgh_last_idx >= LLOG_BITMAP_SIZE(llh) - 1)
+ if (loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1)
RETURN(-ENOSPC);
/* increment the last_idx along with llh_tail index, they should
* be equal for a llog lifetime */
loghandle->lgh_last_idx++;
index = loghandle->lgh_last_idx;
- llh->llh_tail.lrt_index = index;
+ LLOG_HDR_TAIL(llh)->lrt_index = index;
/**
* NB: the caller should make sure only 1 process access
* the lgh_last_idx, e.g. append should be exclusive.
* Otherwise it might hit the assert.
*/
- LASSERT(index < LLOG_BITMAP_SIZE(llh));
+ LASSERT(index < LLOG_HDR_BITMAP_SIZE(llh));
rec->lrh_index = index;
lrt = rec_tail(rec);
lrt->lrt_len = rec->lrh_len;
/* the lgh_hdr_lock protects llog header data from concurrent
* update/cancel, the llh_count and llh_bitmap are protected */
down_write(&loghandle->lgh_hdr_lock);
- if (ext2_set_bit(index, llh->llh_bitmap)) {
+ if (ext2_set_bit(index, LLOG_HDR_BITMAP(llh))) {
CERROR("%s: index %u already set in log bitmap\n",
o->do_lu.lo_dev->ld_obd->obd_name, index);
up_write(&loghandle->lgh_hdr_lock);
out:
/* cleanup llog for error case */
down_write(&loghandle->lgh_hdr_lock);
- ext2_clear_bit(index, llh->llh_bitmap);
+ ext2_clear_bit(index, LLOG_HDR_BITMAP(llh));
llh->llh_count--;
up_write(&loghandle->lgh_hdr_lock);
/* restore llog last_idx */
loghandle->lgh_last_idx--;
- llh->llh_tail.lrt_index = loghandle->lgh_last_idx;
+ LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
RETURN(rc);
}
* actual records are larger than minimum size) we just skip
* some more records.
*/
-static inline void llog_skip_over(__u64 *off, int curr, int goal)
+static inline void llog_skip_over(__u64 *off, int curr, int goal,
+ __u32 chunk_size)
{
if (goal <= curr)
return;
*off = (*off + (goal - curr - 1) * LLOG_MIN_REC_SIZE) &
- ~(LLOG_CHUNK_SIZE - 1);
+ ~(chunk_size - 1);
}
/**
* \param[in,out] cur_offset furtherst point read in the file
* \param[in] buf pointer to data buffer to fill
* \param[in] len required len to read, it is
- * LLOG_CHUNK_SIZE usually.
+ * usually llog chunk_size.
*
* \retval 0 on successful buffer read
* \retval negative value on error
struct dt_object *o;
struct dt_device *dt;
int rc;
+ __u32 chunk_size;
ENTRY;
LASSERT(env);
LASSERT(lgi);
- if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
+ chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len;
+ if (len == 0 || len & (chunk_size - 1))
RETURN(-EINVAL);
CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
struct llog_rec_hdr *rec, *last_rec;
struct llog_rec_tail *tail;
- llog_skip_over(cur_offset, *cur_idx, next_idx);
+ llog_skip_over(cur_offset, *cur_idx, next_idx, chunk_size);
- /* read up to next LLOG_CHUNK_SIZE block */
- lgi->lgi_buf.lb_len = LLOG_CHUNK_SIZE -
- (*cur_offset & (LLOG_CHUNK_SIZE - 1));
+ /* read up to next llog chunk_size block */
+ lgi->lgi_buf.lb_len = chunk_size -
+ (*cur_offset & (chunk_size - 1));
lgi->lgi_buf.lb_buf = buf;
rc = dt_read(env, o, &lgi->lgi_buf, cur_offset);
* \param[in] loghandle llog handle of the current llog
* \param[in] prev_idx target index to find
* \param[in] buf pointer to data buffer to fill
- * \param[in] len required len to read, it is LLOG_CHUNK_SIZE usually.
+ * \param[in] len required len to read, it is llog_chunk_size usually.
*
* \retval 0 on successful buffer read
* \retval negative value on error
struct dt_object *o;
struct dt_device *dt;
loff_t cur_offset;
+ __u32 chunk_size;
int rc;
ENTRY;
- if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
+ chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len;
+ if (len == 0 || len & (chunk_size - 1))
RETURN(-EINVAL);
CDEBUG(D_OTHER, "looking for log index %u\n", prev_idx);
dt = lu2dt_dev(o->do_lu.lo_dev);
LASSERT(dt);
- cur_offset = LLOG_CHUNK_SIZE;
- llog_skip_over(&cur_offset, 0, prev_idx);
+ cur_offset = chunk_size;
+ llog_skip_over(&cur_offset, 0, prev_idx, chunk_size);
rc = dt_attr_get(env, o, &lgi->lgi_attr);
if (rc)
__swab64s(&lsr->lsr_valid);
tail = &lsr->lsr_tail;
break;
- }
- case OBD_CFG_REC:
- /* these are swabbed as they are consumed */
- break;
+ }
+ case OBD_CFG_REC:
+ /* these are swabbed as they are consumed */
+ break;
case LLOG_HDR_MAGIC:
{
- struct llog_log_hdr *llh = (struct llog_log_hdr *)rec;
-
- __swab64s(&llh->llh_timestamp);
- __swab32s(&llh->llh_count);
- __swab32s(&llh->llh_bitmap_offset);
- __swab32s(&llh->llh_flags);
- __swab32s(&llh->llh_size);
- __swab32s(&llh->llh_cat_idx);
- tail = &llh->llh_tail;
- break;
- }
+ struct llog_log_hdr *llh = (struct llog_log_hdr *)rec;
+
+ __swab64s(&llh->llh_timestamp);
+ __swab32s(&llh->llh_count);
+ __swab32s(&llh->llh_bitmap_offset);
+ __swab32s(&llh->llh_flags);
+ __swab32s(&llh->llh_size);
+ __swab32s(&llh->llh_cat_idx);
+ tail = LLOG_HDR_TAIL(llh);
+ break;
+ }
case LLOG_LOGID_MAGIC:
{
struct llog_logid_rec *lid = (struct llog_logid_rec *)rec;
static void print_llog_hdr(struct llog_log_hdr *h)
{
- CDEBUG(D_OTHER, "llog header: %p\n", h);
- CDEBUG(D_OTHER, "\tllh_hdr.lrh_index: %#x\n", h->llh_hdr.lrh_index);
- CDEBUG(D_OTHER, "\tllh_hdr.lrh_len: %#x\n", h->llh_hdr.lrh_len);
- CDEBUG(D_OTHER, "\tllh_hdr.lrh_type: %#x\n", h->llh_hdr.lrh_type);
- CDEBUG(D_OTHER, "\tllh_timestamp: "LPX64"\n", h->llh_timestamp);
- CDEBUG(D_OTHER, "\tllh_count: %#x\n", h->llh_count);
- CDEBUG(D_OTHER, "\tllh_bitmap_offset: %#x\n", h->llh_bitmap_offset);
- CDEBUG(D_OTHER, "\tllh_flags: %#x\n", h->llh_flags);
- CDEBUG(D_OTHER, "\tllh_size: %#x\n", h->llh_size);
- CDEBUG(D_OTHER, "\tllh_cat_idx: %#x\n", h->llh_cat_idx);
- CDEBUG(D_OTHER, "\tllh_tail.lrt_index: %#x\n", h->llh_tail.lrt_index);
- CDEBUG(D_OTHER, "\tllh_tail.lrt_len: %#x\n", h->llh_tail.lrt_len);
+ CDEBUG(D_OTHER, "llog header: %p\n", h);
+ CDEBUG(D_OTHER, "\tllh_hdr.lrh_index: %#x\n", h->llh_hdr.lrh_index);
+ CDEBUG(D_OTHER, "\tllh_hdr.lrh_len: %#x\n", h->llh_hdr.lrh_len);
+ CDEBUG(D_OTHER, "\tllh_hdr.lrh_type: %#x\n", h->llh_hdr.lrh_type);
+ CDEBUG(D_OTHER, "\tllh_timestamp: "LPX64"\n", h->llh_timestamp);
+ CDEBUG(D_OTHER, "\tllh_count: %#x\n", h->llh_count);
+ CDEBUG(D_OTHER, "\tllh_bitmap_offset: %#x\n", h->llh_bitmap_offset);
+ CDEBUG(D_OTHER, "\tllh_flags: %#x\n", h->llh_flags);
+ CDEBUG(D_OTHER, "\tllh_size: %#x\n", h->llh_size);
+ CDEBUG(D_OTHER, "\tllh_cat_idx: %#x\n", h->llh_cat_idx);
+ CDEBUG(D_OTHER, "\tllh_tail.lrt_index: %#x\n",
+ LLOG_HDR_TAIL(h)->lrt_index);
+ CDEBUG(D_OTHER, "\tllh_tail.lrt_len: %#x\n",
+ LLOG_HDR_TAIL(h)->lrt_len);
}
void lustre_swab_llog_hdr (struct llog_log_hdr *h)
/* This is slightly more than the number of records that can fit into a
* single llog file, because the llog_log_header takes up some of the
* space in the first block that cannot be used for the bitmap. */
-#define LLOG_TEST_RECNUM (LLOG_CHUNK_SIZE * 8)
+#define LLOG_TEST_RECNUM (LLOG_MIN_CHUNK_SIZE * 8)
static int llog_test_rand;
static struct obd_uuid uuid = { .uuid = "test_uuid" };
static int verify_handle(char *test, struct llog_handle *llh, int num_recs)
{
- int i;
- int last_idx = 0;
- int active_recs = 0;
-
- for (i = 0; i < LLOG_BITMAP_BYTES * 8; i++) {
- if (ext2_test_bit(i, llh->lgh_hdr->llh_bitmap)) {
- last_idx = i;
- active_recs++;
- }
- }
+ int i;
+ int last_idx = 0;
+ int active_recs = 0;
+
+ for (i = 0; i < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr); i++) {
+ if (ext2_test_bit(i, LLOG_HDR_BITMAP(llh->lgh_hdr))) {
+ last_idx = i;
+ active_recs++;
+ }
+ }
if (active_recs != num_recs) {
CERROR("%s: expected %d active recs after write, found %d\n",
CWARN("3d: write records with variable size until BITMAP_SIZE, "
"return -ENOSPC\n");
- for (i = 0; i < LLOG_BITMAP_SIZE(llh->lgh_hdr) + 1; i++) {
+ for (i = 0; i < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) + 1; i++) {
char buf[64];
struct llog_rec_hdr *hdr = (void *)&buf;
GOTO(out, rc);
CWARN("4e: add 5 large records, one record per block\n");
- buflen = LLOG_CHUNK_SIZE;
+ buflen = LLOG_MIN_CHUNK_SIZE;
OBD_ALLOC(buf, buflen);
if (buf == NULL)
GOTO(out, rc = -ENOMEM);
CERROR("7_sub: can't init llog handle: %d\n", rc);
GOTO(out_close, rc);
}
- for (i = 0; i < LLOG_BITMAP_SIZE(llh->lgh_hdr); i++) {
+ for (i = 0; i < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr); i++) {
rc = llog_write(env, llh, &llog_records.lrh, LLOG_NEXT_IDX);
if (rc == -ENOSPC) {
break;
CERROR("7_sub: verify handle failed: %d\n", rc);
GOTO(out_close, rc);
}
- if (num_recs < LLOG_BITMAP_SIZE(llh->lgh_hdr) - 1)
+ if (num_recs < LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1)
CWARN("7_sub: records are not aligned, written %d from %u\n",
- num_recs, LLOG_BITMAP_SIZE(llh->lgh_hdr) - 1);
+ num_recs, LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1);
plain_counter = 0;
rc = llog_process(env, llh, test_7_print_cb, "test 7", NULL);
#include <obd_class.h>
#include <lustre_disk.h>
#include <lustre_fid.h>
-#include <lustre/lustre_idl.h> /* LLOG_CHUNK_SIZE definition */
+#include <lustre/lustre_idl.h> /* LLOG_MIN_CHUNK_SIZE definition */
#include "osd_internal.h"
/* XXX: we still miss for append declaration support in ZFS
* -1 means append which is used by llog mostly, llog
- * can grow upto LLOG_CHUNK_SIZE*8 records */
+ * can grow upto LLOG_MIN_CHUNK_SIZE*8 records */
if (pos == -1)
- pos = max_t(loff_t, 256 * 8 * LLOG_CHUNK_SIZE,
+ pos = max_t(loff_t, 256 * 8 * LLOG_MIN_CHUNK_SIZE,
obj->oo_attr.la_size + (2 << 20));
dmu_tx_hold_write(oh->ot_tx, oid, pos, buf->lb_len);
if (hdr == NULL)
GOTO(out, rc =-EFAULT);
- memcpy(handle->lgh_hdr, hdr, sizeof (*hdr));
- handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
-
- /* sanity checks */
- llh_hdr = &handle->lgh_hdr->llh_hdr;
- if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
- CERROR("bad log header magic: %#x (expecting %#x)\n",
- llh_hdr->lrh_type, LLOG_HDR_MAGIC);
- rc = -EIO;
- } else if (llh_hdr->lrh_len != LLOG_CHUNK_SIZE) {
- CERROR("incorrectly sized log header: %#x "
- "(expecting %#x)\n",
- llh_hdr->lrh_len, LLOG_CHUNK_SIZE);
- CERROR("you may need to re-run lconf --write_conf.\n");
- rc = -EIO;
- }
- EXIT;
+ if (handle->lgh_hdr_size < hdr->llh_hdr.lrh_len)
+ GOTO(out, rc = -EFAULT);
+
+ memcpy(handle->lgh_hdr, hdr, hdr->llh_hdr.lrh_len);
+ handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
+
+ /* sanity checks */
+ llh_hdr = &handle->lgh_hdr->llh_hdr;
+ if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
+ CERROR("bad log header magic: %#x (expecting %#x)\n",
+ llh_hdr->lrh_type, LLOG_HDR_MAGIC);
+ rc = -EIO;
+ } else if (llh_hdr->lrh_len !=
+ LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len ||
+ (llh_hdr->lrh_len & (llh_hdr->lrh_len - 1)) != 0 ||
+ llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE ||
+ llh_hdr->lrh_len > handle->lgh_hdr_size) {
+ CERROR("incorrectly sized log header: %#x, "
+ "expecting %#x (power of two > 8192)\n",
+ llh_hdr->lrh_len,
+ LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len);
+ CERROR("you may need to re-run lconf --write_conf.\n");
+ rc = -EIO;
+ }
+ EXIT;
out:
ptlrpc_req_finished(req);
err_exit:
RETURN(err_serious(-EFAULT));
req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
- LLOG_CHUNK_SIZE);
+ LLOG_MIN_CHUNK_SIZE);
rc = req_capsule_server_pack(&req->rq_pill);
if (rc)
RETURN(err_serious(-ENOMEM));
ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
rc = llog_next_block(req->rq_svc_thread->t_env, loghandle,
&repbody->lgd_saved_index, repbody->lgd_index,
- &repbody->lgd_cur_offset, ptr, LLOG_CHUNK_SIZE);
+ &repbody->lgd_cur_offset, ptr,
+ LLOG_MIN_CHUNK_SIZE);
if (rc)
GOTO(out_close, rc);
EXIT;
RETURN(err_serious(-EFAULT));
req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
- LLOG_CHUNK_SIZE);
+ LLOG_MIN_CHUNK_SIZE);
rc = req_capsule_server_pack(&req->rq_pill);
if (rc)
RETURN(err_serious(-ENOMEM));
ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
rc = llog_prev_block(req->rq_svc_thread->t_env, loghandle,
- body->lgd_index, ptr, LLOG_CHUNK_SIZE);
+ body->lgd_index, ptr, LLOG_MIN_CHUNK_SIZE);
if (rc)
GOTO(out_close, rc);
lur->lur_update_rec.ur_master_transno = 0;
lur->lur_update_rec.ur_batchid = 0;
lur->lur_update_rec.ur_flags = 0;
- lur->lur_hdr.lrh_len = LLOG_CHUNK_SIZE;
+ lur->lur_hdr.lrh_len = LLOG_MIN_CHUNK_SIZE;
tur->tur_update_param_count = 0;
cur_rec = (struct llog_rec_hdr *)ptr;
idx = le32_to_cpu(cur_rec->lrh_index);
- recs_pr[i] = cur_rec;
-
- if (ext2_test_bit(idx, (*llog)->llh_bitmap)) {
- if (le32_to_cpu(cur_rec->lrh_type) != OBD_CFG_REC)
- printf("rec #%d type=%x len=%u\n", idx,
- cur_rec->lrh_type, cur_rec->lrh_len);
- } else {
- printf("Bit %d of %d not set\n", idx, recs_num);
- cur_rec->lrh_id = CANCELLED;
- /* The header counts only set records */
- i--;
- }
+ recs_pr[i] = cur_rec;
+
+ if (ext2_test_bit(idx, LLOG_HDR_BITMAP(*llog))) {
+ if (le32_to_cpu(cur_rec->lrh_type) != OBD_CFG_REC)
+ printf("rec #%d type=%x len=%u\n", idx,
+ cur_rec->lrh_type, cur_rec->lrh_len);
+ } else {
+ printf("Bit %d of %d not set\n", idx, recs_num);
+ cur_rec->lrh_id = CANCELLED;
+ /* The header counts only set records */
+ i--;
+ }
ptr += le32_to_cpu(cur_rec->lrh_len);
if ((ptr - file_buf) > file_size) {
CHECK_MEMBER(llog_log_hdr, llh_flags);
CHECK_MEMBER(llog_log_hdr, llh_cat_idx);
CHECK_MEMBER(llog_log_hdr, llh_tgtuuid);
- CHECK_MEMBER(llog_log_hdr, llh_reserved);
- CHECK_MEMBER(llog_log_hdr, llh_bitmap);
- CHECK_MEMBER(llog_log_hdr, llh_tail);
}
static void