From: braam Date: Mon, 8 Sep 2003 02:28:43 +0000 (+0000) Subject: - reworking the logging api for better layering X-Git-Tag: v1_7_0_51~2^7~586 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=22fd0e6578d6b4991fc61c128e232f6d31e8474a;p=fs%2Flustre-release.git - reworking the logging api for better layering - beginning of Lustre vfs library lvfs --- diff --git a/lustre/include/linux/lvfs.h b/lustre/include/linux/lvfs.h new file mode 100644 index 0000000..17d876f --- /dev/null +++ b/lustre/include/linux/lvfs.h @@ -0,0 +1,94 @@ +#ifndef __LVFS_H__ +#define __LVFS_H__ + +#if defined __LINUX__ && defined __KERNEL__ +#include +#endif + +#ifdef LIBLUSTRE +#include +#endif + +/* simple.c */ +struct obd_ucred { + __u32 ouc_fsuid; + __u32 ouc_fsgid; + __u32 ouc_cap; + __u32 ouc_suppgid1; + __u32 ouc_suppgid2; +}; + +#define OBD_RUN_CTXT_MAGIC 0xC0FFEEAA +#define OBD_CTXT_DEBUG /* development-only debugging */ +struct obd_run_ctxt { + struct vfsmount *pwdmnt; + struct dentry *pwd; + mm_segment_t fs; + struct obd_ucred ouc; + int ngroups; +#ifdef OBD_CTXT_DEBUG + __u32 magic; +#endif +}; + + +#ifdef OBD_CTXT_DEBUG +#define OBD_SET_CTXT_MAGIC(ctxt) (ctxt)->magic = OBD_RUN_CTXT_MAGIC +#else +#define OBD_SET_CTXT_MAGIC(ctxt) do {} while(0) +#endif + +#ifdef __KERNEL__ + +void push_ctxt(struct obd_run_ctxt *save, struct obd_run_ctxt *new_ctx, + struct obd_ucred *cred); +void pop_ctxt(struct obd_run_ctxt *saved, struct obd_run_ctxt *new_ctx, + struct obd_ucred *cred); +struct dentry *simple_mkdir(struct dentry *dir, char *name, int mode); +struct dentry *simple_mknod(struct dentry *dir, char *name, int mode); +int lustre_fread(struct file *file, void *buf, int len, loff_t *off); +int lustre_fwrite(struct file *file, const void *buf, int len, loff_t *off); +int lustre_fsync(struct file *file); + +static inline void l_dput(struct dentry *de) +{ + if (!de || IS_ERR(de)) + return; + //shrink_dcache_parent(de); + LASSERT(atomic_read(&de->d_count) > 0); + dput(de); +} + +/* We need to hold the inode semaphore over the dcache lookup itself, or we + * run the risk of entering the filesystem lookup path concurrently on SMP + * systems, and instantiating two inodes for the same entry. We still + * protect against concurrent addition/removal races with the DLM locking. + */ +static inline struct dentry *ll_lookup_one_len(char *fid_name, + struct dentry *dparent, + int fid_namelen) +{ + struct dentry *dchild; + + down(&dparent->d_inode->i_sem); + dchild = lookup_one_len(fid_name, dparent, fid_namelen); + up(&dparent->d_inode->i_sem); + + return dchild; +} + +static inline void ll_sleep(int t) +{ + set_current_state(TASK_INTERRUPTIBLE); + schedule_timeout(t * HZ); + set_current_state(TASK_RUNNING); +} +#endif + +#define LL_FID_NAMELEN (16 + 1 + 8 + 1) +static inline int ll_fid2str(char *str, __u64 id, __u32 generation) +{ + return sprintf(str, "%llx:%08x", (unsigned long long)id, generation); +} + +#endif diff --git a/lustre/include/linux/lvfs_linux.h b/lustre/include/linux/lvfs_linux.h new file mode 100644 index 0000000..0c17b70 --- /dev/null +++ b/lustre/include/linux/lvfs_linux.h @@ -0,0 +1,11 @@ +#ifndef __LVFS_LINUX_H__ +#define __LVFS_LINUX_H__ + +#define l_file file +#define l_dentry dentry +#define l_inode inode + +#define l_dentry_open dentry_open +#define l_filp_open filp_open + +#endif diff --git a/lustre/lvfs/.cvsignore b/lustre/lvfs/.cvsignore new file mode 100644 index 0000000..49c6100 --- /dev/null +++ b/lustre/lvfs/.cvsignore @@ -0,0 +1,9 @@ +.Xrefs +config.log +config.status +configure +Makefile +Makefile.in +.deps +TAGS +.*.cmd diff --git a/lustre/lvfs/Makefile.am b/lustre/lvfs/Makefile.am new file mode 100644 index 0000000..4c807b5 --- /dev/null +++ b/lustre/lvfs/Makefile.am @@ -0,0 +1,32 @@ +# Copyright (C) 2001 Cluster File Systems, Inc. +# +# This code is issued under the GNU General Public License. +# See the file COPYING in this distribution +DEFS= +MODULE = lvfs + + +if EXTN +FSMOD = fsfilt_extN +else +FSMOD = fsfilt_ext3 +endif + + +if LIBLUSTRE +lib_LIBRARIES = liblvfs.a +liblvfs_a_SOURCES = lvfs_user_fs.c + +#if MYSQL +#liblvfs_a_SOURCES += lvfs_user_mysql.c +#endif + +else +modulefs_DATA = lvfs.o $(FSMOD).o fsfilt_reiserfs.o + +EXTRA_PROGRAMS = lvfs $(FSMOD) fsfilt_reiserfs +lvfs_SOURCES = lvfs_linux.c fsfilt.c +endif + + +include $(top_srcdir)/Rules diff --git a/lustre/lvfs/fsfilt.c b/lustre/lvfs/fsfilt.c new file mode 100644 index 0000000..6b8837d --- /dev/null +++ b/lustre/lvfs/fsfilt.c @@ -0,0 +1,109 @@ +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif +#define DEBUG_SUBSYSTEM S_FILTER + +#include +#include +#include +#include +#include +#include +#include + +LIST_HEAD(fsfilt_types); + +static struct fsfilt_operations *fsfilt_search_type(const char *type) +{ + struct fsfilt_operations *found; + struct list_head *p; + + list_for_each(p, &fsfilt_types) { + found = list_entry(p, struct fsfilt_operations, fs_list); + if (!strcmp(found->fs_type, type)) { + return found; + } + } + return NULL; +} + +int fsfilt_register_ops(struct fsfilt_operations *fs_ops) +{ + struct fsfilt_operations *found; + + /* lock fsfilt_types list */ + if ((found = fsfilt_search_type(fs_ops->fs_type))) { + if (found != fs_ops) { + CERROR("different operations for type %s\n", + fs_ops->fs_type); + /* unlock fsfilt_types list */ + RETURN(-EEXIST); + } + } else { + PORTAL_MODULE_USE; + list_add(&fs_ops->fs_list, &fsfilt_types); + } + + /* unlock fsfilt_types list */ + return 0; +} + +void fsfilt_unregister_ops(struct fsfilt_operations *fs_ops) +{ + struct list_head *p; + + /* lock fsfilt_types list */ + list_for_each(p, &fsfilt_types) { + struct fsfilt_operations *found; + + found = list_entry(p, typeof(*found), fs_list); + if (found == fs_ops) { + list_del(p); + PORTAL_MODULE_UNUSE; + break; + } + } + /* unlock fsfilt_types list */ +} + +struct fsfilt_operations *fsfilt_get_ops(const char *type) +{ + struct fsfilt_operations *fs_ops; + + /* lock fsfilt_types list */ + if (!(fs_ops = fsfilt_search_type(type))) { + char name[32]; + int rc; + + snprintf(name, sizeof(name) - 1, "fsfilt_%s", type); + name[sizeof(name) - 1] = '\0'; + + if ((rc = request_module(name))) { + fs_ops = fsfilt_search_type(type); + CDEBUG(D_INFO, "Loaded module '%s'\n", name); + if (!fs_ops) + rc = -ENOENT; + } + + if (rc) { + CERROR("Can't find fsfilt_%s interface\n", name); + RETURN(ERR_PTR(rc)); + /* unlock fsfilt_types list */ + } + } + try_module_get(fs_ops->fs_owner); + /* unlock fsfilt_types list */ + + return fs_ops; +} + +void fsfilt_put_ops(struct fsfilt_operations *fs_ops) +{ + module_put(fs_ops->fs_owner); +} + + +EXPORT_SYMBOL(fsfilt_register_ops); +EXPORT_SYMBOL(fsfilt_unregister_ops); +EXPORT_SYMBOL(fsfilt_get_ops); +EXPORT_SYMBOL(fsfilt_put_ops); diff --git a/lustre/lvfs/fsfilt_ext3.c b/lustre/lvfs/fsfilt_ext3.c new file mode 100644 index 0000000..830bf68 --- /dev/null +++ b/lustre/lvfs/fsfilt_ext3.c @@ -0,0 +1,699 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/lib/fsfilt_ext3.c + * Lustre filesystem abstraction routines + * + * Copyright (C) 2002, 2003 Cluster File Systems, Inc. + * Author: Andreas Dilger + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include +#include +#include +#include +#include +#include +#include +#include +/* XXX ugh */ +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) + #include +#else + #include +#endif +#include +#include +#include +#include +#include + +static kmem_cache_t *fcb_cache; +static atomic_t fcb_cache_count = ATOMIC_INIT(0); + +struct fsfilt_cb_data { + struct journal_callback cb_jcb; /* jbd private data - MUST BE FIRST */ + fsfilt_cb_t cb_func; /* MDS/OBD completion function */ + struct obd_device *cb_obd; /* MDS/OBD completion device */ + __u64 cb_last_rcvd; /* MDS/OST last committed operation */ + void *cb_data; /* MDS/OST completion function data */ +}; + +#define EXT3_XATTR_INDEX_LUSTRE 5 +#define XATTR_LUSTRE_MDS_OBJID "system.lustre_mds_objid" + +/* + * We don't currently need any additional blocks for rmdir and + * unlink transactions because we are storing the OST oa_id inside + * the inode (which we will be changing anyways as part of this + * transaction). + */ +static void *fsfilt_ext3_start(struct inode *inode, int op, void *desc_private) +{ + /* For updates to the last recieved file */ + int nblocks = EXT3_DATA_TRANS_BLOCKS; + void *handle; + + LASSERT(current->journal_info == NULL); + + switch(op) { + case FSFILT_OP_CREATE_LOG: + nblocks += EXT3_INDEX_EXTRA_TRANS_BLOCKS+EXT3_DATA_TRANS_BLOCKS; + op = FSFILT_OP_CREATE; + break; + case FSFILT_OP_UNLINK_LOG: + nblocks += EXT3_INDEX_EXTRA_TRANS_BLOCKS+EXT3_DATA_TRANS_BLOCKS; + op = FSFILT_OP_UNLINK; + break; + } + + switch(op) { + case FSFILT_OP_RMDIR: + case FSFILT_OP_UNLINK: + nblocks += EXT3_DELETE_TRANS_BLOCKS; + break; + case FSFILT_OP_RENAME: + /* modify additional directory */ + nblocks += EXT3_DATA_TRANS_BLOCKS; + /* no break */ + case FSFILT_OP_SYMLINK: + /* additional block + block bitmap + GDT for long symlink */ + nblocks += 3; + /* no break */ + case FSFILT_OP_CREATE: + case FSFILT_OP_MKDIR: + case FSFILT_OP_MKNOD: + /* modify one inode + block bitmap + GDT */ + nblocks += 3; + /* no break */ + case FSFILT_OP_LINK: + /* modify parent directory */ + nblocks += EXT3_INDEX_EXTRA_TRANS_BLOCKS+EXT3_DATA_TRANS_BLOCKS; + break; + case FSFILT_OP_SETATTR: + /* Setattr on inode */ + nblocks += 1; + break; + default: CERROR("unknown transaction start op %d\n", op); + LBUG(); + } + + LASSERT(current->journal_info == desc_private); + lock_kernel(); + handle = journal_start(EXT3_JOURNAL(inode), nblocks); + unlock_kernel(); + + if (!IS_ERR(handle)) + LASSERT(current->journal_info == handle); + return handle; +} + +/* + * Calculate the number of buffer credits needed to write multiple pages in + * a single ext3 transaction. No, this shouldn't be here, but as yet ext3 + * doesn't have a nice API for calculating this sort of thing in advance. + * + * See comment above ext3_writepage_trans_blocks for details. We assume + * no data journaling is being done, but it does allow for all of the pages + * being non-contiguous. If we are guaranteed contiguous pages we could + * reduce the number of (d)indirect blocks a lot. + * + * With N blocks per page and P pages, for each inode we have at most: + * N*P indirect + * min(N*P, blocksize/4 + 1) dindirect blocks + * niocount tindirect + * + * For the entire filesystem, we have at most: + * min(sum(nindir + P), ngroups) bitmap blocks (from the above) + * min(sum(nindir + P), gdblocks) group descriptor blocks (from the above) + * objcount inode blocks + * 1 superblock + * 2 * EXT3_SINGLEDATA_TRANS_BLOCKS for the quota files + * + * 1 EXT3_DATA_TRANS_BLOCKS for the last_rcvd update. + */ +static int fsfilt_ext3_credits_needed(int objcount, struct fsfilt_objinfo *fso) +{ + struct super_block *sb = fso->fso_dentry->d_inode->i_sb; + int blockpp = 1 << (PAGE_CACHE_SHIFT - sb->s_blocksize_bits); + int addrpp = EXT3_ADDR_PER_BLOCK(sb) * blockpp; + int nbitmaps = 0; + int ngdblocks = 0; + int needed = objcount + 1; + int i; + + for (i = 0; i < objcount; i++, fso++) { + int nblocks = fso->fso_bufcnt * blockpp; + int ndindirect = min(nblocks, addrpp + 1); + int nindir = nblocks + ndindirect + 1; + + nbitmaps += nindir + nblocks; + ngdblocks += nindir + nblocks; + + needed += nindir; + } + + /* Assumes ext3 and ext3 have same sb_info layout at the start. */ + if (nbitmaps > EXT3_SB(sb)->s_groups_count) + nbitmaps = EXT3_SB(sb)->s_groups_count; + if (ngdblocks > EXT3_SB(sb)->s_gdb_count) + ngdblocks = EXT3_SB(sb)->s_gdb_count; + + needed += nbitmaps + ngdblocks; + + /* last_rcvd update */ + needed += EXT3_DATA_TRANS_BLOCKS; + +#ifdef CONFIG_QUOTA + /* We assume that there will be 1 bit set in s_dquot.flags for each + * quota file that is active. This is at least true for now. + */ + needed += hweight32(sb_any_quota_enabled(sb)) * + EXT3_SINGLEDATA_TRANS_BLOCKS; +#endif + + return needed; +} + +/* We have to start a huge journal transaction here to hold all of the + * metadata for the pages being written here. This is necessitated by + * the fact that we do lots of prepare_write operations before we do + * any of the matching commit_write operations, so even if we split + * up to use "smaller" transactions none of them could complete until + * all of them were opened. By having a single journal transaction, + * we eliminate duplicate reservations for common blocks like the + * superblock and group descriptors or bitmaps. + * + * We will start the transaction here, but each prepare_write will + * add a refcount to the transaction, and each commit_write will + * remove a refcount. The transaction will be closed when all of + * the pages have been written. + */ +static void *fsfilt_ext3_brw_start(int objcount, struct fsfilt_objinfo *fso, + int niocount, void *desc_private) +{ + journal_t *journal; + handle_t *handle; + int needed; + ENTRY; + + LASSERT(current->journal_info == desc_private); + journal = EXT3_SB(fso->fso_dentry->d_inode->i_sb)->s_journal; + needed = fsfilt_ext3_credits_needed(objcount, fso); + + /* The number of blocks we could _possibly_ dirty can very large. + * We reduce our request if it is absurd (and we couldn't get that + * many credits for a single handle anyways). + * + * At some point we have to limit the size of I/Os sent at one time, + * increase the size of the journal, or we have to calculate the + * actual journal requirements more carefully by checking all of + * the blocks instead of being maximally pessimistic. It remains to + * be seen if this is a real problem or not. + */ + if (needed > journal->j_max_transaction_buffers) { + CERROR("want too many journal credits (%d) using %d instead\n", + needed, journal->j_max_transaction_buffers); + needed = journal->j_max_transaction_buffers; + } + + lock_kernel(); + handle = journal_start(journal, needed); + unlock_kernel(); + if (IS_ERR(handle)) { + CERROR("can't get handle for %d credits: rc = %ld\n", needed, + PTR_ERR(handle)); + } else { + LASSERT(handle->h_buffer_credits >= needed); + LASSERT(current->journal_info == handle); + } + + RETURN(handle); +} + +static int fsfilt_ext3_commit(struct inode *inode, void *h, int force_sync) +{ + int rc; + handle_t *handle = h; + + LASSERT(current->journal_info == handle); + if (force_sync) + handle->h_sync = 1; /* recovery likes this */ + + lock_kernel(); + rc = journal_stop(handle); + unlock_kernel(); + + LASSERT(current->journal_info == NULL); + return rc; +} + +static int fsfilt_ext3_setattr(struct dentry *dentry, void *handle, + struct iattr *iattr, int do_trunc) +{ + struct inode *inode = dentry->d_inode; + int rc; + + lock_kernel(); + + /* A _really_ horrible hack to avoid removing the data stored + * in the block pointers; this is really the "small" stripe MD data. + * We can avoid further hackery by virtue of the MDS file size being + * zero all the time (which doesn't invoke block truncate at unlink + * time), so we assert we never change the MDS file size from zero. */ + if (iattr->ia_valid & ATTR_SIZE && !do_trunc) { + /* ATTR_SIZE would invoke truncate: clear it */ + iattr->ia_valid &= ~ATTR_SIZE; + EXT3_I(inode)->i_disksize = inode->i_size = iattr->ia_size; + + /* make sure _something_ gets set - so new inode + * goes to disk (probably won't work over XFS */ + if (!(iattr->ia_valid & (ATTR_MODE | ATTR_MTIME | ATTR_CTIME))){ + iattr->ia_valid |= ATTR_MODE; + iattr->ia_mode = inode->i_mode; + } + } + + /* Don't allow setattr to change file type */ + iattr->ia_mode = (inode->i_mode & S_IFMT)|(iattr->ia_mode & ~S_IFMT); + + if (inode->i_op->setattr) { + rc = inode->i_op->setattr(dentry, iattr); + } else { + rc = inode_change_ok(inode, iattr); + if (!rc) + rc = inode_setattr(inode, iattr); + } + + unlock_kernel(); + + return rc; +} + +static int fsfilt_ext3_set_md(struct inode *inode, void *handle, + void *lmm, int lmm_size) +{ + int rc; + + /* Nasty hack city - store stripe MD data in the block pointers if + * it will fit, because putting it in an EA currently kills the MDS + * performance. We'll fix this with "fast EAs" in the future. + */ + if (inode->i_blocks == 0 && lmm_size <= sizeof(EXT3_I(inode)->i_data) - + sizeof(EXT3_I(inode)->i_data[0])) { + /* XXX old_size is debugging only */ + int old_size = EXT3_I(inode)->i_data[0]; + if (old_size != 0) { + LASSERT(old_size < sizeof(EXT3_I(inode)->i_data)); + CERROR("setting EA on %lu again... interesting\n", + inode->i_ino); + } + + EXT3_I(inode)->i_data[0] = cpu_to_le32(lmm_size); + memcpy(&EXT3_I(inode)->i_data[1], lmm, lmm_size); + mark_inode_dirty(inode); + return 0; + } else { + down(&inode->i_sem); + lock_kernel(); +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) + rc = ext3_xattr_set(handle, inode, EXT3_XATTR_INDEX_LUSTRE, + XATTR_LUSTRE_MDS_OBJID, lmm, lmm_size, 0); +#else + rc = ext3_xattr_set_handle(handle, inode, + EXT3_XATTR_INDEX_LUSTRE, + XATTR_LUSTRE_MDS_OBJID, lmm, + lmm_size, 0); +#endif + unlock_kernel(); + up(&inode->i_sem); + } + + if (rc) + CERROR("error adding MD data to inode %lu: rc = %d\n", + inode->i_ino, rc); + return rc; +} + +static int fsfilt_ext3_get_md(struct inode *inode, void *lmm, int lmm_size) +{ + int rc; + + if (inode->i_blocks == 0 && EXT3_I(inode)->i_data[0]) { + int size = le32_to_cpu(EXT3_I(inode)->i_data[0]); + LASSERT(size < sizeof(EXT3_I(inode)->i_data)); + if (lmm) { + if (size > lmm_size) + return -ERANGE; + memcpy(lmm, &EXT3_I(inode)->i_data[1], size); + } + return size; + } + + down(&inode->i_sem); + lock_kernel(); + rc = ext3_xattr_get(inode, EXT3_XATTR_INDEX_LUSTRE, + XATTR_LUSTRE_MDS_OBJID, lmm, lmm_size); + unlock_kernel(); + up(&inode->i_sem); + + /* This gives us the MD size */ + if (lmm == NULL) + return (rc == -ENODATA) ? 0 : rc; + + if (rc < 0) { + CDEBUG(D_INFO, "error getting EA %s from inode %lu: " + "rc = %d\n", XATTR_LUSTRE_MDS_OBJID, inode->i_ino, rc); + memset(lmm, 0, lmm_size); + return (rc == -ENODATA) ? 0 : rc; + } + + return rc; +} + +static ssize_t fsfilt_ext3_readpage(struct file *file, char *buf, size_t count, + loff_t *off) +{ + struct inode *inode = file->f_dentry->d_inode; + int rc = 0; + + if (S_ISREG(inode->i_mode)) + rc = file->f_op->read(file, buf, count, off); + else { + const int blkbits = inode->i_sb->s_blocksize_bits; + const int blksize = inode->i_sb->s_blocksize; + + CDEBUG(D_EXT2, "reading "LPSZ" at dir %lu+%llu\n", + count, inode->i_ino, *off); + while (count > 0) { + struct buffer_head *bh; + + bh = NULL; + if (*off < inode->i_size) { + int err = 0; + + bh = ext3_bread(NULL, inode, *off >> blkbits, + 0, &err); + + CDEBUG(D_EXT2, "read %u@%llu\n", blksize, *off); + + if (bh) { + memcpy(buf, bh->b_data, blksize); + brelse(bh); + } else if (err) { + /* XXX in theory we should just fake + * this buffer and continue like ext3, + * especially if this is a partial read + */ + CERROR("error read dir %lu+%llu: %d\n", + inode->i_ino, *off, err); + RETURN(err); + } + } + if (!bh) { + struct ext3_dir_entry_2 *fake = (void *)buf; + + CDEBUG(D_EXT2, "fake %u@%llu\n", blksize, *off); + memset(fake, 0, sizeof(*fake)); + fake->rec_len = cpu_to_le32(blksize); + } + count -= blksize; + buf += blksize; + *off += blksize; + rc += blksize; + } + } + + return rc; +} + +static void fsfilt_ext3_cb_func(struct journal_callback *jcb, int error) +{ + struct fsfilt_cb_data *fcb = (struct fsfilt_cb_data *)jcb; + + fcb->cb_func(fcb->cb_obd, fcb->cb_last_rcvd, fcb->cb_data, error); + + OBD_SLAB_FREE(fcb, fcb_cache, sizeof *fcb); + atomic_dec(&fcb_cache_count); +} + +static int fsfilt_ext3_set_last_rcvd(struct obd_device *obd, __u64 last_rcvd, + void *handle, fsfilt_cb_t cb_func, + void *cb_data) +{ + struct fsfilt_cb_data *fcb; + + OBD_SLAB_ALLOC(fcb, fcb_cache, GFP_NOFS, sizeof *fcb); + if (fcb == NULL) + RETURN(-ENOMEM); + + atomic_inc(&fcb_cache_count); + fcb->cb_func = cb_func; + fcb->cb_obd = obd; + fcb->cb_last_rcvd = last_rcvd; + fcb->cb_data = cb_data; + + CDEBUG(D_EXT2, "set callback for last_rcvd: "LPD64"\n", last_rcvd); + lock_kernel(); + journal_callback_set(handle, fsfilt_ext3_cb_func, + (struct journal_callback *)fcb); + unlock_kernel(); + + return 0; +} + +static int fsfilt_ext3_journal_data(struct file *filp) +{ +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) + /* bug 1576: enable data journaling on 2.5 when appropriate */ + struct inode *inode = filp->f_dentry->d_inode; + EXT3_I(inode)->i_flags |= EXT3_JOURNAL_DATA_FL; +#endif + return 0; +} + +/* + * We need to hack the return value for the free inode counts because + * the current EA code requires one filesystem block per inode with EAs, + * so it is possible to run out of blocks before we run out of inodes. + * + * This can be removed when the ext3 EA code is fixed. + */ +static int fsfilt_ext3_statfs(struct super_block *sb, struct obd_statfs *osfs) +{ + struct kstatfs sfs; + int rc = vfs_statfs(sb, &sfs); + + if (!rc && sfs.f_bfree < sfs.f_ffree) { + sfs.f_files = (sfs.f_files - sfs.f_ffree) + sfs.f_bfree; + sfs.f_ffree = sfs.f_bfree; + } + + statfs_pack(osfs, &sfs); + return rc; +} + +static int fsfilt_ext3_sync(struct super_block *sb) +{ + return ext3_force_commit(sb); +} + +extern int ext3_prep_san_write(struct inode *inode, long *blocks, + int nblocks, loff_t newsize); +static int fsfilt_ext3_prep_san_write(struct inode *inode, long *blocks, + int nblocks, loff_t newsize) +{ + return ext3_prep_san_write(inode, blocks, nblocks, newsize); +} + +static int fsfilt_ext3_read_record(struct file * file, void *buf, + int size, loff_t *offs) +{ + struct buffer_head *bh; + unsigned long block, boffs; + struct inode *inode = file->f_dentry->d_inode; + int err; + + if (inode->i_size < *offs + size) { + CERROR("file size %llu is too short for read %u@%llu\n", + inode->i_size, size, *offs); + return -EIO; + } + + block = *offs >> inode->i_blkbits; + bh = ext3_bread(NULL, inode, block, 0, &err); + if (!bh) { + CERROR("can't read block: %d\n", err); + return err; + } + + boffs = (unsigned)*offs % bh->b_size; + if (boffs + size > bh->b_size) { + CERROR("request crosses block's border. offset %llu, size %u\n", + *offs, size); + brelse(bh); + return -EIO; + } + + memcpy(buf, bh->b_data + boffs, size); + brelse(bh); + *offs += size; + return 0; +} + +static int fsfilt_ext3_write_record(struct file *file, void *buf, int size, + loff_t *offs, int force_sync) +{ + struct buffer_head *bh; + unsigned long block, boffs; + struct inode *inode = file->f_dentry->d_inode; + loff_t old_size = inode->i_size; + journal_t *journal; + handle_t *handle; + int err; + + journal = EXT3_SB(inode->i_sb)->s_journal; + handle = journal_start(journal, EXT3_DATA_TRANS_BLOCKS + 2); + if (IS_ERR(handle)) { + CERROR("can't start transaction\n"); + return PTR_ERR(handle); + } + + block = *offs >> inode->i_blkbits; + if (*offs + size > inode->i_size) { + down(&inode->i_sem); + if (*offs + size > inode->i_size) + inode->i_size = ((loff_t)block + 1) << inode->i_blkbits; + up(&inode->i_sem); + } + + bh = ext3_bread(handle, inode, block, 1, &err); + if (!bh) { + CERROR("can't read/create block: %d\n", err); + goto out; + } + + /* This is a hack only needed because ext3_get_block_handle() updates + * i_disksize after marking the inode dirty in ext3_splice_branch(). + * We will fix that when we get a chance, as ext3_mark_inode_dirty() + * is not without cost, nor is it even exported. + */ + if (inode->i_size > old_size) + mark_inode_dirty(inode); + + boffs = (unsigned)*offs % bh->b_size; + if (boffs + size > bh->b_size) { + CERROR("request crosses block's border. offset %llu, size %u\n", + *offs, size); + err = -EIO; + goto out; + } + + err = ext3_journal_get_write_access(handle, bh); + if (err) { + CERROR("journal_get_write_access() returned error %d\n", err); + goto out; + } + memcpy(bh->b_data + boffs, buf, size); + err = ext3_journal_dirty_metadata(handle, bh); + if (err) { + CERROR("journal_dirty_metadata() returned error %d\n", err); + goto out; + } + + if (force_sync) + handle->h_sync = 1; /* recovery likes this */ +out: + if (bh) + brelse(bh); + journal_stop(handle); + if (err == 0) + *offs += size; + return err; +} + +static struct fsfilt_operations fsfilt_ext3_ops = { + fs_type: "ext3", + fs_owner: THIS_MODULE, + fs_start: fsfilt_ext3_start, + fs_brw_start: fsfilt_ext3_brw_start, + fs_commit: fsfilt_ext3_commit, + fs_setattr: fsfilt_ext3_setattr, + fs_set_md: fsfilt_ext3_set_md, + fs_get_md: fsfilt_ext3_get_md, + fs_readpage: fsfilt_ext3_readpage, + fs_journal_data: fsfilt_ext3_journal_data, + fs_set_last_rcvd: fsfilt_ext3_set_last_rcvd, + fs_statfs: fsfilt_ext3_statfs, + fs_sync: fsfilt_ext3_sync, + fs_prep_san_write: fsfilt_ext3_prep_san_write, + fs_write_record: fsfilt_ext3_write_record, + fs_read_record: fsfilt_ext3_read_record, +}; + +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) + +#warning "fsfilt_ext3_init() and fsfilt_ext3_exit() aren't called on 2.6. MUST be fixed" + +static int __init fsfilt_ext3_init(void) +{ + int rc; + + //rc = ext3_xattr_register(); + fcb_cache = kmem_cache_create("fsfilt_ext3_fcb", + sizeof(struct fsfilt_cb_data), 0, + 0, NULL, NULL); + if (!fcb_cache) { + CERROR("error allocating fsfilt journal callback cache\n"); + GOTO(out, rc = -ENOMEM); + } + + rc = fsfilt_register_ops(&fsfilt_ext3_ops); + + if (rc) + kmem_cache_destroy(fcb_cache); +out: + return rc; +} + +static void __exit fsfilt_ext3_exit(void) +{ + int rc; + + fsfilt_unregister_ops(&fsfilt_ext3_ops); + rc = kmem_cache_destroy(fcb_cache); + + if (rc || atomic_read(&fcb_cache_count)) { + CERROR("can't free fsfilt callback cache: count %d, rc = %d\n", + atomic_read(&fcb_cache_count), rc); + } + + //rc = ext3_xattr_unregister(); +} + +MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_DESCRIPTION("Lustre ext3 Filesystem Helper v0.1"); +MODULE_LICENSE("GPL"); + +module_init(fsfilt_ext3_init); +module_exit(fsfilt_ext3_exit); + +#endif + diff --git a/lustre/lvfs/fsfilt_extN.c b/lustre/lvfs/fsfilt_extN.c new file mode 100644 index 0000000..d5adb5a --- /dev/null +++ b/lustre/lvfs/fsfilt_extN.c @@ -0,0 +1,678 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/lib/fsfilt_extN.c + * Lustre filesystem abstraction routines + * + * Copyright (C) 2002, 2003 Cluster File Systems, Inc. + * Author: Andreas Dilger + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static kmem_cache_t *fcb_cache; +static atomic_t fcb_cache_count = ATOMIC_INIT(0); + +struct fsfilt_cb_data { + struct journal_callback cb_jcb; /* jbd private data - MUST BE FIRST */ + fsfilt_cb_t cb_func; /* MDS/OBD completion function */ + struct obd_device *cb_obd; /* MDS/OBD completion device */ + __u64 cb_last_rcvd; /* MDS/OST last committed operation */ + void *cb_data; /* MDS/OST completion function data */ +}; + +#define EXTN_XATTR_INDEX_LUSTRE 5 +#define XATTR_LUSTRE_MDS_OBJID "system.lustre_mds_objid" + +/* + * We don't currently need any additional blocks for rmdir and + * unlink transactions because we are storing the OST oa_id inside + * the inode (which we will be changing anyways as part of this + * transaction). + */ +static void *fsfilt_extN_start(struct inode *inode, int op, void *desc_private) +{ + /* For updates to the last recieved file */ + int nblocks = EXTN_DATA_TRANS_BLOCKS; + void *handle; + + LASSERT(current->journal_info == NULL); + + switch(op) { + case FSFILT_OP_CREATE_LOG: + nblocks += EXTN_INDEX_EXTRA_TRANS_BLOCKS+EXTN_DATA_TRANS_BLOCKS; + op = FSFILT_OP_CREATE; + break; + case FSFILT_OP_UNLINK_LOG: + nblocks += EXTN_INDEX_EXTRA_TRANS_BLOCKS+EXTN_DATA_TRANS_BLOCKS; + op = FSFILT_OP_UNLINK; + break; + } + + switch(op) { + case FSFILT_OP_RMDIR: + case FSFILT_OP_UNLINK: + nblocks += EXTN_DELETE_TRANS_BLOCKS; + break; + case FSFILT_OP_RENAME: + /* modify additional directory */ + nblocks += EXTN_DATA_TRANS_BLOCKS; + /* no break */ + case FSFILT_OP_SYMLINK: + /* additional block + block bitmap + GDT for long symlink */ + nblocks += 3; + /* no break */ + case FSFILT_OP_CREATE: + case FSFILT_OP_MKDIR: + case FSFILT_OP_MKNOD: + /* modify one inode + block bitmap + GDT */ + nblocks += 3; + /* no break */ + case FSFILT_OP_LINK: + /* modify parent directory */ + nblocks += EXTN_INDEX_EXTRA_TRANS_BLOCKS+EXTN_DATA_TRANS_BLOCKS; + break; + case FSFILT_OP_SETATTR: + /* Setattr on inode */ + nblocks += 1; + break; + default: CERROR("unknown transaction start op %d\n", op); + LBUG(); + } + + LASSERT(current->journal_info == desc_private); + lock_kernel(); + handle = journal_start(EXTN_JOURNAL(inode), nblocks); + unlock_kernel(); + + if (!IS_ERR(handle)) + LASSERT(current->journal_info == handle); + return handle; +} + +/* + * Calculate the number of buffer credits needed to write multiple pages in + * a single extN transaction. No, this shouldn't be here, but as yet extN + * doesn't have a nice API for calculating this sort of thing in advance. + * + * See comment above extN_writepage_trans_blocks for details. We assume + * no data journaling is being done, but it does allow for all of the pages + * being non-contiguous. If we are guaranteed contiguous pages we could + * reduce the number of (d)indirect blocks a lot. + * + * With N blocks per page and P pages, for each inode we have at most: + * N*P indirect + * min(N*P, blocksize/4 + 1) dindirect blocks + * niocount tindirect + * + * For the entire filesystem, we have at most: + * min(sum(nindir + P), ngroups) bitmap blocks (from the above) + * min(sum(nindir + P), gdblocks) group descriptor blocks (from the above) + * objcount inode blocks + * 1 superblock + * 2 * EXTN_SINGLEDATA_TRANS_BLOCKS for the quota files + * + * 1 EXTN_DATA_TRANS_BLOCKS for the last_rcvd update. + */ +static int fsfilt_extN_credits_needed(int objcount, struct fsfilt_objinfo *fso) +{ + struct super_block *sb = fso->fso_dentry->d_inode->i_sb; + int blockpp = 1 << (PAGE_CACHE_SHIFT - sb->s_blocksize_bits); + int addrpp = EXTN_ADDR_PER_BLOCK(sb) * blockpp; + int nbitmaps = 0; + int ngdblocks = 0; + int needed = objcount + 1; + int i; + + for (i = 0; i < objcount; i++, fso++) { + int nblocks = fso->fso_bufcnt * blockpp; + int ndindirect = min(nblocks, addrpp + 1); + int nindir = nblocks + ndindirect + 1; + + nbitmaps += nindir + nblocks; + ngdblocks += nindir + nblocks; + + needed += nindir; + } + + /* Assumes extN and extN have same sb_info layout at the start. */ + if (nbitmaps > EXTN_SB(sb)->s_groups_count) + nbitmaps = EXTN_SB(sb)->s_groups_count; + if (ngdblocks > EXTN_SB(sb)->s_gdb_count) + ngdblocks = EXTN_SB(sb)->s_gdb_count; + + needed += nbitmaps + ngdblocks; + + /* last_rcvd update */ + needed += EXTN_DATA_TRANS_BLOCKS; + +#ifdef CONFIG_QUOTA + /* We assume that there will be 1 bit set in s_dquot.flags for each + * quota file that is active. This is at least true for now. + */ + needed += hweight32(sb_any_quota_enabled(sb)) * + EXTN_SINGLEDATA_TRANS_BLOCKS; +#endif + + return needed; +} + +/* We have to start a huge journal transaction here to hold all of the + * metadata for the pages being written here. This is necessitated by + * the fact that we do lots of prepare_write operations before we do + * any of the matching commit_write operations, so even if we split + * up to use "smaller" transactions none of them could complete until + * all of them were opened. By having a single journal transaction, + * we eliminate duplicate reservations for common blocks like the + * superblock and group descriptors or bitmaps. + * + * We will start the transaction here, but each prepare_write will + * add a refcount to the transaction, and each commit_write will + * remove a refcount. The transaction will be closed when all of + * the pages have been written. + */ +static void *fsfilt_extN_brw_start(int objcount, struct fsfilt_objinfo *fso, + int niocount, void *desc_private) +{ + journal_t *journal; + handle_t *handle; + int needed; + ENTRY; + + LASSERT(current->journal_info == desc_private); + journal = EXTN_SB(fso->fso_dentry->d_inode->i_sb)->s_journal; + needed = fsfilt_extN_credits_needed(objcount, fso); + + /* The number of blocks we could _possibly_ dirty can very large. + * We reduce our request if it is absurd (and we couldn't get that + * many credits for a single handle anyways). + * + * At some point we have to limit the size of I/Os sent at one time, + * increase the size of the journal, or we have to calculate the + * actual journal requirements more carefully by checking all of + * the blocks instead of being maximally pessimistic. It remains to + * be seen if this is a real problem or not. + */ + if (needed > journal->j_max_transaction_buffers) { + CERROR("want too many journal credits (%d) using %d instead\n", + needed, journal->j_max_transaction_buffers); + needed = journal->j_max_transaction_buffers; + } + + lock_kernel(); + handle = journal_start(journal, needed); + unlock_kernel(); + if (IS_ERR(handle)) { + CERROR("can't get handle for %d credits: rc = %ld\n", needed, + PTR_ERR(handle)); + } else { + LASSERT(handle->h_buffer_credits >= needed); + LASSERT(current->journal_info == handle); + } + + RETURN(handle); +} + +static int fsfilt_extN_commit(struct inode *inode, void *h, int force_sync) +{ + int rc; + handle_t *handle = h; + + LASSERT(current->journal_info == handle); + if (force_sync) + handle->h_sync = 1; /* recovery likes this */ + + lock_kernel(); + rc = journal_stop(handle); + unlock_kernel(); + + LASSERT(current->journal_info == NULL); + return rc; +} + +static int fsfilt_extN_setattr(struct dentry *dentry, void *handle, + struct iattr *iattr, int do_trunc) +{ + struct inode *inode = dentry->d_inode; + int rc; + + lock_kernel(); + + /* A _really_ horrible hack to avoid removing the data stored + * in the block pointers; this is really the "small" stripe MD data. + * We can avoid further hackery by virtue of the MDS file size being + * zero all the time (which doesn't invoke block truncate at unlink + * time), so we assert we never change the MDS file size from zero. */ + if (iattr->ia_valid & ATTR_SIZE && !do_trunc) { + /* ATTR_SIZE would invoke truncate: clear it */ + iattr->ia_valid &= ~ATTR_SIZE; + EXTN_I(inode)->i_disksize = inode->i_size = iattr->ia_size; + + /* make sure _something_ gets set - so new inode + * goes to disk (probably won't work over XFS */ + if (!(iattr->ia_valid & (ATTR_MODE | ATTR_MTIME | ATTR_CTIME))){ + iattr->ia_valid |= ATTR_MODE; + iattr->ia_mode = inode->i_mode; + } + } + + /* Don't allow setattr to change file type */ + iattr->ia_mode = (inode->i_mode & S_IFMT)|(iattr->ia_mode & ~S_IFMT); + + if (inode->i_op->setattr) { + rc = inode->i_op->setattr(dentry, iattr); + } else { + rc = inode_change_ok(inode, iattr); + if (!rc) + rc = inode_setattr(inode, iattr); + } + + unlock_kernel(); + + return rc; +} + +static int fsfilt_extN_set_md(struct inode *inode, void *handle, + void *lmm, int lmm_size) +{ + int rc; + + /* Nasty hack city - store stripe MD data in the block pointers if + * it will fit, because putting it in an EA currently kills the MDS + * performance. We'll fix this with "fast EAs" in the future. + */ + if (inode->i_blocks == 0 && lmm_size <= sizeof(EXTN_I(inode)->i_data) - + sizeof(EXTN_I(inode)->i_data[0])) { + /* XXX old_size is debugging only */ + int old_size = EXTN_I(inode)->i_data[0]; + if (old_size != 0) { + LASSERT(old_size < sizeof(EXTN_I(inode)->i_data)); + CERROR("setting EA on %lu again... interesting\n", + inode->i_ino); + } + + EXTN_I(inode)->i_data[0] = cpu_to_le32(lmm_size); + memcpy(&EXTN_I(inode)->i_data[1], lmm, lmm_size); + mark_inode_dirty(inode); + return 0; + } else { + down(&inode->i_sem); + lock_kernel(); + rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE, + XATTR_LUSTRE_MDS_OBJID, lmm, lmm_size, 0); + unlock_kernel(); + up(&inode->i_sem); + } + + if (rc) + CERROR("error adding MD data to inode %lu: rc = %d\n", + inode->i_ino, rc); + return rc; +} + +static int fsfilt_extN_get_md(struct inode *inode, void *lmm, int lmm_size) +{ + int rc; + + if (inode->i_blocks == 0 && EXTN_I(inode)->i_data[0]) { + int size = le32_to_cpu(EXTN_I(inode)->i_data[0]); + LASSERT(size < sizeof(EXTN_I(inode)->i_data)); + if (lmm) { + if (size > lmm_size) + return -ERANGE; + memcpy(lmm, &EXTN_I(inode)->i_data[1], size); + } + return size; + } + + down(&inode->i_sem); + lock_kernel(); + rc = extN_xattr_get(inode, EXTN_XATTR_INDEX_LUSTRE, + XATTR_LUSTRE_MDS_OBJID, lmm, lmm_size); + unlock_kernel(); + up(&inode->i_sem); + + /* This gives us the MD size */ + if (lmm == NULL) + return (rc == -ENODATA) ? 0 : rc; + + if (rc < 0) { + CDEBUG(D_INFO, "error getting EA %s from inode %lu: " + "rc = %d\n", XATTR_LUSTRE_MDS_OBJID, inode->i_ino, rc); + memset(lmm, 0, lmm_size); + return (rc == -ENODATA) ? 0 : rc; + } + + return rc; +} + +static ssize_t fsfilt_extN_readpage(struct file *file, char *buf, size_t count, + loff_t *off) +{ + struct inode *inode = file->f_dentry->d_inode; + int rc = 0; + + if (S_ISREG(inode->i_mode)) + rc = file->f_op->read(file, buf, count, off); + else { + const int blkbits = inode->i_sb->s_blocksize_bits; + const int blksize = inode->i_sb->s_blocksize; + + CDEBUG(D_EXT2, "reading "LPSZ" at dir %lu+%llu\n", + count, inode->i_ino, *off); + while (count > 0) { + struct buffer_head *bh; + + bh = NULL; + if (*off < inode->i_size) { + int err = 0; + + bh = extN_bread(NULL, inode, *off >> blkbits, + 0, &err); + + CDEBUG(D_EXT2, "read %u@%llu\n", blksize, *off); + + if (bh) { + memcpy(buf, bh->b_data, blksize); + brelse(bh); + } else if (err) { + /* XXX in theory we should just fake + * this buffer and continue like extN, + * especially if this is a partial read + */ + CERROR("error read dir %lu+%llu: %d\n", + inode->i_ino, *off, err); + RETURN(err); + } + } + if (!bh) { + struct extN_dir_entry_2 *fake = (void *)buf; + + CDEBUG(D_EXT2, "fake %u@%llu\n", blksize, *off); + memset(fake, 0, sizeof(*fake)); + fake->rec_len = cpu_to_le32(blksize); + } + count -= blksize; + buf += blksize; + *off += blksize; + rc += blksize; + } + } + + return rc; +} + +static void fsfilt_extN_cb_func(struct journal_callback *jcb, int error) +{ + struct fsfilt_cb_data *fcb = (struct fsfilt_cb_data *)jcb; + + fcb->cb_func(fcb->cb_obd, fcb->cb_last_rcvd, fcb->cb_data, error); + + OBD_SLAB_FREE(fcb, fcb_cache, sizeof *fcb); + atomic_dec(&fcb_cache_count); +} + +static int fsfilt_extN_set_last_rcvd(struct obd_device *obd, __u64 last_rcvd, + void *handle, fsfilt_cb_t cb_func, + void *cb_data) +{ + struct fsfilt_cb_data *fcb; + + OBD_SLAB_ALLOC(fcb, fcb_cache, GFP_NOFS, sizeof *fcb); + if (fcb == NULL) + RETURN(-ENOMEM); + + atomic_inc(&fcb_cache_count); + fcb->cb_func = cb_func; + fcb->cb_obd = obd; + fcb->cb_last_rcvd = last_rcvd; + fcb->cb_data = cb_data; + + CDEBUG(D_EXT2, "set callback for last_rcvd: "LPD64"\n", last_rcvd); + lock_kernel(); + journal_callback_set(handle, fsfilt_extN_cb_func, + (struct journal_callback *)fcb); + unlock_kernel(); + + return 0; +} + +static int fsfilt_extN_journal_data(struct file *filp) +{ + struct inode *inode = filp->f_dentry->d_inode; + + EXTN_I(inode)->i_flags |= EXTN_JOURNAL_DATA_FL; + + return 0; +} + +/* + * We need to hack the return value for the free inode counts because + * the current EA code requires one filesystem block per inode with EAs, + * so it is possible to run out of blocks before we run out of inodes. + * + * This can be removed when the extN EA code is fixed. + */ +static int fsfilt_extN_statfs(struct super_block *sb, struct obd_statfs *osfs) +{ + struct kstatfs sfs; + int rc = vfs_statfs(sb, &sfs); + + if (!rc && sfs.f_bfree < sfs.f_ffree) { + sfs.f_files = (sfs.f_files - sfs.f_ffree) + sfs.f_bfree; + sfs.f_ffree = sfs.f_bfree; + } + + statfs_pack(osfs, &sfs); + return rc; +} + +static int fsfilt_extN_sync(struct super_block *sb) +{ + return extN_force_commit(sb); +} + +extern int extN_prep_san_write(struct inode *inode, long *blocks, + int nblocks, loff_t newsize); +static int fsfilt_extN_prep_san_write(struct inode *inode, long *blocks, + int nblocks, loff_t newsize) +{ + return extN_prep_san_write(inode, blocks, nblocks, newsize); +} + +static int fsfilt_extN_read_record(struct file * file, void *buf, + int size, loff_t *offs) +{ + struct buffer_head *bh; + unsigned long block, boffs; + struct inode *inode = file->f_dentry->d_inode; + int err; + + if (inode->i_size < *offs + size) { + CERROR("file size %llu is too short for read %u@%llu\n", + inode->i_size, size, *offs); + return -EIO; + } + + block = *offs >> inode->i_blkbits; + bh = extN_bread(NULL, inode, block, 0, &err); + if (!bh) { + CERROR("can't read block: %d\n", err); + return err; + } + + boffs = (unsigned)*offs % bh->b_size; + if (boffs + size > bh->b_size) { + CERROR("request crosses block's border. offset %llu, size %u\n", + *offs, size); + brelse(bh); + return -EIO; + } + + memcpy(buf, bh->b_data + boffs, size); + brelse(bh); + *offs += size; + return 0; +} + +static int fsfilt_extN_write_record(struct file *file, void *buf, int size, + loff_t *offs, int force_sync) +{ + struct buffer_head *bh; + unsigned long block, boffs; + struct inode *inode = file->f_dentry->d_inode; + loff_t old_size = inode->i_size; + journal_t *journal; + handle_t *handle; + int err; + + journal = EXTN_SB(inode->i_sb)->s_journal; + handle = journal_start(journal, EXTN_DATA_TRANS_BLOCKS + 2); + if (IS_ERR(handle)) { + CERROR("can't start transaction\n"); + return PTR_ERR(handle); + } + + block = *offs >> inode->i_blkbits; + if (*offs + size > inode->i_size) { + down(&inode->i_sem); + if (*offs + size > inode->i_size) + inode->i_size = ((loff_t)block + 1) << inode->i_blkbits; + up(&inode->i_sem); + } + + bh = extN_bread(handle, inode, block, 1, &err); + if (!bh) { + CERROR("can't read/create block: %d\n", err); + goto out; + } + + /* This is a hack only needed because extN_get_block_handle() updates + * i_disksize after marking the inode dirty in extN_splice_branch(). + * We will fix that when we get a chance, as extN_mark_inode_dirty() + * is not without cost, nor is it even exported. + */ + if (inode->i_size > old_size) + mark_inode_dirty(inode); + + boffs = (unsigned)*offs % bh->b_size; + if (boffs + size > bh->b_size) { + CERROR("request crosses block's border. offset %llu, size %u\n", + *offs, size); + err = -EIO; + goto out; + } + + err = extN_journal_get_write_access(handle, bh); + if (err) { + CERROR("journal_get_write_access() returned error %d\n", err); + goto out; + } + memcpy(bh->b_data + boffs, buf, size); + err = extN_journal_dirty_metadata(handle, bh); + if (err) { + CERROR("journal_dirty_metadata() returned error %d\n", err); + goto out; + } + + if (force_sync) + handle->h_sync = 1; /* recovery likes this */ +out: + if (bh) + brelse(bh); + journal_stop(handle); + if (err == 0) + *offs += size; + return err; +} + +static struct fsfilt_operations fsfilt_extN_ops = { + fs_type: "extN", + fs_owner: THIS_MODULE, + fs_start: fsfilt_extN_start, + fs_brw_start: fsfilt_extN_brw_start, + fs_commit: fsfilt_extN_commit, + fs_setattr: fsfilt_extN_setattr, + fs_set_md: fsfilt_extN_set_md, + fs_get_md: fsfilt_extN_get_md, + fs_readpage: fsfilt_extN_readpage, + fs_journal_data: fsfilt_extN_journal_data, + fs_set_last_rcvd: fsfilt_extN_set_last_rcvd, + fs_statfs: fsfilt_extN_statfs, + fs_sync: fsfilt_extN_sync, + fs_prep_san_write: fsfilt_extN_prep_san_write, + fs_write_record: fsfilt_extN_write_record, + fs_read_record: fsfilt_extN_read_record, +}; + +static int __init fsfilt_extN_init(void) +{ + int rc; + + //rc = extN_xattr_register(); + fcb_cache = kmem_cache_create("fsfilt_extN_fcb", + sizeof(struct fsfilt_cb_data), 0, + 0, NULL, NULL); + if (!fcb_cache) { + CERROR("error allocating fsfilt journal callback cache\n"); + GOTO(out, rc = -ENOMEM); + } + + rc = fsfilt_register_ops(&fsfilt_extN_ops); + + if (rc) + kmem_cache_destroy(fcb_cache); +out: + return rc; +} + +static void __exit fsfilt_extN_exit(void) +{ + int rc; + + fsfilt_unregister_ops(&fsfilt_extN_ops); + rc = kmem_cache_destroy(fcb_cache); + + if (rc || atomic_read(&fcb_cache_count)) { + CERROR("can't free fsfilt callback cache: count %d, rc = %d\n", + atomic_read(&fcb_cache_count), rc); + } + + //rc = extN_xattr_unregister(); +} + +MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_DESCRIPTION("Lustre extN Filesystem Helper v0.1"); +MODULE_LICENSE("GPL"); + +module_init(fsfilt_extN_init); +module_exit(fsfilt_extN_exit); diff --git a/lustre/lvfs/fsfilt_reiserfs.c b/lustre/lvfs/fsfilt_reiserfs.c new file mode 100644 index 0000000..3d118fc --- /dev/null +++ b/lustre/lvfs/fsfilt_reiserfs.c @@ -0,0 +1,203 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/lib/fsfilt_reiserfs.c + * Lustre filesystem abstraction routines + * + * Copyright (C) 2002 Cluster File Systems, Inc. + * Author: Andreas Dilger + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +/* + * NOTE - According to Hans Reiser, this could actually be implemented more + * efficiently than creating a directory and putting ASCII objids in it. + * Instead, we should return the reiserfs object ID as the lustre objid + * (although I'm not sure what impact that would have on backup/restore). + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include +#include +#include +#include +#include +#include +#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) +#include +#include +#endif +#include +#include +#include +#include +#include + +static void *fsfilt_reiserfs_start(struct inode *inode, int op, + void *desc_private) +{ + return (void *)0xf00f00be; +} + +static void *fsfilt_reiserfs_brw_start(int objcount, struct fsfilt_objinfo *fso, + int niocount, void *desc_private) +{ + return (void *)0xf00f00be; +} + +static int fsfilt_reiserfs_commit(struct inode *inode, void *handle, + int force_sync) +{ + if (handle != (void *)0xf00f00be) { + CERROR("bad handle %p", handle); + return -EINVAL; + } + + return 0; +} + +static int fsfilt_reiserfs_setattr(struct dentry *dentry, void *handle, + struct iattr *iattr, int do_trunc) +{ + struct inode *inode = dentry->d_inode; + int rc; + + lock_kernel(); + + /* A _really_ horrible hack to avoid removing the data stored + * in the block pointers; this is really the "small" stripe MD data. + * We can avoid further hackery by virtue of the MDS file size being + * zero all the time (which doesn't invoke block truncate at unlink + * time), so we assert we never change the MDS file size from zero. + */ + if (iattr->ia_valid & ATTR_SIZE && !do_trunc) { + /* ATTR_SIZE would invoke truncate: clear it */ + iattr->ia_valid &= ~ATTR_SIZE; + inode->i_size = iattr->ia_size; + + /* make sure _something_ gets set - so new inode + * goes to disk (probably won't work over XFS + */ + if (!iattr->ia_valid & ATTR_MODE) { + iattr->ia_valid |= ATTR_MODE; + iattr->ia_mode = inode->i_mode; + } + } + if (inode->i_op->setattr) + rc = inode->i_op->setattr(dentry, iattr); + else + rc = inode_setattr(inode, iattr); + + unlock_kernel(); + + return rc; +} + +static int fsfilt_reiserfs_set_md(struct inode *inode, void *handle, + void *lmm, int lmm_size) +{ + /* XXX write stripe data into MDS file itself */ + CERROR("not implemented yet\n"); + + return -ENOSYS; +} + +static int fsfilt_reiserfs_get_md(struct inode *inode, void *lmm, int lmm_size) +{ + if (lmm == NULL) + return inode->i_size; + + CERROR("not implemented yet\n"); + return -ENOSYS; +} + +static ssize_t fsfilt_reiserfs_readpage(struct file *file, char *buf, size_t count, + loff_t *offset) +{ + return file->f_op->read(file, buf, count, offset); +} + +static int fsfilt_reiserfs_set_last_rcvd(struct obd_device *obd, + __u64 last_rcvd, void *handle, + fsfilt_cb_t cb_func, void *cb_data) +{ + static long next = 0; + + if (time_after(jiffies, next)) { + CERROR("no journal callback kernel patch, faking it...\n"); + next = jiffies + 300 * HZ; + } + + cb_func(obd, last_rcvd, cb_data, 0); + + return 0; +} + +static int fsfilt_reiserfs_journal_data(struct file *filp) +{ + CERROR("not implemented yet\n"); + return 0; +} + +static int fsfilt_reiserfs_statfs(struct super_block *sb, struct obd_statfs *osfs) +{ + struct statfs sfs; + int rc = vfs_statfs(sb, &sfs); + + statfs_pack(osfs, &sfs); + return rc; +} + +static int fsfilt_reiserfs_sync(struct super_block *sb) +{ + CERROR("not implemented yet\n"); + return -ENOSYS; +} + +static struct fsfilt_operations fsfilt_reiserfs_ops = { + fs_type: "reiserfs", + fs_owner: THIS_MODULE, + fs_start: fsfilt_reiserfs_start, + fs_brw_start: fsfilt_reiserfs_brw_start, + fs_commit: fsfilt_reiserfs_commit, + fs_setattr: fsfilt_reiserfs_setattr, + fs_set_md: fsfilt_reiserfs_set_md, + fs_get_md: fsfilt_reiserfs_get_md, + fs_readpage: fsfilt_reiserfs_readpage, + fs_journal_data: fsfilt_reiserfs_journal_data, + fs_set_last_rcvd: fsfilt_reiserfs_set_last_rcvd, + fs_statfs: fsfilt_reiserfs_statfs, + fs_sync: fsfilt_reiserfs_sync, +}; + +static int __init fsfilt_reiserfs_init(void) +{ + return fsfilt_register_ops(&fsfilt_reiserfs_ops); +} + +static void __exit fsfilt_reiserfs_exit(void) +{ + fsfilt_unregister_ops(&fsfilt_reiserfs_ops); +} + +MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_DESCRIPTION("Lustre reiserfs Filesystem Helper v0.1"); +MODULE_LICENSE("GPL"); + +module_init(fsfilt_reiserfs_init); +module_exit(fsfilt_reiserfs_exit); diff --git a/lustre/lvfs/lvfs_internal.h b/lustre/lvfs/lvfs_internal.h new file mode 100644 index 0000000..4d68116 --- /dev/null +++ b/lustre/lvfs/lvfs_internal.h @@ -0,0 +1,8 @@ +int fsfilt_ext3_init(void); +void fsfilt_ext3_exit(void); + +int fsfilt_extN_init(void); +void fsfilt_extN_exit(void); + +int fsfilt_reiser_init(void); +void fsfilt_reiser_exit(void); diff --git a/lustre/lvfs/lvfs_linux.c b/lustre/lvfs/lvfs_linux.c new file mode 100644 index 0000000..fb09c74 --- /dev/null +++ b/lustre/lvfs/lvfs_linux.c @@ -0,0 +1,335 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * lustre/lib/fsfilt_ext3.c + * Lustre filesystem abstraction routines + * + * Copyright (C) 2002, 2003 Cluster File Systems, Inc. + * Author: Andreas Dilger + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif + +#define DEBUG_SUBSYSTEM S_FILTER + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +/* XXX ugh */ +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) + #include +#else + #include +#endif +#include +#include +#include +#include +#include +#include +#include +#include "lvfs_internal.h" + +#include +#include + +/* Debugging check only needed during development */ +#ifdef OBD_CTXT_DEBUG +# define ASSERT_CTXT_MAGIC(magic) LASSERT((magic) == OBD_RUN_CTXT_MAGIC) +# define ASSERT_NOT_KERNEL_CTXT(msg) LASSERT(!segment_eq(get_fs(), get_ds())) +# define ASSERT_KERNEL_CTXT(msg) LASSERT(segment_eq(get_fs(), get_ds())) +#else +# define ASSERT_CTXT_MAGIC(magic) do {} while(0) +# define ASSERT_NOT_KERNEL_CTXT(msg) do {} while(0) +# define ASSERT_KERNEL_CTXT(msg) do {} while(0) +#endif + +/* push / pop to root of obd store */ +void push_ctxt(struct obd_run_ctxt *save, struct obd_run_ctxt *new_ctx, + struct obd_ucred *uc) +{ + //ASSERT_NOT_KERNEL_CTXT("already in kernel context!\n"); + ASSERT_CTXT_MAGIC(new_ctx->magic); + OBD_SET_CTXT_MAGIC(save); + + /* + CDEBUG(D_INFO, + "= push %p->%p = cur fs %p pwd %p:d%d:i%d (%*s), pwdmnt %p:%d\n", + save, current, current->fs, current->fs->pwd, + atomic_read(¤t->fs->pwd->d_count), + atomic_read(¤t->fs->pwd->d_inode->i_count), + current->fs->pwd->d_name.len, current->fs->pwd->d_name.name, + current->fs->pwdmnt, + atomic_read(¤t->fs->pwdmnt->mnt_count)); + */ + + save->fs = get_fs(); + LASSERT(atomic_read(¤t->fs->pwd->d_count)); + LASSERT(atomic_read(&new_ctx->pwd->d_count)); + save->pwd = dget(current->fs->pwd); + save->pwdmnt = mntget(current->fs->pwdmnt); + save->ngroups = current->ngroups; + + LASSERT(save->pwd); + LASSERT(save->pwdmnt); + LASSERT(new_ctx->pwd); + LASSERT(new_ctx->pwdmnt); + + if (uc) { + save->ouc.ouc_fsuid = current->fsuid; + save->ouc.ouc_fsgid = current->fsgid; + save->ouc.ouc_cap = current->cap_effective; + save->ouc.ouc_suppgid1 = current->groups[0]; + save->ouc.ouc_suppgid2 = current->groups[1]; + + current->fsuid = uc->ouc_fsuid; + current->fsgid = uc->ouc_fsgid; + current->cap_effective = uc->ouc_cap; + current->ngroups = 0; + + if (uc->ouc_suppgid1 != -1) + current->groups[current->ngroups++] = uc->ouc_suppgid1; + if (uc->ouc_suppgid2 != -1) + current->groups[current->ngroups++] = uc->ouc_suppgid2; + } + set_fs(new_ctx->fs); + set_fs_pwd(current->fs, new_ctx->pwdmnt, new_ctx->pwd); + + /* + CDEBUG(D_INFO, + "= push %p->%p = cur fs %p pwd %p:d%d:i%d (%*s), pwdmnt %p:%d\n", + new_ctx, current, current->fs, current->fs->pwd, + atomic_read(¤t->fs->pwd->d_count), + atomic_read(¤t->fs->pwd->d_inode->i_count), + current->fs->pwd->d_name.len, current->fs->pwd->d_name.name, + current->fs->pwdmnt, + atomic_read(¤t->fs->pwdmnt->mnt_count)); + */ +} +EXPORT_SYMBOL(push_ctxt); + +void pop_ctxt(struct obd_run_ctxt *saved, struct obd_run_ctxt *new_ctx, + struct obd_ucred *uc) +{ + //printk("pc0"); + ASSERT_CTXT_MAGIC(saved->magic); + //printk("pc1"); + ASSERT_KERNEL_CTXT("popping non-kernel context!\n"); + + /* + CDEBUG(D_INFO, + " = pop %p==%p = cur %p pwd %p:d%d:i%d (%*s), pwdmnt %p:%d\n", + new_ctx, current, current->fs, current->fs->pwd, + atomic_read(¤t->fs->pwd->d_count), + atomic_read(¤t->fs->pwd->d_inode->i_count), + current->fs->pwd->d_name.len, current->fs->pwd->d_name.name, + current->fs->pwdmnt, + atomic_read(¤t->fs->pwdmnt->mnt_count)); + */ + + LASSERT(current->fs->pwd == new_ctx->pwd); + LASSERT(current->fs->pwdmnt == new_ctx->pwdmnt); + + set_fs(saved->fs); + set_fs_pwd(current->fs, saved->pwdmnt, saved->pwd); + + dput(saved->pwd); + mntput(saved->pwdmnt); + if (uc) { + current->fsuid = saved->ouc.ouc_fsuid; + current->fsgid = saved->ouc.ouc_fsgid; + current->cap_effective = saved->ouc.ouc_cap; + current->ngroups = saved->ngroups; + current->groups[0] = saved->ouc.ouc_suppgid1; + current->groups[1] = saved->ouc.ouc_suppgid2; + } + + /* + CDEBUG(D_INFO, + "= pop %p->%p = cur fs %p pwd %p:d%d:i%d (%*s), pwdmnt %p:%d\n", + saved, current, current->fs, current->fs->pwd, + atomic_read(¤t->fs->pwd->d_count), + atomic_read(¤t->fs->pwd->d_inode->i_count), + current->fs->pwd->d_name.len, current->fs->pwd->d_name.name, + current->fs->pwdmnt, + atomic_read(¤t->fs->pwdmnt->mnt_count)); + */ +} +EXPORT_SYMBOL(pop_ctxt); + +/* utility to make a file */ +struct dentry *simple_mknod(struct dentry *dir, char *name, int mode) +{ + struct dentry *dchild; + int err = 0; + ENTRY; + + ASSERT_KERNEL_CTXT("kernel doing mknod outside kernel context\n"); + CDEBUG(D_INODE, "creating file %*s\n", (int)strlen(name), name); + + dchild = ll_lookup_one_len(name, dir, strlen(name)); + if (IS_ERR(dchild)) + GOTO(out_up, dchild); + + if (dchild->d_inode) { + if (!S_ISREG(dchild->d_inode->i_mode)) + GOTO(out_err, err = -EEXIST); + + GOTO(out_up, dchild); + } + + err = ll_vfs_create(dir->d_inode, dchild, (mode & ~S_IFMT) | S_IFREG, NULL); + if (err) + GOTO(out_err, err); + + RETURN(dchild); + +out_err: + dput(dchild); + dchild = ERR_PTR(err); +out_up: + return dchild; +} +EXPORT_SYMBOL(simple_mknod); + +/* utility to make a directory */ +struct dentry *simple_mkdir(struct dentry *dir, char *name, int mode) +{ + struct dentry *dchild; + int err = 0; + ENTRY; + + ASSERT_KERNEL_CTXT("kernel doing mkdir outside kernel context\n"); + CDEBUG(D_INODE, "creating directory %*s\n", (int)strlen(name), name); + dchild = ll_lookup_one_len(name, dir, strlen(name)); + if (IS_ERR(dchild)) + GOTO(out_up, dchild); + + if (dchild->d_inode) { + if (!S_ISDIR(dchild->d_inode->i_mode)) + GOTO(out_err, err = -ENOTDIR); + + GOTO(out_up, dchild); + } + + err = vfs_mkdir(dir->d_inode, dchild, mode); + if (err) + GOTO(out_err, err); + + RETURN(dchild); + +out_err: + dput(dchild); + dchild = ERR_PTR(err); +out_up: + return dchild; +} +EXPORT_SYMBOL(simple_mkdir); + +/* + * Read a file from within kernel context. Prior to calling this + * function we should already have done a push_ctxt(). + */ +int lustre_fread(struct file *file, void *buf, int len, loff_t *off) +{ + ASSERT_KERNEL_CTXT("kernel doing read outside kernel context\n"); + if (!file || !file->f_op || !file->f_op->read || !off) + RETURN(-ENOSYS); + + return file->f_op->read(file, buf, len, off); +} +EXPORT_SYMBOL(lustre_fread); + +/* + * Write a file from within kernel context. Prior to calling this + * function we should already have done a push_ctxt(). + */ +int lustre_fwrite(struct file *file, const void *buf, int len, loff_t *off) +{ + ENTRY; + ASSERT_KERNEL_CTXT("kernel doing write outside kernel context\n"); + if (!file) + RETURN(-ENOENT); + if (!file->f_op) + RETURN(-ENOSYS); + if (!off) + RETURN(-EINVAL); + + if (!file->f_op->write) + RETURN(-EROFS); + + RETURN(file->f_op->write(file, buf, len, off)); +} +EXPORT_SYMBOL(lustre_fwrite); + +/* + * Sync a file from within kernel context. Prior to calling this + * function we should already have done a push_ctxt(). + */ +int lustre_fsync(struct file *file) +{ + ENTRY; + ASSERT_KERNEL_CTXT("kernel doing sync outside kernel context\n"); + if (!file || !file->f_op || !file->f_op->fsync) + RETURN(-ENOSYS); + + RETURN(file->f_op->fsync(file, file->f_dentry, 0)); +} +EXPORT_SYMBOL(lustre_fsync); + + + + + + + +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) + +static int __init lvfs_linux_init(void) +{ + RETURN(0); +} + +static void __exit lvfs_linux_exit(void) +{ + + return; +} + +MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_DESCRIPTION("Lustre VFS Filesystem Helper v0.1"); +MODULE_LICENSE("GPL"); + +module_init(lvfs_linux_init); +module_exit(lvfs_linux_exit); + +#else + +#warning "lvfs_linux_init() and fsfilt_ext3_exit() aren't called on 2.6. MUST be fixed" + + +#endif diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c new file mode 100644 index 0000000..deee5f9 --- /dev/null +++ b/lustre/obdclass/llog.c @@ -0,0 +1,196 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2001-2003 Cluster File Systems, Inc. + * Author: Andreas Dilger + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * OST<->MDS recovery logging infrastructure. + * + * Invariants in implementation: + * - we do not share logs among different OST<->MDS connections, so that + * if an OST or MDS fails it need only look at log(s) relevant to itself + */ + +#define DEBUG_SUBSYSTEM S_LOG + +#ifndef EXPORT_SYMTAB +#define EXPORT_SYMTAB +#endif + +#include +#include +#include +#include + +/* Allocate a new log or catalog handle */ +struct llog_handle *llog_alloc_handle(void) +{ + struct llog_handle *loghandle; + ENTRY; + + OBD_ALLOC(loghandle, sizeof(*loghandle)); + if (loghandle == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + OBD_ALLOC(loghandle->lgh_hdr, LLOG_CHUNK_SIZE); + if (loghandle->lgh_hdr == NULL) { + OBD_FREE(loghandle, sizeof(*loghandle)); + RETURN(ERR_PTR(-ENOMEM)); + } + + INIT_LIST_HEAD(&loghandle->lgh_list); + sema_init(&loghandle->lgh_lock, 1); + + RETURN(loghandle); +} +EXPORT_SYMBOL(llog_alloc_handle); + +void llog_free_handle(struct llog_handle *loghandle) +{ + if (!loghandle) + return; + + list_del_init(&loghandle->lgh_list); + OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE); + OBD_FREE(loghandle, sizeof(*loghandle)); +} +EXPORT_SYMBOL(llog_free_handle); + +int llog_buf2reclen(int len) +{ + int size; + + size = sizeof(struct llog_rec_hdr) + size_round(len) + sizeof(__u32); + return size; +} + + + + +/* Remove a log entry from the catalog. + * Assumes caller has already pushed us into the kernel context and is locking. + */ +int llog_delete_log(struct llog_handle *cathandle,struct llog_handle *loghandle) +{ + struct llog_cookie *lgc = &loghandle->lgh_cookie; + int catindex = lgc->lgc_index; + struct llog_log_hdr *llh = cathandle->lgh_hdr; + loff_t offset = 0; + int rc = 0; + ENTRY; + + CDEBUG(D_HA, "log "LPX64":%x empty, closing\n", + lgc->lgc_lgl.lgl_oid, lgc->lgc_lgl.lgl_ogen); + + if (!ext2_clear_bit(catindex, llh->llh_bitmap)) { + CERROR("catalog index %u already clear?\n", catindex); + LBUG(); + } else { + rc = lustre_fwrite(cathandle->lgh_file, llh, sizeof(*llh), + &offset); + + if (rc != sizeof(*llh)) { + CERROR("log %u cancel error: rc %d\n", catindex, rc); + if (rc >= 0) + rc = -EIO; + } else + rc = 0; + } + RETURN(rc); +} +EXPORT_SYMBOL(llog_delete_log); + +int llog_process_log(struct llog_handle *loghandle, llog_cb_t cb, void *data) +{ + struct llog_log_hdr *llh = loghandle->lgh_hdr; + void *buf; + __u64 cur_offset = LLOG_CHUNK_SIZE; + int rc = 0, index = 0; + ENTRY; + + OBD_ALLOC(buf, PAGE_SIZE); + if (!buf) + RETURN(-ENOMEM); + + while (rc == 0) { + struct llog_rec_hdr *rec; + + /* there is likely a more efficient way than this */ + while (index < LLOG_BITMAP_BYTES * 8 && + !ext2_test_bit(index, llh->llh_bitmap)) + ++index; + + if (index >= LLOG_BITMAP_BYTES * 8) + break; + + rc = llog_next_block(loghandle, 0, index, + &cur_offset, buf, PAGE_SIZE); + if (rc) + RETURN(rc); + + rec = buf; + + /* skip records in buffer until we are at the one we want */ + while (rec->lrh_index < index) { + if (rec->lrh_index == 0) + RETURN(0); /* no more records */ + + cur_offset += rec->lrh_len; + rec = ((void *)rec + rec->lrh_len); + + if ((void *)rec > buf + PAGE_SIZE) { + CERROR("log index %u not in log @ "LPU64"\n", + index, cur_offset); + LBUG(); /* record not in this buffer? */ + } + + rc = cb(loghandle, rec, data); + ++index; + } + } + + RETURN(rc); +} +EXPORT_SYMBOL(llog_process_log); + + +int llog_write_header(struct llog_handle *loghandle, int size) +{ + struct llog_log_hdr *llh; + + LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE); + + if (loghandle->lgh_file->f_dentry->d_inode->i_size) + RETURN(-EBUSY); + + llh = loghandle->lgh_hdr; + llh->llh_size = size; + llh->llh_hdr.lrh_type = LLOG_OBJECT_MAGIC; + llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = sizeof(*llh); + llh->llh_timestamp = LTIME_S(CURRENT_TIME); + llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap); + memcpy(&llh->llh_tgtuuid, tgtuuid, sizeof(llh->llh_tgtuuid)); + loghandle->lgh_tgtuuid = &llh->llh_tgtuuid; + + /* write the header record in the log */ + rc = llog_write_record(loghandle, &llh, NULL, NULL, 0); + if (rc > 0) + rc = 0; + RETURN(rc); +} +EXPORT_SYMBOL(llog_write_header); diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c new file mode 100644 index 0000000..4180ab2 --- /dev/null +++ b/lustre/obdclass/llog_cat.c @@ -0,0 +1,327 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2001-2003 Cluster File Systems, Inc. + * Author: Andreas Dilger + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * OST<->MDS recovery logging infrastructure. + * + * Invariants in implementation: + * - we do not share logs among different OST<->MDS connections, so that + * if an OST or MDS fails it need only look at log(s) relevant to itself + */ + +#define DEBUG_SUBSYSTEM S_LOG + +#ifndef EXPORT_SYMTAB +#define EXPORT_SYMTAB +#endif + +#include +#include +#include +#include + +/* Create a new log handle and add it to the open list. + * This log handle will be closed when all of the records in it are removed. + * + * Assumes caller has already pushed us into the kernel context and is locking. + */ +struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle, + struct obd_uuid *tgtuuid) +{ + struct llog_handle *loghandle; + struct llog_log_hdr *llh; + struct llog_logid_rec rec; + loff_t offset; + int rc, index, bitmap_size, i; + ENTRY; + + /* does this need a tgt uuid */ + rc = llog_create(cathandle->lgh_obd, &loghandle, NULL); + if (rc) + RETURN(ERR_PTR(rc)); + + + llh = cathandle->lgh_hdr; + bitmap_size = sizeof(llh->llh_bitmap) * 8; + /* This should basically always find the first entry free */ + for (i = 0, index = llh->llh_count; i < bitmap_size; i++, index++) { + index %= bitmap_size; + if (ext2_set_bit(index, llh->llh_bitmap)) { + /* XXX This should trigger log clean up or similar */ + CERROR("catalog index %d is still in use\n", index); + } else { + llh->llh_count = (index + 1) % bitmap_size; + break; + } + } + if (i == bitmap_size) { + CERROR("no free catalog slots for log...\n"); + GOTO(out_destroy, rc = -ENOSPC); + } + + CDEBUG(D_HA, "new recovery log "LPX64":%x catalog index %u\n", + loghandle->lgh_cookie.lgc_lgl.lgl_oid, + loghandle->lgh_cookie.lgc_lgl.lgl_ogen, index); + loghandle->lgh_cookie.lgc_index = index; + + rec.lid_hdr.lrh_len = sizeof(rec); + rec.lid_hdr.lrh_index = index; + rec.lid_hdr.lrh_type = LLOG_OBJECT_MAGIC; + rec.lid_id = loghandle->lgh_id; + rec.lid_tail.lrt_len = sizeof(rec); + rec.lid_tail.lrt_index = index; + + rc = llog_write_record(cathandle, &rec, loghandle->lgh_my_cat_cookie, + index); + if (rc < 0) { + GOTO(out_destroy, rc); + } + + rc = llog_write_record(loghandle, ) + + + + cathandle->lgh_current = loghandle; + list_add_tail(&loghandle->lgh_list, &cathandle->lgh_list); + + out_destroy: + llog_destroy(loghandle); + + RETURN(loghandle); +} +EXPORT_SYMBOL(llog_cat_new_log); + +/* Assumes caller has already pushed us into the kernel context and is locking. + * We return a lock on the handle to ensure nobody yanks it from us. + */ +int llog_cat_id2handle(struct llog_handle *cathandle, + struct llog_handle **res, + struct llog_logid *logid) +{ + struct llog_handle *loghandle; + int rc = 0; + ENTRY; + + if (cathandle == NULL) + RETURN(-EBADF); + + list_for_each_entry(loghandle, &cathandle->lgh_list, lgh_list) { + struct llog_logid *cgl = &loghandle->lgh_cookie.lgc_lgl; + if (cgl->lgl_oid == logid->lgl_oid) { + if (cgl->lgl_ogen != logid->lgl_ogen) { + CERROR("log "LPX64" generation %x != %x\n", + logid->lgl_oid, cgl->lgl_ogen, + logid->lgl_ogen); + continue; + } + GOTO(out, rc = 0); + } + } + + rc = llog_open(cathandle->lgh_obd, &loghandle, logid); + if (rc) { + CERROR("error opening log id "LPX64":%x: rc %d\n", + logid->lgl_oid, logid->lgl_ogen, rc); + } else { + list_add(&loghandle->lgh_list, &cathandle->lgh_list); + } + +out: + *res = loghandle; + RETURN(rc); +} + +/* Assumes caller has already pushed us into the kernel context. */ +int llog_cat_init(struct llog_handle *cathandle, struct obd_uuid *tgtuuid) +{ + struct llog_log_hdr *llh; + loff_t offset = 0; + int rc = 0; + ENTRY; + + LASSERT(sizeof(*llh) == LLOG_CHUNK_SIZE); + + down(&cathandle->lgh_lock); + llh = cathandle->lgh_hdr; + + if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) { + llog_write_header(cathandle, LLOG_HDR_FL_FIXED_SZ); + +write_hdr: llh->llh_hdr.lrh_type = LLOG_CATALOG_MAGIC; + llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE; + llh->llh_timestamp = LTIME_S(CURRENT_TIME); + llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap); + memcpy(&llh->llh_tgtuuid, tgtuuid, sizeof(llh->llh_tgtuuid)); + rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE, + &offset); + if (rc != LLOG_CHUNK_SIZE) { + CERROR("error writing catalog header: rc %d\n", rc); + OBD_FREE(llh, sizeof(*llh)); + if (rc >= 0) + rc = -ENOSPC; + } else + rc = 0; + } else { + rc = lustre_fread(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE, + &offset); + if (rc != LLOG_CHUNK_SIZE) { + CERROR("error reading catalog header: rc %d\n", rc); + /* Can we do much else if the header is bad? */ + goto write_hdr; + } else + rc = 0; + } + + cathandle->lgh_tgtuuid = &llh->llh_tgtuuid; + up(&cathandle->lgh_lock); + RETURN(rc); +} +EXPORT_SYMBOL(llog_cat_init); + +/* Return the currently active log handle. If the current log handle doesn't + * have enough space left for the current record, start a new one. + * + * If reclen is 0, we only want to know what the currently active log is, + * otherwise we get a lock on this log so nobody can steal our space. + * + * Assumes caller has already pushed us into the kernel context and is locking. + */ +static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, + int reclen) +{ + struct llog_handle *loghandle = NULL; + ENTRY; + + loghandle = cathandle->lgh_current; + if (loghandle) { + struct llog_log_hdr *llh = loghandle->lgh_hdr; + if (llh->llh_count < sizeof(llh->llh_bitmap) * 8) + RETURN(loghandle); + } + + if (reclen) + loghandle = llog_new_log(cathandle, cathandle->lgh_tgtuuid); + RETURN(loghandle); +} + +/* Add a single record to the recovery log(s) using a catalog + * Returns as llog_write_record + * + * Assumes caller has already pushed us into the kernel context. + */ +int llog_cat_add_record(struct llog_handle *cathandle, struct llog_rec_hdr *rec, + struct llog_cookie *reccookie, void *buf) +{ + struct llog_handle *loghandle; + int reclen = rec->lrh_len; + int rc; + ENTRY; + + LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE); + down(&cathandle->lgh_lock); + loghandle = llog_cat_current_log(cathandle, reclen); + if (IS_ERR(loghandle)) { + up(&cathandle->lgh_lock); + RETURN(PTR_ERR(loghandle)); + } + down(&loghandle->lgh_lock); + up(&cathandle->lgh_lock); + + rc = llog_write_record(loghandle, rec, reccookie, buf); + + up(&loghandle->lgh_lock); + RETURN(rc); +} +EXPORT_SYMBOL(llog_cat_add_record); + +/* For each cookie in the cookie array, we clear the log in-use bit and either: + * - the log is empty, so mark it free in the catalog header and delete it + * - the log is not empty, just write out the log header + * + * The cookies may be in different log files, so we need to get new logs + * each time. + * + * Assumes caller has already pushed us into the kernel context. + */ +int llog_cancel_records(struct llog_handle *cathandle, int count, + struct llog_cookie *cookies) +{ + int i, rc = 0; + ENTRY; + + down(&cathandle->lgh_lock); + for (i = 0; i < count; i++, cookies++) { + struct llog_handle *loghandle; + struct llog_log_hdr *llh; + struct llog_logid *lgl = &cookies->lgc_lgl; + int res; + + rc = llog_cat_id2handle(cathandle, &loghandle, lgl); + if (res) { + CERROR("Cannot find log "LPX64"\n", lgl->lgl_oid); + break; + } + + down(&loghandle->lgh_lock); + llh = loghandle->lgh_hdr; + CDEBUG(D_HA, "cancelling "LPX64" index %u: %u\n", + lgl->lgl_oid, cookies->lgc_index, + ext2_test_bit(cookies->lgc_index, llh->llh_bitmap)); + if (!ext2_clear_bit(cookies->lgc_index, llh->llh_bitmap)) { + CERROR("log index %u in "LPX64":%x already clear?\n", + cookies->lgc_index, lgl->lgl_oid, lgl->lgl_ogen); + } else if (--llh->llh_count == 0 && + loghandle != llog_cat_current_log(cathandle, 0)) { + rc = llog_close_log(cathandle, loghandle); + } else { + loff_t offset = 0; + int ret = lustre_fwrite(loghandle->lgh_file, llh, + sizeof(*llh), &offset); + + if (ret != sizeof(*llh)) { + CERROR("error cancelling index %u: rc %d\n", + cookies->lgc_index, ret); + /* XXX mark handle bad? */ + if (!rc) + rc = ret; + } + } + up(&loghandle->lgh_lock); + } + up(&cathandle->lgh_lock); + + RETURN(rc); +} +EXPORT_SYMBOL(llog_cancel_records); + +void llog_cat_put(struct obd_device *obd, struct llog_handle *cathandle) +{ + struct llog_handle *loghandle, *n; + struct obd_run_ctxt saved; + int rc; + ENTRY; + + push_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL); + list_for_each_entry_safe(loghandle, n, &cathandle->lgh_list, lgh_list) + llog_cat_close(cathandle, loghandle); + pop_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL); + + EXIT; +} diff --git a/lustre/obdclass/llog_lvfs.c b/lustre/obdclass/llog_lvfs.c new file mode 100644 index 0000000..7a96643 --- /dev/null +++ b/lustre/obdclass/llog_lvfs.c @@ -0,0 +1,814 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2001-2003 Cluster File Systems, Inc. + * Author: Andreas Dilger + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * OST<->MDS recovery logging infrastructure. + * + * Invariants in implementation: + * - we do not share logs among different OST<->MDS connections, so that + * if an OST or MDS fails it need only look at log(s) relevant to itself + */ + +#define DEBUG_SUBSYSTEM S_LOG + +#ifndef EXPORT_SYMTAB +#define EXPORT_SYMTAB +#endif + +#include +#include +#include +#include + + +static int llog_lvfs_pad(struct l_file *file, int len, int index) +{ + struct llog_rec_hdr rec; + struct llog_rec_tail tail; + ENTRY; + + LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0xf) == 0); + + tail.lrt_len = rec.lrh_len = len; + tail.lrt_index = rec.lrh_index = index; + rec.lrh_type = 0; + + rc = lustre_fwrite(file, &rec, sizeof(rec), &file->f_pos); + if (rc != sizeof(rec)) { + CERROR("error writing padding record: rc %d\n", rc); + GOTO(out, rc < 0 ? rc : rc = -EIO); + } + + file->f_pos += len - sizeof(rec) - sizeof(tail); + rc = lustre_fwrite(file, &tail, sizeof(tail), &file->f_pos); + if (rc != sizeof(tail)) { + CERROR("error writing padding record: rc %d\n", rc); + GOTO(out, rc < 0 ? rc : rc = -EIO); + } + rc = 0; + out: + RETURN(rc); +} + +static int llog_vfs_write_blob(struct l_file *file, struct llog_rec_hdr *rec, + void *buf, loff_t off) +{ + int rc; + struct llog_rec_tail end; + loff_t saved_off = file->f_pos; + + ENTRY; + file->f_pos = off; + + if (!buf) { + rc = lustre_fwrite(file, rec, rec->lrh_len, &file->f_pos); + if (rc != rec->lhr_len) { + CERROR("error writing log record: rc %d\n", rc); + GOTO(out, rc < 0 ? rc : rc = -ENOSPC); + } + GOTO(out, rc = 0); + } + + /* the buf case */ + buflen = rec->lrh_len; + rec->lrh_len = sizeof(*rec) + size_round(buflen) + sizeof(*end); + rc = lustre_fwrite(file, rec, sizeof(*rec), &file->f_pos); + if (rc != sizeof(*rec)) { + CERROR("error writing log transhdr: rc %d\n", rc); + GOTO(out, rc < 0 ? rc : rc = -ENOSPC); + } + + rc = lustre_fwrite(file, buf, buflen, &file->f_pos); + if (rc != buflen) { + CERROR("error writing log buffer: rc %d\n", rc); + GOTO(out, rc < 0 ? rc : rc = -ENOSPC); + } + + loghandle->lgh_file->f_pos += size_round(buflen) - buflen; + end.lrt_len = rec->lrh_len; + end.lrt_index = rec->lrh_index; + rc = lustre_fwrite(file, &end, sizeof(end), &file->f_pos); + if (rc != sizeof(end)) { + CERROR("error writing log tail: rc %d\n", rc); + GOTO(out, rc < 0 ? rc : rc = -ENOSPC); + } + + rc = 0; + out: + if (saved_off > file->f_pos) + file->f_pos = saved_off; + LASSERT(rc <= 0); + RETURN(rc); +} + +/* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */ +/* appends if idx == -1, otherwise overwrites record idx. */ +int llog_lvfs_write_record(struct llog_handle *loghandle, struct llog_rec_hdr *rec, + struct llog_cookie *reccookie, void *buf, int idx) +{ + struct llog_log_hdr *llh; + int reclen = rec->lrh_len; + struct file *file; + loff_t offset; + size_t left; + int index; + int rc; + int buflen; + ENTRY; + + llh = loghandle->lgh_hdr; + file = loghandle->lgh_file; + + if (idx != -1) { + loff_t saved_offset; + + /* no header: only allowed to insert record 0 */ + if (idx != 0 && !file->f_dentry->d_inode->i_size) { + CERROR("idx != -1 in empty log ", ); + LBUG(); + } + + if (!loghandle->lgh_hdr->llh_size != rec->lrh_len) + RETURN(-EINVAL); + + rc = llog_lvfs_write_blob(file, llh, NULL, 0); + if (rc) + RETURN(rc); + + saved_offset = sizeof(*llh) + idx * rec->lrh_len; + rc = llog_lvfs_write_blob(file, rec, buf, saved_offset); + if (rc) + RETURN(rc); + } + + /* Make sure that records don't cross a chunk boundary, so we can + * process them page-at-a-time if needed. If it will cross a chunk + * boundary, write in a fake (but referenced) entry to pad the chunk. + * + * We know that llog_current_log() will return a loghandle that is + * big enough to hold reclen, so all we care about is padding here. + */ + left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1)); + + if (left != 0 && left <= reclen) { + loghandle->lgh_index++; + rc = llog_lvfs_pad(file, len, loghandle->lgh_index); + if (rc) + RETURN(rc); + } + + index = loghandle->lgh_index++; + rec->lrh_index = index; + if (ext2_set_bit(index, llh->llh_bitmap)) { + CERROR("argh, index %u already set in log bitmap?\n", index); + LBUG(); /* should never happen */ + } + llh->llh_count++; + + + offset = 0; + rc = llog_lvfs_write_blob(file, llh, NULL, 0); + if (rc) + RETURN(rc); + + rc = llog_lvfs_write_blob(file, rec, buf, file->f_pos); + if (rc) + RETURN(rc); + + out: + CDEBUG(D_HA, "added record "LPX64":%x+%u, %u bytes\n", + loghandle->lgh_cookie.lgc_lgl.lgl_oid, + loghandle->lgh_cookie.lgc_lgl.lgl_ogen, index, rec->lrh_len); + if (rc == 0 && reccookie) { + reccookie->lgc_lgl = loghandle->lgh_id; + reccookie->lgc_index = index; + rc = 1; + } + RETURN(rc); +} +EXPORT_SYMBOL(llog_vfs_write_record); + +int llog_lvfs_next_block(struct llog_handle *loghandle, int cur_idx, int next_idx, + __u64 *cur_offset, void *buf, int len) +{ + int rc; + ENTRY; + + if (len == 0 || len & (LLOG_CHUNK_SIZE - 1)) + RETURN(-EINVAL); + + CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64"\n", + next_idx, cur_idx, *cur_offset); + + /* We can skip reading at least as many log blocks as the number of + * minimum sized log records we are skipping. If it turns out that we + * are not far enough along the log (because the actual records are + * larger than minimum size) we just skip some more records. */ + while ((*cur_offset = (*cur_offset + + (next_idx - cur_idx) * LLOG_MIN_REC_SIZE) & + ~(LLOG_CHUNK_SIZE - 1)) < + loghandle->lgh_file->f_dentry->d_inode->i_size) { + struct llog_rec_hdr *rec; + + rc = fsfilt_read_record(loghandle->lgh_obd, loghandle->lgh_file, + buf, LLOG_CHUNK_SIZE, *cur_offset); + if (rc) + RETURN(rc); + + rec = buf; + /* sanity check that the start of the new buffer is no farther + * than the record that we wanted. This shouldn't happen. */ + if (rec->lrh_index > next_idx) { + CERROR("missed desired record? %u > %u\n", + rec->lrh_index, next_idx); + RETURN(-ENOENT); + } + + /* Check if last record in this buffer is higher than what we + * are looking for, or is zero (implying that this is the last + * buffer in the log). In conjunction with the previous test, + * this means that the record we are looking for is in the + * current buffer, or the client asked for a record beyond the + * end of the log, which is the client's problem. */ + rec = buf + LLOG_CHUNK_SIZE - sizeof(__u32); + if (rec->lrh_index == 0) + RETURN(0); + + cur_idx = rec->lrh_index; + if (cur_idx >= next_idx) { + while (rc == 0 && (len -= LLOG_CHUNK_SIZE) > 0) { + buf += LLOG_CHUNK_SIZE; + *cur_offset += LLOG_CHUNK_SIZE; + + rc = fsfilt_read_record(loghandle->lgh_obd, + loghandle->lgh_file, + buf, LLOG_CHUNK_SIZE, + *cur_offset); + } + + RETURN(rc); + } + } + + RETURN(-ENOENT); +} +EXPORT_SYMBOL(llog_lvfs_next_block); + + +/* This is a callback from the llog_* functions. + * Assumes caller has already pushed us into the kernel context. */ +int llog_lvfs_create(struct obd_device *obd, + struct llog_handle **res, char *name) +{ + char logname[24]; + struct llog_handle *loghandle; + int rc, open_flags = O_RDWR | O_CREAT | O_LARGEFILE; + ENTRY; + + loghandle = llog_alloc_handle(); + if (!loghandle) + RETURN(-ENOMEM); + *res = loghandle; + + if (name) { + sprintf(logname, "LOGS/%s", name); + + loghandle->lgh_file = l_filp_open(logname, open_flags, 0644); + if (IS_ERR(loghandle->lgh_file)) { + rc = PTR_ERR(loghandle->lgh_file); + CERROR(D_HA, "logfile creation %s: %d\n", logname, rc); + obd->u.mds.mds_catalog->lgh_index++; + GOTO(out_handle, rc); + } + loghandle->lgh_cookie.lgc_lgl.lgl_oid = + loghandle->lgh_file->f_dentry->d_inode->i_ino; + loghandle->lgh_cookie.lgc_lgl.lgl_ogen = + loghandle->lgh_file->f_dentry->d_inode->i_generation; + } else { + struct obdo *oa; + struct l_dentry *de; + oa = obdo_alloc(); + if (!oa) + GOTO(out, rc = -ENOMEM); + /* XXX */ + oa->o_gr = 1; + oa->o_valid = OBD_MD_FLGROUP; + rc = obd_create(obd->obd_log_exp, oa, NULL, NULL); + if (rc) + GOTO(out, rc); + de = lvfs_fid2dentry(loghandle->lgh_obd = obd, oa); + if (IS_ERR(de)) + GOTO(out, rc = PTR_ERR(de)); + loghandle->lgh_file = l_dentry_open(de, open_flags); + if (IS_ERR(loghandle->lgh_file)) + GOTO(out, rc = PTR_ERR(loghandle->lgh_file)); + loghandle->lgh_cookie.lgc_lgl.lgl_oid = oa->o_id; + loghandle->lgh_cookie.lgc_lgl.lgl_ogr = oa->o_gr; + + } + + RETURN(loghandle); + +out_handle: + obdo_free(oa); + llog_free_handle(loghandle); + return rc; +} + + +int llog_lvfs_close(struct llog_handle *handle) +{ + int rc; + ENTRY; + + rc = filp_close(handle->lgh_file, 0); + if (rc) + CERROR("error closing log: rc %d\n", rc); + + llog_free_handle(handle); + RETURN(rc); +} + +/* This is a callback from the llog_* functions. + * Assumes caller has already pushed us into the kernel context. */ +int mds_log_close(struct llog_handle *cathandle, struct llog_handle *loghandle) +{ + struct llog_log_hdr *llh = loghandle->lgh_hdr; + struct mds_obd *mds = &cathandle->lgh_obd->u.mds; + struct dentry *dchild = NULL; + int rc; + ENTRY; + + /* If we are going to delete this log, grab a ref before we close + * it so we don't have to immediately do another lookup. + */ + if (llh->llh_hdr.lrh_type != LLOG_CATALOG_MAGIC && llh->llh_count == 0){ + CDEBUG(D_INODE, "deleting log file "LPX64":%x\n", + loghandle->lgh_cookie.lgc_lgl.lgl_oid, + loghandle->lgh_cookie.lgc_lgl.lgl_ogen); + down(&mds->mds_logs_dir->d_inode->i_sem); + dchild = dget(loghandle->lgh_file->f_dentry); + llog_delete_log(cathandle, loghandle); + } else { + CDEBUG(D_INODE, "closing log file "LPX64":%x\n", + loghandle->lgh_cookie.lgc_lgl.lgl_oid, + loghandle->lgh_cookie.lgc_lgl.lgl_ogen); + } + + rc = filp_close(loghandle->lgh_file, 0); + + llog_free_handle(loghandle); /* also removes loghandle from list */ + + if (dchild) { + int err = vfs_unlink(mds->mds_logs_dir->d_inode, dchild); + if (err) { + CERROR("error unlinking empty log %*s: rc %d\n", + dchild->d_name.len, dchild->d_name.name, err); + if (!rc) + rc = err; + } + l_dput(dchild); + up(&mds->mds_logs_dir->d_inode->i_sem); + } + RETURN(rc); +} + +struct llog_handle *mds_log_open(struct obd_device *obd, + struct llog_cookie *logcookie); + +/* This is a callback from the llog_* functions. + * Assumes caller has already pushed us into the kernel context. */ +static struct llog_handle *filter_log_create(struct obd_device *obd) +{ + struct filter_obd *filter = &obd->u.filter; + struct lustre_handle parent_lockh; + struct dentry *dparent, *dchild; + struct llog_handle *loghandle; + struct file *file; + struct obdo obdo; + int err, rc; + obd_id id; + ENTRY; + + loghandle = llog_alloc_handle(); + if (!loghandle) + RETURN(ERR_PTR(-ENOMEM)); + + memset(&obdo, 0, sizeof(obdo)); + obdo.o_valid = OBD_MD_FLGROUP; + obdo.o_gr = 1; /* FIXME: object groups */ + retry: + id = filter_next_id(filter, &obdo); + + dparent = filter_parent_lock(obd, obdo.o_gr, id, LCK_PW, &parent_lockh); + if (IS_ERR(dparent)) + GOTO(out_ctxt, rc = PTR_ERR(dparent)); + + dchild = filter_fid2dentry(obd, dparent, obdo.o_gr, id); + if (IS_ERR(dchild)) + GOTO(out_lock, rc = PTR_ERR(dchild)); + + if (dchild->d_inode != NULL) { + /* This would only happen if lastobjid was bad on disk */ + CERROR("Serious error: objid %*s already exists; is this " + "filesystem corrupt? I will try to work around it.\n", + dchild->d_name.len, dchild->d_name.name); + f_dput(dchild); + ldlm_lock_decref(&parent_lockh, LCK_PW); + goto retry; + } + + rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL); + if (rc) { + CERROR("log create failed rc = %d\n", rc); + GOTO(out_child, rc); + } + + rc = filter_update_last_objid(obd, obdo.o_gr, 0); + if (rc) { + CERROR("can't write lastobjid but log created: rc %d\n",rc); + GOTO(out_destroy, rc); + } + + /* dentry_open does a dput(dchild) and mntput(mnt) on error */ + mntget(filter->fo_vfsmnt); + file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE); + if (IS_ERR(file)) { + rc = PTR_ERR(file); + CERROR("error opening log file "LPX64": rc %d\n", id, rc); + GOTO(out_destroy, rc); + } + ldlm_lock_decref(&parent_lockh, LCK_PW); + + loghandle->lgh_file = file; + loghandle->lgh_cookie.lgc_lgl.lgl_oid = id; + loghandle->lgh_cookie.lgc_lgl.lgl_ogen = dchild->d_inode->i_generation; + loghandle->lgh_log_create = filter_log_create; + loghandle->lgh_log_open = filter_log_open; + loghandle->lgh_log_close = filter_log_close; + loghandle->lgh_obd = obd; + + RETURN(loghandle); + +out_destroy: + err = vfs_unlink(dparent->d_inode, dchild); + if (err) + CERROR("error unlinking %*s on error: rc %d\n", + dchild->d_name.len, dchild->d_name.name, err); +out_child: + f_dput(dchild); +out_lock: + ldlm_lock_decref(&parent_lockh, LCK_PW); +out_ctxt: + llog_free_handle(loghandle); + RETURN(ERR_PTR(rc)); +} + +/* This is a callback from the llog_* functions. + * Assumes caller has already pushed us into the kernel context. */ +struct llog_handle *mds_log_open(struct obd_device *obd, + struct llog_cookie *logcookie) +{ + struct ll_fid fid = { .id = logcookie->lgc_lgl.lgl_oid, + .generation = logcookie->lgc_lgl.lgl_ogen, + .f_type = S_IFREG }; + struct llog_handle *loghandle; + struct dentry *dchild; + int rc; + ENTRY; + + loghandle = llog_alloc_handle(); + if (loghandle == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + down(&obd->u.mds.mds_logs_dir->d_inode->i_sem); + dchild = mds_fid2dentry(&obd->u.mds, &fid, NULL); + up(&obd->u.mds.mds_logs_dir->d_inode->i_sem); + if (IS_ERR(dchild)) { + rc = PTR_ERR(dchild); + CERROR("error looking up log file "LPX64":%x: rc %d\n", + fid.id, fid.generation, rc); + GOTO(out, rc); + } + + if (dchild->d_inode == NULL) { + rc = -ENOENT; + CERROR("nonexistent log file "LPX64":%x: rc %d\n", + fid.id, fid.generation, rc); + GOTO(out_put, rc); + } + + /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */ + mntget(obd->u.mds.mds_vfsmnt); + loghandle->lgh_file = dentry_open(dchild, obd->u.mds.mds_vfsmnt, + O_RDWR | O_LARGEFILE); + if (IS_ERR(loghandle->lgh_file)) { + rc = PTR_ERR(loghandle->lgh_file); + CERROR("error opening logfile "LPX64":%x: rc %d\n", + fid.id, fid.generation, rc); + GOTO(out, rc); + } + memcpy(&loghandle->lgh_cookie, logcookie, sizeof(*logcookie)); + loghandle->lgh_log_create = mds_log_create; + loghandle->lgh_log_open = mds_log_open; + loghandle->lgh_log_close = mds_log_close; + loghandle->lgh_obd = obd; + + RETURN(loghandle); + +out_put: + l_dput(dchild); +out: + llog_free_handle(loghandle); + return ERR_PTR(rc); +} + + + +struct llog_handle *mds_get_catalog(struct obd_device *obd) +{ + struct mds_server_data *msd = obd->u.mds.mds_server_data; + struct obd_run_ctxt saved; + struct llog_handle *cathandle = NULL; + int rc = 0; + ENTRY; + + push_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL); + + if (msd->msd_catalog_oid) { + struct llog_cookie catcookie; + + catcookie.lgc_lgl.lgl_oid = le64_to_cpu(msd->msd_catalog_oid); + catcookie.lgc_lgl.lgl_ogen = le32_to_cpu(msd->msd_catalog_ogen); + cathandle = mds_log_open(obd, &catcookie); + if (IS_ERR(cathandle)) { + CERROR("error opening catalog "LPX64":%x: rc %d\n", + catcookie.lgc_lgl.lgl_oid, + catcookie.lgc_lgl.lgl_ogen, + (int)PTR_ERR(cathandle)); + msd->msd_catalog_oid = 0; + msd->msd_catalog_ogen = 0; + } + /* ORPHANS FIXME: compare catalog UUID to msd_peeruuid */ + } + + if (!msd->msd_catalog_oid) { + struct llog_logid *lgl; + + cathandle = mds_log_create(obd, "LOGS/catalog"); + if (IS_ERR(cathandle)) { + CERROR("error creating new catalog: rc %d\n", + (int)PTR_ERR(cathandle)); + GOTO(out, cathandle); + } + lgl = &cathandle->lgh_cookie.lgc_lgl; + msd->msd_catalog_oid = cpu_to_le64(lgl->lgl_oid); + msd->msd_catalog_ogen = cpu_to_le32(lgl->lgl_ogen); + rc = mds_update_server_data(obd, 1); + if (rc) { + CERROR("error writing new catalog to disk: rc %d\n",rc); + GOTO(out_handle, rc); + } + } + + rc = llog_init_catalog(cathandle, &obd->u.mds.mds_lov_name); + +out: + pop_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL); + RETURN(cathandle); + +out_handle: + mds_log_close(cathandle, cathandle); + cathandle = ERR_PTR(rc); + goto out; + +} + +static struct llog_handle *filter_log_create(struct obd_device *obd); + +/* This is a callback from the llog_* functions. + * Assumes caller has already pushed us into the kernel context. */ +static int filter_log_close(struct llog_handle *cathandle, + struct llog_handle *loghandle) +{ + struct llog_object_hdr *llh = loghandle->lgh_hdr; + struct file *file = loghandle->lgh_file; + struct dentry *dparent = NULL, *dchild = NULL; + struct lustre_handle parent_lockh; + struct llog_logid *lgl = &loghandle->lgh_cookie.lgc_lgl; + int rc; + ENTRY; + + /* If we are going to delete this log, grab a ref before we close + * it so we don't have to immediately do another lookup. */ + if (llh->llh_hdr.lth_type != LLOG_CATALOG_MAGIC && llh->llh_count == 0){ + CDEBUG(D_INODE, "deleting log file "LPX64":%x\n", + lgl->lgl_oid, lgl->lgl_ogen); + dparent = filter_parent_lock(loghandle->lgh_obd, S_IFREG, + lgl->lgl_oid,LCK_PW,&parent_lockh); + if (IS_ERR(dparent)) { + rc = PTR_ERR(dparent); + CERROR("error locking parent, orphan log %*s: rc %d\n", + file->f_dentry->d_name.len, + file->f_dentry->d_name.name, rc); + RETURN(rc); + } else { + dchild = dget(file->f_dentry); + llog_delete_log(cathandle, loghandle); + } + } else { + CDEBUG(D_INODE, "closing log file "LPX64":%x\n", + lgl->lgl_oid, lgl->lgl_ogen); + } + + rc = filp_close(file, 0); + + llog_free_handle(loghandle); /* also removes loghandle from list */ + + if (dchild != NULL) { + int err = vfs_unlink(dparent->d_inode, dchild); + if (err) { + CERROR("error unlinking empty log %*s: rc %d\n", + dchild->d_name.len, dchild->d_name.name, err); + if (!rc) + rc = err; + } + f_dput(dchild); + ldlm_lock_decref(&parent_lockh, LCK_PW); + } + RETURN(rc); +} + +/* This is a callback from the llog_* functions. + * Assumes caller has already pushed us into the kernel context. */ +static struct llog_handle *filter_log_open(struct obd_device *obd, + struct llog_cookie *logcookie) +{ + struct llog_logid *lgl = &logcookie->lgc_lgl; + struct llog_handle *loghandle; + struct dentry *dchild; + int rc; + ENTRY; + + loghandle = llog_alloc_handle(); + if (!loghandle) + RETURN(ERR_PTR(-ENOMEM)); + + dchild = filter_fid2dentry(obd, NULL, S_IFREG, lgl->lgl_oid); + if (IS_ERR(dchild)) + GOTO(out_handle, rc = PTR_ERR(dchild)); + + if (dchild->d_inode == NULL) { + CERROR("logcookie references non-existent object %*s\n", + dchild->d_name.len, dchild->d_name.name); + GOTO(out_dentry, rc = -ENOENT); + } + + if (dchild->d_inode->i_generation != lgl->lgl_ogen) { + CERROR("logcookie for %*s had different generation %x != %x\n", + dchild->d_name.len, dchild->d_name.name, + dchild->d_inode->i_generation, lgl->lgl_ogen); + GOTO(out_dentry, rc = -ESTALE); + } + + /* dentry_open does a dput(dchild) and mntput(mnt) on error */ + mntget(obd->u.filter.fo_vfsmnt); + loghandle->lgh_file = dentry_open(dchild, obd->u.filter.fo_vfsmnt, + O_RDWR); + if (IS_ERR(loghandle->lgh_file)) { + rc = PTR_ERR(loghandle->lgh_file); + CERROR("error opening logfile %*s: rc %d\n", + dchild->d_name.len, dchild->d_name.name, rc); + GOTO(out_dentry, rc); + } + memcpy(&loghandle->lgh_cookie, logcookie, sizeof(*logcookie)); + loghandle->lgh_obd = obd; + RETURN(loghandle); + +out_dentry: + f_dput(dchild); +out_handle: + llog_free_handle(loghandle); + RETURN(ERR_PTR(rc)); +} + + +/* This is called from filter_setup() and should be single threaded */ +struct llog_handle *filter_get_catalog(struct obd_device *obd) +{ + struct filter_obd *filter = &obd->u.filter; + struct filter_server_data *fsd = filter->fo_fsd; + struct obd_run_ctxt saved; + struct llog_handle *cathandle = NULL; + int rc; + ENTRY; + + push_ctxt(&saved, &filter->fo_ctxt, NULL); + if (fsd->fsd_catalog_oid) { + struct llog_cookie catcookie; + + catcookie.lgc_lgl.lgl_oid = le64_to_cpu(fsd->fsd_catalog_oid); + catcookie.lgc_lgl.lgl_ogen = le32_to_cpu(fsd->fsd_catalog_ogen); + cathandle = filter_log_open(obd, &catcookie); + if (IS_ERR(cathandle)) { + CERROR("error opening catalog "LPX64":%x: rc %d\n", + catcookie.lgc_lgl.lgl_oid, + catcookie.lgc_lgl.lgl_ogen, + (int)PTR_ERR(cathandle)); + fsd->fsd_catalog_oid = 0; + fsd->fsd_catalog_ogen = 0; + } + } + + if (!fsd->fsd_catalog_oid) { + struct llog_logid *lgl; + + cathandle = filter_log_create(obd); + if (IS_ERR(cathandle)) { + CERROR("error creating new catalog: rc %d\n", + (int)PTR_ERR(cathandle)); + GOTO(out, cathandle); + } + lgl = &cathandle->lgh_cookie.lgc_lgl; + fsd->fsd_catalog_oid = cpu_to_le64(lgl->lgl_oid); + fsd->fsd_catalog_ogen = cpu_to_le32(lgl->lgl_ogen); + rc = filter_update_server_data(obd, filter->fo_rcvd_filp,fsd,0); + if (rc) { + CERROR("error writing new catalog to disk: rc %d\n",rc); + GOTO(out_handle, rc); + } + } + + rc = llog_cat_init(cathandle, &obd->u.filter.fo_mdc_uuid); + if (rc) + GOTO(out_handle, rc); +out: + pop_ctxt(&saved, &filter->fo_ctxt, NULL); + RETURN(cathandle); + +out_handle: + filter_log_close(cathandle, cathandle); + cathandle = ERR_PTR(rc); + goto out; +} + +void filter_put_catalog(struct llog_handle *cathandle) +{ + struct llog_handle *loghandle, *n; + int rc; + ENTRY; + + list_for_each_entry_safe(loghandle, n, &cathandle->lgh_list, lgh_list) + filter_log_close(cathandle, loghandle); + + rc = filp_close(cathandle->lgh_file, 0); + if (rc) + CERROR("error closing catalog: rc %d\n", rc); + + llog_free_handle(cathandle); + EXIT; +} + +int filter_log_cancel(struct obd_export *exp, struct lov_stripe_md *lsm, + int num_cookies, struct llog_cookie *logcookies, + int flags) +{ + struct obd_device *obd = exp->exp_obd; + struct obd_run_ctxt saved; + int rc; + ENTRY; + + push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); + rc = llog_cancel_records(obd->u.filter.fo_catalog, num_cookies, + logcookies); + pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL); + + RETURN(rc); +} + +struct llog_operations llog_lvfs_ops = { + lop_write_rec: llog_lvfs_write_rec; + lop_next_block: llog_lvfs_next_block; + lop_open: llog_lvfs_open; + lop_cancel: llog_lvfs_cancel; + lop_create:llog_lvfs_create; + lop_close:llog_lvfs_close; +} +EXPORT_SYMBOL(llog_lvfs_ops);