A reorganization of obdfilter/ before I launch into the real work.
There should be no functional changes.
- removes unused 'niobuf_remote' argument from fsfilt_brw_start
- splits filter.c into filter.c, filter_io.c, filter_log.c, and filter_san.c
More of the same is on the way, but this is a good start.
--- /dev/null
+#ifndef _FILTER_INTERNAL_H
+#define _FILTER_INTERNAL_H
+
+
+#ifdef __KERNEL__
+# include <linux/spinlock.h>
+#endif
+#include <linux/lustre_handles.h>
+#include <linux/obd.h>
+
+#ifndef OBD_FILTER_DEVICENAME
+# define OBD_FILTER_DEVICENAME "obdfilter"
+#endif
+
+#ifndef OBD_FILTER_SAN_DEVICENAME
+# define OBD_FILTER_SAN_DEVICENAME "sanobdfilter"
+#endif
+
+#define FILTER_LR_SERVER_SIZE 512
+
+#define FILTER_LR_CLIENT_START 8192
+#define FILTER_LR_CLIENT_SIZE 128
+
+#define FILTER_SUBDIR_COUNT 32 /* set to zero for no subdirs */
+
+#define FILTER_MOUNT_RECOV 2
+#define FILTER_RECOVERY_TIMEOUT (obd_timeout * 5 * HZ / 2) /* *waves hands* */
+
+/* Data stored per server at the head of the last_rcvd file. In le32 order. */
+struct filter_server_data {
+ __u8 fsd_uuid[37]; /* server UUID */
+ __u8 fsd_uuid_padding[3]; /* unused */
+ __u64 fsd_last_objid; /* last created object ID */
+ __u64 fsd_last_transno; /* last completed transaction ID */
+ __u64 fsd_mount_count; /* FILTER incarnation number */
+ __u32 fsd_feature_compat; /* compatible feature flags */
+ __u32 fsd_feature_rocompat;/* read-only compatible feature flags */
+ __u32 fsd_feature_incompat;/* incompatible feature flags */
+ __u32 fsd_server_size; /* size of server data area */
+ __u32 fsd_client_start; /* start of per-client data area */
+ __u16 fsd_client_size; /* size of per-client data area */
+ __u16 fsd_subdir_count; /* number of subdirectories for objects */
+ __u64 fsd_catalog_oid; /* recovery catalog object id */
+ __u32 fsd_catalog_ogen; /* recovery catalog inode generation */
+ __u8 fsd_peeruuid[37]; /* UUID of MDS associated with this OST */
+ __u8 peer_padding[3]; /* unused */
+ __u8 fsd_padding[FILTER_LR_SERVER_SIZE - 140];
+};
+
+/* Data stored per client in the last_rcvd file. In le32 order. */
+struct filter_client_data {
+ __u8 fcd_uuid[37]; /* client UUID */
+ __u8 fcd_uuid_padding[3]; /* unused */
+ __u64 fcd_last_rcvd; /* last completed transaction ID */
+ __u64 fcd_mount_count; /* FILTER incarnation number */
+ __u64 fcd_last_xid; /* client RPC xid for the last transaction */
+ __u8 fcd_padding[FILTER_LR_CLIENT_SIZE - 64];
+};
+
+/* file data for open files on OST */
+struct filter_file_data {
+ struct portals_handle ffd_handle;
+ atomic_t ffd_refcount;
+ struct list_head ffd_export_list; /* export open list - fed_lock */
+ struct file *ffd_file; /* file handle */
+};
+
+struct filter_dentry_data {
+ struct llog_cookie fdd_cookie;
+ obd_id fdd_objid;
+ __u32 fdd_magic;
+ atomic_t fdd_open_count;
+ int fdd_flags;
+};
+
+#define FILTER_DENTRY_MAGIC 0x9efba101
+#define FILTER_FLAG_DESTROY 0x0001 /* destroy dentry on last file close */
+
+enum {
+ LPROC_FILTER_READ_BYTES = 0,
+ LPROC_FILTER_WRITE_BYTES = 1,
+ LPROC_FILTER_LAST,
+};
+
+/* filter.c */
+struct dentry *filter_parent(struct obd_device *, obd_mode mode, obd_id objid);
+struct dentry *filter_parent_lock(struct obd_device *, obd_mode mode,
+ obd_id objid, ldlm_mode_t lock_mode,
+ struct lustre_handle *lockh);
+void f_dput(struct dentry *);
+struct dentry *filter_fid2dentry(struct obd_device *, struct dentry *dir,
+ obd_mode mode, obd_id id);
+int filter_finish_transno(struct obd_export *, struct obd_trans_info *, int rc);
+__u64 filter_next_id(struct filter_obd *);
+int filter_update_server_data(struct file *, struct filter_server_data *);
+int filter_common_setup(struct obd_device *, obd_count len, void *buf,
+ char *option);
+
+/* filter_io.c */
+int filter_preprw(int cmd, struct obd_export *, struct obdo *, int objcount,
+ struct obd_ioobj *, int niocount, struct niobuf_remote *,
+ struct niobuf_local *, struct obd_trans_info *);
+int filter_commitrw(int cmd, struct obd_export *, int objcount,
+ struct obd_ioobj *, int niocount, struct niobuf_local *,
+ struct obd_trans_info *);
+int filter_brw(int cmd, struct lustre_handle *, struct lov_stripe_md *,
+ obd_count oa_bufs, struct brw_page *, struct obd_trans_info *);
+
+/* filter_log.c */
+int filter_log_cancel(struct lustre_handle *, struct lov_stripe_md *,
+ int num_cookies, struct llog_cookie *, int flags);
+int filter_log_op_create(struct llog_handle *cathandle, struct ll_fid *mds_fid,
+ obd_id oid, obd_count ogen, struct llog_cookie *);
+int filter_log_op_orphan(struct llog_handle *cathandle, obd_id oid,
+ obd_count ogen, struct llog_cookie *);
+
+/* filter_san.c */
+int filter_san_setup(struct obd_device *obd, obd_count len, void *buf);
+int filter_san_preprw(int cmd, struct lustre_handle *, int objcount,
+ struct obd_ioobj *, int niocount, struct niobuf_remote *);
+
+#endif
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * linux/fs/obdfilter/filter_io.c
+ *
+ * Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ * Author: Peter Braam <braam@clusterfs.com>
+ * Author: Andreas Dilger <adilger@clusterfs.com>
+ * Author: Phil Schwan <phil@clusterfs.com>
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define DEBUG_SUBSYSTEM S_FILTER
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/pagemap.h> // XXX kill me soon
+#include <linux/version.h>
+
+#include <linux/obd_class.h>
+#include <linux/lustre_fsfilt.h>
+#include "filter_internal.h"
+
+static inline void lustre_put_page(struct page *page)
+{
+ page_cache_release(page);
+}
+
+static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
+{
+ struct address_space *mapping = inode->i_mapping;
+ struct page *page;
+ unsigned long index = lnb->offset >> PAGE_SHIFT;
+ int rc;
+
+ page = grab_cache_page(mapping, index); /* locked page */
+ if (IS_ERR(page))
+ return lnb->rc = PTR_ERR(page);
+
+ lnb->page = page;
+
+ if (inode->i_size < lnb->offset + lnb->len - 1)
+ lnb->rc = inode->i_size - lnb->offset;
+ else
+ lnb->rc = lnb->len;
+
+ if (PageUptodate(page)) {
+ unlock_page(page);
+ return 0;
+ }
+
+ rc = mapping->a_ops->readpage(NULL, page);
+ if (rc < 0) {
+ CERROR("page index %lu, rc = %d\n", index, rc);
+ lnb->page = NULL;
+ lustre_put_page(page);
+ return lnb->rc = rc;
+ }
+
+ return 0;
+}
+
+static int filter_finish_page_read(struct niobuf_local *lnb)
+{
+ if (lnb->page == NULL)
+ return 0;
+
+ if (PageUptodate(lnb->page))
+ return 0;
+
+ wait_on_page(lnb->page);
+ if (!PageUptodate(lnb->page)) {
+ CERROR("page index %lu/offset "LPX64" not uptodate\n",
+ lnb->page->index, lnb->offset);
+ GOTO(err_page, lnb->rc = -EIO);
+ }
+ if (PageError(lnb->page)) {
+ CERROR("page index %lu/offset "LPX64" has error\n",
+ lnb->page->index, lnb->offset);
+ GOTO(err_page, lnb->rc = -EIO);
+ }
+
+ return 0;
+
+err_page:
+ lustre_put_page(lnb->page);
+ lnb->page = NULL;
+ return lnb->rc;
+}
+
+static struct page *lustre_get_page_write(struct inode *inode,
+ unsigned long index)
+{
+ struct address_space *mapping = inode->i_mapping;
+ struct page *page;
+ int rc;
+
+ page = grab_cache_page(mapping, index); /* locked page */
+
+ if (!IS_ERR(page)) {
+ /* Note: Called with "O" and "PAGE_SIZE" this is essentially
+ * a no-op for most filesystems, because we write the whole
+ * page. For partial-page I/O this will read in the page.
+ */
+ rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
+ if (rc) {
+ CERROR("page index %lu, rc = %d\n", index, rc);
+ if (rc != -ENOSPC)
+ LBUG();
+ GOTO(err_unlock, rc);
+ }
+ /* XXX not sure if we need this if we are overwriting page */
+ if (PageError(page)) {
+ CERROR("error on page index %lu, rc = %d\n", index, rc);
+ LBUG();
+ GOTO(err_unlock, rc = -EIO);
+ }
+ }
+ return page;
+
+err_unlock:
+ unlock_page(page);
+ lustre_put_page(page);
+ return ERR_PTR(rc);
+}
+
+#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
+int waitfor_one_page(struct page *page)
+{
+ wait_on_page_locked(page);
+ return 0;
+}
+#endif
+
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+/* We should only change the file mtime (and not the ctime, like
+ * update_inode_times() in generic_file_write()) when we only change data.
+ */
+static inline void inode_update_time(struct inode *inode, int ctime_too)
+{
+ time_t now = CURRENT_TIME;
+ if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
+ return;
+ inode->i_mtime = now;
+ if (ctime_too)
+ inode->i_ctime = now;
+ mark_inode_dirty_sync(inode);
+}
+#endif
+
+static int lustre_commit_write(struct niobuf_local *lnb)
+{
+ struct page *page = lnb->page;
+ unsigned from = lnb->offset & ~PAGE_MASK;
+ unsigned to = from + lnb->len;
+ struct inode *inode = page->mapping->host;
+ int err;
+
+ LASSERT(to <= PAGE_SIZE);
+ err = page->mapping->a_ops->commit_write(NULL, page, from, to);
+ if (!err && IS_SYNC(inode))
+ err = waitfor_one_page(page);
+ //SetPageUptodate(page); // the client commit_write will do this
+
+ SetPageReferenced(page);
+ unlock_page(page);
+ lustre_put_page(page);
+ return err;
+}
+
+int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
+ int *pglocked)
+{
+ unsigned long index = lnb->offset >> PAGE_SHIFT;
+ struct address_space *mapping = inode->i_mapping;
+ struct page *page;
+ int rc;
+
+ //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
+ if (*pglocked)
+ page = grab_cache_page_nowait(mapping, index); /* locked page */
+ else
+ page = grab_cache_page(mapping, index); /* locked page */
+
+
+ /* This page is currently locked, so get a temporary page instead. */
+ if (!page) {
+ CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
+ page = alloc_pages(GFP_KERNEL, 0); /* locked page */
+ if (!page) {
+ CERROR("no memory for a temp page\n");
+ GOTO(err, rc = -ENOMEM);
+ }
+ page->index = index;
+ lnb->page = page;
+ lnb->flags |= N_LOCAL_TEMP_PAGE;
+ } else if (!IS_ERR(page)) {
+ (*pglocked)++;
+
+ rc = mapping->a_ops->prepare_write(NULL, page,
+ lnb->offset & ~PAGE_MASK,
+ lnb->len);
+ if (rc) {
+ if (rc != -ENOSPC)
+ CERROR("page index %lu, rc = %d\n", index, rc);
+ GOTO(err_unlock, rc);
+ }
+ /* XXX not sure if we need this if we are overwriting page */
+ if (PageError(page)) {
+ CERROR("error on page index %lu, rc = %d\n", index, rc);
+ LBUG();
+ GOTO(err_unlock, rc = -EIO);
+ }
+ lnb->page = page;
+ }
+
+ return 0;
+
+err_unlock:
+ unlock_page(page);
+ lustre_put_page(page);
+err:
+ return lnb->rc = rc;
+}
+
+/*
+ * We need to balance prepare_write() calls with commit_write() calls.
+ * If the page has been prepared, but we have no data for it, we don't
+ * want to overwrite valid data on disk, but we still need to zero out
+ * data for space which was newly allocated. Like part of what happens
+ * in __block_prepare_write() for newly allocated blocks.
+ *
+ * XXX currently __block_prepare_write() creates buffers for all the
+ * pages, and the filesystems mark these buffers as BH_New if they
+ * were newly allocated from disk. We use the BH_New flag similarly.
+ */
+static int filter_commit_write(struct niobuf_local *lnb, int err)
+{
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+ if (err) {
+ unsigned block_start, block_end;
+ struct buffer_head *bh, *head = lnb->page->buffers;
+ unsigned blocksize = head->b_size;
+
+ /* debugging: just seeing if this ever happens */
+ CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
+ "called for ino %lu:%lu on err %d\n",
+ lnb->page->mapping->host->i_ino, lnb->page->index, err);
+
+ /* Currently one buffer per page, but in the future... */
+ for (bh = head, block_start = 0; bh != head || !block_start;
+ block_start = block_end, bh = bh->b_this_page) {
+ block_end = block_start + blocksize;
+ if (buffer_new(bh)) {
+ memset(kmap(lnb->page) + block_start, 0,
+ blocksize);
+ kunmap(lnb->page);
+ }
+ }
+ }
+#endif
+ return lustre_commit_write(lnb);
+}
+
+int filter_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
+ int objcount, struct obd_ioobj *obj,
+ int niocount, struct niobuf_remote *nb,
+ struct niobuf_local *res, struct obd_trans_info *oti)
+{
+ struct obd_run_ctxt saved;
+ struct obd_device *obd;
+ struct obd_ioobj *o;
+ struct niobuf_remote *rnb;
+ struct niobuf_local *lnb;
+ struct fsfilt_objinfo *fso;
+ struct dentry *dentry;
+ struct inode *inode;
+ int pglocked = 0, rc = 0, i, j, tot_bytes = 0;
+ unsigned long now = jiffies;
+ ENTRY;
+
+ memset(res, 0, niocount * sizeof(*res));
+
+ obd = exp->exp_obd;
+ if (obd == NULL)
+ RETURN(-EINVAL);
+
+ // theoretically we support multi-obj BRW RPCs, but until then...
+ LASSERT(objcount == 1);
+
+ OBD_ALLOC(fso, objcount * sizeof(*fso));
+ if (!fso)
+ RETURN(-ENOMEM);
+
+ push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+
+ for (i = 0, o = obj; i < objcount; i++, o++) {
+ struct filter_dentry_data *fdd;
+
+ LASSERT(o->ioo_bufcnt);
+
+ dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
+
+ if (IS_ERR(dentry))
+ GOTO(out_objinfo, rc = PTR_ERR(dentry));
+
+ fso[i].fso_dentry = dentry;
+ fso[i].fso_bufcnt = o->ioo_bufcnt;
+
+ if (!dentry->d_inode) {
+ CERROR("trying to BRW to non-existent file "LPU64"\n",
+ o->ioo_id);
+ f_dput(dentry);
+ GOTO(out_objinfo, rc = -ENOENT);
+ }
+
+ /* If we ever start to support mutli-object BRW RPCs, we will
+ * need to get locks on mulitple inodes (in order) or use the
+ * DLM to do the locking for us (and use the same locking in
+ * filter_setattr() for truncate). That isn't all, because
+ * there still exists the possibility of a truncate starting
+ * a new transaction while holding the ext3 rwsem = write
+ * while some writes (which have started their transactions
+ * here) blocking on the ext3 rwsem = read => lock inversion.
+ *
+ * The handling gets very ugly when dealing with locked pages.
+ * It may be easier to just get rid of the locked page code
+ * (which has problems of its own) and either discover we do
+ * not need it anymore (i.e. it was a symptom of another bug)
+ * or ensure we get the page locks in an appropriate order.
+ */
+ if (cmd & OBD_BRW_WRITE)
+ down(&dentry->d_inode->i_sem);
+ fdd = dentry->d_fsdata;
+ if (!fdd || !atomic_read(&fdd->fdd_open_count))
+ CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
+ o->ioo_id);
+ }
+
+ if (time_after(jiffies, now + 15 * HZ))
+ CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
+
+ if (cmd & OBD_BRW_WRITE) {
+#warning "FIXME: we need inode->i_sem for each object to protect vs truncate"
+ LASSERT(oti);
+ /* Even worse, we need to get locks on mulitple inodes (in
+ * order) or use the DLM to do the locking for us (and use
+ * the same locking in filter_setattr() for truncate. The
+ * handling gets very ugly when dealing with locked pages.
+ * It may be easier to just get rid of the locked page code
+ * (which has problems of its own) and either discover we do
+ * not need it anymore (i.e. it was a symptom of another bug)
+ * or ensure we get the page locks in an appropriate order.
+ */
+ oti->oti_handle = fsfilt_brw_start(obd, objcount, fso, niocount,
+ oti->oti_handle);
+ if (IS_ERR(oti->oti_handle)) {
+ rc = PTR_ERR(oti->oti_handle);
+ CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+ "error starting transaction: rc = %d\n", rc);
+ oti->oti_handle = NULL;
+ GOTO(out_objinfo, rc);
+ }
+ }
+
+ for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
+ dentry = fso[i].fso_dentry;
+ inode = dentry->d_inode;
+
+ for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
+ if (j == 0)
+ lnb->dentry = dentry;
+ else
+ lnb->dentry = dget(dentry);
+
+ lnb->offset = rnb->offset;
+ lnb->len = rnb->len;
+ lnb->flags = rnb->flags;
+ lnb->start = jiffies;
+
+ if (cmd & OBD_BRW_WRITE) {
+ rc = filter_get_page_write(inode,lnb,&pglocked);
+ if (rc)
+ up(&dentry->d_inode->i_sem);
+ } else if (inode->i_size <= rnb->offset) {
+ /* If there's no more data, abort early.
+ * lnb->page == NULL and lnb->rc == 0, so it's
+ * easy to detect later. */
+ f_dput(dentry);
+ lnb->dentry = NULL;
+ break;
+ } else {
+ rc = filter_start_page_read(inode, lnb);
+ }
+
+ if (rc) {
+ CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+ "page err %u@"LPU64" %u/%u %p: rc %d\n",
+ lnb->len, lnb->offset, j, o->ioo_bufcnt,
+ dentry, rc);
+ f_dput(dentry);
+ GOTO(out_pages, rc);
+ }
+
+ tot_bytes += lnb->len;
+
+ if ((cmd & OBD_BRW_READ) && lnb->rc < lnb->len) {
+ /* Likewise with a partial read */
+ break;
+ }
+ }
+ }
+
+ if (time_after(jiffies, now + 15 * HZ))
+ CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
+
+ if (cmd & OBD_BRW_READ) {
+ lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES,
+ tot_bytes);
+ while (lnb-- > res) {
+ rc = filter_finish_page_read(lnb);
+ if (rc) {
+ CERROR("error page %u@"LPU64" %u %p: rc %d\n",
+ lnb->len, lnb->offset, (int)(lnb - res),
+ lnb->dentry, rc);
+ f_dput(lnb->dentry);
+ GOTO(out_pages, rc);
+ }
+ }
+ } else
+ lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
+ tot_bytes);
+
+ if (time_after(jiffies, now + 15 * HZ))
+ CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
+
+ EXIT;
+out:
+ OBD_FREE(fso, objcount * sizeof(*fso));
+ /* we saved the journal handle into oti->oti_handle instead */
+ current->journal_info = NULL;
+ pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ return rc;
+
+out_pages:
+ while (lnb-- > res) {
+ if (cmd & OBD_BRW_WRITE) {
+ filter_commit_write(lnb, rc);
+ up(&lnb->dentry->d_inode->i_sem);
+ } else {
+ lustre_put_page(lnb->page);
+ }
+ f_dput(lnb->dentry);
+ }
+ if (cmd & OBD_BRW_WRITE) {
+ filter_finish_transno(exp, oti, rc);
+ fsfilt_commit(obd,
+ filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
+ oti->oti_handle, 0);
+ }
+ goto out; /* dropped the dentry refs already (one per page) */
+
+out_objinfo:
+ for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
+ if (cmd & OBD_BRW_WRITE)
+ up(&fso[i].fso_dentry->d_inode->i_sem);
+ f_dput(fso[i].fso_dentry);
+ }
+ goto out;
+}
+
+static int filter_write_locked_page(struct niobuf_local *lnb)
+{
+ struct page *lpage;
+ void *lpage_addr;
+ void *lnb_addr;
+ int rc;
+ ENTRY;
+
+ lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
+ if (IS_ERR(lpage)) {
+ /* It is highly unlikely that we would ever get an error here.
+ * The page we want to get was previously locked, so it had to
+ * have already allocated the space, and we were just writing
+ * over the same data, so there would be no hole in the file.
+ *
+ * XXX: possibility of a race with truncate could exist, need
+ * to check that. There are no guarantees w.r.t.
+ * write order even on a local filesystem, although the
+ * normal response would be to return the number of bytes
+ * successfully written and leave the rest to the app.
+ */
+ rc = PTR_ERR(lpage);
+ CERROR("error getting locked page index %ld: rc = %d\n",
+ lnb->page->index, rc);
+ LBUG();
+ lustre_commit_write(lnb);
+ RETURN(rc);
+ }
+
+ /* 2 kmaps == vanishingly small deadlock opportunity */
+ lpage_addr = kmap(lpage);
+ lnb_addr = kmap(lnb->page);
+
+ memcpy(lpage_addr, lnb_addr, PAGE_SIZE);
+
+ kunmap(lnb->page);
+ kunmap(lpage);
+
+ lustre_put_page(lnb->page);
+
+ lnb->page = lpage;
+ rc = lustre_commit_write(lnb);
+ if (rc)
+ CERROR("error committing locked page %ld: rc = %d\n",
+ lnb->page->index, rc);
+
+ RETURN(rc);
+}
+
+int filter_commitrw(int cmd, struct obd_export *exp,
+ int objcount, struct obd_ioobj *obj,
+ int niocount, struct niobuf_local *res,
+ struct obd_trans_info *oti)
+{
+ struct obd_run_ctxt saved;
+ struct obd_ioobj *o;
+ struct niobuf_local *lnb;
+ struct obd_device *obd = exp->exp_obd;
+ int found_locked = 0, rc = 0, i;
+ int nested_trans = current->journal_info != NULL;
+ unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */
+ ENTRY;
+
+ push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+
+ if (cmd & OBD_BRW_WRITE) {
+ LASSERT(oti);
+ LASSERT(current->journal_info == NULL ||
+ current->journal_info == oti->oti_handle);
+ current->journal_info = oti->oti_handle;
+ }
+
+ for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
+ int j;
+
+ if (cmd & OBD_BRW_WRITE) {
+ inode_update_time(lnb->dentry->d_inode, 1);
+ up(&lnb->dentry->d_inode->i_sem);
+ }
+ for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
+ if (lnb->page == NULL) {
+ continue;
+ }
+
+ if (lnb->flags & N_LOCAL_TEMP_PAGE) {
+ found_locked++;
+ continue;
+ }
+
+ if (time_after(jiffies, lnb->start + 15 * HZ))
+ CERROR("slow commitrw %lus\n",
+ (jiffies - lnb->start) / HZ);
+
+ if (cmd & OBD_BRW_WRITE) {
+ int err = filter_commit_write(lnb, 0);
+
+ if (!rc)
+ rc = err;
+ } else {
+ lustre_put_page(lnb->page);
+ }
+
+ f_dput(lnb->dentry);
+ if (time_after(jiffies, lnb->start + 15 * HZ))
+ CERROR("slow commit_write %lus\n",
+ (jiffies - lnb->start) / HZ);
+ }
+ }
+
+ for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
+ i++, o++) {
+ int j;
+ for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
+ int err;
+ if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
+ continue;
+
+ if (time_after(jiffies, lnb->start + 15 * HZ))
+ CERROR("slow commitrw locked %lus\n",
+ (jiffies - lnb->start) / HZ);
+
+ err = filter_write_locked_page(lnb);
+ if (!rc)
+ rc = err;
+ f_dput(lnb->dentry);
+ found_locked--;
+
+ if (time_after(jiffies, lnb->start + 15 * HZ))
+ CERROR("slow commit_write locked %lus\n",
+ (jiffies - lnb->start) / HZ);
+ }
+ }
+
+ if (cmd & OBD_BRW_WRITE) {
+ /* We just want any dentry for the commit, for now */
+ struct dentry *dparent = filter_parent(obd, S_IFREG, 0);
+ int err;
+
+ rc = filter_finish_transno(exp, oti, rc);
+ err = fsfilt_commit(obd, dparent->d_inode, oti->oti_handle,
+ obd_sync_filter);
+ if (err)
+ rc = err;
+ if (obd_sync_filter)
+ LASSERT(oti->oti_transno <= obd->obd_last_committed);
+ if (time_after(jiffies, now + 15 * HZ))
+ CERROR("slow commitrw commit %lus\n", (jiffies-now)/HZ);
+ }
+
+ LASSERT(nested_trans || current->journal_info == NULL);
+
+ pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ RETURN(rc);
+}
+
+int filter_brw(int cmd, struct lustre_handle *conn,
+ struct lov_stripe_md *lsm, obd_count oa_bufs,
+ struct brw_page *pga, struct obd_trans_info *oti)
+{
+ struct obd_export *exp = class_conn2export(conn);
+ struct obd_ioobj ioo;
+ struct niobuf_local *lnb;
+ struct niobuf_remote *rnb;
+ obd_count i;
+ int ret = 0;
+ ENTRY;
+
+ if (exp == NULL)
+ RETURN(-EINVAL);
+
+ OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
+ OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
+
+ if (lnb == NULL || rnb == NULL)
+ GOTO(out, ret = -ENOMEM);
+
+ for (i = 0; i < oa_bufs; i++) {
+ rnb[i].offset = pga[i].off;
+ rnb[i].len = pga[i].count;
+ }
+
+ ioo.ioo_id = lsm->lsm_object_id;
+ ioo.ioo_gr = 0;
+ ioo.ioo_type = S_IFREG;
+ ioo.ioo_bufcnt = oa_bufs;
+
+ ret = filter_preprw(cmd, exp, NULL, 1, &ioo, oa_bufs, rnb, lnb, oti);
+ if (ret != 0)
+ GOTO(out, ret);
+
+ for (i = 0; i < oa_bufs; i++) {
+ void *virt = kmap(pga[i].pg);
+ obd_off off = pga[i].off & ~PAGE_MASK;
+ void *addr = kmap(lnb[i].page);
+
+ /* 2 kmaps == vanishingly small deadlock opportunity */
+
+ if (cmd & OBD_BRW_WRITE)
+ memcpy(addr + off, virt + off, pga[i].count);
+ else
+ memcpy(virt + off, addr + off, pga[i].count);
+
+ kunmap(addr);
+ kunmap(virt);
+ }
+
+ ret = filter_commitrw(cmd, exp, 1, &ioo, oa_bufs, lnb, oti);
+
+out:
+ if (lnb)
+ OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
+ if (rnb)
+ OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
+ class_export_put(exp);
+ RETURN(ret);
+}
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * linux/fs/obdfilter/filter_log.c
+ *
+ * Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ * Author: Peter Braam <braam@clusterfs.com>
+ * Author: Andreas Dilger <adilger@clusterfs.com>
+ * Author: Phil Schwan <phil@clusterfs.com>
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define DEBUG_SUBSYSTEM S_FILTER
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/version.h>
+
+#include <portals/list.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_fsfilt.h>
+#include <linux/lustre_commit_confd.h>
+
+#include "filter_internal.h"
+
+static struct llog_handle *filter_log_create(struct obd_device *obd);
+
+/* This is a callback from the llog_* functions.
+ * Assumes caller has already pushed us into the kernel context. */
+static int filter_log_close(struct llog_handle *cathandle,
+ struct llog_handle *loghandle)
+{
+ struct llog_object_hdr *llh = loghandle->lgh_hdr;
+ struct file *file = loghandle->lgh_file;
+ struct dentry *dparent = NULL, *dchild = NULL;
+ struct lustre_handle parent_lockh;
+ struct llog_logid *lgl = &loghandle->lgh_cookie.lgc_lgl;
+ int rc;
+ ENTRY;
+
+ /* If we are going to delete this log, grab a ref before we close
+ * it so we don't have to immediately do another lookup. */
+ if (llh->llh_hdr.lth_type != LLOG_CATALOG_MAGIC && llh->llh_count == 0){
+ CDEBUG(D_INODE, "deleting log file "LPX64":%x\n",
+ lgl->lgl_oid, lgl->lgl_ogen);
+ dparent = filter_parent_lock(loghandle->lgh_obd, S_IFREG,
+ lgl->lgl_oid,LCK_PW,&parent_lockh);
+ if (IS_ERR(dparent)) {
+ rc = PTR_ERR(dparent);
+ CERROR("error locking parent, orphan log %*s: rc %d\n",
+ file->f_dentry->d_name.len,
+ file->f_dentry->d_name.name, rc);
+ RETURN(rc);
+ } else {
+ dchild = dget(file->f_dentry);
+ llog_delete_log(cathandle, loghandle);
+ }
+ } else {
+ CDEBUG(D_INODE, "closing log file "LPX64":%x\n",
+ lgl->lgl_oid, lgl->lgl_ogen);
+ }
+
+ rc = filp_close(file, 0);
+
+ llog_free_handle(loghandle); /* also removes loghandle from list */
+
+ if (dchild != NULL) {
+ int err = vfs_unlink(dparent->d_inode, dchild);
+ if (err) {
+ CERROR("error unlinking empty log %*s: rc %d\n",
+ dchild->d_name.len, dchild->d_name.name, err);
+ if (!rc)
+ rc = err;
+ }
+ f_dput(dchild);
+ ldlm_lock_decref(&parent_lockh, LCK_PW);
+ }
+ RETURN(rc);
+}
+
+/* This is a callback from the llog_* functions.
+ * Assumes caller has already pushed us into the kernel context. */
+static struct llog_handle *filter_log_open(struct obd_device *obd,
+ struct llog_cookie *logcookie)
+{
+ struct llog_logid *lgl = &logcookie->lgc_lgl;
+ struct llog_handle *loghandle;
+ struct dentry *dchild;
+ int rc;
+ ENTRY;
+
+ loghandle = llog_alloc_handle();
+ if (!loghandle)
+ RETURN(ERR_PTR(-ENOMEM));
+
+ dchild = filter_fid2dentry(obd, NULL, S_IFREG, lgl->lgl_oid);
+ if (IS_ERR(dchild))
+ GOTO(out_handle, rc = PTR_ERR(dchild));
+
+ if (dchild->d_inode == NULL) {
+ CERROR("logcookie references non-existent object %*s\n",
+ dchild->d_name.len, dchild->d_name.name);
+ GOTO(out_dentry, rc = -ENOENT);
+ }
+
+ if (dchild->d_inode->i_generation != lgl->lgl_ogen) {
+ CERROR("logcookie for %*s had different generation %x != %x\n",
+ dchild->d_name.len, dchild->d_name.name,
+ dchild->d_inode->i_generation, lgl->lgl_ogen);
+ GOTO(out_dentry, rc = -ESTALE);
+ }
+
+ /* dentry_open does a dput(dchild) and mntput(mnt) on error */
+ mntget(obd->u.filter.fo_vfsmnt);
+ loghandle->lgh_file = dentry_open(dchild, obd->u.filter.fo_vfsmnt,
+ O_RDWR);
+ if (IS_ERR(loghandle->lgh_file)) {
+ rc = PTR_ERR(loghandle->lgh_file);
+ CERROR("error opening logfile %*s: rc %d\n",
+ dchild->d_name.len, dchild->d_name.name, rc);
+ GOTO(out_dentry, rc);
+ }
+ memcpy(&loghandle->lgh_cookie, logcookie, sizeof(*logcookie));
+ loghandle->lgh_log_create = filter_log_create;
+ loghandle->lgh_log_open = filter_log_open;
+ loghandle->lgh_log_close = filter_log_close;
+ loghandle->lgh_obd = obd;
+ RETURN(loghandle);
+
+out_dentry:
+ f_dput(dchild);
+out_handle:
+ llog_free_handle(loghandle);
+ RETURN(ERR_PTR(rc));
+}
+
+/* This is a callback from the llog_* functions.
+ * Assumes caller has already pushed us into the kernel context. */
+static struct llog_handle *filter_log_create(struct obd_device *obd)
+{
+ struct filter_obd *filter = &obd->u.filter;
+ struct lustre_handle parent_lockh;
+ struct dentry *dparent, *dchild;
+ struct llog_handle *loghandle;
+ struct file *file;
+ int err, rc;
+ obd_id id;
+ ENTRY;
+
+ loghandle = llog_alloc_handle();
+ if (!loghandle)
+ RETURN(ERR_PTR(-ENOMEM));
+
+ retry:
+ id = filter_next_id(filter);
+
+ dparent = filter_parent_lock(obd, S_IFREG, id, LCK_PW, &parent_lockh);
+ if (IS_ERR(dparent))
+ GOTO(out_ctxt, rc = PTR_ERR(dparent));
+
+ dchild = filter_fid2dentry(obd, dparent, S_IFREG, id);
+ if (IS_ERR(dchild))
+ GOTO(out_lock, rc = PTR_ERR(dchild));
+
+ if (dchild->d_inode != NULL) {
+ /* This would only happen if lastobjid was bad on disk */
+ CERROR("Serious error: objid %*s already exists; is this "
+ "filesystem corrupt? I will try to work around it.\n",
+ dchild->d_name.len, dchild->d_name.name);
+ f_dput(dchild);
+ ldlm_lock_decref(&parent_lockh, LCK_PW);
+ goto retry;
+ }
+
+ rc = vfs_create(dparent->d_inode, dchild, S_IFREG);
+ if (rc) {
+ CERROR("log create failed rc = %d\n", rc);
+ GOTO(out_child, rc);
+ }
+
+ rc = filter_update_server_data(filter->fo_rcvd_filp, filter->fo_fsd);
+ if (rc) {
+ CERROR("can't write lastobjid but log created: rc %d\n",rc);
+ GOTO(out_destroy, rc);
+ }
+
+ /* dentry_open does a dput(dchild) and mntput(mnt) on error */
+ mntget(filter->fo_vfsmnt);
+ file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
+ if (IS_ERR(file)) {
+ rc = PTR_ERR(file);
+ CERROR("error opening log file "LPX64": rc %d\n", id, rc);
+ GOTO(out_destroy, rc);
+ }
+ ldlm_lock_decref(&parent_lockh, LCK_PW);
+
+ loghandle->lgh_file = file;
+ loghandle->lgh_cookie.lgc_lgl.lgl_oid = id;
+ loghandle->lgh_cookie.lgc_lgl.lgl_ogen = dchild->d_inode->i_generation;
+ loghandle->lgh_log_create = filter_log_create;
+ loghandle->lgh_log_open = filter_log_open;
+ loghandle->lgh_log_close = filter_log_close;
+ loghandle->lgh_obd = obd;
+
+ RETURN(loghandle);
+
+out_destroy:
+ err = vfs_unlink(dparent->d_inode, dchild);
+ if (err)
+ CERROR("error unlinking %*s on error: rc %d\n",
+ dchild->d_name.len, dchild->d_name.name, err);
+out_child:
+ f_dput(dchild);
+out_lock:
+ ldlm_lock_decref(&parent_lockh, LCK_PW);
+out_ctxt:
+ llog_free_handle(loghandle);
+ RETURN(ERR_PTR(rc));
+}
+
+/* This is called from filter_setup() and should be single threaded */
+static struct llog_handle *filter_get_catalog(struct obd_device *obd)
+{
+ struct filter_obd *filter = &obd->u.filter;
+ struct filter_server_data *fsd = filter->fo_fsd;
+ struct obd_run_ctxt saved;
+ struct llog_handle *cathandle = NULL;
+ int rc;
+ ENTRY;
+
+ push_ctxt(&saved, &filter->fo_ctxt, NULL);
+ if (fsd->fsd_catalog_oid) {
+ struct llog_cookie catcookie;
+
+ catcookie.lgc_lgl.lgl_oid = le64_to_cpu(fsd->fsd_catalog_oid);
+ catcookie.lgc_lgl.lgl_ogen = le32_to_cpu(fsd->fsd_catalog_ogen);
+ cathandle = filter_log_open(obd, &catcookie);
+ if (IS_ERR(cathandle)) {
+ CERROR("error opening catalog "LPX64":%x: rc %d\n",
+ catcookie.lgc_lgl.lgl_oid,
+ catcookie.lgc_lgl.lgl_ogen,
+ (int)PTR_ERR(cathandle));
+ fsd->fsd_catalog_oid = 0;
+ fsd->fsd_catalog_ogen = 0;
+ }
+ }
+
+ if (!fsd->fsd_catalog_oid) {
+ struct llog_logid *lgl;
+
+ cathandle = filter_log_create(obd);
+ if (IS_ERR(cathandle)) {
+ CERROR("error creating new catalog: rc %d\n",
+ (int)PTR_ERR(cathandle));
+ GOTO(out, cathandle);
+ }
+ lgl = &cathandle->lgh_cookie.lgc_lgl;
+ fsd->fsd_catalog_oid = cpu_to_le64(lgl->lgl_oid);
+ fsd->fsd_catalog_ogen = cpu_to_le32(lgl->lgl_ogen);
+ rc = filter_update_server_data(filter->fo_rcvd_filp, fsd);
+ if (rc) {
+ CERROR("error writing new catalog to disk: rc %d\n",rc);
+ GOTO(out_handle, rc);
+ }
+ }
+
+ rc = llog_init_catalog(cathandle, &obd->u.filter.fo_mdc_uuid);
+ if (rc)
+ GOTO(out_handle, rc);
+out:
+ pop_ctxt(&saved, &filter->fo_ctxt, NULL);
+ RETURN(cathandle);
+
+out_handle:
+ filter_log_close(cathandle, cathandle);
+ cathandle = ERR_PTR(rc);
+ goto out;
+}
+
+static void filter_put_catalog(struct llog_handle *cathandle)
+{
+ struct llog_handle *loghandle, *n;
+ int rc;
+ ENTRY;
+
+ list_for_each_entry_safe(loghandle, n, &cathandle->lgh_list, lgh_list)
+ filter_log_close(cathandle, loghandle);
+
+ rc = filp_close(cathandle->lgh_file, 0);
+ if (rc)
+ CERROR("error closing catalog: rc %d\n", rc);
+
+ llog_free_handle(cathandle);
+ EXIT;
+}
+
+int filter_log_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
+ int num_cookies, struct llog_cookie *logcookies,
+ int flags)
+{
+ struct obd_device *obd = class_conn2obd(conn);
+ struct obd_run_ctxt saved;
+ int rc;
+ ENTRY;
+
+ push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ rc = llog_cancel_records(obd->u.filter.fo_catalog, num_cookies,
+ logcookies);
+ pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+
+ RETURN(rc);
+}
+
+int filter_log_op_create(struct llog_handle *cathandle, struct ll_fid *mds_fid,
+ obd_id oid, obd_count ogen,
+ struct llog_cookie *logcookie)
+{
+ struct llog_create_rec *lcr;
+ int rc;
+ ENTRY;
+
+ OBD_ALLOC(lcr, sizeof(*lcr));
+ if (lcr == NULL)
+ RETURN(-ENOMEM);
+ lcr->lcr_hdr.lth_len = lcr->lcr_end_len = sizeof(*lcr);
+ lcr->lcr_hdr.lth_type = OST_CREATE_REC;
+ lcr->lcr_fid.id = mds_fid->id;
+ lcr->lcr_fid.generation = mds_fid->generation;
+ lcr->lcr_fid.f_type = mds_fid->f_type;
+ lcr->lcr_oid = oid;
+ lcr->lcr_ogen = ogen;
+
+ rc = llog_add_record(cathandle, &lcr->lcr_hdr, logcookie);
+ OBD_FREE(lcr, sizeof(*lcr));
+
+ if (rc > 0) {
+ LASSERT(rc == sizeof(*logcookie));
+ rc = 0;
+ }
+ RETURN(rc);
+}
+
+int filter_log_op_orphan(struct llog_handle *cathandle, obd_id oid,
+ obd_count ogen, struct llog_cookie *logcookie)
+{
+ struct llog_orphan_rec *lor;
+ int rc;
+ ENTRY;
+
+ OBD_ALLOC(lor, sizeof(*lor));
+ if (lor == NULL)
+ RETURN(-ENOMEM);
+ lor->lor_hdr.lth_len = lor->lor_end_len = sizeof(*lor);
+ lor->lor_hdr.lth_type = OST_ORPHAN_REC;
+ lor->lor_oid = oid;
+ lor->lor_ogen = ogen;
+
+ rc = llog_add_record(cathandle, &lor->lor_hdr, logcookie);
+
+ if (rc > 0) {
+ LASSERT(rc == sizeof(*logcookie));
+ rc = 0;
+ }
+ RETURN(rc);
+}
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * linux/fs/obdfilter/filter_san.c
+ *
+ * Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ * Author: Peter Braam <braam@clusterfs.com>
+ * Author: Andreas Dilger <adilger@clusterfs.com>
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define DEBUG_SUBSYSTEM S_FILTER
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/pagemap.h> // XXX kill me soon
+#include <linux/version.h>
+
+#include <linux/obd_class.h>
+#include <linux/lustre_fsfilt.h>
+#include "filter_internal.h"
+
+/* sanobd setup methods - use a specific mount option */
+int filter_san_setup(struct obd_device *obd, obd_count len, void *buf)
+{
+ struct obd_ioctl_data* data = buf;
+ char *option = NULL;
+
+ if (!data->ioc_inlbuf2)
+ RETURN(-EINVAL);
+
+ /* for extN/ext3 filesystem, we must mount it with 'writeback' mode */
+ if (!strcmp(data->ioc_inlbuf2, "extN"))
+ option = "data=writeback";
+ else if (!strcmp(data->ioc_inlbuf2, "ext3"))
+ option = "data=writeback,asyncdel";
+ else
+ LBUG(); /* just a reminder */
+
+ return filter_common_setup(obd, len, buf, option);
+}
+
+int filter_san_preprw(int cmd, struct lustre_handle *conn, int objcount,
+ struct obd_ioobj *obj, int niocount,
+ struct niobuf_remote *nb)
+{
+ struct obd_device *obd;
+ struct obd_ioobj *o = obj;
+ struct niobuf_remote *rnb = nb;
+ int rc = 0;
+ int i;
+ ENTRY;
+
+ obd = class_conn2obd(conn);
+ if (!obd) {
+ CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
+ conn->cookie);
+ RETURN(-EINVAL);
+ }
+
+ for (i = 0; i < objcount; i++, o++) {
+ struct dentry *dentry;
+ struct inode *inode;
+ int (*fs_bmap)(struct address_space *, long);
+ int j;
+
+ dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
+ if (IS_ERR(dentry))
+ GOTO(out, rc = PTR_ERR(dentry));
+ inode = dentry->d_inode;
+ if (!inode) {
+ CERROR("trying to BRW to non-existent file "LPU64"\n",
+ o->ioo_id);
+ f_dput(dentry);
+ GOTO(out, rc = -ENOENT);
+ }
+ fs_bmap = inode->i_mapping->a_ops->bmap;
+
+ for (j = 0; j < o->ioo_bufcnt; j++, rnb++) {
+ long block;
+
+ block = rnb->offset >> inode->i_blkbits;
+
+ if (cmd == OBD_BRW_READ) {
+ block = fs_bmap(inode->i_mapping, block);
+ } else {
+ loff_t newsize = rnb->offset + rnb->len;
+ /* fs_prep_san_write will also update inode
+ * size for us:
+ * (1) new alloced block
+ * (2) existed block but size extented
+ */
+ /* FIXME We could call fs_prep_san_write()
+ * only once for all the blocks allocation.
+ * Now call it once for each block, for
+ * simplicity. And if error happens, we
+ * probably need to release previous alloced
+ * block */
+ rc = fs_prep_san_write(obd, inode, &block,
+ 1, newsize);
+ if (rc)
+ break;
+ }
+
+ rnb->offset = block;
+ }
+ f_dput(dentry);
+ }
+out:
+ RETURN(rc);
+}
+