#define LLOG_HEADER_SIZE (4096) /* <= PAGE_SIZE */
#define LLOG_HDR_RSVD_U32 (16)
#define LLOG_HDR_DATA_SIZE (LLOG_HDR_RSVD_U32 * sizeof(__u32))
-#define LLOG_BITMAP_SIZE (LLOG_HEADER_SIZE - LLOG_HDR_DATA_SIZE)
+#define LLOG_BITMAP_BYTES (LLOG_HEADER_SIZE - LLOG_HDR_DATA_SIZE)
-#define LLOG_LOGLIST_MAGIC 0x6d50e67d
-struct llog_catalog_header {
+#define LLOG_CATALOG_MAGIC 0x6d50e67d
+struct llog_catalog_hdr {
__u32 lch_size;
__u32 lch_magic;
__u32 lch_numrec;
__u32 lch_reserved[LLOG_HDR_RSVD_U32 - 4];
- __u32 lch_bitmap[LLOG_BITMAP_SIZE / sizeof(__u32)];
+ __u32 lch_bitmap[LLOG_BITMAP_BYTES / sizeof(__u32)];
__u32 lch_size_end;
- struct llog_logid lch_logs[0];
};
/* This first chunk should be exactly 4096 bytes in size */
__u32 loh_size;
__u32 loh_magic;
- __u32 loh_numrec;
+ __u32 loh_maxrec;
__u32 loh_reserved[LLOG_HDR_RSVD_U32 - 4];
- __u32 loh_bitmap[LLOG_BITMAP_SIZE / sizeof(__u32)];
+ __u32 loh_bitmap[LLOG_BITMAP_BYTES / sizeof(__u32)];
__u32 loh_size_end;
-
- struct llog_trans_rec loh_records[0];
};
-static inline llog_log_swabbed(struct llog_object_hdr *hdr)
+static inline int llog_log_swabbed(struct llog_object_hdr *hdr)
{
if (hdr->loh_magic == __swab32(LLOG_OBJECT_MAGIC))
return 1;
return -EINVAL;
}
-/* In-memory descriptor for a log object */
+/* In-memory descriptor for a log object or log catalog */
struct llog_handle {
struct list_head lgh_list;
struct llog_logid lgh_lid;
struct brw_page lgh_pga[2];
+ struct obdo *lgh_oa;
struct lov_stripe_md *lgh_lsm;
};
};
/* exported api prototypes */
-int llog_add_record(struct llog_handle **, void *recbuf, int reclen,
- struct llog_cookie *cookie);
-int llog_clear_records(int count, struct llog_cookie **cookies);
-int llog_clear_record(struct llog_handle *handle, __u32 recno);
-int llog_delete(struct llog_logid *id);
+extern int llog_add_record(struct lustre_handle *conn,
+ struct llog_trans_hdr *rec,
+ struct llog_cookie *logcookie,
+ struct obd_trans_info *oti);
+extern int llog_clear_records(struct lustre_handle *conn, int count,
+ struct llog_cookie *cookies);
+extern int llog_clear_record(struct lustre_handle *conn, __u32 recno);
+extern int llog_delete(struct lustre_handle *conn, struct llog_logid *id);
/* internal api */
-int llog_id2handle(struct llog_logid *logid);
+extern struct llog_handle *llog_id2handle(struct lustre_handle *conn,
+ struct llog_logid *logid);
#endif
#include <linux/obd.h>
#include <linux/lustre_log.h>
-/* Create a new log handle and add it to the open list.
- * This log handle will be closed when all of the records in it are removed.
- */
-static int llog_new_log(struct lustre_handle *conn, struct list_head *loglist,
- void *transhandle)
+/* Create a new log or catalog handle */
+static struct log_handle *llog_new_handle(struct lustre_handle *conn,
+ struct obd_trans_info *oti)
{
- struct obd_device *obd = class_conn2obd(conn);
struct llog_handle *loghandle;
- struct llog_object_hdr *loh;
- struct obdo *oa;
- void *addr;
+ int rc;
ENTRY;
- if (list_empty(loglist)) {
- XXX do stuff to allocate log_catalog;
- }
-
OBD_ALLOC(loghandle, sizeof(*loghandle));
if (loghandle == NULL)
- RETURN(-ENOMEM);
+ GOTO(out, rc = -ENOMEM);
loghandle->lgh_pga[0].pg = alloc_page(GFP_KERNEL);
if (loghandle->lgh_pga[0].pg == NULL)
GOTO(out_handle, rc = -ENOMEM);
loghandle->lgh_pga[0].count = LLOG_HEADER_SIZE;
- loh = kmap(loghandle->lgh_pga[0].pg);
- clear_page(loh);
- loh->loh_size = loh->loh_size_end = LLOG_HEADER_SIZE;
- loh->loh_magic = LLOG_OBJECT_MAGIC;
- kunmap(loghandle->lgh_pga[0].pg);
-
loghandle->lgh_pga[1].pg = alloc_page(GFP_KERNEL);
if (loghandle->lgh_pga[1].pg == NULL)
GOTO(out_pga1, rc = -ENOMEM);
loghandle->lgh_pga[0].off = LLOG_HEADER_SIZE;
- obdo_alloc(oa);
- rc = obd_create(conn, oa, &loghandle->lsm, NULL)
+ obdo_alloc(loghandle->lgh_oa);
+ if (!loghandle->lgh_oa)
+ GOTO(out_pga2, rc = -ENOMEM);
+
+ rc = obd_create(conn, loghandle->lgh_oa, loghandle->lgh_lsm, oti)
if (rc) {
- obdo_free(oa);
- GOTO(out_pga2, rc);
+ CERROR("couldn't create new log object: rc %d\n", rc);
+ GOTO(out_oa, rc);
}
-retry:
- lch = kmap(obd->u.
- index = ext2_find_first_zero_bit(lch->lch_bitmap, LLOG_BITMAP_SIZE * 8);
- if (ext2_set_bit(index, lch->lch_bitmap)) {
- CERROR("log catalog bit %u changed under us!\n", index);
- goto retry;
+ rc = obd_open(conn, loghandle->lgh_oa, loghandle->lgh_lsm, oti, NULL);
+ if (rc) {
+ CERROR("couldn't open new log object "LPX64": rc %d\n",
+ loghandle->lgh_oa->o_id, rc);
+ GOTO(out_destroy, rc);
}
- if (index > lch->lch_numrec
- rc = obd_brw(OBD_BRW_WRITE, conn,
- list_add_tail(&loghandle->lgh_list, loglist);
+ LIST_HEAD_INIT(&loghandle->lgh_list);
loghandle->lgh_lid.lid_oid = oa->o_id;
//loghandle->lgh_lid.lid_bootcount = ????;
+ RETURN(loghandle);
+
+out_destroy:
+ obd_destroy(conn, loghandle->lgh_oa, loghandle->lgh_lsm, oti);
+out_oa:
+ obd_free(loghandle->lgh_oa);
out_pga2:
__free_page(loghandle->lgh_pga[1].pg);
out_pga1:
__free_page(loghandle->lgh_pga[0].pg);
+out:
+ RETURN(ERR_PTR(rc));
+}
+
+/* Create a new log handle and add it to the open list.
+ * This log handle will be closed when all of the records in it are removed.
+ */
+static struct llog_handle *llog_new_log(struct lustre_handle *conn,
+ struct obd_trans_info *oti)
+{
+ struct obd_device *obd = class_conn2obd(conn);
+ struct llog_handle *loghandle, *cathandle;
+ struct llog_object_hdr *loh;
+ struct llog_logid *lid;
+ int num_pga = 2;
+ ENTRY;
+
+ cathandle = obd->obd_catalog;
+ loghandle = llog_new_handle(conn, oti);
+ if (IS_ERR(loghandle))
+ RETURN(loghandle);
+
+ loh = kmap(loghandle->lgh_pga[0].pg);
+ clear_page(loh);
+ loh->loh_size = loh->loh_size_end = LLOG_HEADER_SIZE;
+ loh->loh_magic = LLOG_OBJECT_MAGIC;
+ kunmap(loghandle->lgh_pga[0].pg);
+
+ lch = kmap(cathandle->lgh_pga[0].pg);
+retry:
+ index = ext2_find_first_zero_bit(lch->lch_bitmap, LLOG_BITMAP_BYTES*8);
+ /* Not much we can do here - we already leaked a few thousandd logs */
+ LASSERT(index < LLOG_BITMAP_BYTES*8);
+
+ if (ext2_set_bit(index, lch->lch_bitmap)) {
+ CERROR("log catalog bit %u changed under us!!?\n", index);
+ goto retry;
+ }
+ if (index >= lch->lch_maxrec)
+ lch->lch_maxrec = index + 1;
+
+ offset = LLOG_HEADER_SIZE + index * sizeof(*loh->loh_lid);
+#if PAGE_SIZE > LLOG_HEADER_SIZE
+ if (offset + sizeof(*loh->loh_lid) < PAGE_SIZE) {
+ num_pga = 1;
+ lid = (void *)lch + offset;
+ *lid = loghandle->lgh_lid;
+ cathandle->lgh_pga[0].len = offset+sizeof(lch->lch_lids[index]);
+ kunmap(lch);
+ } else
+#endif
+ {
+ void *addr;
+
+#if PAGE_SIZE > LLOG_HEADER_SIZE
+ cathandle->lgh_pga[0].len = LLOG_HEADER_SIZE;
+#endif
+ kunmap(lch);
+
+ cathandle->lgh_pga[1].off = offset;
+ cathandle->lgh_pga[1].len = sizeof(*lid);
+ addr = kmap(cathandle->lgh_pga[1].pg);
+ lid = addr + (offset & ~PAGE_MASK);
+ *lid = loghandle->lgh_lid;
+ kunmap(cathandle->lgh_pga[1].pg);
+ }
+
+ rc = obd_brw(OBD_BRW_WRITE, conn, cathandle->lgh_lsm, num_pga,
+ cathandle->lgh_pga, NULL, oti);
+ if (rc) {
+ list_add_tail(&loghandle->lgh_list, &cathandle->lgh_list);
+
+ RETURN(0);
+
out_handle:
OBD_FREE(loghandle, sizeof(*loghandle));
RETURN(rc);
}
+int llog_init_catalog(struct lustre_handle *conn, struct obd_trans_info *oti)
+{
+ struct obd_device *obd = class_conn2obd(conn);
+ struct llog_handle *cathandle;
+ struct llog_catalog_hdr *lch;
+ ENTRY;
+
+ if (obd->obd_catalog != NULL)
+ RETURN(0);
+
+ cathandle = llog_new_handle(conn, oti);
+ if (IS_ERR(cathandle))
+ RETURN(ERR_PTR(cathandle));
+ obd->obd_catalog = cathandle;
+
+ lch = kmap(cathandle->lgh_pga[0].pg);
+ clear_page(lch);
+ lch->lch_size = lch->lch_size_end = LLOG_HEADER_SIZE;
+ lcg->lcg_magic = LLOG_CATALOG_MAGIC;
+ kunmap(cathandle->lgh_pga[0].pg);
+
+ RETURN(0);
+}
+
/* We start a new log object here if needed, either because no log has been
* started, or because the current log cannot fit the new record.
*/
-int llog_get_log(conn, struct list_head *loglist, int reclen, void *transhandle)
+int llog_get_log(struct lustre_handle *conn, int reclen,
+ struct obd_trans_info *oti)
{
+ struct obd_device *obd = class_conn2obd(conn);
+ struct list_head *loglist = &obd->obd_catalog->lgh_list;
+
if (list_empty(loglist)) {
- loghandle = llog_new_log(conn, loglist, transhandle);
+ loghandle = llog_new_log(conn, oti);
if (IS_ERR(loghandle))
RETURN(rc = PTR_ERR(loghandle));
} else {
if (loghandle->lgh_pga[1].off + reclen >= LLOG_MAX_LOG_SIZE) {
__free_page(loghandle->lgh_pga[1].pg);
loghandle->lgh_pga[1].pg = NULL;
- loghandle = llog_new_log(conn, loglist, transhandle);
+ loghandle = llog_new_log(conn, oti);
if (IS_ERR(loghandle))
RETURN(rc = PTR_ERR(loghandle));
}
}
/* Add a single record to the recovery log. */
-int llog_add_record(struct lustre_handle *conn, struct list_head *loglist,
- llog_trans_hdr *rec, struct llog_cookie *logcookie,
- void *transhandle)
+int llog_add_record(struct lustre_handle *conn, struct llog_trans_hdr *rec,
+ struct llog_cookie *logcookie, struct obd_trans_info *oti)
{
struct llog_handle *loghandle;
struct llog_object_hdr *loh;
int reclen = rec->lgh_len;
+ int offset;
+ int index;
int num_pga = 2;
int rc;
ENTRY;
- loghandle = llog_get_log(conn, loglist, reclen, transhandle);
+ loghandle = llog_get_log(conn, reclen, transhandle);
+
+ offset = loghandle->lgh_pga[1].off;
+
+ loh = kmap(loghandle->lgc_pga[0].pg);
+ index = loh->loh_numrec++;
+ ext2_set_bit(index, loh->loh_bitmap);
#if PAGE_SIZE > LLOG_HEADER_SIZE
/* It is possible we are still writing in the first page */
- if (loghandle->lgh_pga[1].off < PAGE_SIZE) {
- memcpy(kmap(loghandle->lgh_pga[0]->page) +
- loghandle->lgh_pga[1].off, rec, reclen);
- loghandle->lgh_pga[0].count = loghandle->lgh_pga[1].off+reclen;
+ if (offset < PAGE_SIZE) {
+ memcpy(loh + offset, rec, reclen);
+ loghandle->lgh_pga[0].count = offset + reclen;
+ kunmap(loghandle->lgh_pga[0]->pg);
num_pga = 1;
} else
#endif
{
- memcpy(kmap(loghandle->lgh_pga[1]->page) +
- loghandle->lgh_pga[1].off, rec, reclen);
#if PAGE_SIZE > LLOG_HEADER_SIZE
loghandle->lgh_pga[0].count = LLOG_HEADER_SIZE;
#endif
+ kunmap(loghandle->lgh_pga[0]->pg);
+
+ memcpy(kmap(loghandle->lgh_pga[1]->pg) + (offset & ~PAGE_MASK),
+ rec, reclen);
loghandle->lgh_pga[1].count = reclen;
+ kunmap(loghandle->lgh_pga[1]->pg);
}
- kunmap(loghandle->lgh_pga->page);
rc = obd_brw(OBD_BRW_WRITE, conn, loghandle->lgh_lsm, num_pga,
- loghandle->lgh_pga, NULL, NULL);
-
+ loghandle->lgh_pga, NULL, oti);
if (rc)
RETURN(rc);
- loh = kmap(logcookie->lgc_pga[0].pg);
- logcookie->lgc_lid = loghandle->lgh_lid;
- logcookie->lgc_offset = loghandle->lgh_pga[1].off;
- logcookie->lgc_index = loh->loh_numrec++;
- ext2_set_bit(logcookie->lgc_index, loh->loh_bitmap);
- kunmap(logcookie->lgc_pga[0].pg);
-
loghandle->lgh_pga[1].off += reclen;
+ logcookie->lgc_lid = loghandle->lgh_lid;
+ logcookie->lgc_index = index;
+ logcookie->lgc_offset = offset;
+
RETURN(0);
}