Whamcloud - gitweb
b=1141
authorpschwan <pschwan>
Sun, 6 Jul 2003 18:49:43 +0000 (18:49 +0000)
committerpschwan <pschwan>
Sun, 6 Jul 2003 18:49:43 +0000 (18:49 +0000)
A reorganization of obdfilter/ before I launch into the real work.
There should be no functional changes.
 - removes unused 'niobuf_remote' argument from fsfilt_brw_start
 - splits filter.c into filter.c, filter_io.c, filter_log.c, and filter_san.c
   More of the same is on the way, but this is a good start.

lustre/obdfilter/filter_internal.h [new file with mode: 0644]
lustre/obdfilter/filter_io.c [new file with mode: 0644]
lustre/obdfilter/filter_log.c [new file with mode: 0644]
lustre/obdfilter/filter_san.c [new file with mode: 0644]

diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h
new file mode 100644 (file)
index 0000000..94b5321
--- /dev/null
@@ -0,0 +1,122 @@
+#ifndef _FILTER_INTERNAL_H
+#define _FILTER_INTERNAL_H
+
+
+#ifdef __KERNEL__
+# include <linux/spinlock.h>
+#endif
+#include <linux/lustre_handles.h>
+#include <linux/obd.h>
+
+#ifndef OBD_FILTER_DEVICENAME
+# define OBD_FILTER_DEVICENAME "obdfilter"
+#endif
+
+#ifndef OBD_FILTER_SAN_DEVICENAME
+# define OBD_FILTER_SAN_DEVICENAME "sanobdfilter"
+#endif
+
+#define FILTER_LR_SERVER_SIZE    512
+
+#define FILTER_LR_CLIENT_START   8192
+#define FILTER_LR_CLIENT_SIZE    128
+
+#define FILTER_SUBDIR_COUNT      32            /* set to zero for no subdirs */
+
+#define FILTER_MOUNT_RECOV 2
+#define FILTER_RECOVERY_TIMEOUT (obd_timeout * 5 * HZ / 2) /* *waves hands* */
+
+/* Data stored per server at the head of the last_rcvd file.  In le32 order. */
+struct filter_server_data {
+        __u8  fsd_uuid[37];        /* server UUID */
+        __u8  fsd_uuid_padding[3]; /* unused */
+        __u64 fsd_last_objid;      /* last created object ID */
+        __u64 fsd_last_transno;    /* last completed transaction ID */
+        __u64 fsd_mount_count;     /* FILTER incarnation number */
+        __u32 fsd_feature_compat;  /* compatible feature flags */
+        __u32 fsd_feature_rocompat;/* read-only compatible feature flags */
+        __u32 fsd_feature_incompat;/* incompatible feature flags */
+        __u32 fsd_server_size;     /* size of server data area */
+        __u32 fsd_client_start;    /* start of per-client data area */
+        __u16 fsd_client_size;     /* size of per-client data area */
+        __u16 fsd_subdir_count;    /* number of subdirectories for objects */
+        __u64 fsd_catalog_oid;     /* recovery catalog object id */
+        __u32 fsd_catalog_ogen;    /* recovery catalog inode generation */
+        __u8  fsd_peeruuid[37];    /* UUID of MDS associated with this OST */
+        __u8  peer_padding[3];     /* unused */
+        __u8  fsd_padding[FILTER_LR_SERVER_SIZE - 140];
+};
+
+/* Data stored per client in the last_rcvd file.  In le32 order. */
+struct filter_client_data {
+        __u8  fcd_uuid[37];        /* client UUID */
+        __u8  fcd_uuid_padding[3]; /* unused */
+        __u64 fcd_last_rcvd;       /* last completed transaction ID */
+        __u64 fcd_mount_count;     /* FILTER incarnation number */
+        __u64 fcd_last_xid;        /* client RPC xid for the last transaction */
+        __u8  fcd_padding[FILTER_LR_CLIENT_SIZE - 64];
+};
+
+/* file data for open files on OST */
+struct filter_file_data {
+        struct portals_handle ffd_handle;
+        atomic_t              ffd_refcount;
+        struct list_head      ffd_export_list; /* export open list - fed_lock */
+        struct file          *ffd_file;         /* file handle */
+};
+
+struct filter_dentry_data {
+        struct llog_cookie      fdd_cookie;
+        obd_id                  fdd_objid;
+        __u32                   fdd_magic;
+        atomic_t                fdd_open_count;
+        int                     fdd_flags;
+};
+
+#define FILTER_DENTRY_MAGIC 0x9efba101
+#define FILTER_FLAG_DESTROY 0x0001      /* destroy dentry on last file close */
+
+enum {
+        LPROC_FILTER_READ_BYTES = 0,
+        LPROC_FILTER_WRITE_BYTES = 1,
+        LPROC_FILTER_LAST,
+};
+
+/* filter.c */
+struct dentry *filter_parent(struct obd_device *, obd_mode mode, obd_id objid);
+struct dentry *filter_parent_lock(struct obd_device *, obd_mode mode,
+                                  obd_id objid, ldlm_mode_t lock_mode,
+                                  struct lustre_handle *lockh);
+void f_dput(struct dentry *);
+struct dentry *filter_fid2dentry(struct obd_device *, struct dentry *dir,
+                                 obd_mode mode, obd_id id);
+int filter_finish_transno(struct obd_export *, struct obd_trans_info *, int rc);
+__u64 filter_next_id(struct filter_obd *);
+int filter_update_server_data(struct file *, struct filter_server_data *);
+int filter_common_setup(struct obd_device *, obd_count len, void *buf,
+                        char *option);
+
+/* filter_io.c */
+int filter_preprw(int cmd, struct obd_export *, struct obdo *, int objcount,
+                  struct obd_ioobj *, int niocount, struct niobuf_remote *,
+                  struct niobuf_local *, struct obd_trans_info *);
+int filter_commitrw(int cmd, struct obd_export *, int objcount,
+                    struct obd_ioobj *, int niocount, struct niobuf_local *,
+                    struct obd_trans_info *);
+int filter_brw(int cmd, struct lustre_handle *, struct lov_stripe_md *,
+               obd_count oa_bufs, struct brw_page *, struct obd_trans_info *);
+
+/* filter_log.c */
+int filter_log_cancel(struct lustre_handle *, struct lov_stripe_md *,
+                      int num_cookies, struct llog_cookie *, int flags);
+int filter_log_op_create(struct llog_handle *cathandle, struct ll_fid *mds_fid,
+                         obd_id oid, obd_count ogen, struct llog_cookie *);
+int filter_log_op_orphan(struct llog_handle *cathandle, obd_id oid,
+                         obd_count ogen, struct llog_cookie *);
+
+/* filter_san.c */
+int filter_san_setup(struct obd_device *obd, obd_count len, void *buf);
+int filter_san_preprw(int cmd, struct lustre_handle *, int objcount,
+                      struct obd_ioobj *, int niocount, struct niobuf_remote *);
+
+#endif
diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c
new file mode 100644 (file)
index 0000000..47d8544
--- /dev/null
@@ -0,0 +1,701 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  linux/fs/obdfilter/filter_io.c
+ *
+ *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ *   Author: Peter Braam <braam@clusterfs.com>
+ *   Author: Andreas Dilger <adilger@clusterfs.com>
+ *   Author: Phil Schwan <phil@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define DEBUG_SUBSYSTEM S_FILTER
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/pagemap.h> // XXX kill me soon
+#include <linux/version.h>
+
+#include <linux/obd_class.h>
+#include <linux/lustre_fsfilt.h>
+#include "filter_internal.h"
+
+static inline void lustre_put_page(struct page *page)
+{
+        page_cache_release(page);
+}
+
+static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
+{
+        struct address_space *mapping = inode->i_mapping;
+        struct page *page;
+        unsigned long index = lnb->offset >> PAGE_SHIFT;
+        int rc;
+
+        page = grab_cache_page(mapping, index); /* locked page */
+        if (IS_ERR(page))
+                return lnb->rc = PTR_ERR(page);
+
+        lnb->page = page;
+
+        if (inode->i_size < lnb->offset + lnb->len - 1)
+                lnb->rc = inode->i_size - lnb->offset;
+        else
+                lnb->rc = lnb->len;
+
+        if (PageUptodate(page)) {
+                unlock_page(page);
+                return 0;
+        }
+
+        rc = mapping->a_ops->readpage(NULL, page);
+        if (rc < 0) {
+                CERROR("page index %lu, rc = %d\n", index, rc);
+                lnb->page = NULL;
+                lustre_put_page(page);
+                return lnb->rc = rc;
+        }
+
+        return 0;
+}
+
+static int filter_finish_page_read(struct niobuf_local *lnb)
+{
+        if (lnb->page == NULL)
+                return 0;
+
+        if (PageUptodate(lnb->page))
+                return 0;
+
+        wait_on_page(lnb->page);
+        if (!PageUptodate(lnb->page)) {
+                CERROR("page index %lu/offset "LPX64" not uptodate\n",
+                       lnb->page->index, lnb->offset);
+                GOTO(err_page, lnb->rc = -EIO);
+        }
+        if (PageError(lnb->page)) {
+                CERROR("page index %lu/offset "LPX64" has error\n",
+                       lnb->page->index, lnb->offset);
+                GOTO(err_page, lnb->rc = -EIO);
+        }
+
+        return 0;
+
+err_page:
+        lustre_put_page(lnb->page);
+        lnb->page = NULL;
+        return lnb->rc;
+}
+
+static struct page *lustre_get_page_write(struct inode *inode,
+                                          unsigned long index)
+{
+        struct address_space *mapping = inode->i_mapping;
+        struct page *page;
+        int rc;
+
+        page = grab_cache_page(mapping, index); /* locked page */
+
+        if (!IS_ERR(page)) {
+                /* Note: Called with "O" and "PAGE_SIZE" this is essentially
+                 * a no-op for most filesystems, because we write the whole
+                 * page.  For partial-page I/O this will read in the page.
+                 */
+                rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
+                if (rc) {
+                        CERROR("page index %lu, rc = %d\n", index, rc);
+                        if (rc != -ENOSPC)
+                                LBUG();
+                        GOTO(err_unlock, rc);
+                }
+                /* XXX not sure if we need this if we are overwriting page */
+                if (PageError(page)) {
+                        CERROR("error on page index %lu, rc = %d\n", index, rc);
+                        LBUG();
+                        GOTO(err_unlock, rc = -EIO);
+                }
+        }
+        return page;
+
+err_unlock:
+        unlock_page(page);
+        lustre_put_page(page);
+        return ERR_PTR(rc);
+}
+
+#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
+int waitfor_one_page(struct page *page)
+{
+        wait_on_page_locked(page);
+        return 0;
+}
+#endif
+
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+/* We should only change the file mtime (and not the ctime, like
+ * update_inode_times() in generic_file_write()) when we only change data.
+ */
+static inline void inode_update_time(struct inode *inode, int ctime_too)
+{
+        time_t now = CURRENT_TIME;
+        if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
+                return;
+        inode->i_mtime = now;
+        if (ctime_too)
+                inode->i_ctime = now;
+        mark_inode_dirty_sync(inode);
+}
+#endif
+
+static int lustre_commit_write(struct niobuf_local *lnb)
+{
+        struct page *page = lnb->page;
+        unsigned from = lnb->offset & ~PAGE_MASK;
+        unsigned to = from + lnb->len;
+        struct inode *inode = page->mapping->host;
+        int err;
+
+        LASSERT(to <= PAGE_SIZE);
+        err = page->mapping->a_ops->commit_write(NULL, page, from, to);
+        if (!err && IS_SYNC(inode))
+                err = waitfor_one_page(page);
+        //SetPageUptodate(page); // the client commit_write will do this
+
+        SetPageReferenced(page);
+        unlock_page(page);
+        lustre_put_page(page);
+        return err;
+}
+
+int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
+                          int *pglocked)
+{
+        unsigned long index = lnb->offset >> PAGE_SHIFT;
+        struct address_space *mapping = inode->i_mapping;
+        struct page *page;
+        int rc;
+
+        //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
+        if (*pglocked)
+                page = grab_cache_page_nowait(mapping, index); /* locked page */
+        else
+                page = grab_cache_page(mapping, index); /* locked page */
+
+
+        /* This page is currently locked, so get a temporary page instead. */
+        if (!page) {
+                CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
+                page = alloc_pages(GFP_KERNEL, 0); /* locked page */
+                if (!page) {
+                        CERROR("no memory for a temp page\n");
+                        GOTO(err, rc = -ENOMEM);
+                }
+                page->index = index;
+                lnb->page = page;
+                lnb->flags |= N_LOCAL_TEMP_PAGE;
+        } else if (!IS_ERR(page)) {
+                (*pglocked)++;
+
+                rc = mapping->a_ops->prepare_write(NULL, page,
+                                                   lnb->offset & ~PAGE_MASK,
+                                                   lnb->len);
+                if (rc) {
+                        if (rc != -ENOSPC)
+                                CERROR("page index %lu, rc = %d\n", index, rc);
+                        GOTO(err_unlock, rc);
+                }
+                /* XXX not sure if we need this if we are overwriting page */
+                if (PageError(page)) {
+                        CERROR("error on page index %lu, rc = %d\n", index, rc);
+                        LBUG();
+                        GOTO(err_unlock, rc = -EIO);
+                }
+                lnb->page = page;
+        }
+
+        return 0;
+
+err_unlock:
+        unlock_page(page);
+        lustre_put_page(page);
+err:
+        return lnb->rc = rc;
+}
+
+/*
+ * We need to balance prepare_write() calls with commit_write() calls.
+ * If the page has been prepared, but we have no data for it, we don't
+ * want to overwrite valid data on disk, but we still need to zero out
+ * data for space which was newly allocated.  Like part of what happens
+ * in __block_prepare_write() for newly allocated blocks.
+ *
+ * XXX currently __block_prepare_write() creates buffers for all the
+ *     pages, and the filesystems mark these buffers as BH_New if they
+ *     were newly allocated from disk. We use the BH_New flag similarly.
+ */
+static int filter_commit_write(struct niobuf_local *lnb, int err)
+{
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+        if (err) {
+                unsigned block_start, block_end;
+                struct buffer_head *bh, *head = lnb->page->buffers;
+                unsigned blocksize = head->b_size;
+
+                /* debugging: just seeing if this ever happens */
+                CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
+                       "called for ino %lu:%lu on err %d\n",
+                       lnb->page->mapping->host->i_ino, lnb->page->index, err);
+
+                /* Currently one buffer per page, but in the future... */
+                for (bh = head, block_start = 0; bh != head || !block_start;
+                     block_start = block_end, bh = bh->b_this_page) {
+                        block_end = block_start + blocksize;
+                        if (buffer_new(bh)) {
+                                memset(kmap(lnb->page) + block_start, 0,
+                                       blocksize);
+                                kunmap(lnb->page);
+                        }
+                }
+        }
+#endif
+        return lustre_commit_write(lnb);
+}
+
+int filter_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
+                  int objcount, struct obd_ioobj *obj,
+                  int niocount, struct niobuf_remote *nb,
+                  struct niobuf_local *res, struct obd_trans_info *oti)
+{
+        struct obd_run_ctxt saved;
+        struct obd_device *obd;
+        struct obd_ioobj *o;
+        struct niobuf_remote *rnb;
+        struct niobuf_local *lnb;
+        struct fsfilt_objinfo *fso;
+        struct dentry *dentry;
+        struct inode *inode;
+        int pglocked = 0, rc = 0, i, j, tot_bytes = 0;
+        unsigned long now = jiffies;
+        ENTRY;
+
+        memset(res, 0, niocount * sizeof(*res));
+
+        obd = exp->exp_obd;
+        if (obd == NULL)
+                RETURN(-EINVAL);
+
+        // theoretically we support multi-obj BRW RPCs, but until then...
+        LASSERT(objcount == 1);
+
+        OBD_ALLOC(fso, objcount * sizeof(*fso));
+        if (!fso)
+                RETURN(-ENOMEM);
+
+        push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+
+        for (i = 0, o = obj; i < objcount; i++, o++) {
+                struct filter_dentry_data *fdd;
+
+                LASSERT(o->ioo_bufcnt);
+
+                dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
+
+                if (IS_ERR(dentry))
+                        GOTO(out_objinfo, rc = PTR_ERR(dentry));
+
+                fso[i].fso_dentry = dentry;
+                fso[i].fso_bufcnt = o->ioo_bufcnt;
+
+                if (!dentry->d_inode) {
+                        CERROR("trying to BRW to non-existent file "LPU64"\n",
+                               o->ioo_id);
+                        f_dput(dentry);
+                        GOTO(out_objinfo, rc = -ENOENT);
+                }
+
+                /* If we ever start to support mutli-object BRW RPCs, we will
+                 * need to get locks on mulitple inodes (in order) or use the
+                 * DLM to do the locking for us (and use the same locking in
+                 * filter_setattr() for truncate).  That isn't all, because
+                 * there still exists the possibility of a truncate starting
+                 * a new transaction while holding the ext3 rwsem = write
+                 * while some writes (which have started their transactions
+                 * here) blocking on the ext3 rwsem = read => lock inversion.
+                 *
+                 * The handling gets very ugly when dealing with locked pages.
+                 * It may be easier to just get rid of the locked page code
+                 * (which has problems of its own) and either discover we do
+                 * not need it anymore (i.e. it was a symptom of another bug)
+                 * or ensure we get the page locks in an appropriate order.
+                 */
+                if (cmd & OBD_BRW_WRITE)
+                        down(&dentry->d_inode->i_sem);
+                fdd = dentry->d_fsdata;
+                if (!fdd || !atomic_read(&fdd->fdd_open_count))
+                        CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
+                               o->ioo_id);
+        }
+
+        if (time_after(jiffies, now + 15 * HZ))
+                CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
+
+        if (cmd & OBD_BRW_WRITE) {
+#warning "FIXME: we need inode->i_sem for each object to protect vs truncate"
+                LASSERT(oti);
+                /* Even worse, we need to get locks on mulitple inodes (in
+                 * order) or use the DLM to do the locking for us (and use
+                 * the same locking in filter_setattr() for truncate.  The
+                 * handling gets very ugly when dealing with locked pages.
+                 * It may be easier to just get rid of the locked page code
+                 * (which has problems of its own) and either discover we do
+                 * not need it anymore (i.e. it was a symptom of another bug)
+                 * or ensure we get the page locks in an appropriate order.
+                 */
+                oti->oti_handle = fsfilt_brw_start(obd, objcount, fso, niocount,
+                                                   oti->oti_handle);
+                if (IS_ERR(oti->oti_handle)) {
+                        rc = PTR_ERR(oti->oti_handle);
+                        CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+                               "error starting transaction: rc = %d\n", rc);
+                        oti->oti_handle = NULL;
+                        GOTO(out_objinfo, rc);
+                }
+        }
+
+        for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
+                dentry = fso[i].fso_dentry;
+                inode = dentry->d_inode;
+
+                for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
+                        if (j == 0)
+                                lnb->dentry = dentry;
+                        else
+                                lnb->dentry = dget(dentry);
+
+                        lnb->offset = rnb->offset;
+                        lnb->len    = rnb->len;
+                        lnb->flags  = rnb->flags;
+                        lnb->start  = jiffies;
+
+                        if (cmd & OBD_BRW_WRITE) {
+                                rc = filter_get_page_write(inode,lnb,&pglocked);
+                                if (rc)
+                                        up(&dentry->d_inode->i_sem);
+                        } else if (inode->i_size <= rnb->offset) {
+                                /* If there's no more data, abort early.
+                                 * lnb->page == NULL and lnb->rc == 0, so it's
+                                 * easy to detect later. */
+                                f_dput(dentry);
+                                lnb->dentry = NULL;
+                                break;
+                        } else {
+                                rc = filter_start_page_read(inode, lnb);
+                        }
+
+                        if (rc) {
+                                CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+                                       "page err %u@"LPU64" %u/%u %p: rc %d\n",
+                                       lnb->len, lnb->offset, j, o->ioo_bufcnt,
+                                       dentry, rc);
+                                f_dput(dentry);
+                                GOTO(out_pages, rc);
+                        }
+
+                        tot_bytes += lnb->len;
+
+                        if ((cmd & OBD_BRW_READ) && lnb->rc < lnb->len) {
+                                /* Likewise with a partial read */
+                                break;
+                        }
+                }
+        }
+
+        if (time_after(jiffies, now + 15 * HZ))
+                CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
+
+        if (cmd & OBD_BRW_READ) {
+                lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES,
+                                    tot_bytes);
+                while (lnb-- > res) {
+                        rc = filter_finish_page_read(lnb);
+                        if (rc) {
+                                CERROR("error page %u@"LPU64" %u %p: rc %d\n",
+                                       lnb->len, lnb->offset, (int)(lnb - res),
+                                       lnb->dentry, rc);
+                                f_dput(lnb->dentry);
+                                GOTO(out_pages, rc);
+                        }
+                }
+        } else
+                lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
+                                    tot_bytes);
+
+        if (time_after(jiffies, now + 15 * HZ))
+                CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
+
+        EXIT;
+out:
+        OBD_FREE(fso, objcount * sizeof(*fso));
+        /* we saved the journal handle into oti->oti_handle instead */
+        current->journal_info = NULL;
+        pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+        return rc;
+
+out_pages:
+        while (lnb-- > res) {
+                if (cmd & OBD_BRW_WRITE) {
+                        filter_commit_write(lnb, rc);
+                        up(&lnb->dentry->d_inode->i_sem);
+                } else {
+                        lustre_put_page(lnb->page);
+                }
+                f_dput(lnb->dentry);
+        }
+        if (cmd & OBD_BRW_WRITE) {
+                filter_finish_transno(exp, oti, rc);
+                fsfilt_commit(obd,
+                              filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
+                              oti->oti_handle, 0);
+        }
+        goto out; /* dropped the dentry refs already (one per page) */
+
+out_objinfo:
+        for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
+                if (cmd & OBD_BRW_WRITE)
+                        up(&fso[i].fso_dentry->d_inode->i_sem);
+                f_dput(fso[i].fso_dentry);
+        }
+        goto out;
+}
+
+static int filter_write_locked_page(struct niobuf_local *lnb)
+{
+        struct page *lpage;
+        void        *lpage_addr;
+        void        *lnb_addr;
+        int rc;
+        ENTRY;
+
+        lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
+        if (IS_ERR(lpage)) {
+                /* It is highly unlikely that we would ever get an error here.
+                 * The page we want to get was previously locked, so it had to
+                 * have already allocated the space, and we were just writing
+                 * over the same data, so there would be no hole in the file.
+                 *
+                 * XXX: possibility of a race with truncate could exist, need
+                 *      to check that.  There are no guarantees w.r.t.
+                 *      write order even on a local filesystem, although the
+                 *      normal response would be to return the number of bytes
+                 *      successfully written and leave the rest to the app.
+                 */
+                rc = PTR_ERR(lpage);
+                CERROR("error getting locked page index %ld: rc = %d\n",
+                       lnb->page->index, rc);
+                LBUG();
+                lustre_commit_write(lnb);
+                RETURN(rc);
+        }
+
+        /* 2 kmaps == vanishingly small deadlock opportunity */
+        lpage_addr = kmap(lpage);
+        lnb_addr = kmap(lnb->page);
+
+        memcpy(lpage_addr, lnb_addr, PAGE_SIZE);
+
+        kunmap(lnb->page);
+        kunmap(lpage);
+
+        lustre_put_page(lnb->page);
+
+        lnb->page = lpage;
+        rc = lustre_commit_write(lnb);
+        if (rc)
+                CERROR("error committing locked page %ld: rc = %d\n",
+                       lnb->page->index, rc);
+
+        RETURN(rc);
+}
+
+int filter_commitrw(int cmd, struct obd_export *exp,
+                    int objcount, struct obd_ioobj *obj,
+                    int niocount, struct niobuf_local *res,
+                    struct obd_trans_info *oti)
+{
+        struct obd_run_ctxt saved;
+        struct obd_ioobj *o;
+        struct niobuf_local *lnb;
+        struct obd_device *obd = exp->exp_obd;
+        int found_locked = 0, rc = 0, i;
+        int nested_trans = current->journal_info != NULL;
+        unsigned long now = jiffies;  /* DEBUGGING OST TIMEOUTS */
+        ENTRY;
+
+        push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+
+        if (cmd & OBD_BRW_WRITE) {
+                LASSERT(oti);
+                LASSERT(current->journal_info == NULL ||
+                        current->journal_info == oti->oti_handle);
+                current->journal_info = oti->oti_handle;
+        }
+
+        for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
+                int j;
+
+                if (cmd & OBD_BRW_WRITE) {
+                        inode_update_time(lnb->dentry->d_inode, 1);
+                        up(&lnb->dentry->d_inode->i_sem);
+                }
+                for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
+                        if (lnb->page == NULL) {
+                                continue;
+                        }
+
+                        if (lnb->flags & N_LOCAL_TEMP_PAGE) {
+                                found_locked++;
+                                continue;
+                        }
+
+                        if (time_after(jiffies, lnb->start + 15 * HZ))
+                                CERROR("slow commitrw %lus\n",
+                                       (jiffies - lnb->start) / HZ);
+
+                        if (cmd & OBD_BRW_WRITE) {
+                                int err = filter_commit_write(lnb, 0);
+
+                                if (!rc)
+                                        rc = err;
+                        } else {
+                                lustre_put_page(lnb->page);
+                        }
+
+                        f_dput(lnb->dentry);
+                        if (time_after(jiffies, lnb->start + 15 * HZ))
+                                CERROR("slow commit_write %lus\n",
+                                       (jiffies - lnb->start) / HZ);
+                }
+        }
+
+        for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
+             i++, o++) {
+                int j;
+                for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
+                        int err;
+                        if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
+                                continue;
+
+                        if (time_after(jiffies, lnb->start + 15 * HZ))
+                                CERROR("slow commitrw locked %lus\n",
+                                       (jiffies - lnb->start) / HZ);
+
+                        err = filter_write_locked_page(lnb);
+                        if (!rc)
+                                rc = err;
+                        f_dput(lnb->dentry);
+                        found_locked--;
+
+                        if (time_after(jiffies, lnb->start + 15 * HZ))
+                                CERROR("slow commit_write locked %lus\n",
+                                       (jiffies - lnb->start) / HZ);
+                }
+        }
+
+        if (cmd & OBD_BRW_WRITE) {
+                /* We just want any dentry for the commit, for now */
+                struct dentry *dparent = filter_parent(obd, S_IFREG, 0);
+                int err;
+
+                rc = filter_finish_transno(exp, oti, rc);
+                err = fsfilt_commit(obd, dparent->d_inode, oti->oti_handle,
+                                    obd_sync_filter);
+                if (err)
+                        rc = err;
+                if (obd_sync_filter)
+                        LASSERT(oti->oti_transno <= obd->obd_last_committed);
+                if (time_after(jiffies, now + 15 * HZ))
+                        CERROR("slow commitrw commit %lus\n", (jiffies-now)/HZ);
+        }
+
+        LASSERT(nested_trans || current->journal_info == NULL);
+
+        pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+        RETURN(rc);
+}
+
+int filter_brw(int cmd, struct lustre_handle *conn,
+               struct lov_stripe_md *lsm, obd_count oa_bufs,
+               struct brw_page *pga, struct obd_trans_info *oti)
+{
+        struct obd_export      *exp = class_conn2export(conn);
+        struct obd_ioobj        ioo;
+        struct niobuf_local    *lnb;
+        struct niobuf_remote   *rnb;
+        obd_count               i;
+        int                     ret = 0;
+        ENTRY;
+
+        if (exp == NULL)
+                RETURN(-EINVAL);
+
+        OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
+        OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
+
+        if (lnb == NULL || rnb == NULL)
+                GOTO(out, ret = -ENOMEM);
+
+        for (i = 0; i < oa_bufs; i++) {
+                rnb[i].offset = pga[i].off;
+                rnb[i].len = pga[i].count;
+        }
+
+        ioo.ioo_id = lsm->lsm_object_id;
+        ioo.ioo_gr = 0;
+        ioo.ioo_type = S_IFREG;
+        ioo.ioo_bufcnt = oa_bufs;
+
+        ret = filter_preprw(cmd, exp, NULL, 1, &ioo, oa_bufs, rnb, lnb, oti);
+        if (ret != 0)
+                GOTO(out, ret);
+
+        for (i = 0; i < oa_bufs; i++) {
+                void *virt = kmap(pga[i].pg);
+                obd_off off = pga[i].off & ~PAGE_MASK;
+                void *addr = kmap(lnb[i].page);
+
+                /* 2 kmaps == vanishingly small deadlock opportunity */
+
+                if (cmd & OBD_BRW_WRITE)
+                        memcpy(addr + off, virt + off, pga[i].count);
+                else
+                        memcpy(virt + off, addr + off, pga[i].count);
+
+                kunmap(addr);
+                kunmap(virt);
+        }
+
+        ret = filter_commitrw(cmd, exp, 1, &ioo, oa_bufs, lnb, oti);
+
+out:
+        if (lnb)
+                OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
+        if (rnb)
+                OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
+        class_export_put(exp);
+        RETURN(ret);
+}
diff --git a/lustre/obdfilter/filter_log.c b/lustre/obdfilter/filter_log.c
new file mode 100644 (file)
index 0000000..790659d
--- /dev/null
@@ -0,0 +1,379 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  linux/fs/obdfilter/filter_log.c
+ *
+ *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ *   Author: Peter Braam <braam@clusterfs.com>
+ *   Author: Andreas Dilger <adilger@clusterfs.com>
+ *   Author: Phil Schwan <phil@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define DEBUG_SUBSYSTEM S_FILTER
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/version.h>
+
+#include <portals/list.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_fsfilt.h>
+#include <linux/lustre_commit_confd.h>
+
+#include "filter_internal.h"
+
+static struct llog_handle *filter_log_create(struct obd_device *obd);
+
+/* This is a callback from the llog_* functions.
+ * Assumes caller has already pushed us into the kernel context. */
+static int filter_log_close(struct llog_handle *cathandle,
+                            struct llog_handle *loghandle)
+{
+        struct llog_object_hdr *llh = loghandle->lgh_hdr;
+        struct file *file = loghandle->lgh_file;
+        struct dentry *dparent = NULL, *dchild = NULL;
+        struct lustre_handle parent_lockh;
+        struct llog_logid *lgl = &loghandle->lgh_cookie.lgc_lgl;
+        int rc;
+        ENTRY;
+
+        /* If we are going to delete this log, grab a ref before we close
+         * it so we don't have to immediately do another lookup. */
+        if (llh->llh_hdr.lth_type != LLOG_CATALOG_MAGIC && llh->llh_count == 0){
+                CDEBUG(D_INODE, "deleting log file "LPX64":%x\n",
+                       lgl->lgl_oid, lgl->lgl_ogen);
+                dparent = filter_parent_lock(loghandle->lgh_obd, S_IFREG,
+                                             lgl->lgl_oid,LCK_PW,&parent_lockh);
+                if (IS_ERR(dparent)) {
+                        rc = PTR_ERR(dparent);
+                        CERROR("error locking parent, orphan log %*s: rc %d\n",
+                               file->f_dentry->d_name.len,
+                               file->f_dentry->d_name.name, rc);
+                        RETURN(rc);
+                } else {
+                        dchild = dget(file->f_dentry);
+                        llog_delete_log(cathandle, loghandle);
+                }
+        } else {
+                CDEBUG(D_INODE, "closing log file "LPX64":%x\n",
+                       lgl->lgl_oid, lgl->lgl_ogen);
+        }
+
+        rc = filp_close(file, 0);
+
+        llog_free_handle(loghandle); /* also removes loghandle from list */
+
+        if (dchild != NULL) {
+                int err = vfs_unlink(dparent->d_inode, dchild);
+                if (err) {
+                        CERROR("error unlinking empty log %*s: rc %d\n",
+                               dchild->d_name.len, dchild->d_name.name, err);
+                        if (!rc)
+                                rc = err;
+                }
+                f_dput(dchild);
+                ldlm_lock_decref(&parent_lockh, LCK_PW);
+        }
+        RETURN(rc);
+}
+
+/* This is a callback from the llog_* functions.
+ * Assumes caller has already pushed us into the kernel context. */
+static struct llog_handle *filter_log_open(struct obd_device *obd,
+                                           struct llog_cookie *logcookie)
+{
+        struct llog_logid *lgl = &logcookie->lgc_lgl;
+        struct llog_handle *loghandle;
+        struct dentry *dchild;
+        int rc;
+        ENTRY;
+
+        loghandle = llog_alloc_handle();
+        if (!loghandle)
+                RETURN(ERR_PTR(-ENOMEM));
+
+        dchild = filter_fid2dentry(obd, NULL, S_IFREG, lgl->lgl_oid);
+        if (IS_ERR(dchild))
+                GOTO(out_handle, rc = PTR_ERR(dchild));
+
+        if (dchild->d_inode == NULL) {
+                CERROR("logcookie references non-existent object %*s\n",
+                       dchild->d_name.len, dchild->d_name.name);
+                GOTO(out_dentry, rc = -ENOENT);
+        }
+
+        if (dchild->d_inode->i_generation != lgl->lgl_ogen) {
+                CERROR("logcookie for %*s had different generation %x != %x\n",
+                       dchild->d_name.len, dchild->d_name.name,
+                       dchild->d_inode->i_generation, lgl->lgl_ogen);
+                GOTO(out_dentry, rc = -ESTALE);
+        }
+
+        /* dentry_open does a dput(dchild) and mntput(mnt) on error */
+        mntget(obd->u.filter.fo_vfsmnt);
+        loghandle->lgh_file = dentry_open(dchild, obd->u.filter.fo_vfsmnt,
+                                          O_RDWR);
+        if (IS_ERR(loghandle->lgh_file)) {
+                rc = PTR_ERR(loghandle->lgh_file);
+                CERROR("error opening logfile %*s: rc %d\n",
+                       dchild->d_name.len, dchild->d_name.name, rc);
+                GOTO(out_dentry, rc);
+        }
+        memcpy(&loghandle->lgh_cookie, logcookie, sizeof(*logcookie));
+        loghandle->lgh_log_create = filter_log_create;
+        loghandle->lgh_log_open = filter_log_open;
+        loghandle->lgh_log_close = filter_log_close;
+        loghandle->lgh_obd = obd;
+        RETURN(loghandle);
+
+out_dentry:
+        f_dput(dchild);
+out_handle:
+        llog_free_handle(loghandle);
+        RETURN(ERR_PTR(rc));
+}
+
+/* This is a callback from the llog_* functions.
+ * Assumes caller has already pushed us into the kernel context. */
+static struct llog_handle *filter_log_create(struct obd_device *obd)
+{
+        struct filter_obd *filter = &obd->u.filter;
+        struct lustre_handle parent_lockh;
+        struct dentry *dparent, *dchild;
+        struct llog_handle *loghandle;
+        struct file *file;
+        int err, rc;
+        obd_id id;
+        ENTRY;
+
+        loghandle = llog_alloc_handle();
+        if (!loghandle)
+                RETURN(ERR_PTR(-ENOMEM));
+
+ retry:
+        id = filter_next_id(filter);
+
+        dparent = filter_parent_lock(obd, S_IFREG, id, LCK_PW, &parent_lockh);
+        if (IS_ERR(dparent))
+                GOTO(out_ctxt, rc = PTR_ERR(dparent));
+
+        dchild = filter_fid2dentry(obd, dparent, S_IFREG, id);
+        if (IS_ERR(dchild))
+                GOTO(out_lock, rc = PTR_ERR(dchild));
+
+        if (dchild->d_inode != NULL) {
+                /* This would only happen if lastobjid was bad on disk */
+                CERROR("Serious error: objid %*s already exists; is this "
+                       "filesystem corrupt?  I will try to work around it.\n",
+                       dchild->d_name.len, dchild->d_name.name);
+                f_dput(dchild);
+                ldlm_lock_decref(&parent_lockh, LCK_PW);
+                goto retry;
+        }
+
+        rc = vfs_create(dparent->d_inode, dchild, S_IFREG);
+        if (rc) {
+                CERROR("log create failed rc = %d\n", rc);
+                GOTO(out_child, rc);
+        }
+
+        rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
+        if (rc) {
+                CERROR("can't write lastobjid but log created: rc %d\n",rc);
+                GOTO(out_destroy, rc);
+        }
+
+        /* dentry_open does a dput(dchild) and mntput(mnt) on error */
+        mntget(filter->fo_vfsmnt);
+        file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
+        if (IS_ERR(file)) {
+                rc = PTR_ERR(file);
+                CERROR("error opening log file "LPX64": rc %d\n", id, rc);
+                GOTO(out_destroy, rc);
+        }
+        ldlm_lock_decref(&parent_lockh, LCK_PW);
+
+        loghandle->lgh_file = file;
+        loghandle->lgh_cookie.lgc_lgl.lgl_oid = id;
+        loghandle->lgh_cookie.lgc_lgl.lgl_ogen = dchild->d_inode->i_generation;
+        loghandle->lgh_log_create = filter_log_create;
+        loghandle->lgh_log_open = filter_log_open;
+        loghandle->lgh_log_close = filter_log_close;
+        loghandle->lgh_obd = obd;
+
+        RETURN(loghandle);
+
+out_destroy:
+        err = vfs_unlink(dparent->d_inode, dchild);
+        if (err)
+                CERROR("error unlinking %*s on error: rc %d\n",
+                       dchild->d_name.len, dchild->d_name.name, err);
+out_child:
+        f_dput(dchild);
+out_lock:
+        ldlm_lock_decref(&parent_lockh, LCK_PW);
+out_ctxt:
+        llog_free_handle(loghandle);
+        RETURN(ERR_PTR(rc));
+}
+
+/* This is called from filter_setup() and should be single threaded */
+static struct llog_handle *filter_get_catalog(struct obd_device *obd)
+{
+        struct filter_obd *filter = &obd->u.filter;
+        struct filter_server_data *fsd = filter->fo_fsd;
+        struct obd_run_ctxt saved;
+        struct llog_handle *cathandle = NULL;
+        int rc;
+        ENTRY;
+
+        push_ctxt(&saved, &filter->fo_ctxt, NULL);
+        if (fsd->fsd_catalog_oid) {
+                struct llog_cookie catcookie;
+
+                catcookie.lgc_lgl.lgl_oid = le64_to_cpu(fsd->fsd_catalog_oid);
+                catcookie.lgc_lgl.lgl_ogen = le32_to_cpu(fsd->fsd_catalog_ogen);
+                cathandle = filter_log_open(obd, &catcookie);
+                if (IS_ERR(cathandle)) {
+                        CERROR("error opening catalog "LPX64":%x: rc %d\n",
+                               catcookie.lgc_lgl.lgl_oid,
+                               catcookie.lgc_lgl.lgl_ogen,
+                               (int)PTR_ERR(cathandle));
+                        fsd->fsd_catalog_oid = 0;
+                        fsd->fsd_catalog_ogen = 0;
+                }
+        }
+
+        if (!fsd->fsd_catalog_oid) {
+                struct llog_logid *lgl;
+
+                cathandle = filter_log_create(obd);
+                if (IS_ERR(cathandle)) {
+                        CERROR("error creating new catalog: rc %d\n",
+                               (int)PTR_ERR(cathandle));
+                        GOTO(out, cathandle);
+                }
+                lgl = &cathandle->lgh_cookie.lgc_lgl;
+                fsd->fsd_catalog_oid = cpu_to_le64(lgl->lgl_oid);
+                fsd->fsd_catalog_ogen = cpu_to_le32(lgl->lgl_ogen);
+                rc = filter_update_server_data(filter->fo_rcvd_filp, fsd);
+                if (rc) {
+                        CERROR("error writing new catalog to disk: rc %d\n",rc);
+                        GOTO(out_handle, rc);
+                }
+        }
+
+        rc = llog_init_catalog(cathandle, &obd->u.filter.fo_mdc_uuid);
+        if (rc)
+                GOTO(out_handle, rc);
+out:
+        pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+        RETURN(cathandle);
+
+out_handle:
+        filter_log_close(cathandle, cathandle);
+        cathandle = ERR_PTR(rc);
+        goto out;
+}
+
+static void filter_put_catalog(struct llog_handle *cathandle)
+{
+        struct llog_handle *loghandle, *n;
+        int rc;
+        ENTRY;
+
+        list_for_each_entry_safe(loghandle, n, &cathandle->lgh_list, lgh_list)
+                filter_log_close(cathandle, loghandle);
+
+        rc = filp_close(cathandle->lgh_file, 0);
+        if (rc)
+                CERROR("error closing catalog: rc %d\n", rc);
+
+        llog_free_handle(cathandle);
+        EXIT;
+}
+
+int filter_log_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
+                      int num_cookies, struct llog_cookie *logcookies,
+                      int flags)
+{
+        struct obd_device *obd = class_conn2obd(conn);
+        struct obd_run_ctxt saved;
+        int rc;
+        ENTRY;
+
+        push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+        rc = llog_cancel_records(obd->u.filter.fo_catalog, num_cookies,
+                                 logcookies);
+        pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+
+        RETURN(rc);
+}
+
+int filter_log_op_create(struct llog_handle *cathandle, struct ll_fid *mds_fid,
+                         obd_id oid, obd_count ogen,
+                         struct llog_cookie *logcookie)
+{
+        struct llog_create_rec *lcr;
+        int rc;
+        ENTRY;
+
+        OBD_ALLOC(lcr, sizeof(*lcr));
+        if (lcr == NULL)
+                RETURN(-ENOMEM);
+        lcr->lcr_hdr.lth_len = lcr->lcr_end_len = sizeof(*lcr);
+        lcr->lcr_hdr.lth_type = OST_CREATE_REC;
+        lcr->lcr_fid.id = mds_fid->id;
+        lcr->lcr_fid.generation = mds_fid->generation;
+        lcr->lcr_fid.f_type = mds_fid->f_type;
+        lcr->lcr_oid = oid;
+        lcr->lcr_ogen = ogen;
+
+        rc = llog_add_record(cathandle, &lcr->lcr_hdr, logcookie);
+        OBD_FREE(lcr, sizeof(*lcr));
+
+        if (rc > 0) {
+                LASSERT(rc == sizeof(*logcookie));
+                rc = 0;
+        }
+        RETURN(rc);
+}
+
+int filter_log_op_orphan(struct llog_handle *cathandle, obd_id oid,
+                         obd_count ogen, struct llog_cookie *logcookie)
+{
+        struct llog_orphan_rec *lor;
+        int rc;
+        ENTRY;
+
+        OBD_ALLOC(lor, sizeof(*lor));
+        if (lor == NULL)
+                RETURN(-ENOMEM);
+        lor->lor_hdr.lth_len = lor->lor_end_len = sizeof(*lor);
+        lor->lor_hdr.lth_type = OST_ORPHAN_REC;
+        lor->lor_oid = oid;
+        lor->lor_ogen = ogen;
+
+        rc = llog_add_record(cathandle, &lor->lor_hdr, logcookie);
+
+        if (rc > 0) {
+                LASSERT(rc == sizeof(*logcookie));
+                rc = 0;
+        }
+        RETURN(rc);
+}
diff --git a/lustre/obdfilter/filter_san.c b/lustre/obdfilter/filter_san.c
new file mode 100644 (file)
index 0000000..c535ccc
--- /dev/null
@@ -0,0 +1,126 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  linux/fs/obdfilter/filter_san.c
+ *
+ *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ *   Author: Peter Braam <braam@clusterfs.com>
+ *   Author: Andreas Dilger <adilger@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define DEBUG_SUBSYSTEM S_FILTER
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/pagemap.h> // XXX kill me soon
+#include <linux/version.h>
+
+#include <linux/obd_class.h>
+#include <linux/lustre_fsfilt.h>
+#include "filter_internal.h"
+
+/* sanobd setup methods - use a specific mount option */
+int filter_san_setup(struct obd_device *obd, obd_count len, void *buf)
+{
+        struct obd_ioctl_data* data = buf;
+        char *option = NULL;
+
+        if (!data->ioc_inlbuf2)
+                RETURN(-EINVAL);
+
+        /* for extN/ext3 filesystem, we must mount it with 'writeback' mode */
+        if (!strcmp(data->ioc_inlbuf2, "extN"))
+                option = "data=writeback";
+        else if (!strcmp(data->ioc_inlbuf2, "ext3"))
+                option = "data=writeback,asyncdel";
+        else
+                LBUG(); /* just a reminder */
+
+        return filter_common_setup(obd, len, buf, option);
+}
+
+int filter_san_preprw(int cmd, struct lustre_handle *conn, int objcount,
+                      struct obd_ioobj *obj, int niocount,
+                      struct niobuf_remote *nb)
+{
+        struct obd_device *obd;
+        struct obd_ioobj *o = obj;
+        struct niobuf_remote *rnb = nb;
+        int rc = 0;
+        int i;
+        ENTRY;
+
+        obd = class_conn2obd(conn);
+        if (!obd) {
+                CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
+                       conn->cookie);
+                RETURN(-EINVAL);
+        }
+
+        for (i = 0; i < objcount; i++, o++) {
+                struct dentry *dentry;
+                struct inode *inode;
+                int (*fs_bmap)(struct address_space *, long);
+                int j;
+
+                dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
+                if (IS_ERR(dentry))
+                        GOTO(out, rc = PTR_ERR(dentry));
+                inode = dentry->d_inode;
+                if (!inode) {
+                        CERROR("trying to BRW to non-existent file "LPU64"\n",
+                               o->ioo_id);
+                        f_dput(dentry);
+                        GOTO(out, rc = -ENOENT);
+                }
+                fs_bmap = inode->i_mapping->a_ops->bmap;
+
+                for (j = 0; j < o->ioo_bufcnt; j++, rnb++) {
+                        long block;
+
+                        block = rnb->offset >> inode->i_blkbits;
+
+                        if (cmd == OBD_BRW_READ) {
+                                block = fs_bmap(inode->i_mapping, block);
+                        } else {
+                                loff_t newsize = rnb->offset + rnb->len;
+                                /* fs_prep_san_write will also update inode
+                                 * size for us:
+                                 * (1) new alloced block
+                                 * (2) existed block but size extented
+                                 */
+                                /* FIXME We could call fs_prep_san_write()
+                                 * only once for all the blocks allocation.
+                                 * Now call it once for each block, for
+                                 * simplicity. And if error happens, we
+                                 * probably need to release previous alloced
+                                 * block */
+                                rc = fs_prep_san_write(obd, inode, &block,
+                                                       1, newsize);
+                                if (rc)
+                                        break;
+                        }
+
+                        rnb->offset = block;
+                }
+                f_dput(dentry);
+        }
+out:
+        RETURN(rc);
+}
+