Whamcloud - gitweb
Revert "LU-2158 lvfs: remove llog_lvfs.c and other lvfs code from llog" 71/7371/2
authorOleg Drokin <oleg.drokin@intel.com>
Sat, 17 Aug 2013 06:11:41 +0000 (06:11 +0000)
committerOleg Drokin <oleg.drokin@intel.com>
Sat, 17 Aug 2013 06:11:57 +0000 (06:11 +0000)
Someow this causes compile failures. E.g. see here: http://build.whamcloud.com/job/lustre-master/arch=x86_64,build_type=server,distro=el6,ib_stack=inkernel/1623/changes

This reverts commit fc587387bacf232b0464f0913b967500aae09511

Change-Id: I2fa0eedadc8b5e511b24a9cb0a39cedd8dfc5cf8
Reviewed-on: http://review.whamcloud.com/7371
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
Tested-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/linux/lustre_fsfilt.h
lustre/include/linux/lvfs.h
lustre/include/lustre_log.h
lustre/include/obd_class.h
lustre/lvfs/fsfilt_ext3.c
lustre/obdclass/Makefile.in
lustre/obdclass/autoMakefile.am
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_lvfs.c [new file with mode: 0644]

index 8b7fe26..e320817 100644 (file)
 #include <obd.h>
 #include <obd_class.h>
 
+typedef void (*fsfilt_cb_t)(struct obd_device *obd, __u64 last_rcvd,
+                            void *data, int error);
+
 struct fsfilt_operations {
         cfs_list_t fs_list;
         cfs_module_t *fs_owner;
         char   *fs_type;
+        char   *(* fs_getlabel)(struct super_block *sb);
+        void   *(* fs_start)(struct inode *inode, int op, void *desc_private,
+                             int logs);
+        int     (* fs_commit)(struct inode *inode, void *handle,int force_sync);
         int     (* fs_map_inode_pages)(struct inode *inode, struct page **page,
                                       int pages, unsigned long *blocks,
                                       int create, struct mutex *sem);
+        int     (* fs_write_record)(struct file *, void *, int size, loff_t *,
+                                    int force_sync);
+        int     (* fs_read_record)(struct file *, void *, int size, loff_t *);
+        int     (* fs_setup)(struct super_block *sb);
 };
 
 extern int fsfilt_register_ops(struct fsfilt_operations *fs_ops);
@@ -64,6 +75,20 @@ extern void fsfilt_unregister_ops(struct fsfilt_operations *fs_ops);
 extern struct fsfilt_operations *fsfilt_get_ops(const char *type);
 extern void fsfilt_put_ops(struct fsfilt_operations *fs_ops);
 
+static inline char *fsfilt_get_label(struct obd_device *obd,
+                                     struct super_block *sb)
+{
+        if (obd->obd_fsops->fs_getlabel == NULL)
+                return NULL;
+        if (obd->obd_fsops->fs_getlabel(sb)[0] == '\0')
+                return NULL;
+
+        return obd->obd_fsops->fs_getlabel(sb);
+}
+
+#define FSFILT_OP_UNLINK                1
+#define FSFILT_OP_CANCEL_UNLINK         10
+
 #define __fsfilt_check_slow(obd, start, msg)                              \
 do {                                                                      \
         if (cfs_time_before(jiffies, start + 15 * CFS_HZ))                \
@@ -85,6 +110,64 @@ do {                                                    \
         start = jiffies;                                \
 } while (0)
 
+static inline void *fsfilt_start_log(struct obd_device *obd,
+                                     struct inode *inode, int op,
+                                     struct obd_trans_info *oti, int logs)
+{
+        unsigned long now = jiffies;
+        void *parent_handle = oti ? oti->oti_handle : NULL;
+        void *handle;
+
+        handle = obd->obd_fsops->fs_start(inode, op, parent_handle, logs);
+        CDEBUG(D_INFO, "started handle %p (%p)\n", handle, parent_handle);
+
+        if (oti != NULL) {
+                if (parent_handle == NULL) {
+                        oti->oti_handle = handle;
+                } else if (handle != parent_handle) {
+                        CERROR("mismatch: parent %p, handle %p, oti %p\n",
+                               parent_handle, handle, oti);
+                        LBUG();
+                }
+        }
+        fsfilt_check_slow(obd, now, "journal start");
+        return handle;
+}
+
+static inline int fsfilt_commit(struct obd_device *obd, struct inode *inode,
+                                void *handle, int force_sync)
+{
+        unsigned long now = jiffies;
+        int rc = obd->obd_fsops->fs_commit(inode, handle, force_sync);
+        CDEBUG(D_INFO, "committing handle %p\n", handle);
+
+        fsfilt_check_slow(obd, now, "journal start");
+
+        return rc;
+}
+
+static inline int fsfilt_read_record(struct obd_device *obd, struct file *file,
+                                     void *buf, loff_t size, loff_t *offs)
+{
+        return obd->obd_fsops->fs_read_record(file, buf, size, offs);
+}
+
+static inline int fsfilt_write_record(struct obd_device *obd, struct file *file,
+                                      void *buf, loff_t size, loff_t *offs,
+                                      int force_sync)
+{
+        return obd->obd_fsops->fs_write_record(file, buf, size,offs,force_sync);
+}
+
+static inline int fsfilt_setup(struct obd_device *obd, struct super_block *fs)
+{
+        if (obd->obd_fsops->fs_setup)
+                return obd->obd_fsops->fs_setup(fs);
+        return 0;
+}
+
+
+
 #endif /* __KERNEL__ */
 
 #endif
index ee3073f..652d8d2 100644 (file)
@@ -68,6 +68,10 @@ struct lvfs_ucred {
        struct md_identity     *luc_identity;
 };
 
+struct lvfs_callback_ops {
+        struct dentry *(*l_fid2dentry)(__u64 id_ino, __u32 gen, __u64 gr, void *data);
+};
+
 #define OBD_RUN_CTXT_MAGIC      0xC0FFEEAA
 #define OBD_CTXT_DEBUG          /* development-only debugging */
 struct lvfs_run_ctxt {
@@ -76,6 +80,7 @@ struct lvfs_run_ctxt {
         mm_segment_t             fs;
         struct lvfs_ucred        luc;
         int                      ngroups;
+        struct lvfs_callback_ops cb_ops;
         struct group_info       *group_info;
        struct dt_device        *dt;
 #ifdef OBD_CTXT_DEBUG
index 011811a..708d7c8 100644 (file)
@@ -317,6 +317,7 @@ struct llog_handle {
        spinlock_t               lgh_hdr_lock; /* protect lgh_hdr data */
        struct llog_logid        lgh_id; /* id of this log */
        struct llog_log_hdr     *lgh_hdr;
+       struct file             *lgh_file;
        struct dt_object        *lgh_obj;
        int                      lgh_last_idx;
        int                      lgh_cur_idx; /* used during llog_process */
@@ -332,6 +333,9 @@ struct llog_handle {
        cfs_atomic_t             lgh_refcount;
 };
 
+/* llog_lvfs.c */
+extern struct llog_operations llog_lvfs_ops;
+
 /* llog_osd.c */
 extern struct llog_operations llog_osd_ops;
 int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d,
index 8c33fc5..5e91034 100644 (file)
@@ -1207,6 +1207,16 @@ static inline int obd_extent_calc(struct obd_export *exp,
         RETURN(rc);
 }
 
+static inline struct dentry *
+obd_lvfs_fid2dentry(struct obd_export *exp, struct ost_id *oi, __u32 gen)
+{
+       struct lvfs_run_ctxt *ctxt = &exp->exp_obd->obd_lvfs_ctxt;
+       LASSERT(exp->exp_obd);
+
+       return ctxt->cb_ops.l_fid2dentry(ostid_id(oi), gen, ostid_seq(oi),
+                                        exp->exp_obd);
+}
+
 /* @max_age is the oldest time in jiffies that we accept using a cached data.
  * If the cache is older than @max_age we will get a new value from the
  * target.  Use a value of "cfs_time_current() + HZ" to guarantee freshness. */
index 8ca8f19..ae23b27 100644 (file)
 #endif
 
 /* for kernels 2.6.18 and later */
+#define FSFILT_SINGLEDATA_TRANS_BLOCKS(sb) EXT3_SINGLEDATA_TRANS_BLOCKS(sb)
+
 #define fsfilt_ext3_ext_insert_extent(handle, inode, path, newext, flag) \
                ext3_ext_insert_extent(handle, inode, path, newext, flag)
 
 #define ext3_mb_discard_inode_preallocations(inode) \
                  ext3_discard_preallocations(inode)
 
+#define fsfilt_log_start_commit(journal, tid) jbd2_log_start_commit(journal, tid)
+#define fsfilt_log_wait_commit(journal, tid) jbd2_log_wait_commit(journal, tid)
+
+static struct kmem_cache *fcb_cache;
+
+struct fsfilt_cb_data {
+       struct ext4_journal_cb_entry cb_jcb; /* private data - MUST BE FIRST */
+       fsfilt_cb_t cb_func;            /* MDS/OBD completion function */
+       struct obd_device *cb_obd;      /* MDS/OBD completion device */
+       __u64 cb_last_rcvd;             /* MDS/OST last committed operation */
+       void *cb_data;                  /* MDS/OST completion function data */
+};
+
+static char *fsfilt_ext3_get_label(struct super_block *sb)
+{
+        return EXT3_SB(sb)->s_es->s_volume_name;
+}
+
+/* kernel has ext4_blocks_for_truncate since linux-3.1.1 */
+#ifdef HAVE_BLOCKS_FOR_TRUNCATE
+# include <ext4/truncate.h>
+#else
+static inline unsigned long ext4_blocks_for_truncate(struct inode *inode)
+{
+       ext4_lblk_t needed;
+
+       needed = inode->i_blocks >> (inode->i_sb->s_blocksize_bits - 9);
+       if (needed < 2)
+               needed = 2;
+       if (needed > EXT4_MAX_TRANS_DATA)
+               needed = EXT4_MAX_TRANS_DATA;
+       return EXT4_DATA_TRANS_BLOCKS(inode->i_sb) + needed;
+}
+#endif
+
+/*
+ * We don't currently need any additional blocks for rmdir and
+ * unlink transactions because we are storing the OST oa_id inside
+ * the inode (which we will be changing anyways as part of this
+ * transaction).
+ */
+static void *fsfilt_ext3_start(struct inode *inode, int op, void *desc_private,
+                               int logs)
+{
+        /* For updates to the last received file */
+        int nblocks = FSFILT_SINGLEDATA_TRANS_BLOCKS(inode->i_sb);
+        journal_t *journal;
+        void *handle;
+
+        if (current->journal_info) {
+                CDEBUG(D_INODE, "increasing refcount on %p\n",
+                       current->journal_info);
+                goto journal_start;
+        }
+
+        switch(op) {
+        case FSFILT_OP_UNLINK:
+               /* delete one file + create/update logs for each stripe */
+               nblocks += EXT3_DELETE_TRANS_BLOCKS(inode->i_sb);
+               nblocks += (EXT3_INDEX_EXTRA_TRANS_BLOCKS +
+                           FSFILT_SINGLEDATA_TRANS_BLOCKS(inode->i_sb)) * logs;
+               break;
+        case FSFILT_OP_CANCEL_UNLINK:
+               LASSERT(logs == 1);
+
+               /* blocks for log header bitmap update OR
+                * blocks for catalog header bitmap update + unlink of logs +
+                * blocks for delete the inode (include blocks truncating). */
+               nblocks = (LLOG_CHUNK_SIZE >> inode->i_blkbits) +
+                         EXT3_DELETE_TRANS_BLOCKS(inode->i_sb) +
+                         ext4_blocks_for_truncate(inode) + 3;
+               break;
+        default: CERROR("unknown transaction start op %d\n", op);
+                LBUG();
+        }
+
+        LASSERT(current->journal_info == desc_private);
+        journal = EXT3_SB(inode->i_sb)->s_journal;
+        if (nblocks > journal->j_max_transaction_buffers) {
+                CWARN("too many credits %d for op %ux%u using %d instead\n",
+                       nblocks, op, logs, journal->j_max_transaction_buffers);
+                nblocks = journal->j_max_transaction_buffers;
+        }
+
+ journal_start:
+        LASSERTF(nblocks > 0, "can't start %d credit transaction\n", nblocks);
+        handle = ext3_journal_start(inode, nblocks);
+
+        if (!IS_ERR(handle))
+                LASSERT(current->journal_info == handle);
+        else
+                CERROR("error starting handle for op %u (%u credits): rc %ld\n",
+                       op, nblocks, PTR_ERR(handle));
+        return handle;
+}
+
+static int fsfilt_ext3_commit(struct inode *inode, void *h, int force_sync)
+{
+        int rc;
+        handle_t *handle = h;
+
+        LASSERT(current->journal_info == handle);
+        if (force_sync)
+                handle->h_sync = 1; /* recovery likes this */
+
+        rc = ext3_journal_stop(handle);
+
+        return rc;
+}
+
 #ifndef EXT3_EXTENTS_FL
 #define EXT3_EXTENTS_FL                 0x00080000 /* Inode uses extents */
 #endif
@@ -444,20 +556,211 @@ int fsfilt_ext3_map_inode_pages(struct inode *inode, struct page **page,
         return rc;
 }
 
+int fsfilt_ext3_read(struct inode *inode, void *buf, int size, loff_t *offs)
+{
+        unsigned long block;
+        struct buffer_head *bh;
+        int err, blocksize, csize, boffs, osize = size;
+
+        /* prevent reading after eof */
+       spin_lock(&inode->i_lock);
+       if (i_size_read(inode) < *offs + size) {
+               size = i_size_read(inode) - *offs;
+               spin_unlock(&inode->i_lock);
+                if (size < 0) {
+                        CDEBUG(D_EXT2, "size %llu is too short for read @%llu\n",
+                               i_size_read(inode), *offs);
+                        return -EBADR;
+                } else if (size == 0) {
+                        return 0;
+                }
+        } else {
+               spin_unlock(&inode->i_lock);
+        }
+
+        blocksize = 1 << inode->i_blkbits;
+
+        while (size > 0) {
+                block = *offs >> inode->i_blkbits;
+                boffs = *offs & (blocksize - 1);
+                csize = min(blocksize - boffs, size);
+                bh = ext3_bread(NULL, inode, block, 0, &err);
+                if (!bh) {
+                        CERROR("can't read block: %d\n", err);
+                        return err;
+                }
+
+                memcpy(buf, bh->b_data + boffs, csize);
+                brelse(bh);
+
+                *offs += csize;
+                buf += csize;
+                size -= csize;
+        }
+        return osize;
+}
+EXPORT_SYMBOL(fsfilt_ext3_read);
+
+static int fsfilt_ext3_read_record(struct file * file, void *buf,
+                                   int size, loff_t *offs)
+{
+        int rc;
+        rc = fsfilt_ext3_read(file->f_dentry->d_inode, buf, size, offs);
+        if (rc > 0)
+                rc = 0;
+        return rc;
+}
+
+int fsfilt_ext3_write_handle(struct inode *inode, void *buf, int bufsize,
+                                loff_t *offs, handle_t *handle)
+{
+        struct buffer_head *bh = NULL;
+        loff_t old_size = i_size_read(inode), offset = *offs;
+        loff_t new_size = i_size_read(inode);
+        unsigned long block;
+        int err = 0, blocksize = 1 << inode->i_blkbits, size, boffs;
+
+        while (bufsize > 0) {
+                if (bh != NULL)
+                        brelse(bh);
+
+                block = offset >> inode->i_blkbits;
+                boffs = offset & (blocksize - 1);
+                size = min(blocksize - boffs, bufsize);
+                bh = ext3_bread(handle, inode, block, 1, &err);
+                if (!bh) {
+                        CERROR("can't read/create block: %d\n", err);
+                        break;
+                }
+
+                err = ext3_journal_get_write_access(handle, bh);
+                if (err) {
+                        CERROR("journal_get_write_access() returned error %d\n",
+                               err);
+                        break;
+                }
+                LASSERT(bh->b_data + boffs + size <= bh->b_data + bh->b_size);
+                memcpy(bh->b_data + boffs, buf, size);
+                err = ext3_journal_dirty_metadata(handle, bh);
+                if (err) {
+                        CERROR("journal_dirty_metadata() returned error %d\n",
+                               err);
+                        break;
+                }
+                if (offset + size > new_size)
+                        new_size = offset + size;
+                offset += size;
+                bufsize -= size;
+                buf += size;
+        }
+        if (bh)
+                brelse(bh);
+
+        /* correct in-core and on-disk sizes */
+        if (new_size > i_size_read(inode)) {
+               spin_lock(&inode->i_lock);
+               if (new_size > i_size_read(inode))
+                       i_size_write(inode, new_size);
+               if (i_size_read(inode) > EXT3_I(inode)->i_disksize)
+                       EXT3_I(inode)->i_disksize = i_size_read(inode);
+               if (i_size_read(inode) > old_size) {
+                       spin_unlock(&inode->i_lock);
+                       mark_inode_dirty(inode);
+               } else {
+                       spin_unlock(&inode->i_lock);
+                }
+        }
+
+        if (err == 0)
+                *offs = offset;
+        return err;
+}
+EXPORT_SYMBOL(fsfilt_ext3_write_handle);
+
+static int fsfilt_ext3_write_record(struct file *file, void *buf, int bufsize,
+                                    loff_t *offs, int force_sync)
+{
+        struct inode *inode = file->f_dentry->d_inode;
+        handle_t *handle;
+        int err, block_count = 0, blocksize;
+
+        /* Determine how many transaction credits are needed */
+        blocksize = 1 << inode->i_blkbits;
+        block_count = (*offs & (blocksize - 1)) + bufsize;
+        block_count = (block_count + blocksize - 1) >> inode->i_blkbits;
+
+       handle = ext3_journal_start(inode,
+                       block_count * EXT3_DATA_TRANS_BLOCKS(inode->i_sb) + 2);
+       if (IS_ERR(handle)) {
+               CERROR("can't start transaction for %d blocks (%d bytes)\n",
+                      block_count * EXT3_DATA_TRANS_BLOCKS(inode->i_sb) + 2,
+                      bufsize);
+               return PTR_ERR(handle);
+       }
+
+        err = fsfilt_ext3_write_handle(inode, buf, bufsize, offs, handle);
+
+        if (!err && force_sync)
+                handle->h_sync = 1; /* recovery likes this */
+
+        ext3_journal_stop(handle);
+
+        return err;
+}
+
+static int fsfilt_ext3_setup(struct super_block *sb)
+{
+        if (!EXT3_HAS_COMPAT_FEATURE(sb,
+                                EXT3_FEATURE_COMPAT_HAS_JOURNAL)) {
+                CERROR("ext3 mounted without journal\n");
+                return -EINVAL;
+        }
+
+#ifdef S_PDIROPS
+        CWARN("Enabling PDIROPS\n");
+        set_opt(EXT3_SB(sb)->s_mount_opt, PDIROPS);
+        sb->s_flags |= S_PDIROPS;
+#endif
+        if (!EXT3_HAS_COMPAT_FEATURE(sb, EXT3_FEATURE_COMPAT_DIR_INDEX))
+                CWARN("filesystem doesn't have dir_index feature enabled\n");
+        return 0;
+}
 static struct fsfilt_operations fsfilt_ext3_ops = {
-       .fs_type                = "ext3",
-       .fs_owner               = THIS_MODULE,
-       .fs_map_inode_pages     = fsfilt_ext3_map_inode_pages,
+        .fs_type                = "ext3",
+        .fs_owner               = THIS_MODULE,
+        .fs_getlabel            = fsfilt_ext3_get_label,
+        .fs_start               = fsfilt_ext3_start,
+        .fs_commit              = fsfilt_ext3_commit,
+        .fs_map_inode_pages     = fsfilt_ext3_map_inode_pages,
+        .fs_write_record        = fsfilt_ext3_write_record,
+        .fs_read_record         = fsfilt_ext3_read_record,
+        .fs_setup               = fsfilt_ext3_setup,
 };
 
 static int __init fsfilt_ext3_init(void)
 {
-       return fsfilt_register_ops(&fsfilt_ext3_ops);
+       int rc;
+
+       fcb_cache = kmem_cache_create("fsfilt_ext3_fcb",
+                                     sizeof(struct fsfilt_cb_data),
+                                     0, 0, NULL);
+       if (!fcb_cache) {
+               CERROR("error allocating fsfilt journal callback cache\n");
+               GOTO(out, rc = -ENOMEM);
+       }
+
+       rc = fsfilt_register_ops(&fsfilt_ext3_ops);
+
+       if (rc)
+               kmem_cache_destroy(fcb_cache);
+out:
+       return rc;
 }
 
 static void __exit fsfilt_ext3_exit(void)
 {
        fsfilt_unregister_ops(&fsfilt_ext3_ops);
+       kmem_cache_destroy(fcb_cache);
 }
 
 module_init(fsfilt_ext3_init);
index 1351b9b..371c8f4 100644 (file)
@@ -8,6 +8,7 @@ default: all
 sources:
 
 obdclass-all-objs := llog.o llog_cat.o llog_obd.o llog_swab.o
+@LDISKFS_ENABLED_TRUE@ obdclass-all-objs += llog_lvfs.o
 obdclass-all-objs += class_obd.o debug.o genops.o uuid.o llog_ioctl.o
 obdclass-all-objs += lprocfs_status.o lustre_handles.o lustre_peer.o
 obdclass-all-objs += llog_osd.o local_storage.o
index 1d53b5a..ba8b75a 100644 (file)
@@ -9,8 +9,8 @@ INCLUDES = -I$(SYSIO)/include
 noinst_LIBRARIES = liblustreclass.a
 liblustreclass_a_SOURCES = class_obd.c debug.c genops.c statfs_pack.c mea.c uuid.c
 liblustreclass_a_SOURCES += lustre_handles.c lustre_peer.c lprocfs_status.c
-liblustreclass_a_SOURCES += obdo.c obd_config.c llog.c llog_obd.c llog_cat.c
-liblustreclass_a_SOURCES += llog_swab.c capa.c
+liblustreclass_a_SOURCES += obdo.c obd_config.c llog.c llog_obd.c llog_cat.c 
+liblustreclass_a_SOURCES += llog_lvfs.c llog_swab.c capa.c
 liblustreclass_a_SOURCES += lu_object.c cl_object.c lu_ref.c
 liblustreclass_a_SOURCES += cl_page.c cl_lock.c cl_io.c
 liblustreclass_a_SOURCES += #llog_ioctl.c rbtree.c
@@ -35,7 +35,7 @@ obdclass_SOURCES =                                            \
        class_obd.c genops.c lprocfs_status.c                   \
        lustre_handles.c lustre_peer.c obd_config.c             \
        obdo.c debug.c llog_ioctl.c uuid.c                      \
-       llog_swab.c llog_obd.c llog.c llog_cat.c                \
+       llog_swab.c llog_obd.c llog.c llog_cat.c llog_lvfs.c    \
        mea.c lu_object.c dt_object.c lu_ref.c
 
 obdclass_CFLAGS := $(EXTRA_KCFLAGS)
index 78e92d4..c8c83e4 100644 (file)
@@ -457,23 +457,31 @@ int llog_cat_add(const struct lu_env *env, struct llog_handle *cathandle,
        LASSERT(ctxt);
        LASSERT(ctxt->loc_exp);
 
-       LASSERT(cathandle->lgh_obj != NULL);
-       dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
+       if (cathandle->lgh_obj != NULL) {
+               dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
+               LASSERT(dt);
 
-       th = dt_trans_create(env, dt);
-       if (IS_ERR(th))
-               RETURN(PTR_ERR(th));
+               th = dt_trans_create(env, dt);
+               if (IS_ERR(th))
+                       RETURN(PTR_ERR(th));
 
-       rc = llog_cat_declare_add_rec(env, cathandle, rec, th);
-       if (rc)
-               GOTO(out_trans, rc);
+               rc = llog_cat_declare_add_rec(env, cathandle, rec, th);
+               if (rc)
+                       GOTO(out_trans, rc);
 
-       rc = dt_trans_start_local(env, dt, th);
-       if (rc)
-               GOTO(out_trans, rc);
-       rc = llog_cat_add_rec(env, cathandle, rec, reccookie, buf, th);
+               rc = dt_trans_start_local(env, dt, th);
+               if (rc)
+                       GOTO(out_trans, rc);
+               rc = llog_cat_add_rec(env, cathandle, rec, reccookie, buf, th);
 out_trans:
-       dt_trans_stop(env, dt, th);
+               dt_trans_stop(env, dt, th);
+       } else { /* lvfs compat code */
+               LASSERT(cathandle->lgh_file != NULL);
+               rc = llog_cat_declare_add_rec(env, cathandle, rec, th);
+               if (rc == 0)
+                       rc = llog_cat_add_rec(env, cathandle, rec, reccookie,
+                                             buf, th);
+       }
        RETURN(rc);
 }
 EXPORT_SYMBOL(llog_cat_add);
diff --git a/lustre/obdclass/llog_lvfs.c b/lustre/obdclass/llog_lvfs.c
new file mode 100644 (file)
index 0000000..a83cb6f
--- /dev/null
@@ -0,0 +1,865 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, 2013, Intel Corporation.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/obdclass/llog_lvfs.c
+ *
+ * OST<->MDS recovery logging infrastructure.
+ * Invariants in implementation:
+ * - we do not share logs among different OST<->MDS connections, so that
+ *   if an OST or MDS fails it need only look at log(s) relevant to itself
+ *
+ * Author: Andreas Dilger <adilger@clusterfs.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_LOG
+
+#ifndef __KERNEL__
+#include <liblustre.h>
+#endif
+
+#include <obd.h>
+#include <obd_class.h>
+#include <lustre_log.h>
+#include <obd_ost.h>
+#include <libcfs/list.h>
+#include <lvfs.h>
+#include <lustre_fsfilt.h>
+#include <lustre_disk.h>
+#include "llog_internal.h"
+
+#if defined(__KERNEL__) && defined(LLOG_LVFS)
+
+static int llog_lvfs_pad(struct obd_device *obd, struct l_file *file,
+                                int len, int index)
+{
+        struct llog_rec_hdr rec = { 0 };
+        struct llog_rec_tail tail;
+        int rc;
+        ENTRY;
+
+        LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
+
+        tail.lrt_len = rec.lrh_len = len;
+        tail.lrt_index = rec.lrh_index = index;
+        rec.lrh_type = LLOG_PAD_MAGIC;
+
+        rc = fsfilt_write_record(obd, file, &rec, sizeof(rec), &file->f_pos, 0);
+        if (rc) {
+                CERROR("error writing padding record: rc %d\n", rc);
+                goto out;
+        }
+
+        file->f_pos += len - sizeof(rec) - sizeof(tail);
+        rc = fsfilt_write_record(obd, file, &tail, sizeof(tail),&file->f_pos,0);
+        if (rc) {
+                CERROR("error writing padding record: rc %d\n", rc);
+                goto out;
+        }
+
+ out:
+        RETURN(rc);
+}
+
+static int llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file,
+                                struct llog_rec_hdr *rec, void *buf, loff_t off)
+{
+        int rc;
+        struct llog_rec_tail end;
+        loff_t saved_off = file->f_pos;
+        int buflen = rec->lrh_len;
+
+        ENTRY;
+
+        file->f_pos = off;
+
+        if (buflen == 0)
+                CWARN("0-length record\n");
+
+        if (!buf) {
+                rc = fsfilt_write_record(obd, file, rec, buflen,&file->f_pos,0);
+                if (rc) {
+                        CERROR("error writing log record: rc %d\n", rc);
+                        goto out;
+                }
+                GOTO(out, rc = 0);
+        }
+
+        /* the buf case */
+        rec->lrh_len = sizeof(*rec) + buflen + sizeof(end);
+        rc = fsfilt_write_record(obd, file, rec, sizeof(*rec), &file->f_pos, 0);
+        if (rc) {
+                CERROR("error writing log hdr: rc %d\n", rc);
+                goto out;
+        }
+
+        rc = fsfilt_write_record(obd, file, buf, buflen, &file->f_pos, 0);
+        if (rc) {
+                CERROR("error writing log buffer: rc %d\n", rc);
+                goto out;
+        }
+
+        end.lrt_len = rec->lrh_len;
+        end.lrt_index = rec->lrh_index;
+        rc = fsfilt_write_record(obd, file, &end, sizeof(end), &file->f_pos, 0);
+        if (rc) {
+                CERROR("error writing log tail: rc %d\n", rc);
+                goto out;
+        }
+
+        rc = 0;
+ out:
+        if (saved_off > file->f_pos)
+                file->f_pos = saved_off;
+        LASSERT(rc <= 0);
+        RETURN(rc);
+}
+
+static int llog_lvfs_read_blob(struct obd_device *obd, struct l_file *file,
+                                void *buf, int size, loff_t off)
+{
+        loff_t offset = off;
+        int rc;
+        ENTRY;
+
+        rc = fsfilt_read_record(obd, file, buf, size, &offset);
+        if (rc) {
+                CERROR("error reading log record: rc %d\n", rc);
+                RETURN(rc);
+        }
+        RETURN(0);
+}
+
+static int llog_lvfs_read_header(const struct lu_env *env,
+                                struct llog_handle *handle)
+{
+        struct obd_device *obd;
+        int rc;
+        ENTRY;
+
+        LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
+
+        obd = handle->lgh_ctxt->loc_exp->exp_obd;
+
+        if (i_size_read(handle->lgh_file->f_dentry->d_inode) == 0) {
+                CDEBUG(D_HA, "not reading header from 0-byte log\n");
+                RETURN(LLOG_EEMPTY);
+        }
+
+        rc = llog_lvfs_read_blob(obd, handle->lgh_file, handle->lgh_hdr,
+                                 LLOG_CHUNK_SIZE, 0);
+        if (rc) {
+                CERROR("error reading log header from %.*s\n",
+                       handle->lgh_file->f_dentry->d_name.len,
+                       handle->lgh_file->f_dentry->d_name.name);
+        } else {
+                struct llog_rec_hdr *llh_hdr = &handle->lgh_hdr->llh_hdr;
+
+                if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr))
+                        lustre_swab_llog_hdr(handle->lgh_hdr);
+
+                if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
+                        CERROR("bad log %.*s header magic: %#x (expected %#x)\n",
+                               handle->lgh_file->f_dentry->d_name.len,
+                               handle->lgh_file->f_dentry->d_name.name,
+                               llh_hdr->lrh_type, LLOG_HDR_MAGIC);
+                        rc = -EIO;
+                } else if (llh_hdr->lrh_len != LLOG_CHUNK_SIZE) {
+                        CERROR("incorrectly sized log %.*s header: %#x "
+                               "(expected %#x)\n",
+                               handle->lgh_file->f_dentry->d_name.len,
+                               handle->lgh_file->f_dentry->d_name.name,
+                               llh_hdr->lrh_len, LLOG_CHUNK_SIZE);
+                        CERROR("you may need to re-run lconf --write_conf.\n");
+                        rc = -EIO;
+                }
+        }
+
+        handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
+        handle->lgh_file->f_pos = i_size_read(handle->lgh_file->f_dentry->d_inode);
+
+        RETURN(rc);
+}
+
+/* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */
+/* appends if idx == -1, otherwise overwrites record idx. */
+static int llog_lvfs_write_rec(const struct lu_env *env,
+                              struct llog_handle *loghandle,
+                              struct llog_rec_hdr *rec,
+                              struct llog_cookie *reccookie, int cookiecount,
+                              void *buf, int idx, struct thandle *th)
+{
+        struct llog_log_hdr *llh;
+        int reclen = rec->lrh_len, index, rc;
+        struct llog_rec_tail *lrt;
+        struct obd_device *obd;
+        struct file *file;
+        size_t left;
+        ENTRY;
+
+        llh = loghandle->lgh_hdr;
+        file = loghandle->lgh_file;
+        obd = loghandle->lgh_ctxt->loc_exp->exp_obd;
+
+        /* record length should not bigger than LLOG_CHUNK_SIZE */
+        if (buf)
+                rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr) -
+                      sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
+        else
+                rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
+        if (rc)
+                RETURN(rc);
+
+        if (buf)
+                /* write_blob adds header and tail to lrh_len. */
+                reclen = sizeof(*rec) + rec->lrh_len +
+                         sizeof(struct llog_rec_tail);
+
+        if (idx != -1) {
+                loff_t saved_offset;
+
+                /* no header: only allowed to insert record 1 */
+                if (idx != 1 && !i_size_read(file->f_dentry->d_inode)) {
+                        CERROR("idx != -1 in empty log\n");
+                        LBUG();
+                }
+
+                if (idx && llh->llh_size && llh->llh_size != rec->lrh_len)
+                        RETURN(-EINVAL);
+
+                if (!ext2_test_bit(idx, llh->llh_bitmap))
+                        CERROR("Modify unset record %u\n", idx);
+                if (idx != rec->lrh_index)
+                        CERROR("Index mismatch %d %u\n", idx, rec->lrh_index);
+
+                rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
+                /* we are done if we only write the header or on error */
+                if (rc || idx == 0)
+                        RETURN(rc);
+
+                if (buf) {
+                        /* We assume that caller has set lgh_cur_* */
+                        saved_offset = loghandle->lgh_cur_offset;
+                        CDEBUG(D_OTHER,
+                               "modify record "DOSTID": idx:%d/%u/%d, len:%u "
+                               "offset %llu\n",
+                               POSTID(&loghandle->lgh_id.lgl_oi), idx, rec->lrh_index,
+                               loghandle->lgh_cur_idx, rec->lrh_len,
+                               (long long)(saved_offset - sizeof(*llh)));
+                        if (rec->lrh_index != loghandle->lgh_cur_idx) {
+                                CERROR("modify idx mismatch %u/%d\n",
+                                       idx, loghandle->lgh_cur_idx);
+                                RETURN(-EFAULT);
+                        }
+               } else {
+                       /* Assumes constant lrh_len */
+                       saved_offset = sizeof(*llh) + (idx - 1) * reclen;
+               }
+
+                rc = llog_lvfs_write_blob(obd, file, rec, buf, saved_offset);
+                if (rc == 0 && reccookie) {
+                        reccookie->lgc_lgl = loghandle->lgh_id;
+                        reccookie->lgc_index = idx;
+                        rc = 1;
+                }
+                RETURN(rc);
+        }
+
+        /* Make sure that records don't cross a chunk boundary, so we can
+         * process them page-at-a-time if needed.  If it will cross a chunk
+         * boundary, write in a fake (but referenced) entry to pad the chunk.
+         *
+         * We know that llog_current_log() will return a loghandle that is
+         * big enough to hold reclen, so all we care about is padding here.
+         */
+        left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
+
+        /* NOTE: padding is a record, but no bit is set */
+        if (left != 0 && left != reclen &&
+            left < (reclen + LLOG_MIN_REC_SIZE)) {
+                 index = loghandle->lgh_last_idx + 1;
+                 rc = llog_lvfs_pad(obd, file, left, index);
+                 if (rc)
+                         RETURN(rc);
+                 loghandle->lgh_last_idx++; /*for pad rec*/
+         }
+         /* if it's the last idx in log file, then return -ENOSPC */
+         if (loghandle->lgh_last_idx >= LLOG_BITMAP_SIZE(llh) - 1)
+                 RETURN(-ENOSPC);
+        loghandle->lgh_last_idx++;
+        index = loghandle->lgh_last_idx;
+        LASSERT(index < LLOG_BITMAP_SIZE(llh));
+        rec->lrh_index = index;
+        if (buf == NULL) {
+                lrt = (struct llog_rec_tail *)
+                        ((char *)rec + rec->lrh_len - sizeof(*lrt));
+                lrt->lrt_len = rec->lrh_len;
+                lrt->lrt_index = rec->lrh_index;
+        }
+        /*The caller should make sure only 1 process access the lgh_last_idx,
+         *Otherwise it might hit the assert.*/
+        LASSERT(index < LLOG_BITMAP_SIZE(llh));
+       spin_lock(&loghandle->lgh_hdr_lock);
+       if (ext2_set_bit(index, llh->llh_bitmap)) {
+               CERROR("argh, index %u already set in log bitmap?\n", index);
+               spin_unlock(&loghandle->lgh_hdr_lock);
+               LBUG(); /* should never happen */
+       }
+       llh->llh_count++;
+       spin_unlock(&loghandle->lgh_hdr_lock);
+        llh->llh_tail.lrt_index = index;
+
+        rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
+        if (rc)
+                RETURN(rc);
+
+        rc = llog_lvfs_write_blob(obd, file, rec, buf, file->f_pos);
+        if (rc)
+                RETURN(rc);
+
+       CDEBUG(D_RPCTRACE, "added record "DOSTID": idx: %u, %u \n",
+              POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len);
+        if (rc == 0 && reccookie) {
+                reccookie->lgc_lgl = loghandle->lgh_id;
+                reccookie->lgc_index = index;
+                if ((rec->lrh_type == MDS_UNLINK_REC) ||
+                    (rec->lrh_type == MDS_SETATTR64_REC))
+                        reccookie->lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
+                else if (rec->lrh_type == OST_SZ_REC)
+                        reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
+                else
+                        reccookie->lgc_subsys = -1;
+                rc = 1;
+        }
+        if (rc == 0 && rec->lrh_type == LLOG_GEN_REC)
+                rc = 1;
+
+        RETURN(rc);
+}
+
+/* We can skip reading at least as many log blocks as the number of
+* minimum sized log records we are skipping.  If it turns out
+* that we are not far enough along the log (because the
+* actual records are larger than minimum size) we just skip
+* some more records. */
+
+static void llog_skip_over(__u64 *off, int curr, int goal)
+{
+        if (goal <= curr)
+                return;
+        *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
+                ~(LLOG_CHUNK_SIZE - 1);
+}
+
+
+/* sets:
+ *  - cur_offset to the furthest point read in the log file
+ *  - cur_idx to the log index preceeding cur_offset
+ * returns -EIO/-EINVAL on error
+ */
+static int llog_lvfs_next_block(const struct lu_env *env,
+                               struct llog_handle *loghandle, int *cur_idx,
+                               int next_idx, __u64 *cur_offset, void *buf,
+                               int len)
+{
+        int rc;
+        ENTRY;
+
+        if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
+                RETURN(-EINVAL);
+
+        CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
+               next_idx, *cur_idx, *cur_offset);
+
+        while (*cur_offset < i_size_read(loghandle->lgh_file->f_dentry->d_inode)) {
+               struct llog_rec_hdr *rec, *last_rec;
+               struct llog_rec_tail *tail;
+               loff_t ppos;
+               int llen;
+
+               llog_skip_over(cur_offset, *cur_idx, next_idx);
+
+               /* read up to next LLOG_CHUNK_SIZE block */
+               ppos = *cur_offset;
+               llen = LLOG_CHUNK_SIZE - (*cur_offset & (LLOG_CHUNK_SIZE - 1));
+               rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
+                                       loghandle->lgh_file, buf, llen,
+                                       cur_offset);
+               if (rc < 0) {
+                       CERROR("Cant read llog block at log id "DOSTID
+                              "/%u offset "LPU64"\n",
+                              POSTID(&loghandle->lgh_id.lgl_oi),
+                              loghandle->lgh_id.lgl_ogen,
+                              *cur_offset);
+                       RETURN(rc);
+               }
+
+                /* put number of bytes read into rc to make code simpler */
+               rc = *cur_offset - ppos;
+                if (rc < len) {
+                        /* signal the end of the valid buffer to llog_process */
+                        memset(buf + rc, 0, len - rc);
+                }
+
+                if (rc == 0) /* end of file, nothing to do */
+                        RETURN(0);
+
+               if (rc < sizeof(*tail)) {
+                       CERROR("Invalid llog block at log id "DOSTID"/%u offset"
+                              LPU64"\n", POSTID(&loghandle->lgh_id.lgl_oi),
+                              loghandle->lgh_id.lgl_ogen, *cur_offset);
+                       RETURN(-EINVAL);
+               }
+
+                rec = buf;
+               if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
+                       lustre_swab_llog_rec(rec);
+
+               tail = (struct llog_rec_tail *)(buf + rc -
+                                               sizeof(struct llog_rec_tail));
+
+               /* get the last record in block */
+               last_rec = (struct llog_rec_hdr *)(buf + rc -
+                                                  le32_to_cpu(tail->lrt_len));
+
+               if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
+                       lustre_swab_llog_rec(last_rec);
+               LASSERT(last_rec->lrh_index == tail->lrt_index);
+
+                *cur_idx = tail->lrt_index;
+
+               /* this shouldn't happen */
+               if (tail->lrt_index == 0) {
+                       CERROR("Invalid llog tail at log id "DOSTID"/%u offset "
+                              LPU64"\n", POSTID(&loghandle->lgh_id.lgl_oi),
+                              loghandle->lgh_id.lgl_ogen, *cur_offset);
+                       RETURN(-EINVAL);
+               }
+                if (tail->lrt_index < next_idx)
+                        continue;
+
+                /* sanity check that the start of the new buffer is no farther
+                 * than the record that we wanted.  This shouldn't happen. */
+                if (rec->lrh_index > next_idx) {
+                        CERROR("missed desired record? %u > %u\n",
+                               rec->lrh_index, next_idx);
+                        RETURN(-ENOENT);
+                }
+                RETURN(0);
+        }
+        RETURN(-EIO);
+}
+
+static int llog_lvfs_prev_block(const struct lu_env *env,
+                               struct llog_handle *loghandle,
+                               int prev_idx, void *buf, int len)
+{
+        __u64 cur_offset;
+        int rc;
+        ENTRY;
+
+        if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
+                RETURN(-EINVAL);
+
+        CDEBUG(D_OTHER, "looking for log index %u\n", prev_idx);
+
+        cur_offset = LLOG_CHUNK_SIZE;
+        llog_skip_over(&cur_offset, 0, prev_idx);
+
+        while (cur_offset < i_size_read(loghandle->lgh_file->f_dentry->d_inode)) {
+               struct llog_rec_hdr *rec, *last_rec;
+               struct llog_rec_tail *tail;
+               loff_t ppos = cur_offset;
+
+               rc = fsfilt_read_record(loghandle->lgh_ctxt->loc_exp->exp_obd,
+                                       loghandle->lgh_file, buf, len,
+                                       &cur_offset);
+               if (rc < 0) {
+                       CERROR("Cant read llog block at log id "DOSTID
+                              "/%u offset "LPU64"\n",
+                              POSTID(&loghandle->lgh_id.lgl_oi),
+                              loghandle->lgh_id.lgl_ogen,
+                              cur_offset);
+                       RETURN(rc);
+               }
+
+                /* put number of bytes read into rc to make code simpler */
+               rc = cur_offset - ppos;
+
+                if (rc == 0) /* end of file, nothing to do */
+                        RETURN(0);
+
+                if (rc < sizeof(*tail)) {
+                        CERROR("Invalid llog block at log id "DOSTID"/%u offset"
+                               LPU64"\n", POSTID(&loghandle->lgh_id.lgl_oi),
+                               loghandle->lgh_id.lgl_ogen, cur_offset);
+                        RETURN(-EINVAL);
+                }
+
+               rec = buf;
+               if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
+                       lustre_swab_llog_rec(rec);
+
+               tail = (struct llog_rec_tail *)(buf + rc -
+                                               sizeof(struct llog_rec_tail));
+
+               /* get the last record in block */
+               last_rec = (struct llog_rec_hdr *)(buf + rc -
+                                                  le32_to_cpu(tail->lrt_len));
+
+               if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
+                       lustre_swab_llog_rec(last_rec);
+               LASSERT(last_rec->lrh_index == tail->lrt_index);
+
+               /* this shouldn't happen */
+               if (tail->lrt_index == 0) {
+                       CERROR("Invalid llog tail at log id "DOSTID"/%u offset"
+                              LPU64"\n", POSTID(&loghandle->lgh_id.lgl_oi),
+                              loghandle->lgh_id.lgl_ogen, cur_offset);
+                       RETURN(-EINVAL);
+               }
+               if (tail->lrt_index < prev_idx)
+                        continue;
+
+                /* sanity check that the start of the new buffer is no farther
+                 * than the record that we wanted.  This shouldn't happen. */
+               if (rec->lrh_index > prev_idx) {
+                       CERROR("missed desired record? %u > %u\n",
+                              rec->lrh_index, prev_idx);
+                        RETURN(-ENOENT);
+                }
+                RETURN(0);
+        }
+        RETURN(-EIO);
+}
+
+static struct file *llog_filp_open(char *dir, char *name, int flags, int mode)
+{
+        char *logname;
+        struct file *filp;
+        int len;
+
+        OBD_ALLOC(logname, PATH_MAX);
+        if (logname == NULL)
+                return ERR_PTR(-ENOMEM);
+
+        len = snprintf(logname, PATH_MAX, "%s/%s", dir, name);
+        if (len >= PATH_MAX - 1) {
+                filp = ERR_PTR(-ENAMETOOLONG);
+        } else {
+                filp = l_filp_open(logname, flags, mode);
+               if (IS_ERR(filp) && PTR_ERR(filp) != -ENOENT)
+                        CERROR("logfile creation %s: %ld\n", logname,
+                               PTR_ERR(filp));
+        }
+        OBD_FREE(logname, PATH_MAX);
+        return filp;
+}
+
+static int llog_lvfs_open(const struct lu_env *env,  struct llog_handle *handle,
+                         struct llog_logid *logid, char *name,
+                         enum llog_open_param open_param)
+{
+       struct llog_ctxt        *ctxt = handle->lgh_ctxt;
+       struct l_dentry         *dchild = NULL;
+       struct obd_device       *obd;
+       int                      rc = 0;
+
+       ENTRY;
+
+       LASSERT(ctxt);
+       LASSERT(ctxt->loc_exp);
+       LASSERT(ctxt->loc_exp->exp_obd);
+       obd = ctxt->loc_exp->exp_obd;
+
+       LASSERT(handle);
+       if (logid != NULL) {
+               dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, &logid->lgl_oi,
+                                            logid->lgl_ogen);
+               if (IS_ERR(dchild)) {
+                       rc = PTR_ERR(dchild);
+                       CERROR("%s: error looking up logfile #"DOSTID "#%08x:"
+                              " rc = %d\n", ctxt->loc_obd->obd_name,
+                              POSTID(&logid->lgl_oi), logid->lgl_ogen, rc);
+                       GOTO(out, rc);
+               }
+               if (dchild->d_inode == NULL) {
+                       l_dput(dchild);
+                       rc = -ENOENT;
+                       CERROR("%s: nonexistent llog #"DOSTID"#%08x:"
+                              "rc = %d\n", ctxt->loc_obd->obd_name,
+                              POSTID(&logid->lgl_oi), logid->lgl_ogen, rc);
+                       GOTO(out, rc);
+               }
+               handle->lgh_file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
+                                                O_RDWR | O_LARGEFILE);
+               l_dput(dchild);
+               if (IS_ERR(handle->lgh_file)) {
+                       rc = PTR_ERR(handle->lgh_file);
+                       handle->lgh_file = NULL;
+                       CERROR("%s: error opening llog #"DOSTID"#%08x:"
+                              "rc = %d\n", ctxt->loc_obd->obd_name,
+                              POSTID(&logid->lgl_oi), logid->lgl_ogen, rc);
+                       GOTO(out, rc);
+               }
+               handle->lgh_id = *logid;
+       } else if (name) {
+               handle->lgh_file = llog_filp_open(MOUNT_CONFIGS_DIR, name,
+                                                 O_RDWR | O_LARGEFILE, 0644);
+               if (IS_ERR(handle->lgh_file)) {
+                       rc = PTR_ERR(handle->lgh_file);
+                       handle->lgh_file = NULL;
+                       if (rc == -ENOENT && open_param == LLOG_OPEN_NEW) {
+                               OBD_ALLOC(handle->lgh_name, strlen(name) + 1);
+                               if (handle->lgh_name)
+                                       strcpy(handle->lgh_name, name);
+                               else
+                                       GOTO(out, rc = -ENOMEM);
+                               rc = 0;
+                       } else {
+                               GOTO(out, rc);
+                       }
+               } else {
+                       lustre_build_llog_lvfs_oid(&handle->lgh_id,
+                           handle->lgh_file->f_dentry->d_inode->i_ino,
+                           handle->lgh_file->f_dentry->d_inode->i_generation);
+               }
+       } else {
+               LASSERTF(open_param == LLOG_OPEN_NEW, "%#x\n", open_param);
+               handle->lgh_file = NULL;
+       }
+
+       /* No new llog is expected but doesn't exist */
+       if (open_param != LLOG_OPEN_NEW && handle->lgh_file == NULL)
+               GOTO(out_name, rc = -ENOENT);
+
+       RETURN(0);
+out_name:
+       if (handle->lgh_name != NULL)
+               OBD_FREE(handle->lgh_name, strlen(name) + 1);
+out:
+       RETURN(rc);
+}
+
+static int llog_lvfs_exist(struct llog_handle *handle)
+{
+       return (handle->lgh_file != NULL);
+}
+
+/* This is a callback from the llog_* functions.
+ * Assumes caller has already pushed us into the kernel context. */
+static int llog_lvfs_create(const struct lu_env *env,
+                           struct llog_handle *handle,
+                           struct thandle *th)
+{
+       struct llog_ctxt        *ctxt = handle->lgh_ctxt;
+       struct obd_device       *obd;
+       struct l_dentry         *dchild = NULL;
+       struct file             *file;
+       struct obdo             *oa = NULL;
+       int                      rc = 0;
+       int                      open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
+
+       ENTRY;
+
+       LASSERT(ctxt);
+       LASSERT(ctxt->loc_exp);
+       obd = ctxt->loc_exp->exp_obd;
+       LASSERT(handle->lgh_file == NULL);
+
+       if (handle->lgh_name) {
+               file = llog_filp_open(MOUNT_CONFIGS_DIR, handle->lgh_name,
+                                     open_flags, 0644);
+               if (IS_ERR(file))
+                       RETURN(PTR_ERR(file));
+
+               lustre_build_llog_lvfs_oid(&handle->lgh_id,
+                               file->f_dentry->d_inode->i_ino,
+                               file->f_dentry->d_inode->i_generation);
+               handle->lgh_file = file;
+       } else {
+               OBDO_ALLOC(oa);
+               if (oa == NULL)
+                       RETURN(-ENOMEM);
+
+               ostid_set_seq_llog(&oa->o_oi);
+               oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
+
+               rc = obd_create(NULL, ctxt->loc_exp, oa, NULL, NULL);
+               if (rc)
+                       GOTO(out, rc);
+
+               /* FIXME: rationalize the misuse of o_generation in
+                *        this API along with mds_obd_{create,destroy}.
+                *        Hopefully it is only an internal API issue. */
+#define o_generation o_parent_oid
+               dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, &oa->o_oi,
+                                            oa->o_generation);
+               if (IS_ERR(dchild))
+                       GOTO(out, rc = PTR_ERR(dchild));
+
+               file = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, open_flags);
+               l_dput(dchild);
+               if (IS_ERR(file))
+                       GOTO(out, rc = PTR_ERR(file));
+               handle->lgh_id.lgl_oi = oa->o_oi;
+               handle->lgh_id.lgl_ogen = oa->o_generation;
+               handle->lgh_file = file;
+out:
+               OBDO_FREE(oa);
+       }
+       RETURN(rc);
+}
+
+static int llog_lvfs_close(const struct lu_env *env,
+                          struct llog_handle *handle)
+{
+       int rc;
+
+       ENTRY;
+
+       if (handle->lgh_file == NULL)
+               RETURN(0);
+       rc = filp_close(handle->lgh_file, 0);
+       if (rc)
+               CERROR("%s: error closing llog #"DOSTID"#%08x: "
+                      "rc = %d\n", handle->lgh_ctxt->loc_obd->obd_name,
+                      POSTID(&handle->lgh_id.lgl_oi),
+                      handle->lgh_id.lgl_ogen, rc);
+       handle->lgh_file = NULL;
+       if (handle->lgh_name) {
+               OBD_FREE(handle->lgh_name, strlen(handle->lgh_name) + 1);
+               handle->lgh_name = NULL;
+       }
+       RETURN(rc);
+}
+
+static int llog_lvfs_destroy(const struct lu_env *env,
+                            struct llog_handle *handle)
+{
+        struct dentry *fdentry;
+        struct obdo *oa;
+        struct obd_device *obd = handle->lgh_ctxt->loc_exp->exp_obd;
+        char *dir;
+        void *th;
+        struct inode *inode;
+        int rc, rc1;
+        ENTRY;
+
+        dir = MOUNT_CONFIGS_DIR;
+
+       LASSERT(handle->lgh_file);
+        fdentry = handle->lgh_file->f_dentry;
+        inode = fdentry->d_parent->d_inode;
+        if (strcmp(fdentry->d_parent->d_name.name, dir) == 0) {
+                struct lvfs_run_ctxt saved;
+                struct vfsmount *mnt = mntget(handle->lgh_file->f_vfsmnt);
+
+                push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+                dget(fdentry);
+               rc = llog_lvfs_close(env, handle);
+               if (rc == 0) {
+                       mutex_lock_nested(&inode->i_mutex, I_MUTEX_PARENT);
+                       rc = ll_vfs_unlink(inode, fdentry, mnt);
+                       mutex_unlock(&inode->i_mutex);
+               }
+               mntput(mnt);
+
+                dput(fdentry);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+                RETURN(rc);
+        }
+
+        OBDO_ALLOC(oa);
+        if (oa == NULL)
+                RETURN(-ENOMEM);
+
+       oa->o_oi = handle->lgh_id.lgl_oi;
+       oa->o_generation = handle->lgh_id.lgl_ogen;
+#undef o_generation
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLGENER;
+
+       rc = llog_lvfs_close(env, handle);
+        if (rc)
+                GOTO(out, rc);
+
+        th = fsfilt_start_log(obd, inode, FSFILT_OP_UNLINK, NULL, 1);
+        if (IS_ERR(th)) {
+                CERROR("fsfilt_start failed: %ld\n", PTR_ERR(th));
+                GOTO(out, rc = PTR_ERR(th));
+        }
+
+        rc = obd_destroy(NULL, handle->lgh_ctxt->loc_exp, oa,
+                         NULL, NULL, NULL, NULL);
+
+        rc1 = fsfilt_commit(obd, inode, th, 0);
+        if (rc == 0 && rc1 != 0)
+                rc = rc1;
+ out:
+        OBDO_FREE(oa);
+        RETURN(rc);
+}
+
+static int llog_lvfs_declare_create(const struct lu_env *env,
+                                   struct llog_handle *res,
+                                   struct thandle *th)
+{
+       return 0;
+}
+
+static int llog_lvfs_declare_write_rec(const struct lu_env *env,
+                                      struct llog_handle *loghandle,
+                                      struct llog_rec_hdr *rec,
+                                      int idx, struct thandle *th)
+{
+       return 0;
+}
+
+struct llog_operations llog_lvfs_ops = {
+       .lop_write_rec          = llog_lvfs_write_rec,
+       .lop_next_block         = llog_lvfs_next_block,
+       .lop_prev_block         = llog_lvfs_prev_block,
+       .lop_read_header        = llog_lvfs_read_header,
+       .lop_create             = llog_lvfs_create,
+       .lop_destroy            = llog_lvfs_destroy,
+       .lop_close              = llog_lvfs_close,
+       .lop_open               = llog_lvfs_open,
+       .lop_exist              = llog_lvfs_exist,
+       .lop_declare_create     = llog_lvfs_declare_create,
+       .lop_declare_write_rec  = llog_lvfs_declare_write_rec,
+};
+EXPORT_SYMBOL(llog_lvfs_ops);
+#else /* !__KERNEL__ */
+struct llog_operations llog_lvfs_ops = {};
+#endif