From 26135435298c6f78e288e5381165f3417152b673 Mon Sep 17 00:00:00 2001 From: phil Date: Mon, 8 Sep 2003 07:51:02 +0000 Subject: [PATCH] - steal from the reserved bits instead of the bitmap bits, another piece of sage Andreas advice - add an llh_flags field to the header - rename confusing lgh_index to lgh_last_idx - if the log is marked ZAP_WHEN_EMPTY, the log is full, and all of the records are cancelled, call llog_destroy from llog_cancel_rec - extend llog_open to also support reopen-by-name - add llog_vfs_read_blob and llog_vfs_read_header - removed a bunch of unnecessary EXPORT_SYMBOLs - add llog_lvfs_open() --- lustre/obdclass/llog.c | 22 ++++-- lustre/obdclass/llog_cat.c | 7 +- lustre/obdclass/llog_lvfs.c | 167 +++++++++++++++++++++++++++++++++++++------- 3 files changed, 161 insertions(+), 35 deletions(-) diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index 356fd52..fd8d939 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -85,14 +85,28 @@ int llog_cancel_rec(struct llog_handle *loghandle, int index) if (!ext2_clear_bit(index, llh->llh_bitmap)) { CERROR("catalog index %u already clear?\n", index); LBUG(); - } else { - rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0); - if (rc) - CERROR("failure re-writing header %d\n", rc); + } + + llh->llh_count--; + + if (llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY && + llh->llh_count == 1 && + loghandle->lgh_last_idx == LLOG_BITMAP_BYTES * 8) { + rc = llog_destroy(loghandle); + if (rc) + CERROR("failure destroying log after last cancel: %d\n", + rc); LASSERT(rc == 0); + RETURN(rc); } + + rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0); + if (rc) + CERROR("failure re-writing header %d\n", rc); + LASSERT(rc == 0); RETURN(rc); } +EXPORT_SYMBOL(llog_cancel_rec); #if 0 int filter_log_cancel(struct obd_export *exp, struct lov_stripe_md *lsm, diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c index 073dea0..9708180 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/obdclass/llog_cat.c @@ -106,9 +106,8 @@ EXPORT_SYMBOL(llog_cat_new_log); /* Assumes caller has already pushed us into the kernel context and is locking. * We return a lock on the handle to ensure nobody yanks it from us. */ -int llog_cat_id2handle(struct llog_handle *cathandle, - struct llog_handle **res, - struct llog_logid *logid) +int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res, + struct llog_logid *logid) { struct llog_handle *loghandle; int rc = 0; @@ -130,7 +129,7 @@ int llog_cat_id2handle(struct llog_handle *cathandle, } } - rc = llog_open(cathandle->lgh_obd, &loghandle, logid); + rc = llog_open(cathandle->lgh_obd, &loghandle, logid, NULL); if (rc) { CERROR("error opening log id "LPX64":%x: rc %d\n", logid->lgl_oid, logid->lgl_ogen, rc); diff --git a/lustre/obdclass/llog_lvfs.c b/lustre/obdclass/llog_lvfs.c index d2ac439..bcfcd2e 100644 --- a/lustre/obdclass/llog_lvfs.c +++ b/lustre/obdclass/llog_lvfs.c @@ -69,7 +69,7 @@ static int llog_lvfs_pad(struct l_file *file, int len, int index) } static int llog_lvfs_write_blob(struct l_file *file, struct llog_rec_hdr *rec, - void *buf, loff_t off) + void *buf, loff_t off) { int rc; struct llog_rec_tail end; @@ -120,12 +120,56 @@ static int llog_lvfs_write_blob(struct l_file *file, struct llog_rec_hdr *rec, RETURN(rc); } +static int llog_lvfs_read_blob(struct l_file *file, void *buf, int size, + loff_t off) +{ + loff_t offset = off; + int rc; + ENTRY; + + rc = lustre_fread(file, buf, size, &offset); + if (rc != size) { + CERROR("error reading log record: rc %d\n", rc); + RETURN(-EIO); + } + RETURN(0); +} + +static int llog_lvfs_read_header(struct llog_handle *handle) +{ + struct llog_rec_tail tail; + int rc; + ENTRY; + + LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE); + + if (handle->lgh_file->f_dentry->d_inode->i_size == 0) { + CERROR("not reading header from 0-byte log\n"); + RETURN(0); + } + + rc = llog_lvfs_read_blob(handle->lgh_file, handle->lgh_hdr, + LLOG_CHUNK_SIZE, 0); + if (rc) + CERROR("error reading log header\n"); + + rc = llog_lvfs_read_blob(handle->lgh_file, &tail, sizeof(tail), + handle->lgh_file->f_dentry->d_inode->i_size - + sizeof(tail)); + if (rc) + CERROR("error reading log tail\n"); + + handle->lgh_last_idx = tail.lrt_index; + + RETURN(rc); +} + /* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */ /* appends if idx == -1, otherwise overwrites record idx. */ -int llog_lvfs_write_rec(struct llog_handle *loghandle, - struct llog_rec_hdr *rec, - struct llog_cookie *reccookie, int cookiecount, - void *buf, int idx) +static int llog_lvfs_write_rec(struct llog_handle *loghandle, + struct llog_rec_hdr *rec, + struct llog_cookie *reccookie, int cookiecount, + void *buf, int idx) { struct llog_log_hdr *llh; int reclen = rec->lrh_len, index, rc; @@ -142,7 +186,7 @@ int llog_lvfs_write_rec(struct llog_handle *loghandle, /* no header: only allowed to insert record 0 */ if (idx != 0 && !file->f_dentry->d_inode->i_size) { - CERROR("idx != -1 in empty log "); + CERROR("idx != -1 in empty log\n"); LBUG(); } @@ -173,13 +217,13 @@ int llog_lvfs_write_rec(struct llog_handle *loghandle, sizeof(struct llog_rec_tail); if (left != 0 && left <= reclen) { - loghandle->lgh_index++; - rc = llog_lvfs_pad(file, left, loghandle->lgh_index); + loghandle->lgh_last_idx++; + rc = llog_lvfs_pad(file, left, loghandle->lgh_last_idx); if (rc) RETURN(rc); } - index = loghandle->lgh_index++; + index = loghandle->lgh_last_idx++; rec->lrh_index = index; if (ext2_set_bit(index, llh->llh_bitmap)) { CERROR("argh, index %u already set in log bitmap?\n", index); @@ -205,10 +249,10 @@ int llog_lvfs_write_rec(struct llog_handle *loghandle, } RETURN(rc); } -EXPORT_SYMBOL(llog_lvfs_write_rec); -int llog_lvfs_next_block(struct llog_handle *loghandle, int cur_idx, - int next_idx, __u64 *cur_offset, void *buf, int len) +static int llog_lvfs_next_block(struct llog_handle *loghandle, int cur_idx, + int next_idx, __u64 *cur_offset, void *buf, + int len) { int rc; ENTRY; @@ -270,13 +314,11 @@ int llog_lvfs_next_block(struct llog_handle *loghandle, int cur_idx, RETURN(-ENOENT); } -EXPORT_SYMBOL(llog_lvfs_next_block); - /* This is a callback from the llog_* functions. * Assumes caller has already pushed us into the kernel context. */ -int llog_lvfs_create(struct obd_device *obd, - struct llog_handle **res, char *name) +static int llog_lvfs_create(struct obd_device *obd, struct llog_handle **res, + char *name) { char logname[24]; struct llog_handle *handle; @@ -290,6 +332,7 @@ int llog_lvfs_create(struct obd_device *obd, *res = handle; if (name) { + LASSERT(strlen(name) <= 18); sprintf(logname, "LOGS/%s", name); handle->lgh_file = l_filp_open(logname, open_flags, 0644); @@ -333,7 +376,79 @@ out_handle: return rc; } -int llog_lvfs_close(struct llog_handle *handle) +static int llog_lvfs_open(struct obd_device *obd, struct llog_handle **res, + struct llog_logid *logid, char *name) +{ + struct llog_handle *loghandle; + struct l_dentry *dchild = NULL; + int rc, cleanup_phase = 1; + ENTRY; + + *res = loghandle = llog_alloc_handle(); + if (loghandle == NULL) + RETURN(-ENOMEM); + loghandle->lgh_obd = obd; + + if (logid != NULL) { + dchild = obd_lvfs_fid2dentry(obd->obd_log_exp, logid->lgl_oid, + logid->lgl_ogr); + if (IS_ERR(dchild)) { + rc = PTR_ERR(dchild); + CERROR("error looking up log file "LPX64":"LPX64 + ": rc %d\n", + logid->lgl_oid, logid->lgl_ogr, rc); + GOTO(cleanup, rc); + } + + cleanup_phase = 2; + if (dchild->d_inode == NULL) { + rc = -ENOENT; + CERROR("nonexistent log file "LPX64":"LPX64": rc %d\n", + logid->lgl_oid, logid->lgl_ogr, rc); + GOTO(cleanup, rc); + } + + loghandle->lgh_file = l_dentry_open(&obd->obd_ctxt, dchild, + O_RDWR | O_LARGEFILE); + if (IS_ERR(loghandle->lgh_file)) { + rc = PTR_ERR(loghandle->lgh_file); + CERROR("error opening logfile "LPX64":"LPX64": rc %d\n", + logid->lgl_oid, logid->lgl_ogr, rc); + GOTO(cleanup, rc); + } + } else if (name != NULL) { + char logname[24]; + int open_flags = O_RDWR | O_LARGEFILE; + LASSERT(strlen(name) <= 18); + sprintf(logname, "LOGS/%s", name); + + loghandle->lgh_file = l_filp_open(logname, open_flags, 0644); + if (IS_ERR(loghandle->lgh_file)) { + rc = PTR_ERR(loghandle->lgh_file); + CERROR("logfile open by name %s: %d\n", logname, rc); + GOTO(cleanup, rc); + } + } else { + LBUG(); + } + + loghandle->lgh_id.lgl_oid = + loghandle->lgh_file->f_dentry->d_inode->i_ino; + loghandle->lgh_id.lgl_ogen = + loghandle->lgh_file->f_dentry->d_inode->i_generation; + + RETURN(0); + cleanup: + switch (cleanup_phase) { + case 2: + l_dput(dchild); + case 1: + llog_free_handle(loghandle); + } + return rc; +} + +static int llog_lvfs_close(struct llog_handle *handle) { int rc; ENTRY; @@ -346,7 +461,7 @@ int llog_lvfs_close(struct llog_handle *handle) RETURN(rc); } -int llog_lvfs_destroy(struct llog_handle *handle) +static int llog_lvfs_destroy(struct llog_handle *handle) { struct obdo *oa; int rc; @@ -416,10 +531,6 @@ int mds_log_close(struct llog_handle *cathandle, struct llog_handle *loghandle) RETURN(rc); } -struct llog_handle *mds_log_open(struct obd_device *obd, - struct llog_cookie *logcookie); - - /* This is a callback from the llog_* functions. * Assumes caller has already pushed us into the kernel context. */ struct llog_handle *mds_log_open(struct obd_device *obd, @@ -651,12 +762,14 @@ out_handle: struct llog_operations llog_lvfs_ops = { - lop_write_rec: llog_lvfs_write_rec, - lop_next_block: llog_lvfs_next_block, - // lop_open: llog_lvfs_open, + lop_write_rec: llog_lvfs_write_rec, + lop_next_block: llog_lvfs_next_block, + lop_open: llog_lvfs_open, + lop_read_header: llog_lvfs_read_header, + lop_create: llog_lvfs_create, + lop_destroy: llog_lvfs_destroy, + lop_close: llog_lvfs_close, // lop_cancel: llog_lvfs_cancel, - lop_create:llog_lvfs_create, - lop_close:llog_lvfs_close }; EXPORT_SYMBOL(llog_lvfs_ops); -- 1.8.3.1