From 8c5986377ee2a0cf5e46d06b108f1d7e5c8f7f7e Mon Sep 17 00:00:00 2001 From: adilger Date: Fri, 2 May 2003 00:00:54 +0000 Subject: [PATCH] Another not-yet-working-but-checking-in-not-to-lose-data landing of b_orphan. Will do another merge with b_devel, but really dreading when b_proto lands... --- lustre/include/linux/lustre_log.h | 128 ++--------- lustre/lib/recov_log.c | 461 +++++++++++++++++++++----------------- 2 files changed, 281 insertions(+), 308 deletions(-) diff --git a/lustre/include/linux/lustre_log.h b/lustre/include/linux/lustre_log.h index 1e206d2..71e3d8b 100644 --- a/lustre/include/linux/lustre_log.h +++ b/lustre/include/linux/lustre_log.h @@ -39,120 +39,38 @@ #include #include -/* catalog of log objects */ - -#define LLOG_MAX_OBJ (64 << 10) - -/* Identifier for a single log object */ -struct llog_logid { - __u64 lgl_oid; -// __u64 lgl_bootcount; -}; - -/* On-disk header structure of catalog of available log object (internal) */ -#define LLOG_HEADER_SIZE (4096) /* <= PAGE_SIZE */ -#define LLOG_HDR_RSVD_U32 (16) -#define LLOG_HDR_DATA_SIZE (LLOG_HDR_RSVD_U32 * sizeof(__u32)) -#define LLOG_BITMAP_BYTES (LLOG_HEADER_SIZE - LLOG_HDR_DATA_SIZE) - -#define LLOG_CATALOG_MAGIC 0x6d50e67d -struct llog_catalog_hdr { - __u32 lch_size; - __u32 lch_magic; - __u32 lch_numrec; - __u32 lch_reserved[LLOG_HDR_RSVD_U32 - 4]; - __u32 lch_bitmap[LLOG_BITMAP_BYTES / sizeof(__u32)]; - __u32 lch_size_end; -}; - - -/* Log data records */ -typedef enum { - OST_CREATE_REC = 1, -} llog_op_type; - -/* Log record header - stored in originating host endian order (use magic to - * check order). - * Each record must start with this and be a multiple of 64 bits in size. - */ -struct llog_trans_hdr { - __u32 lth_len; - llog_op_type lth_op; -}; - -struct llog_create_rec { - struct llog_trans_hdr lcr_hdr; - struct ll_fid lcr_fid; - obd_id lcr_oid; - obd_count lcr_ogener; - __u32 lcr_end_len; -} __attribute__((packed)); - -struct llog_unlink_rec { - struct llog_trans_hdr lur_hdr; - obd_id lur_oid; - obd_count lur_ogener; - __u32 lur_end_len; -} __attribute__((packed)); - -/* On-disk header structure of each log object - stored in creating host - * endian order, with the exception of the bitmap - stored in little endian - * order so that we can use ext2_{clear,set,test}_bit() for optimized - * little-endian handling of bitmaps. - */ -#define LLOG_MAX_LOG_SIZE (64 << 10) /* == PTL_MD_MAX_IOV */ -#define LLOG_MIN_REC_SIZE (16) - -#define LLOG_OBJECT_MAGIC 0xffb45539 -struct llog_object_hdr { - /* This first chunk should be exactly 4096 bytes in size */ - __u32 loh_size; - __u32 loh_magic; - __u32 loh_maxrec; - __u32 loh_reserved[LLOG_HDR_RSVD_U32 - 4]; - __u32 loh_bitmap[LLOG_BITMAP_BYTES / sizeof(__u32)]; - __u32 loh_size_end; -}; - -static inline int llog_log_swabbed(struct llog_object_hdr *hdr) -{ - if (hdr->loh_magic == __swab32(LLOG_OBJECT_MAGIC)) - return 1; - if (hdr->loh_magic == LLOG_OBJECT_MAGIC) - return 0; - return -EINVAL; -} - /* In-memory descriptor for a log object or log catalog */ struct llog_handle { struct list_head lgh_list; - struct llog_logid lgh_lid; - struct brw_page lgh_pga[2]; - struct obdo *lgh_oa; - struct lov_stripe_md *lgh_lsm; -}; - -/* cookie to find a log record back in a specific log object */ -struct llog_cookie { - struct llog_logid lgc_lid; - __u32 lgc_index; - __u32 lgc_offset; + struct llog_cookie lgh_cookie; + struct obd_device *lgh_obd; + void *lgh_hdr; + struct file *lgh_file; + struct llog_handle *(*lgh_log_create)(struct llog_handle *loghandle, + struct obd_trans_info *oti); + struct llog_handle *(*lgh_log_open)(struct llog_handle *cathandle, + struct llog_cookie *logcookie); + int (*lgh_log_close)(struct llog_handle *cathandle, + struct llog_handle *loghandle); + int lgh_index; }; /* exported api prototypes */ -extern int llog_add_record(struct lustre_handle *conn, +extern int llog_add_record(struct llog_handle *cathandle, struct llog_trans_hdr *rec, - struct llog_cookie *logcookie, - struct obd_trans_info *oti); -extern int llog_clear_records(struct lustre_handle *conn, int count, - struct llog_cookie *cookies); -extern int llog_clear_record(struct lustre_handle *conn, __u32 recno); -extern int llog_delete(struct lustre_handle *conn, struct llog_logid *id); + struct lov_mds_md *lmm, + struct obd_trans_info *oti, + struct llog_cookie *logcookies); + +extern int llog_cancel_records(struct llog_handle *cathandle, int count, + struct llog_cookie *cookies); /* internal api */ -extern int llog_init_catalog(struct lustre_handle *conn); -extern struct llog_handle *llog_id2handle(struct lustre_handle *conn, - struct llog_logid *logid); +extern struct llog_handle *llog_alloc_handle(void); +extern void llog_free_handle(struct llog_handle *handle); +extern int llog_init_catalog(struct llog_handle *cathandle); +extern struct llog_handle *llog_id2handle(struct llog_handle *cathandle, + struct llog_cookie *cookie); #endif diff --git a/lustre/lib/recov_log.c b/lustre/lib/recov_log.c index c7f5100..928e22c 100644 --- a/lustre/lib/recov_log.c +++ b/lustre/lib/recov_log.c @@ -20,45 +20,32 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. * * OST<->MDS recovery logging infrastructure. + * + * Invariants in implementation: + * - 1 log file for each OST<->MDS connection, so that if an OST fails it + * need only look at logs relevant to itself */ -#include +#define DEBUG_SUBSYSTEM S_LOG + +#include +#include #include +#include /* Allocate a new log or catalog handle */ -struct log_handle *llog_alloc_handle(void) +struct llog_handle *llog_alloc_handle(void) { struct llog_handle *loghandle; - int rc; ENTRY; OBD_ALLOC(loghandle, sizeof(*loghandle)); if (loghandle == NULL) - GOTO(out, rc = -ENOMEM); - - loghandle->lgh_pga[0].pg = alloc_page(GFP_KERNEL); - if (loghandle->lgh_pga[0].pg == NULL) - GOTO(out_handle, rc = -ENOMEM); - loghandle->lgh_pga[0].count = LLOG_HEADER_SIZE; + RETURN(ERR_PTR(-ENOMEM)); - loghandle->lgh_pga[1].pg = alloc_page(GFP_KERNEL); - if (loghandle->lgh_pga[1].pg == NULL) - GOTO(out_pga1, rc = -ENOMEM); - loghandle->lgh_pga[0].off = LLOG_HEADER_SIZE; - LIST_HEAD_INIT(&loghandle->lgh_list); - - obdo_alloc(loghandle->lgh_oa); - if (!loghandle->lgh_oa) - GOTO(out_pga2, rc = -ENOMEM); + INIT_LIST_HEAD(&loghandle->lgh_list); RETURN(loghandle); - -out_pga2: - __free_page(loghandle->lgh_pga[1].pg); -out_pga1: - __free_page(loghandle->lgh_pga[0].pg); -out: - RETURN(ERR_PTR(rc)); } void llog_free_handle(struct llog_handle *loghandle) @@ -67,131 +54,118 @@ void llog_free_handle(struct llog_handle *loghandle) return; list_del_init(&loghandle->lgh_list); - obdo_free(loghandle->lgh_oa); - __free_page(loghandle->lgh_pga[1].pg); - __free_page(loghandle->lgh_pga[0].pg); + OBD_FREE(loghandle, sizeof(*loghandle)); } /* Create a new log handle and add it to the open list. * This log handle will be closed when all of the records in it are removed. */ -static struct llog_handle *llog_new_log(struct lustre_handle *conn, +static struct llog_handle *llog_new_log(struct llog_handle *cathandle, struct obd_trans_info *oti) { - struct obd_device *obd = class_conn2obd(conn); - struct llog_handle *loghandle, *cathandle; + struct llog_handle *loghandle; + struct llog_catalog_hdr *lch; struct llog_object_hdr *loh; - struct llog_logid *lid; - int num_pga = 2; + loff_t offset; + int rc, index, bitmap_size, i; ENTRY; - cathandle = obd->obd_catalog; - loghandle = llog_new_handle(conn, oti); + LASSERT(sizeof(*loh) == LLOG_CHUNK_SIZE); + + loghandle = cathandle->lgh_log_create(cathandle, oti); if (IS_ERR(loghandle)) RETURN(loghandle); - rc = obd_create(conn, loghandle->lgh_oa, loghandle->lgh_lsm, oti) - if (rc) { - CERROR("couldn't create new log object: rc %d\n", rc); - GOTO(out_handle, rc); - } - - rc = obd_open(conn, loghandle->lgh_oa, loghandle->lgh_lsm, oti, NULL); - if (rc) { - CERROR("couldn't open new log object "LPX64": rc %d\n", - loghandle->lgh_oa->o_id, rc); - GOTO(out_destroy, rc); - } - loghandle->lgh_lid.lid_oid = oa->o_id; - //loghandle->lgh_lid.lid_bootcount = ????; - - loh = kmap(loghandle->lgh_pga[0].pg); - clear_page(loh); - loh->loh_size = loh->loh_size_end = LLOG_HEADER_SIZE; + OBD_ALLOC(loh, sizeof(*loh)); + if (!loh) + GOTO(out_handle, rc = -ENOMEM); loh->loh_magic = LLOG_OBJECT_MAGIC; - kunmap(loghandle->lgh_pga[0].pg); - - lch = kmap(cathandle->lgh_pga[0].pg); -retry: - index = ext2_find_first_zero_bit(lch->lch_bitmap, LLOG_BITMAP_BYTES*8); - /* Not much we can do here - we already leaked a few thousandd logs */ - LASSERT(index < LLOG_BITMAP_BYTES*8); - - if (ext2_set_bit(index, lch->lch_bitmap)) { - CERROR("log catalog bit %u changed under us!!?\n", index); - goto retry; - } - if (index >= lch->lch_maxrec) - lch->lch_maxrec = index + 1; - - offset = LLOG_HEADER_SIZE + index * sizeof(*loh->loh_lid); -#if PAGE_SIZE > LLOG_HEADER_SIZE - if (offset + sizeof(*loh->loh_lid) < PAGE_SIZE) { - num_pga = 1; - lid = (void *)lch + offset; - *lid = loghandle->lgh_lid; - cathandle->lgh_pga[0].len = offset+sizeof(lch->lch_lids[index]); - kunmap(lch); - } else -#endif - { - void *addr; - -#if PAGE_SIZE > LLOG_HEADER_SIZE - cathandle->lgh_pga[0].len = LLOG_HEADER_SIZE; -#endif - kunmap(lch); - - cathandle->lgh_pga[1].off = offset; - cathandle->lgh_pga[1].len = sizeof(*lid); - addr = kmap(cathandle->lgh_pga[1].pg); - lid = addr + (offset & ~PAGE_MASK); - *lid = loghandle->lgh_lid; - kunmap(cathandle->lgh_pga[1].pg); + loh->loh_size = loh->loh_size_end = sizeof(*loh); + loghandle->lgh_hdr = loh; + + lch = cathandle->lgh_hdr; + bitmap_size = sizeof(lch->lch_bitmap) * 8; + CERROR("bitmap size = %d\n", bitmap_size); + /* This should basically always find the first entry free */ + for (i = 0, index = lch->lch_index; i < bitmap_size; i++, index++) { + index %= bitmap_size; + if (ext2_set_bit(index, lch->lch_bitmap)) + CERROR("catalog index %d is still in use\n", index); + else { + lch->lch_index = (index + 1) % bitmap_size; + break; + } } - - rc = obd_brw(OBD_BRW_WRITE, conn, cathandle->lgh_lsm, num_pga, - cathandle->lgh_pga, NULL, oti); + if (i == index) + CERROR("no free catalog slots for log...\n"); + + loghandle->lgh_cookie.lgc_index = index; + + offset = sizeof(*lch) + index * sizeof(loghandle->lgh_cookie); + + /* XXX Hmm, what to do if the catalog update fails? Under normal + * operations we would clean this handle up anyways, and at + * worst we leak some objects. + * + * We don't want to mark a catalog in-use if it wasn't written. + * The only danger is if the OST crashes - the log is lost. + */ + rc = lustre_fwrite(cathandle->lgh_file, &loghandle->lgh_cookie, + sizeof(loghandle->lgh_cookie), &offset); if (rc) { CERROR("error adding log "LPX64" to catalog: rc %d\n", - loghandle->lgh_lid.lid_oid, rc); - /* XXX Hmm, what to do? Under normal ops we would clean this - * handle up anyways, and at worst we leak some objects. - * The only danger is if the OST crashes - the log is lost. - */ + loghandle->lgh_cookie.lgc_lgl.lgl_oid, rc); + } else { + offset = 0; + rc = lustre_fwrite(cathandle->lgh_file, lch, sizeof(*lch), + &offset); + if (rc) + CERROR("error marking catalog entry %d in use: rc %d\n", + index, rc); } + loghandle->lgh_log_create = cathandle->lgh_log_create; + loghandle->lgh_log_open = cathandle->lgh_log_open; + loghandle->lgh_log_close = cathandle->lgh_log_close; + loghandle->lgh_obd = cathandle->lgh_obd; list_add_tail(&loghandle->lgh_list, &cathandle->lgh_list); - RETURN(0); + RETURN(loghandle); -out_destroy: - obd_destroy(conn, loghandle->lgh_oa, loghandle->lgh_lsm, oti); out_handle: - llog_free(handle); + llog_free_handle(loghandle); - RETURN(rc); + RETURN(ERR_PTR(rc)); } -int llog_init_catalog(struct lustre_handle *conn, struct obd_trans_info *oti) +int llog_init_catalog(struct llog_handle *cathandle) { - struct obd_device *obd = class_conn2obd(conn); - struct llog_handle *cathandle; struct llog_catalog_hdr *lch; + struct file *file = cathandle->lgh_file; + loff_t offset = 0; + int rc = 0; ENTRY; - if (obd->obd_catalog != NULL) - RETURN(0); + LASSERT(sizeof(*lch) == LLOG_CHUNK_SIZE); + + OBD_ALLOC(lch, sizeof(*lch)); + if (!lch) + RETURN(-ENOMEM); - cathandle = llog_new_handle(conn, oti); - if (IS_ERR(cathandle)) - RETURN(ERR_PTR(cathandle)); - obd->obd_catalog = cathandle; + cathandle->lgh_hdr = lch; - lch = kmap(cathandle->lgh_pga[0].pg); - clear_page(lch); - lch->lch_size = lch->lch_size_end = LLOG_HEADER_SIZE; - lcg->lcg_magic = LLOG_CATALOG_MAGIC; - kunmap(cathandle->lgh_pga[0].pg); + if (file->f_dentry->d_inode->i_size == 0) { + lch->lch_magic = LLOG_CATALOG_MAGIC; + lch->lch_size = lch->lch_size_end = LLOG_CHUNK_SIZE; + rc = lustre_fwrite(file, lch, sizeof(*lch), &offset); + } else { + rc = lustre_fread(file, lch, sizeof(*lch), &offset); + } + + if (rc != sizeof(*lch)) { + CERROR("error getting catalog header: rc %d\n", rc); + OBD_FREE(lch, sizeof(*lch)); + RETURN(rc < 0 ? rc : -EIO); + } RETURN(0); } @@ -199,124 +173,205 @@ int llog_init_catalog(struct lustre_handle *conn, struct obd_trans_info *oti) /* We start a new log object here if needed, either because no log has been * started, or because the current log cannot fit the new record. */ -int llog_current_log(struct lustre_handle *conn, int reclen, - struct obd_trans_info *oti) +struct llog_handle *llog_current_log(struct llog_handle *cathandle, int reclen, + struct obd_trans_info *oti) { - struct obd_device *obd = class_conn2obd(conn); - struct list_head *loglist = &obd->obd_catalog->lgh_list; + struct list_head *loglist = &cathandle->lgh_list; + ENTRY; + + if (!list_empty(loglist)) { + struct llog_handle *loghandle; + struct llog_object_hdr *loh; - if (list_empty(loglist)) { - loghandle = llog_new_log(conn, oti); - if (IS_ERR(loghandle)) - RETURN(rc = PTR_ERR(loghandle)); - } else { loghandle = list_entry(loglist->prev, struct llog_handle, lgh_list); - if (loghandle->lgh_pga[1].off + reclen >= LLOG_MAX_LOG_SIZE) { - __free_page(loghandle->lgh_pga[1].pg); - loghandle->lgh_pga[1].pg = NULL; - loghandle = llog_new_log(conn, oti); - if (IS_ERR(loghandle)) - RETURN(rc = PTR_ERR(loghandle)); - } + loh = loghandle->lgh_hdr; + if (LLOG_MAX_LOG_SIZE - loghandle->lgh_file->f_pos >= reclen) + RETURN(loghandle); } + + RETURN(llog_new_log(cathandle, oti)); } -/* Add a single record to the recovery log. */ -int llog_add_record(struct lustre_handle *conn, struct llog_trans_hdr *rec, - struct llog_cookie *logcookie, struct obd_trans_info *oti) +/* Add a single record to the recovery log(s). + * Returns number of bytes in returned logcookies, or negative error code. + */ +int llog_add_record(struct llog_handle *cathandle, struct llog_trans_hdr *rec, + struct lov_mds_md *lmm, struct obd_trans_info *oti, + struct llog_cookie *logcookies) { struct llog_handle *loghandle; struct llog_object_hdr *loh; - int reclen = rec->lgh_len; - int offset; + int reclen = rec->lth_len; + struct file *file; + loff_t offset; + int left; int index; - int num_pga = 2; int rc; ENTRY; - loghandle = llog_current_log(conn, reclen, oti); - - offset = loghandle->lgh_pga[1].off; - - loh = kmap(loghandle->lgc_pga[0].pg); - index = loh->loh_numrec++; - ext2_set_bit(index, loh->loh_bitmap); - -#if PAGE_SIZE > LLOG_HEADER_SIZE - /* It is possible we are still writing in the first page */ - if (offset < PAGE_SIZE) { - memcpy(loh + offset, rec, reclen); - loghandle->lgh_pga[0].count = offset + reclen; - kunmap(loghandle->lgh_pga[0]->pg); - num_pga = 1; - } else -#endif - { -#if PAGE_SIZE > LLOG_HEADER_SIZE - loghandle->lgh_pga[0].count = LLOG_HEADER_SIZE; -#endif - kunmap(loghandle->lgh_pga[0]->pg); - - memcpy(kmap(loghandle->lgh_pga[1]->pg) + (offset & ~PAGE_MASK), - rec, reclen); - loghandle->lgh_pga[1].count = reclen; - kunmap(loghandle->lgh_pga[1]->pg); + loghandle = llog_current_log(cathandle, reclen, oti); + if (IS_ERR(loghandle)) + RETURN(PTR_ERR(loghandle)); + + loh = loghandle->lgh_hdr; + file = loghandle->lgh_file; + + /* Make sure that records don't cross a chunk boundary, so we can + * process them page-at-a-time if needed. If it will cross a chunk + * boundary, write in a fake (but referenced) entry to pad the chunk. + * + * We know that llog_current_log() will return a loghandle that is + * big enough to hold reclen, so all we care about is padding here. + */ + left = file->f_pos & (LLOG_CHUNK_SIZE - 1); + if (left != reclen && left < reclen + LLOG_MIN_REC_SIZE && + file->f_pos + left < LLOG_MAX_LOG_SIZE) { + struct llog_null_trans { + struct llog_trans_hdr hdr; + __u32 padding; + __u32 end_size; + } pad; + rc = lustre_fwrite(loghandle->lgh_file, &pad, sizeof(pad), + &loghandle->lgh_file->f_pos); + if (rc != sizeof(pad)) { + CERROR("error writing padding record: rc %d\n", rc); + RETURN(rc < 0 ? rc : -EIO); + } + + loghandle->lgh_index++; } - rc = obd_brw(OBD_BRW_WRITE, conn, loghandle->lgh_lsm, num_pga, - loghandle->lgh_pga, NULL, oti); - if (rc) - RETURN(rc); - loghandle->lgh_pga[1].off += reclen; + index = loghandle->lgh_index++; + if (ext2_set_bit(index, loh->loh_bitmap)) { + CERROR("argh, index %u already set in log bitmap?\n", index); + LBUG(); /* should never happen */ + } + loh->loh_numrec++; - logcookie->lgc_lid = loghandle->lgh_lid; - logcookie->lgc_index = index; - logcookie->lgc_offset = offset; + offset = 0; + rc = lustre_fwrite(loghandle->lgh_file, loh, sizeof(*loh), &offset); + if (rc != sizeof(*loh)) { + CERROR("error writing log header: rc %d\n", rc); + RETURN(rc < 0 ? rc : -EIO); + } - RETURN(0); + rc = lustre_fwrite(loghandle->lgh_file, rec, reclen, + &loghandle->lgh_file->f_pos); + if (rc != reclen) { + CERROR("error writing log record: rc %d\n", rc); + RETURN(rc < 0 ? rc : -EIO); + } + + *logcookies = loghandle->lgh_cookie; + logcookies->lgc_index = index; + + RETURN(sizeof(*logcookies)); } -struct loghandle *llog_id2handle(struct lustre_handle *conn, - struct llog_logid *lid) +struct llog_handle *llog_id2handle(struct llog_handle *cathandle, + struct llog_cookie *logcookie) { - struct obd_device *obd = class_conn2obd(conn); - struct llog_handle *cathandle = obd->obd_catalog; - struct llog_handle *loghandle = NULL, *tmp; + struct llog_handle *loghandle; + struct llog_logid *lgl = &logcookie->lgc_lgl; ENTRY; if (cathandle == NULL) - RETURN(NULL); - - list_for_each_entry(tmp, &cathandle->lgh_list, lgh_list) { - if (tmp->lgh_lid == lid) { - loghandle = tmp; - break; + RETURN(ERR_PTR(-EBADF)); + + list_for_each_entry(loghandle, &cathandle->lgh_list, lgh_list) { + struct llog_logid *cgl = &loghandle->lgh_cookie.lgc_lgl; + if (cgl->lgl_oid == lgl->lgl_oid) { + if (cgl->lgl_ogener != lgl->lgl_ogener) { + CERROR("log "LPX64" generation %x != %x\n", + lgl->lgl_oid, + cgl->lgl_ogener, lgl->lgl_ogener); + continue; + } + RETURN(loghandle); } } - if (loghandle == NULL) { - obd_open( + loghandle = cathandle->lgh_log_open(cathandle, logcookie); + if (IS_ERR(loghandle)) { + CERROR("error opening log id "LPX64":%x: rc %d\n", + lgl->lgl_oid, lgl->lgl_ogener, (int)PTR_ERR(loghandle)); + } else { + loghandle->lgh_log_create = cathandle->lgh_log_create; + loghandle->lgh_log_open = cathandle->lgh_log_open; + loghandle->lgh_log_close = cathandle->lgh_log_close; + loghandle->lgh_obd = cathandle->lgh_obd; + list_add(&loghandle->lgh_list, &cathandle->lgh_list); + } + RETURN(loghandle); } -int llog_clear_records(struct lustre_handle *conn, int count, - struct llog_cookie *cookies) +/* For each cookie in the cookie array, we clear the log in-use bit and either: + * - the log is empty, so mark it free in the catalog header and delete it + * - the log is not empty, just write out the log header + * + * The cookies may be in different log files, so we need to get new logs + * each time. + */ +int llog_cancel_records(struct llog_handle *cathandle, int count, + struct llog_cookie *cookies) { + struct llog_catalog_hdr *lch = cathandle->lgh_hdr; + int rc = 0; int i; - for (i = 0; i < count; i++) { + for (i = 0; i < count; i++, cookies++) { struct llog_handle *loghandle; - loghandle = llog_id2handle(conn, cookies[i].lgc_lid); - -} + struct llog_object_hdr *loh; + struct llog_logid *lgl = &cookies->lgc_lgl; + + loghandle = llog_id2handle(cathandle, cookies); + if (IS_ERR(loghandle)) { + if (!rc) + rc = PTR_ERR(loghandle); + continue; + } -int llog_clear_record(struct llog_handle *handle, __u32 recno) -{ - return 0; -} + loh = loghandle->lgh_hdr; + if (!ext2_clear_bit(cookies->lgc_index, loh)) + CERROR("log index %u in "LPX64":%x already clear?\n", + cookies->lgc_index, lgl->lgl_oid, + lgl->lgl_ogener); + else if (--loh->loh_numrec == 0) { + int catindex = loghandle->lgh_cookie.lgc_index; + loff_t offset = 0; + + if (ext2_clear_bit(catindex, lch->lch_bitmap)) { + CERROR("catalog index %u already clear?\n", + catindex); + } else { + int ret = lustre_fwrite(cathandle->lgh_file, + lch, sizeof(*lch), + &offset); + + if (ret != sizeof(*lch)) { + CERROR("log %u cancel error: rc %d\n", + catindex, ret); + if (!rc) + rc = ret; + } + } + loghandle->lgh_log_close(cathandle, loghandle); + } else { + loff_t offset = 0; + int ret = lustre_fwrite(loghandle->lgh_file, loh, + sizeof(*loh), &offset); + + if (ret != sizeof(*loh)) { + CERROR("error cancelling index %u: rc %d\n", + cookies->lgc_index, ret); + /* XXX mark handle bad? */ + if (!rc) + rc = ret; + } + } + } -int llog_delete(struct llog_logid *id) -{ - return 0; + RETURN(rc); } -- 1.8.3.1