Whamcloud - gitweb
Work-in-progress commit of the IO path refactoring.
authorzab <zab>
Tue, 22 Jul 2003 06:38:59 +0000 (06:38 +0000)
committerzab <zab>
Tue, 22 Jul 2003 06:38:59 +0000 (06:38 +0000)
- intitiate brw on pages by pushing down through the obd api
- add some data structures for passing and collecting the pages
- batch the pages into rpcs in the osc
- refactor ptlrpc a little to export some ptlrpc_set_wait internals
- temporarily build a
- add a kernel patch that adds page->private to 2.4
- split up some page cache stuff into 2.4 and 2.6 files
- make some progress in tearing out the 2.5 thread brw stuff

lustre/kernel_patches/patches/add_page_private.patch [new file with mode: 0644]
lustre/kernel_patches/pc/add_page_private.pc [new file with mode: 0644]
lustre/llite/rw24.c [new file with mode: 0644]
lustre/llite/rw26.c [new file with mode: 0644]

diff --git a/lustre/kernel_patches/patches/add_page_private.patch b/lustre/kernel_patches/patches/add_page_private.patch
new file mode 100644 (file)
index 0000000..f82fb92
--- /dev/null
@@ -0,0 +1,15 @@
+ include/linux/mm.h |    1 +
+ 1 files changed, 1 insertion(+)
+
+--- linux-2.4.20-b_llpio-l21/include/linux/mm.h~add_page_private       2003-07-21 21:42:50.000000000 -0700
++++ linux-2.4.20-b_llpio-l21-zab/include/linux/mm.h    2003-07-21 21:44:16.000000000 -0700
+@@ -162,6 +162,7 @@ typedef struct page {
+                                          protected by pagemap_lru_lock !! */
+       struct page **pprev_hash;       /* Complement to *next_hash. */
+       struct buffer_head * buffers;   /* Buffer maps us to a disk block. */
++      unsigned long private;
+       /*
+        * On machines where all RAM is mapped into kernel address space,
+
+_
diff --git a/lustre/kernel_patches/pc/add_page_private.pc b/lustre/kernel_patches/pc/add_page_private.pc
new file mode 100644 (file)
index 0000000..476581c
--- /dev/null
@@ -0,0 +1 @@
+include/linux/mm.h
diff --git a/lustre/llite/rw24.c b/lustre/llite/rw24.c
new file mode 100644 (file)
index 0000000..924b8d2
--- /dev/null
@@ -0,0 +1,384 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Lustre Lite I/O page cache for the 2.4 kernel generation
+ *
+ *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <linux/config.h>
+#include <linux/kernel.h>
+#include <linux/mm.h>
+#include <linux/string.h>
+#include <linux/stat.h>
+#include <linux/errno.h>
+#include <linux/smp_lock.h>
+#include <linux/unistd.h>
+#include <linux/version.h>
+#include <asm/system.h>
+#include <asm/uaccess.h>
+
+#include <linux/fs.h>
+#include <linux/iobuf.h>
+#include <linux/stat.h>
+#include <asm/uaccess.h>
+#include <asm/segment.h>
+#include <linux/mm.h>
+#include <linux/pagemap.h>
+#include <linux/smp_lock.h>
+
+#define DEBUG_SUBSYSTEM S_LLITE
+
+#include <linux/lustre_mds.h>
+#include <linux/lustre_lite.h>
+#include "llite_internal.h"
+#include <linux/lustre_compat25.h>
+
+/*
+ * we were asked to read a single page but we're going to try and read a batch
+ * of pages all at once.  this vaguely simulates client-side read-ahead that
+ * is done via ->readpages in 2.5.
+ */
+static int ll_readpage_24(struct file *file, struct page *first_page)
+{
+        struct inode *inode = first_page->mapping->host;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct page *page = first_page;
+        struct list_head *pos;
+        struct brw_page *pgs;
+        struct obdo *oa;
+        unsigned long end_index, extent_end = 0;
+        struct ptlrpc_request_set *set;
+        int npgs = 0, rc = 0, max_pages;
+        ENTRY;
+
+        LASSERT(PageLocked(page));
+        LASSERT(!PageUptodate(page));
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),offset="LPX64"\n",
+               inode->i_ino, inode->i_generation, inode,
+               (((obd_off)page->index) << PAGE_SHIFT));
+        LASSERT(atomic_read(&file->f_dentry->d_inode->i_count) > 0);
+
+        if (inode->i_size <= ((obd_off)page->index) << PAGE_SHIFT) {
+                CERROR("reading beyond EOF\n");
+                memset(kmap(page), 0, PAGE_SIZE);
+                kunmap(page);
+                SetPageUptodate(page);
+                unlock_page(page);
+                RETURN(rc);
+        }
+
+        /* try to read the file's preferred block size in a one-er */
+        end_index = first_page->index +
+                (inode->i_blksize >> PAGE_CACHE_SHIFT);
+        if (end_index > (inode->i_size >> PAGE_CACHE_SHIFT))
+                end_index = inode->i_size >> PAGE_CACHE_SHIFT;
+
+        max_pages = ((end_index - first_page->index) << PAGE_CACHE_SHIFT) >>
+                PAGE_SHIFT;
+        pgs = kmalloc(max_pages * sizeof(*pgs), GFP_USER);
+        if (pgs == NULL)
+                RETURN(-ENOMEM);
+
+        /*
+         * find how far we're allowed to read under the extent ll_file_read
+         * is passing us..
+         */
+        spin_lock(&lli->lli_read_extent_lock);
+        list_for_each(pos, &lli->lli_read_extents) {
+                struct ll_read_extent *rextent;
+                rextent = list_entry(pos, struct ll_read_extent, re_lli_item);
+                if (rextent->re_task != current)
+                        continue;
+
+                if (rextent->re_extent.end + PAGE_SIZE < rextent->re_extent.end)
+                        /* extent wrapping */
+                        extent_end = ~0;
+                else {
+                        extent_end = (rextent->re_extent.end + PAGE_SIZE)
+                                                        << PAGE_CACHE_SHIFT;
+                        /* 32bit indexes, 64bit extents.. */
+                        if (((u64)extent_end >> PAGE_CACHE_SHIFT) <
+                                        rextent->re_extent.end)
+                                extent_end = ~0;
+                }
+                break;
+        }
+        spin_unlock(&lli->lli_read_extent_lock);
+
+        if (extent_end == 0) {
+                static unsigned long next_print;
+                if (time_after(jiffies, next_print)) {
+                        next_print = jiffies + 30 * HZ;
+                        CDEBUG(D_INODE, "mmap readpage - check locks\n");
+                }
+                end_index = page->index + 1;
+        } else if (extent_end < end_index)
+                end_index = extent_end;
+
+        /* to balance the find_get_page ref the other pages get that is
+         * decrefed on teardown.. */
+        page_cache_get(page);
+        do {
+                unsigned long index ;
+
+                pgs[npgs].pg = page;
+                pgs[npgs].off = ((obd_off)page->index) << PAGE_CACHE_SHIFT;
+                pgs[npgs].flag = 0;
+                pgs[npgs].count = PAGE_SIZE;
+                /* XXX Workaround for BA OSTs returning short reads at EOF.
+                 * The linux OST will return the full page, zero-filled at the
+                 * end, which will just overwrite the data we set here.  Bug
+                 * 593 relates to fixing this properly.
+                 */
+                if (inode->i_size < pgs[npgs].off + PAGE_SIZE) {
+                        int count = inode->i_size - pgs[npgs].off;
+                        void *addr = kmap(page);
+                        pgs[npgs].count = count;
+                        //POISON(addr, 0x7c, count);
+                        memset(addr + count, 0, PAGE_SIZE - count);
+                        kunmap(page);
+                }
+
+                npgs++;
+                if (npgs == max_pages)
+                        break;
+
+                /*
+                 * find pages ahead of us that we can read in.
+                 * grab_cache_page waits on pages that are locked so
+                 * we first try find_get_page, which doesn't.  this stops
+                 * the worst case behaviour of racing threads waiting on
+                 * each other, but doesn't remove it entirely.
+                 */
+                for (index = page->index + 1, page = NULL;
+                     page == NULL && index < end_index; index++) {
+
+                        /* see if the page already exists and needs updating */
+                        page = find_get_page(inode->i_mapping, index);
+                        if (page) {
+                                if (Page_Uptodate(page) || TryLockPage(page))
+                                        goto out_release;
+                                if (!page->mapping || Page_Uptodate(page))
+                                        goto out_unlock;
+                        } else {
+                                /* ok, we have to create it.. */
+                                page = grab_cache_page(inode->i_mapping, index);
+                                if (page == NULL)
+                                        continue;
+                                if (Page_Uptodate(page))
+                                        goto out_unlock;
+                        }
+
+                        break;
+
+                out_unlock:
+                        unlock_page(page);
+                out_release:
+                        page_cache_release(page);
+                        page = NULL;
+                }
+
+        } while (page);
+
+        if ((oa = obdo_alloc()) == NULL) {
+                CERROR("ENOMEM allocing obdo\n");
+                rc = -ENOMEM;
+        } else if ((set = ptlrpc_prep_set()) == NULL) {
+                CERROR("ENOMEM allocing request set\n");
+                obdo_free(oa);
+                rc = -ENOMEM;
+        } else {
+                struct ll_file_data *fd = file->private_data;
+
+                oa->o_id = lli->lli_smd->lsm_object_id;
+                memcpy(obdo_handle(oa), &fd->fd_ost_och.och_fh,
+                       sizeof(fd->fd_ost_och.och_fh));
+                oa->o_valid = OBD_MD_FLID | OBD_MD_FLHANDLE;
+                obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME);
+
+                rc = obd_brw_async(OBD_BRW_READ, ll_i2obdconn(inode), oa,
+                                   ll_i2info(inode)->lli_smd, npgs, pgs,
+                                   set, NULL);
+                if (rc == 0)
+                        rc = ptlrpc_set_wait(set);
+                ptlrpc_set_destroy(set);
+                if (rc == 0)
+                        obdo_refresh_inode(inode, oa, oa->o_valid);
+                if (rc && rc != -EIO)
+                        CERROR("error from obd_brw_async: rc = %d\n", rc);
+                obdo_free(oa);
+        }
+
+        while (npgs-- > 0) {
+                page = pgs[npgs].pg;
+
+                if (rc == 0)
+                        SetPageUptodate(page);
+                unlock_page(page);
+                page_cache_release(page);
+        }
+
+        kfree(pgs);
+        RETURN(rc);
+}
+
+void ll_complete_writepage_24(struct obd_client_page *ocp, int rc)
+{
+        struct page *page = ocp->ocp_page;
+
+        LASSERT(page->private == (unsigned long)ocp);
+        LASSERT(PageLocked(page));
+
+#if 0
+        rc = ll_clear_dirty_pages(ll_i2obdconn(inode),
+                                  ll_i2info(inode)->lli_smd,
+                                  page->index, page->index);
+        LASSERT(rc == 0);
+#endif
+        ll_ocp_free(page);
+
+        unlock_page(page);
+}
+
+static int ll_writepage_24(struct page *page)
+{
+        struct inode *inode = page->mapping->host;
+        struct obdo oa;
+        struct obd_export *exp;
+        struct obd_client_page *ocp;
+        int rc;
+        ENTRY;
+
+        CDEBUG(D_CACHE, "page %p [lau %d] inode %p\n", page,
+               PageLaunder(page), inode);
+        LASSERT(PageLocked(page));
+
+        exp = ll_i2obdexp(inode);
+        if (exp == NULL)
+                RETURN(-EINVAL);
+
+        oa.o_id = ll_i2info(inode)->lli_smd->lsm_object_id;
+        oa.o_valid = OBD_MD_FLID;
+        obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
+                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+
+        ocp = ll_ocp_alloc(page);
+        if (IS_ERR(ocp)) 
+                GOTO(out, rc = PTR_ERR(ocp));
+
+        ocp->ocp_callback = ll_complete_writepage_24;
+        ocp->ocp_flag = OBD_BRW_CREATE|OBD_BRW_FROM_GRANT;
+
+        rc = obd_brw_async_ocp(OBD_BRW_WRITE, exp, &oa, 
+                               ll_i2info(inode)->lli_smd, ocp,
+                               ll_i2sbi(inode)->ll_lc.lc_set, NULL);
+        if (rc == 0)
+                rc = obd_brw_async_barrier(OBD_BRW_WRITE, exp, 
+                                           ll_i2info(inode)->lli_smd,
+                                           ll_i2sbi(inode)->ll_lc.lc_set);
+out:
+        RETURN(rc);
+}
+
+static int ll_direct_IO_24(int rw, struct inode *inode, struct kiobuf *iobuf,
+                           unsigned long blocknr, int blocksize)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct lov_stripe_md *lsm = lli->lli_smd;
+        struct brw_page *pga;
+        struct ptlrpc_request_set *set;
+        struct obdo oa;
+        int length, i, flags, rc = 0;
+        loff_t offset;
+        ENTRY;
+
+        if (!lsm || !lsm->lsm_object_id)
+                RETURN(-EBADF);
+
+        if ((iobuf->offset & (blocksize - 1)) ||
+            (iobuf->length & (blocksize - 1)))
+                RETURN(-EINVAL);
+
+        set = ptlrpc_prep_set();
+        if (set == NULL)
+                RETURN(-ENOMEM);
+
+        OBD_ALLOC(pga, sizeof(*pga) * iobuf->nr_pages);
+        if (!pga) {
+                ptlrpc_set_destroy(set);
+                RETURN(-ENOMEM);
+        }
+
+        flags = (rw == WRITE ? OBD_BRW_CREATE : 0) /* | OBD_BRW_DIRECTIO */;
+        offset = ((obd_off)blocknr << inode->i_blkbits);
+        length = iobuf->length;
+
+        for (i = 0, length = iobuf->length; length > 0;
+             length -= pga[i].count, offset += pga[i].count, i++) { /*i last!*/
+                pga[i].pg = iobuf->maplist[i];
+                pga[i].off = offset;
+                /* To the end of the page, or the length, whatever is less */
+                pga[i].count = min_t(int, PAGE_SIZE - (offset & ~PAGE_MASK),
+                                     length);
+                pga[i].flag = flags;
+                if (rw == READ) {
+                        //POISON(kmap(iobuf->maplist[i]), 0xc5, PAGE_SIZE);
+                        //kunmap(iobuf->maplist[i]);
+                }
+        }
+
+        oa.o_id = lsm->lsm_object_id;
+        oa.o_valid = OBD_MD_FLID;
+        obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
+                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+
+        if (rw == WRITE)
+                lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
+                                    LPROC_LL_DIRECT_WRITE, iobuf->length);
+        else
+                lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
+                                    LPROC_LL_DIRECT_READ, iobuf->length);
+        rc = obd_brw_async(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
+                           ll_i2obdconn(inode), &oa, lsm, iobuf->nr_pages, pga,
+                           set, NULL);
+        if (rc) {
+                CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+                       "error from obd_brw_async: rc = %d\n", rc);
+        } else {
+                rc = ptlrpc_set_wait(set);
+                if (rc)
+                        CERROR("error from callback: rc = %d\n", rc);
+        }
+        ptlrpc_set_destroy(set);
+        if (rc == 0)
+                rc = iobuf->length;
+
+        OBD_FREE(pga, sizeof(*pga) * iobuf->nr_pages);
+        RETURN(rc);
+}
+
+struct address_space_operations ll_aops = {
+        readpage: ll_readpage_24,
+        direct_IO: ll_direct_IO_24,
+        writepage: ll_writepage_24,
+        sync_page: block_sync_page, /* XXX what's this? */
+        prepare_write: ll_prepare_write,
+        commit_write: ll_commit_write,
+        bmap: NULL
+};
diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c
new file mode 100644 (file)
index 0000000..20d3c52
--- /dev/null
@@ -0,0 +1,226 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Lustre Lite I/O page cache routines for the 2.5/2.6 kernel generation
+ *
+ *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <linux/config.h>
+#include <linux/kernel.h>
+#include <linux/mm.h>
+#include <linux/string.h>
+#include <linux/stat.h>
+#include <linux/errno.h>
+#include <linux/smp_lock.h>
+#include <linux/unistd.h>
+#include <linux/version.h>
+#include <asm/system.h>
+#include <asm/uaccess.h>
+
+#include <linux/fs.h>
+#include <linux/buffer_head.h>
+#include <linux/mpage.h>
+#include <linux/writeback.h>
+#include <linux/stat.h>
+#include <asm/uaccess.h>
+#include <asm/segment.h>
+#include <linux/mm.h>
+#include <linux/pagemap.h>
+#include <linux/smp_lock.h>
+
+#define DEBUG_SUBSYSTEM S_LLITE
+
+#include <linux/lustre_mds.h>
+#include <linux/lustre_lite.h>
+#include "llite_internal.h"
+#include <linux/lustre_compat25.h>
+
+/* in 2.5 we hope that significant read traffic will come through
+ * readpages and will be nicely batched by read-ahead, this is just
+ * to pick up the rest.  */
+static int ll_readpage_26(struct file *file, struct page *page)
+{
+        ENTRY;
+
+        CDEBUG(D_CACHE, "page %p ind %lu inode %p\n", page, page->index,
+                        page->mapping->host);
+
+        LASSERT(PageLocked(page));
+        LASSERT(!PageUptodate(page));
+        LASSERT(page->private == 0);
+
+        /* put it in the list that lliod will use */
+        page_cache_get(page);
+        lliod_give_page(page->mapping->host, page, OBD_BRW_READ);
+        lliod_wakeup(page->mapping->host);
+
+        RETURN(0);
+}
+
+void ll_end_writeback_26(struct inode *inode, struct page *page)
+{
+        int rc;
+        ENTRY;
+        LASSERT(PageWriteback(page));
+        rc = ll_clear_dirty_pages(ll_i2obdconn(inode),
+                                  ll_i2info(inode)->lli_smd,
+                                  page->index, page->index);
+        LASSERT(rc == 0);
+        end_page_writeback(page);
+        EXIT;
+}
+
+static int ll_writepage_26(struct page *page, struct writeback_control *wbc)
+{
+        struct inode *inode = page->mapping->host;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        ENTRY;
+
+        LASSERT(PageLocked(page));
+        LASSERT(!PageWriteback(page));
+        LASSERT(page->private == 0);
+
+        CDEBUG(D_CACHE, "page %p [wb %d] inode %p\n", page,
+               PageWriteback(page), inode);
+
+        /* tell the vm that we're busy with the page */
+        SetPageWriteback(page);
+        unlock_page(page);
+
+        /* put it in the list that lliod will use */
+        page_cache_get(page);
+
+        lliod_give_page(inode, page, OBD_BRW_WRITE);
+        
+        if ((atomic_read(&lli->lli_in_writepages) == 0) || 
+            ((lli->lli_pl_write.pl_num << PAGE_SHIFT) > inode->i_blksize) )
+                lliod_wakeup(inode);
+
+        RETURN(0);
+}
+
+static int ll_writepages(struct address_space *mapping, 
+                         struct writeback_control *wbc)
+{
+        struct ll_inode_info *lli = ll_i2info(mapping->host);
+        int rc;
+        ENTRY;
+
+        atomic_inc(&lli->lli_in_writepages);
+
+        rc = mpage_writepages(mapping, wbc, NULL);
+
+        if (atomic_dec_and_test(&lli->lli_in_writepages)) 
+                lliod_wakeup(mapping->host);
+
+        RETURN(rc);
+}
+
+#if 0 /* XXX need to complete this */
+static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
+                        unsigned long blocknr, int blocksize)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct lov_stripe_md *lsm = lli->lli_smd;
+        struct brw_page *pga;
+        struct ptlrpc_request_set *set;
+        struct obdo oa;
+        int length, i, flags, rc = 0;
+        loff_t offset;
+        ENTRY;
+
+        if (!lsm || !lsm->lsm_object_id)
+                RETURN(-EBADF);
+
+        if ((iobuf->offset & (blocksize - 1)) ||
+            (iobuf->length & (blocksize - 1)))
+                RETURN(-EINVAL);
+
+        set = ptlrpc_prep_set();
+        if (set == NULL)
+                RETURN(-ENOMEM);
+
+        OBD_ALLOC(pga, sizeof(*pga) * iobuf->nr_pages);
+        if (!pga) {
+                ptlrpc_set_destroy(set);
+                RETURN(-ENOMEM);
+        }
+
+        flags = (rw == WRITE ? OBD_BRW_CREATE : 0) /* | OBD_BRW_DIRECTIO */;
+        offset = ((obd_off)blocknr << inode->i_blkbits);
+        length = iobuf->length;
+
+        for (i = 0, length = iobuf->length; length > 0;
+             length -= pga[i].count, offset += pga[i].count, i++) { /*i last!*/
+                pga[i].pg = iobuf->maplist[i];
+                pga[i].off = offset;
+                /* To the end of the page, or the length, whatever is less */
+                pga[i].count = min_t(int, PAGE_SIZE - (offset & ~PAGE_MASK),
+                                     length);
+                pga[i].flag = flags;
+                if (rw == READ) {
+                        //POISON(kmap(iobuf->maplist[i]), 0xc5, PAGE_SIZE);
+                        //kunmap(iobuf->maplist[i]);
+                }
+        }
+
+        oa.o_id = lsm->lsm_object_id;
+        oa.o_valid = OBD_MD_FLID;
+        obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
+                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+
+        if (rw == WRITE)
+                lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
+                                    LPROC_LL_DIRECT_WRITE, iobuf->length);
+        else
+                lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
+                                    LPROC_LL_DIRECT_READ, iobuf->length);
+        rc = obd_brw_async(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
+                           ll_i2obdconn(inode), &oa, lsm, iobuf->nr_pages, pga,
+                           set, NULL);
+        if (rc) {
+                CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+                       "error from obd_brw_async: rc = %d\n", rc);
+        } else {
+                rc = ptlrpc_set_wait(set);
+                if (rc)
+                        CERROR("error from callback: rc = %d\n", rc);
+        }
+        ptlrpc_set_destroy(set);
+        if (rc == 0)
+                rc = iobuf->length;
+
+        OBD_FREE(pga, sizeof(*pga) * iobuf->nr_pages);
+        RETURN(rc);
+}
+#endif
+
+struct address_space_operations ll_aops = {
+        readpage: ll_readpage_26,
+#if 0
+        direct_IO: ll_direct_IO_26,
+#endif
+        writepage: ll_writepage_26,
+        writepages: ll_writepages,
+        set_page_dirty: __set_page_dirty_nobuffers,
+        sync_page: block_sync_page,
+        prepare_write: ll_prepare_write,
+        commit_write: ll_commit_write,
+        bmap: NULL
+};