From: adilger Date: Thu, 24 Apr 2003 08:06:39 +0000 (+0000) Subject: More work on the logging code. X-Git-Tag: v1_7_100~1^92~34 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=5a1da399750d1fbeed26028f7613cfe6645c6f70;p=fs%2Flustre-release.git More work on the logging code. Move "desc_private" journal handle into obd_trans_info (already passed to many of the OBD API methods) so that we can do nested transactions when calling OBD API methods. --- diff --git a/lustre/include/linux/lustre_log.h b/lustre/include/linux/lustre_log.h index 49a2d17..fb24084 100644 --- a/lustre/include/linux/lustre_log.h +++ b/lustre/include/linux/lustre_log.h @@ -53,17 +53,16 @@ struct llog_logid { #define LLOG_HEADER_SIZE (4096) /* <= PAGE_SIZE */ #define LLOG_HDR_RSVD_U32 (16) #define LLOG_HDR_DATA_SIZE (LLOG_HDR_RSVD_U32 * sizeof(__u32)) -#define LLOG_BITMAP_SIZE (LLOG_HEADER_SIZE - LLOG_HDR_DATA_SIZE) +#define LLOG_BITMAP_BYTES (LLOG_HEADER_SIZE - LLOG_HDR_DATA_SIZE) -#define LLOG_LOGLIST_MAGIC 0x6d50e67d -struct llog_catalog_header { +#define LLOG_CATALOG_MAGIC 0x6d50e67d +struct llog_catalog_hdr { __u32 lch_size; __u32 lch_magic; __u32 lch_numrec; __u32 lch_reserved[LLOG_HDR_RSVD_U32 - 4]; - __u32 lch_bitmap[LLOG_BITMAP_SIZE / sizeof(__u32)]; + __u32 lch_bitmap[LLOG_BITMAP_BYTES / sizeof(__u32)]; __u32 lch_size_end; - struct llog_logid lch_logs[0]; }; @@ -109,15 +108,13 @@ struct llog_object_hdr { /* This first chunk should be exactly 4096 bytes in size */ __u32 loh_size; __u32 loh_magic; - __u32 loh_numrec; + __u32 loh_maxrec; __u32 loh_reserved[LLOG_HDR_RSVD_U32 - 4]; - __u32 loh_bitmap[LLOG_BITMAP_SIZE / sizeof(__u32)]; + __u32 loh_bitmap[LLOG_BITMAP_BYTES / sizeof(__u32)]; __u32 loh_size_end; - - struct llog_trans_rec loh_records[0]; }; -static inline llog_log_swabbed(struct llog_object_hdr *hdr) +static inline int llog_log_swabbed(struct llog_object_hdr *hdr) { if (hdr->loh_magic == __swab32(LLOG_OBJECT_MAGIC)) return 1; @@ -126,11 +123,12 @@ static inline llog_log_swabbed(struct llog_object_hdr *hdr) return -EINVAL; } -/* In-memory descriptor for a log object */ +/* In-memory descriptor for a log object or log catalog */ struct llog_handle { struct list_head lgh_list; struct llog_logid lgh_lid; struct brw_page lgh_pga[2]; + struct obdo *lgh_oa; struct lov_stripe_md *lgh_lsm; }; @@ -142,14 +140,18 @@ struct llog_cookie { }; /* exported api prototypes */ -int llog_add_record(struct llog_handle **, void *recbuf, int reclen, - struct llog_cookie *cookie); -int llog_clear_records(int count, struct llog_cookie **cookies); -int llog_clear_record(struct llog_handle *handle, __u32 recno); -int llog_delete(struct llog_logid *id); +extern int llog_add_record(struct lustre_handle *conn, + struct llog_trans_hdr *rec, + struct llog_cookie *logcookie, + struct obd_trans_info *oti); +extern int llog_clear_records(struct lustre_handle *conn, int count, + struct llog_cookie *cookies); +extern int llog_clear_record(struct lustre_handle *conn, __u32 recno); +extern int llog_delete(struct lustre_handle *conn, struct llog_logid *id); /* internal api */ -int llog_id2handle(struct llog_logid *logid); +extern struct llog_handle *llog_id2handle(struct lustre_handle *conn, + struct llog_logid *logid); #endif diff --git a/lustre/lib/recov_log.c b/lustre/lib/recov_log.c index b06eb00..de95ee2 100644 --- a/lustre/lib/recov_log.c +++ b/lustre/lib/recov_log.c @@ -25,80 +25,173 @@ #include #include -/* Create a new log handle and add it to the open list. - * This log handle will be closed when all of the records in it are removed. - */ -static int llog_new_log(struct lustre_handle *conn, struct list_head *loglist, - void *transhandle) +/* Create a new log or catalog handle */ +static struct log_handle *llog_new_handle(struct lustre_handle *conn, + struct obd_trans_info *oti) { - struct obd_device *obd = class_conn2obd(conn); struct llog_handle *loghandle; - struct llog_object_hdr *loh; - struct obdo *oa; - void *addr; + int rc; ENTRY; - if (list_empty(loglist)) { - XXX do stuff to allocate log_catalog; - } - OBD_ALLOC(loghandle, sizeof(*loghandle)); if (loghandle == NULL) - RETURN(-ENOMEM); + GOTO(out, rc = -ENOMEM); loghandle->lgh_pga[0].pg = alloc_page(GFP_KERNEL); if (loghandle->lgh_pga[0].pg == NULL) GOTO(out_handle, rc = -ENOMEM); loghandle->lgh_pga[0].count = LLOG_HEADER_SIZE; - loh = kmap(loghandle->lgh_pga[0].pg); - clear_page(loh); - loh->loh_size = loh->loh_size_end = LLOG_HEADER_SIZE; - loh->loh_magic = LLOG_OBJECT_MAGIC; - kunmap(loghandle->lgh_pga[0].pg); - loghandle->lgh_pga[1].pg = alloc_page(GFP_KERNEL); if (loghandle->lgh_pga[1].pg == NULL) GOTO(out_pga1, rc = -ENOMEM); loghandle->lgh_pga[0].off = LLOG_HEADER_SIZE; - obdo_alloc(oa); - rc = obd_create(conn, oa, &loghandle->lsm, NULL) + obdo_alloc(loghandle->lgh_oa); + if (!loghandle->lgh_oa) + GOTO(out_pga2, rc = -ENOMEM); + + rc = obd_create(conn, loghandle->lgh_oa, loghandle->lgh_lsm, oti) if (rc) { - obdo_free(oa); - GOTO(out_pga2, rc); + CERROR("couldn't create new log object: rc %d\n", rc); + GOTO(out_oa, rc); } -retry: - lch = kmap(obd->u. - index = ext2_find_first_zero_bit(lch->lch_bitmap, LLOG_BITMAP_SIZE * 8); - if (ext2_set_bit(index, lch->lch_bitmap)) { - CERROR("log catalog bit %u changed under us!\n", index); - goto retry; + rc = obd_open(conn, loghandle->lgh_oa, loghandle->lgh_lsm, oti, NULL); + if (rc) { + CERROR("couldn't open new log object "LPX64": rc %d\n", + loghandle->lgh_oa->o_id, rc); + GOTO(out_destroy, rc); } - if (index > lch->lch_numrec - rc = obd_brw(OBD_BRW_WRITE, conn, - list_add_tail(&loghandle->lgh_list, loglist); + LIST_HEAD_INIT(&loghandle->lgh_list); loghandle->lgh_lid.lid_oid = oa->o_id; //loghandle->lgh_lid.lid_bootcount = ????; + RETURN(loghandle); + +out_destroy: + obd_destroy(conn, loghandle->lgh_oa, loghandle->lgh_lsm, oti); +out_oa: + obd_free(loghandle->lgh_oa); out_pga2: __free_page(loghandle->lgh_pga[1].pg); out_pga1: __free_page(loghandle->lgh_pga[0].pg); +out: + RETURN(ERR_PTR(rc)); +} + +/* Create a new log handle and add it to the open list. + * This log handle will be closed when all of the records in it are removed. + */ +static struct llog_handle *llog_new_log(struct lustre_handle *conn, + struct obd_trans_info *oti) +{ + struct obd_device *obd = class_conn2obd(conn); + struct llog_handle *loghandle, *cathandle; + struct llog_object_hdr *loh; + struct llog_logid *lid; + int num_pga = 2; + ENTRY; + + cathandle = obd->obd_catalog; + loghandle = llog_new_handle(conn, oti); + if (IS_ERR(loghandle)) + RETURN(loghandle); + + loh = kmap(loghandle->lgh_pga[0].pg); + clear_page(loh); + loh->loh_size = loh->loh_size_end = LLOG_HEADER_SIZE; + loh->loh_magic = LLOG_OBJECT_MAGIC; + kunmap(loghandle->lgh_pga[0].pg); + + lch = kmap(cathandle->lgh_pga[0].pg); +retry: + index = ext2_find_first_zero_bit(lch->lch_bitmap, LLOG_BITMAP_BYTES*8); + /* Not much we can do here - we already leaked a few thousandd logs */ + LASSERT(index < LLOG_BITMAP_BYTES*8); + + if (ext2_set_bit(index, lch->lch_bitmap)) { + CERROR("log catalog bit %u changed under us!!?\n", index); + goto retry; + } + if (index >= lch->lch_maxrec) + lch->lch_maxrec = index + 1; + + offset = LLOG_HEADER_SIZE + index * sizeof(*loh->loh_lid); +#if PAGE_SIZE > LLOG_HEADER_SIZE + if (offset + sizeof(*loh->loh_lid) < PAGE_SIZE) { + num_pga = 1; + lid = (void *)lch + offset; + *lid = loghandle->lgh_lid; + cathandle->lgh_pga[0].len = offset+sizeof(lch->lch_lids[index]); + kunmap(lch); + } else +#endif + { + void *addr; + +#if PAGE_SIZE > LLOG_HEADER_SIZE + cathandle->lgh_pga[0].len = LLOG_HEADER_SIZE; +#endif + kunmap(lch); + + cathandle->lgh_pga[1].off = offset; + cathandle->lgh_pga[1].len = sizeof(*lid); + addr = kmap(cathandle->lgh_pga[1].pg); + lid = addr + (offset & ~PAGE_MASK); + *lid = loghandle->lgh_lid; + kunmap(cathandle->lgh_pga[1].pg); + } + + rc = obd_brw(OBD_BRW_WRITE, conn, cathandle->lgh_lsm, num_pga, + cathandle->lgh_pga, NULL, oti); + if (rc) { + list_add_tail(&loghandle->lgh_list, &cathandle->lgh_list); + + RETURN(0); + out_handle: OBD_FREE(loghandle, sizeof(*loghandle)); RETURN(rc); } +int llog_init_catalog(struct lustre_handle *conn, struct obd_trans_info *oti) +{ + struct obd_device *obd = class_conn2obd(conn); + struct llog_handle *cathandle; + struct llog_catalog_hdr *lch; + ENTRY; + + if (obd->obd_catalog != NULL) + RETURN(0); + + cathandle = llog_new_handle(conn, oti); + if (IS_ERR(cathandle)) + RETURN(ERR_PTR(cathandle)); + obd->obd_catalog = cathandle; + + lch = kmap(cathandle->lgh_pga[0].pg); + clear_page(lch); + lch->lch_size = lch->lch_size_end = LLOG_HEADER_SIZE; + lcg->lcg_magic = LLOG_CATALOG_MAGIC; + kunmap(cathandle->lgh_pga[0].pg); + + RETURN(0); +} + /* We start a new log object here if needed, either because no log has been * started, or because the current log cannot fit the new record. */ -int llog_get_log(conn, struct list_head *loglist, int reclen, void *transhandle) +int llog_get_log(struct lustre_handle *conn, int reclen, + struct obd_trans_info *oti) { + struct obd_device *obd = class_conn2obd(conn); + struct list_head *loglist = &obd->obd_catalog->lgh_list; + if (list_empty(loglist)) { - loghandle = llog_new_log(conn, loglist, transhandle); + loghandle = llog_new_log(conn, oti); if (IS_ERR(loghandle)) RETURN(rc = PTR_ERR(loghandle)); } else { @@ -107,7 +200,7 @@ int llog_get_log(conn, struct list_head *loglist, int reclen, void *transhandle) if (loghandle->lgh_pga[1].off + reclen >= LLOG_MAX_LOG_SIZE) { __free_page(loghandle->lgh_pga[1].pg); loghandle->lgh_pga[1].pg = NULL; - loghandle = llog_new_log(conn, loglist, transhandle); + loghandle = llog_new_log(conn, oti); if (IS_ERR(loghandle)) RETURN(rc = PTR_ERR(loghandle)); } @@ -115,52 +208,57 @@ int llog_get_log(conn, struct list_head *loglist, int reclen, void *transhandle) } /* Add a single record to the recovery log. */ -int llog_add_record(struct lustre_handle *conn, struct list_head *loglist, - llog_trans_hdr *rec, struct llog_cookie *logcookie, - void *transhandle) +int llog_add_record(struct lustre_handle *conn, struct llog_trans_hdr *rec, + struct llog_cookie *logcookie, struct obd_trans_info *oti) { struct llog_handle *loghandle; struct llog_object_hdr *loh; int reclen = rec->lgh_len; + int offset; + int index; int num_pga = 2; int rc; ENTRY; - loghandle = llog_get_log(conn, loglist, reclen, transhandle); + loghandle = llog_get_log(conn, reclen, transhandle); + + offset = loghandle->lgh_pga[1].off; + + loh = kmap(loghandle->lgc_pga[0].pg); + index = loh->loh_numrec++; + ext2_set_bit(index, loh->loh_bitmap); #if PAGE_SIZE > LLOG_HEADER_SIZE /* It is possible we are still writing in the first page */ - if (loghandle->lgh_pga[1].off < PAGE_SIZE) { - memcpy(kmap(loghandle->lgh_pga[0]->page) + - loghandle->lgh_pga[1].off, rec, reclen); - loghandle->lgh_pga[0].count = loghandle->lgh_pga[1].off+reclen; + if (offset < PAGE_SIZE) { + memcpy(loh + offset, rec, reclen); + loghandle->lgh_pga[0].count = offset + reclen; + kunmap(loghandle->lgh_pga[0]->pg); num_pga = 1; } else #endif { - memcpy(kmap(loghandle->lgh_pga[1]->page) + - loghandle->lgh_pga[1].off, rec, reclen); #if PAGE_SIZE > LLOG_HEADER_SIZE loghandle->lgh_pga[0].count = LLOG_HEADER_SIZE; #endif + kunmap(loghandle->lgh_pga[0]->pg); + + memcpy(kmap(loghandle->lgh_pga[1]->pg) + (offset & ~PAGE_MASK), + rec, reclen); loghandle->lgh_pga[1].count = reclen; + kunmap(loghandle->lgh_pga[1]->pg); } - kunmap(loghandle->lgh_pga->page); rc = obd_brw(OBD_BRW_WRITE, conn, loghandle->lgh_lsm, num_pga, - loghandle->lgh_pga, NULL, NULL); - + loghandle->lgh_pga, NULL, oti); if (rc) RETURN(rc); - loh = kmap(logcookie->lgc_pga[0].pg); - logcookie->lgc_lid = loghandle->lgh_lid; - logcookie->lgc_offset = loghandle->lgh_pga[1].off; - logcookie->lgc_index = loh->loh_numrec++; - ext2_set_bit(logcookie->lgc_index, loh->loh_bitmap); - kunmap(logcookie->lgc_pga[0].pg); - loghandle->lgh_pga[1].off += reclen; + logcookie->lgc_lid = loghandle->lgh_lid; + logcookie->lgc_index = index; + logcookie->lgc_offset = offset; + RETURN(0); }