Whamcloud - gitweb
b=22310 temporary fix: align readahead window end to 1M rpc boundary.
[fs/lustre-release.git] / lustre / llite / rw.c
index 806618b..df40ae6 100644 (file)
@@ -1,27 +1,44 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * Lustre Lite I/O Page Cache
+ * GPL HEADER START
  *
- *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   This file is part of Lustre, http://www.lustre.org.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/llite/rw.c
+ *
+ * Lustre Lite I/O page cache routines shared by different kernel revs
  */
 
-#include <linux/config.h>
+#include <linux/autoconf.h>
 #include <linux/kernel.h>
 #include <linux/mm.h>
 #include <linux/string.h>
 #include <asm/uaccess.h>
 
 #include <linux/fs.h>
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
-#include <linux/buffer_head.h>
-#include <linux/mpage.h>
-#include <linux/writeback.h>
-#else
-#include <linux/iobuf.h>
-#endif
 #include <linux/stat.h>
 #include <asm/uaccess.h>
-#include <asm/segment.h>
 #include <linux/mm.h>
 #include <linux/pagemap.h>
 #include <linux/smp_lock.h>
+/* current_is_kswapd() */
+#include <linux/swap.h>
 
 #define DEBUG_SUBSYSTEM S_LLITE
 
-#include <linux/lustre_mds.h>
-#include <linux/lustre_lite.h>
+//#include <lustre_mdc.h>
+#include <lustre_lite.h>
+#include <obd_cksum.h>
 #include "llite_internal.h"
 #include <linux/lustre_compat25.h>
 
-/*
- * Remove page from dirty list
- */
-static void __set_page_clean(struct page *page)
+/* this isn't where truncate starts.   roughly:
+ * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate. setattr_raw grabs
+ * DLM lock on [size, EOF], i_mutex, ->lli_size_sem, and WRITE_I_ALLOC_SEM to
+ * avoid races.
+ *
+ * must be called under ->lli_size_sem */
+void ll_truncate(struct inode *inode)
 {
-        struct address_space *mapping = page->mapping;
-        struct inode *inode;
-
-        if (!mapping)
-                return;
-
-        PGCACHE_WRLOCK(mapping);
+        struct ll_inode_info *lli = ll_i2info(inode);
+        ENTRY;
 
-        list_del(&page->list);
-        list_add(&page->list, &mapping->clean_pages);
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) to %Lu\n",inode->i_ino,
+               inode->i_generation, inode, i_size_read(inode));
 
-        /* XXX doesn't inode_lock protect i_state ? */
-        inode = mapping->host;
-        if (list_empty(&mapping->dirty_pages)) {
-                CDEBUG(D_INODE, "inode clean\n");
-                inode->i_state &= ~I_DIRTY_PAGES;
+        ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_TRUNC, 1);
+        if (lli->lli_size_sem_owner == cfs_current()) {
+                LASSERT_SEM_LOCKED(&lli->lli_size_sem);
+                ll_inode_size_unlock(inode, 0);
         }
 
-        PGCACHE_WRUNLOCK(mapping);
         EXIT;
-}
+        return;
+} /* ll_truncate */
 
-void set_page_clean(struct page *page)
+/**
+ * Finalizes cl-data before exiting typical address_space operation. Dual to
+ * ll_cl_init().
+ */
+static void ll_cl_fini(struct ll_cl_context *lcc)
 {
-        if (PageDirty(page)) {
-                ClearPageDirty(page);
-                __set_page_clean(page);
+        struct lu_env  *env  = lcc->lcc_env;
+        struct cl_io   *io   = lcc->lcc_io;
+        struct cl_page *page = lcc->lcc_page;
+
+        LASSERT(lcc->lcc_cookie == current);
+        LASSERT(env != NULL);
+
+        if (page != NULL) {
+                lu_ref_del(&page->cp_reference, "cl_io", io);
+                cl_page_put(env, page);
+        }
+
+        if (io && lcc->lcc_created) {
+                cl_io_end(env, io);
+                cl_io_unlock(env, io);
+                cl_io_iter_fini(env, io);
+                cl_io_fini(env, io);
         }
+        cl_env_put(env, &lcc->lcc_refcheck);
 }
 
-/* SYNCHRONOUS I/O to object storage for an inode */
-static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
-                  struct page *page, int flags)
+/**
+ * Initializes common cl-data at the typical address_space operation entry
+ * point.
+ */
+static struct ll_cl_context *ll_cl_init(struct file *file,
+                                        struct page *vmpage, int create)
 {
-        struct ll_inode_info *lli = ll_i2info(inode);
-        struct lov_stripe_md *lsm = lli->lli_smd;
-        struct brw_page pg;
-        int rc;
-        ENTRY;
+        struct ll_cl_context *lcc;
+        struct lu_env    *env;
+        struct cl_io     *io;
+        struct cl_object *clob;
+        struct ccc_io    *cio;
+
+        int refcheck;
+        int result = 0;
+
+        clob = ll_i2info(vmpage->mapping->host)->lli_clob;
+        LASSERT(clob != NULL);
+
+        env = cl_env_get(&refcheck);
+        if (IS_ERR(env))
+                return ERR_PTR(PTR_ERR(env));
+
+        lcc = &vvp_env_info(env)->vti_io_ctx;
+        memset(lcc, 0, sizeof(*lcc));
+        lcc->lcc_env = env;
+        lcc->lcc_refcheck = refcheck;
+        lcc->lcc_cookie = current;
+
+        cio = ccc_env_io(env);
+        io = cio->cui_cl.cis_io;
+        if (io == NULL && create) {
+                struct vvp_io *vio;
+                loff_t pos;
+
+                /*
+                 * Loop-back driver calls ->prepare_write() and ->sendfile()
+                 * methods directly, bypassing file system ->write() operation,
+                 * so cl_io has to be created here.
+                 */
 
-        pg.pg = page;
-        pg.off = ((obd_off)page->index) << PAGE_SHIFT;
+                io = &ccc_env_info(env)->cti_io;
+                vio = vvp_env_io(env);
+                ll_io_init(io, file, 1);
 
-        if (cmd == OBD_BRW_WRITE && (pg.off + PAGE_SIZE > inode->i_size))
-                pg.count = inode->i_size % PAGE_SIZE;
-        else
-                pg.count = PAGE_SIZE;
-
-        CDEBUG(D_PAGE, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n",
-               cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
-               pg.off, pg.off);
-        if (pg.count == 0) {
-                CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off "
-                       LPU64"\n",
-                       inode->i_ino, inode, inode->i_size, page->mapping->host,
-                       page->mapping->host->i_size, page->index, pg.off);
+                /* No lock at all for this kind of IO - we can't do it because
+                 * we have held page lock, it would cause deadlock.
+                 * XXX: This causes poor performance to loop device - One page
+                 *      per RPC.
+                 *      In order to get better performance, users should use
+                 *      lloop driver instead.
+                 */
+                io->ci_lockreq = CILR_NEVER;
+
+                pos = (vmpage->index << CFS_PAGE_SHIFT);
+
+                /* Create a temp IO to serve write. */
+                result = cl_io_rw_init(env, io, CIT_WRITE, pos, CFS_PAGE_SIZE);
+                if (result == 0) {
+                        cio->cui_fd = LUSTRE_FPRIVATE(file);
+                        cio->cui_iov = NULL;
+                        cio->cui_nrsegs = 0;
+                        result = cl_io_iter_init(env, io);
+                        if (result == 0) {
+                                result = cl_io_lock(env, io);
+                                if (result == 0)
+                                        result = cl_io_start(env, io);
+                        }
+                } else
+                        result = io->ci_result;
+                lcc->lcc_created = 1;
         }
 
-        pg.flag = flags;
+        lcc->lcc_io = io;
+        if (io == NULL)
+                result = -EIO;
+        if (result == 0) {
+                struct cl_page   *page;
+
+                LASSERT(io != NULL);
+                LASSERT(io->ci_state == CIS_IO_GOING);
+                LASSERT(cio->cui_fd == LUSTRE_FPRIVATE(file));
+                page = cl_page_find(env, clob, vmpage->index, vmpage,
+                                    CPT_CACHEABLE);
+                if (!IS_ERR(page)) {
+                        lcc->lcc_page = page;
+                        lu_ref_add(&page->cp_reference, "cl_io", io);
+                        result = 0;
+                } else
+                        result = PTR_ERR(page);
+        }
+        if (result) {
+                ll_cl_fini(lcc);
+                lcc = ERR_PTR(result);
+        }
 
-        if (cmd == OBD_BRW_WRITE)
-                lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
-                                    LPROC_LL_BRW_WRITE, pg.count);
-        else
-                lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
-                                    LPROC_LL_BRW_READ, pg.count);
-        rc = obd_brw(cmd, ll_i2obdconn(inode), oa, lsm, 1, &pg, NULL);
-        if (rc != 0 && rc != -EIO)
-                CERROR("error from obd_brw: rc = %d\n", rc);
+        CDEBUG(D_VFSTRACE, "%lu@"DFID" -> %i %p %p\n",
+               vmpage->index, PFID(lu_object_fid(&clob->co_lu)), result,
+               env, io);
+        return lcc;
+}
 
-        RETURN(rc);
+static struct ll_cl_context *ll_cl_get(void)
+{
+        struct ll_cl_context *lcc;
+        struct lu_env *env;
+        int refcheck;
+
+        env = cl_env_get(&refcheck);
+        LASSERT(!IS_ERR(env));
+        lcc = &vvp_env_info(env)->vti_io_ctx;
+        LASSERT(env == lcc->lcc_env);
+        LASSERT(current == lcc->lcc_cookie);
+        cl_env_put(env, &refcheck);
+
+        /* env has got in ll_cl_init, so it is still usable. */
+        return lcc;
 }
 
-/*
- * we were asked to read a single page but we're going to try and read a batch
- * of pages all at once.  this vaguely simulates 2.5's readpages.
+/**
+ * ->prepare_write() address space operation called by generic_file_write()
+ * for every page during write.
  */
-static int ll_readpage(struct file *file, struct page *first_page)
+int ll_prepare_write(struct file *file, struct page *vmpage, unsigned from,
+                     unsigned to)
 {
-        struct inode *inode = first_page->mapping->host;
-        struct ll_inode_info *lli = ll_i2info(inode);
-        struct page *page = first_page;
-        struct list_head *pos;
-        struct brw_page *pgs;
-        struct obdo *oa;
-        unsigned long end_index, extent_end = 0;
-        struct ptlrpc_request_set *set;
-        int npgs = 0, rc = 0, max_pages;
+        struct ll_cl_context *lcc;
+        int result;
         ENTRY;
 
-        LASSERT(PageLocked(page));
-        LASSERT(!PageUptodate(page));
-        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),offset="LPX64"\n",
-               inode->i_ino, inode->i_generation, inode,
-               (((obd_off)page->index) << PAGE_SHIFT));
-        LASSERT(atomic_read(&file->f_dentry->d_inode->i_count) > 0);
-
-        if (inode->i_size <= ((obd_off)page->index) << PAGE_SHIFT) {
-                CERROR("reading beyond EOF\n");
-                memset(kmap(page), 0, PAGE_SIZE);
-                kunmap(page);
-                SetPageUptodate(page);
-                unlock_page(page);
-                RETURN(rc);
+        lcc = ll_cl_init(file, vmpage, 1);
+        if (!IS_ERR(lcc)) {
+                struct lu_env  *env = lcc->lcc_env;
+                struct cl_io   *io  = lcc->lcc_io;
+                struct cl_page *page = lcc->lcc_page;
+
+                cl_page_assume(env, io, page);
+                if (cl_io_is_append(io)) {
+                        struct cl_object   *obj   = io->ci_obj;
+                        struct inode       *inode = ccc_object_inode(obj);
+                        /**
+                         * In VFS file->page write loop, for appending, the
+                         * write offset might be reset according to the new
+                         * file size before holding i_mutex. So crw_pos should
+                         * be reset here. BUG:17711.
+                         */
+                        io->u.ci_wr.wr.crw_pos = i_size_read(inode);
+                }
+                result = cl_io_prepare_write(env, io, page, from, to);
+                if (result == 0) {
+                        /*
+                         * Add a reference, so that page is not evicted from
+                         * the cache until ->commit_write() is called.
+                         */
+                        cl_page_get(page);
+                        lu_ref_add(&page->cp_reference, "prepare_write",
+                                   cfs_current());
+                } else {
+                        cl_page_unassume(env, io, page);
+                        ll_cl_fini(lcc);
+                }
+                /* returning 0 in prepare assumes commit must be called
+                 * afterwards */
+        } else {
+                result = PTR_ERR(lcc);
         }
+        RETURN(result);
+}
 
-        /* try to read the file's preferred block size in a one-er */
-        end_index = first_page->index +
-                (inode->i_blksize >> PAGE_CACHE_SHIFT);
-        if (end_index > (inode->i_size >> PAGE_CACHE_SHIFT))
-                end_index = inode->i_size >> PAGE_CACHE_SHIFT;
+int ll_commit_write(struct file *file, struct page *vmpage, unsigned from,
+                    unsigned to)
+{
+        struct ll_cl_context *lcc;
+        struct lu_env    *env;
+        struct cl_io     *io;
+        struct cl_page   *page;
+        int result;
+        ENTRY;
 
-        max_pages = ((end_index - first_page->index) << PAGE_CACHE_SHIFT) >>
-                PAGE_SHIFT;
-        OBD_ALLOC_GFP(pgs, max_pages * sizeof(*pgs), GFP_USER);
-        if (pgs == NULL)
-                RETURN(-ENOMEM);
+        lcc  = ll_cl_get();
+        env  = lcc->lcc_env;
+        page = lcc->lcc_page;
+        io   = lcc->lcc_io;
 
+        LASSERT(cl_page_is_owned(page, io));
+        result = cl_io_commit_write(env, io, page, from, to);
+        if (cl_page_is_owned(page, io))
+                cl_page_unassume(env, io, page);
         /*
-         * find how far we're allowed to read under the extent ll_file_read
-         * is passing us..
+         * Release reference acquired by cl_io_prepare_write().
          */
-        spin_lock(&lli->lli_read_extent_lock);
-        list_for_each(pos, &lli->lli_read_extents) {
-                struct ll_read_extent *rextent;
-                rextent = list_entry(pos, struct ll_read_extent, re_lli_item);
-                if (rextent->re_task != current)
-                        continue;
-
-                if (rextent->re_extent.end + PAGE_SIZE < rextent->re_extent.end)
-                        /* extent wrapping */
-                        extent_end = ~0;
-                else {
-                        extent_end = (rextent->re_extent.end + PAGE_SIZE)
-                                                        << PAGE_CACHE_SHIFT;
-                        /* 32bit indexes, 64bit extents.. */
-                        if (((u64)extent_end >> PAGE_CACHE_SHIFT) <
-                                        rextent->re_extent.end)
-                                extent_end = ~0;
-                }
-                break;
-        }
-        spin_unlock(&lli->lli_read_extent_lock);
-
-        if (extent_end == 0) {
-                static long next_print;
-                if (time_after(jiffies, next_print)) {
-                        next_print = jiffies + 30 * HZ;
-                        CDEBUG(D_INODE, "mmap readpage - check locks\n");
-                }
-                end_index = page->index + 1;
-        } else if (extent_end < end_index)
-                end_index = extent_end;
-
-        CDEBUG(D_INFO, "max_pages: %d, extent_end: %lu, end_index: %lu, "
-               "i_size: %llu\n",
-               max_pages, extent_end, end_index, inode->i_size);
-
-        /* to balance the find_get_page ref the other pages get that is
-         * decrefed on teardown.. */
-        page_cache_get(page);
-        do {
-                unsigned long index ;
-
-                pgs[npgs].pg = page;
-                pgs[npgs].off = ((obd_off)page->index) << PAGE_CACHE_SHIFT;
-                pgs[npgs].flag = 0;
-                pgs[npgs].count = PAGE_SIZE;
-                /* XXX Workaround for BA OSTs returning short reads at EOF.
-                 * The linux OST will return the full page, zero-filled at the
-                 * end, which will just overwrite the data we set here.  Bug
-                 * 593 relates to fixing this properly.
-                 */
-                if (inode->i_size < pgs[npgs].off + PAGE_SIZE) {
-                        int count = inode->i_size - pgs[npgs].off;
-                        void *addr = kmap(page);
-                        pgs[npgs].count = count;
-                        //POISON(addr, 0x7c, count);
-                        memset(addr + count, 0, PAGE_SIZE - count);
-                        kunmap(page);
-                }
+        lu_ref_del(&page->cp_reference, "prepare_write", cfs_current());
+        cl_page_put(env, page);
+        ll_cl_fini(lcc);
+        RETURN(result);
+}
 
-                npgs++;
-                if (npgs == max_pages)
-                        break;
+struct obd_capa *cl_capa_lookup(struct inode *inode, enum cl_req_type crt)
+{
+        __u64 opc;
 
-                /*
-                 * find pages ahead of us that we can read in.
-                 * grab_cache_page waits on pages that are locked so
-                 * we first try find_get_page, which doesn't.  this stops
-                 * the worst case behaviour of racing threads waiting on
-                 * each other, but doesn't remove it entirely.
-                 */
-                for (index = page->index + 1, page = NULL;
-                     page == NULL && index < end_index; index++) {
-
-                        /* see if the page already exists and needs updating */
-                        page = find_get_page(inode->i_mapping, index);
-                        if (page) {
-                                if (Page_Uptodate(page) || TryLockPage(page))
-                                        goto out_release;
-                                if (!page->mapping || Page_Uptodate(page))
-                                        goto out_unlock;
-                        } else {
-                                /* ok, we have to create it.. */
-                                page = grab_cache_page(inode->i_mapping, index);
-                                if (page == NULL)
-                                        continue;
-                                if (Page_Uptodate(page))
-                                        goto out_unlock;
-                        }
+        opc = crt == CRT_WRITE ? CAPA_OPC_OSS_WRITE : CAPA_OPC_OSS_RW;
+        return ll_osscapa_get(inode, opc);
+}
 
-                        break;
+static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which);
 
-                out_unlock:
-                        unlock_page(page);
-                out_release:
-                        page_cache_release(page);
-                        page = NULL;
-                }
+/* WARNING: This algorithm is used to reduce the contention on
+ * sbi->ll_lock. It should work well if the ra_max_pages is much
+ * greater than the single file's read-ahead window.
+ *
+ * TODO: There may exist a `global sync problem' in this implementation.
+ * Considering the global ra window is 100M, and each file's ra window is 10M,
+ * there are over 10 files trying to get its ra budget and reach
+ * ll_ra_count_get at the exactly same time. All of them will get a zero ra
+ * window, although the global window is 100M. -jay
+ */
+static unsigned long ll_ra_count_get(struct ll_sb_info *sbi, unsigned long len)
+{
+        struct ll_ra_info *ra = &sbi->ll_ra_info;
+        unsigned long ret;
+        ENTRY;
 
-        } while (page);
+        /**
+         * If read-ahead pages left are less than 1M, do not do read-ahead,
+         * otherwise it will form small read RPC(< 1M), which hurt server
+         * performance a lot.
+         */
+        ret = min(ra->ra_max_pages - cfs_atomic_read(&ra->ra_cur_pages), len);
+        if ((int)ret < 0 || ret < min((unsigned long)PTLRPC_MAX_BRW_PAGES, len))
+                GOTO(out, ret = 0);
 
-        if ((oa = obdo_alloc()) == NULL) {
-                CERROR("ENOMEM allocing obdo\n");
-                rc = -ENOMEM;
-        } else if ((set = ptlrpc_prep_set()) == NULL) {
-                CERROR("ENOMEM allocing request set\n");
-                obdo_free(oa);
-                rc = -ENOMEM;
-        } else {
-                struct ll_file_data *fd = file->private_data;
-
-                oa->o_id = lli->lli_smd->lsm_object_id;
-                memcpy(obdo_handle(oa), &fd->fd_ost_och.och_fh,
-                       sizeof(fd->fd_ost_och.och_fh));
-                oa->o_valid = OBD_MD_FLID | OBD_MD_FLHANDLE;
-                obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME);
-
-                rc = obd_brw_async(OBD_BRW_READ, ll_i2obdconn(inode), oa,
-                                   ll_i2info(inode)->lli_smd, npgs, pgs,
-                                   set, NULL);
-                if (rc == 0)
-                        rc = ptlrpc_set_wait(set);
-                ptlrpc_set_destroy(set);
-                if (rc == 0) {
-                        /* bug 1598: don't clobber blksize */
-                        oa->o_valid &= ~(OBD_MD_FLSIZE | OBD_MD_FLBLKSZ);
-                        obdo_refresh_inode(inode, oa, oa->o_valid);
-                }
-                if (rc && rc != -EIO)
-                        CERROR("error from obd_brw_async: rc = %d\n", rc);
-                obdo_free(oa);
+        if (cfs_atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) {
+                cfs_atomic_sub(ret, &ra->ra_cur_pages);
+                ret = 0;
         }
+out:
+        RETURN(ret);
+}
 
-        while (npgs-- > 0) {
-                page = pgs[npgs].pg;
+void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len)
+{
+        struct ll_ra_info *ra = &sbi->ll_ra_info;
+        cfs_atomic_sub(len, &ra->ra_cur_pages);
+}
 
-                if (rc == 0)
-                        SetPageUptodate(page);
-                unlock_page(page);
-                page_cache_release(page);
-        }
+static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which)
+{
+        LASSERTF(which >= 0 && which < _NR_RA_STAT, "which: %u\n", which);
+        lprocfs_counter_incr(sbi->ll_ra_stats, which);
+}
 
-        OBD_FREE(pgs, max_pages * sizeof(*pgs));
-        RETURN(rc);
-} /* ll_readpage */
+void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which)
+{
+        struct ll_sb_info *sbi = ll_i2sbi(mapping->host);
+        ll_ra_stats_inc_sbi(sbi, which);
+}
 
-/* this isn't where truncate starts.   roughly:
- * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate
- * we grab the lock back in setattr_raw to avoid races. */
-void ll_truncate(struct inode *inode)
+#define RAS_CDEBUG(ras) \
+        CDEBUG(D_READA,                                                      \
+               "lrp %lu cr %lu cp %lu ws %lu wl %lu nra %lu r %lu ri %lu"    \
+               "csr %lu sf %lu sp %lu sl %lu \n",                            \
+               ras->ras_last_readpage, ras->ras_consecutive_requests,        \
+               ras->ras_consecutive_pages, ras->ras_window_start,            \
+               ras->ras_window_len, ras->ras_next_readahead,                 \
+               ras->ras_requests, ras->ras_request_index,                    \
+               ras->ras_consecutive_stride_requests, ras->ras_stride_offset, \
+               ras->ras_stride_pages, ras->ras_stride_length)
+
+static int index_in_window(unsigned long index, unsigned long point,
+                           unsigned long before, unsigned long after)
 {
-        struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-        struct obdo oa;
-        int err;
-        ENTRY;
-        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
-               inode->i_generation, inode);
+        unsigned long start = point - before, end = point + after;
 
-        /* object not yet allocated */
-        if (!lsm) {
-                CERROR("truncate on inode %lu with no objects\n", inode->i_ino);
-                EXIT;
-                return;
-        }
+        if (start > point)
+               start = 0;
+        if (end < point)
+               end = ~0;
 
-        /* vmtruncate will just throw away our dirty pages, make sure
-         * we don't think they're still dirty, being careful to round
-         * i_size to the first whole page that was tossed */
-        err = ll_clear_dirty_pages(ll_i2obdconn(inode), lsm,
-                        (inode->i_size + PAGE_CACHE_SIZE-1) >> PAGE_CACHE_SHIFT,
-                        ~0);
-
-        oa.o_id = lsm->lsm_object_id;
-        oa.o_valid = OBD_MD_FLID;
-        obdo_from_inode(&oa, inode, OBD_MD_FLTYPE|OBD_MD_FLMODE|OBD_MD_FLATIME|
-                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
-
-        CDEBUG(D_INFO, "calling punch for "LPX64" (all bytes after %Lu)\n",
-               oa.o_id, inode->i_size);
-
-        /* truncate == punch from new size to absolute end of file */
-        err = obd_punch(ll_i2obdconn(inode), &oa, lsm, inode->i_size,
-                        OBD_OBJECT_EOF, NULL);
-        if (err)
-                CERROR("obd_truncate fails (%d) ino %lu\n", err, inode->i_ino);
-        else
-                obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
-                                          OBD_MD_FLATIME | OBD_MD_FLMTIME |
-                                          OBD_MD_FLCTIME);
+        return start <= index && index <= end;
+}
 
-        EXIT;
-        return;
-} /* ll_truncate */
+static struct ll_readahead_state *ll_ras_get(struct file *f)
+{
+        struct ll_file_data       *fd;
 
-//#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+        fd = LUSTRE_FPRIVATE(f);
+        return &fd->fd_ras;
+}
 
-static int ll_prepare_write(struct file *file, struct page *page, unsigned from,
-                            unsigned to)
+void ll_ra_read_in(struct file *f, struct ll_ra_read *rar)
 {
-        struct inode *inode = page->mapping->host;
-        struct ll_inode_info *lli = ll_i2info(inode);
-        struct ll_file_data *fd = file->private_data;
-        struct lov_stripe_md *lsm = lli->lli_smd;
-        obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
-        struct brw_page pg;
-        struct obdo oa;
-        int rc = 0;
-        ENTRY;
+        struct ll_readahead_state *ras;
 
-        if (!PageLocked(page))
-                LBUG();
+        ras = ll_ras_get(f);
 
-        if (PageUptodate(page))
-                RETURN(0);
+        cfs_spin_lock(&ras->ras_lock);
+        ras->ras_requests++;
+        ras->ras_request_index = 0;
+        ras->ras_consecutive_requests++;
+        rar->lrr_reader = current;
 
-        //POISON(addr + from, 0xca, to - from);
+        cfs_list_add(&rar->lrr_linkage, &ras->ras_read_beads);
+        cfs_spin_unlock(&ras->ras_lock);
+}
 
-        /* Check to see if we should return -EIO right away */
-        pg.pg = page;
-        pg.off = offset;
-        pg.count = PAGE_SIZE;
-        pg.flag = 0;
-        rc = obd_brw(OBD_BRW_CHECK, ll_i2obdconn(inode), NULL, lsm, 1,&pg,NULL);
-        if (rc)
-                RETURN(rc);
+void ll_ra_read_ex(struct file *f, struct ll_ra_read *rar)
+{
+        struct ll_readahead_state *ras;
 
-        /* We're completely overwriting an existing page, so _don't_ set it up
-         * to date until commit_write */
-        if (from == 0 && to == PAGE_SIZE)
-                RETURN(0);
+        ras = ll_ras_get(f);
 
-        /* If are writing to a new page, no need to read old data.
-         * the extent locking and getattr procedures in ll_file_write have
-         * guaranteed that i_size is stable enough for our zeroing needs */
-        if (inode->i_size <= offset) {
-                memset(kmap(page), 0, PAGE_SIZE);
-                kunmap(page);
-                GOTO(prepare_done, rc = 0);
-        }
+        cfs_spin_lock(&ras->ras_lock);
+        cfs_list_del_init(&rar->lrr_linkage);
+        cfs_spin_unlock(&ras->ras_lock);
+}
+
+static struct ll_ra_read *ll_ra_read_get_locked(struct ll_readahead_state *ras)
+{
+        struct ll_ra_read *scan;
 
-        oa.o_id = lsm->lsm_object_id;
-        oa.o_mode = inode->i_mode;
-        memcpy(obdo_handle(&oa), &fd->fd_ost_och.och_fh,
-               sizeof(fd->fd_ost_och.och_fh));
-        oa.o_valid = OBD_MD_FLID |OBD_MD_FLMODE |OBD_MD_FLTYPE |OBD_MD_FLHANDLE;
-
-        rc = ll_brw(OBD_BRW_READ, inode, &oa, page, 0);
-        if (rc == 0) {
-                /* bug 1598: don't clobber blksize */
-                oa.o_valid &= ~(OBD_MD_FLSIZE | OBD_MD_FLBLKSZ);
-                obdo_refresh_inode(inode, &oa, oa.o_valid);
+        cfs_list_for_each_entry(scan, &ras->ras_read_beads, lrr_linkage) {
+                if (scan->lrr_reader == current)
+                        return scan;
         }
+        return NULL;
+}
 
-        EXIT;
- prepare_done:
-        if (rc == 0)
-                SetPageUptodate(page);
+struct ll_ra_read *ll_ra_read_get(struct file *f)
+{
+        struct ll_readahead_state *ras;
+        struct ll_ra_read         *bead;
 
-        return rc;
+        ras = ll_ras_get(f);
+
+        cfs_spin_lock(&ras->ras_lock);
+        bead = ll_ra_read_get_locked(ras);
+        cfs_spin_unlock(&ras->ras_lock);
+        return bead;
 }
 
-/*
- * background file writeback.  This is called regularly from kupdated to write
- * dirty data, from kswapd when memory is low, and from filemap_fdatasync when
- * super blocks or inodes are synced..
+static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io,
+                              struct cl_page_list *queue, struct cl_page *page,
+                              struct page *vmpage)
+{
+        struct ccc_page *cp;
+        int              rc;
+
+        ENTRY;
+
+        rc = 0;
+        cl_page_assume(env, io, page);
+        lu_ref_add(&page->cp_reference, "ra", cfs_current());
+        cp = cl2ccc_page(cl_page_at(page, &vvp_device_type));
+        if (!cp->cpg_defer_uptodate && !Page_Uptodate(vmpage)) {
+                rc = cl_page_is_under_lock(env, io, page);
+                if (rc == -EBUSY) {
+                        cp->cpg_defer_uptodate = 1;
+                        cp->cpg_ra_used = 0;
+                        cl_page_list_add(queue, page);
+                        rc = 1;
+                } else {
+                        cl_page_delete(env, page);
+                        rc = -ENOLCK;
+                }
+        } else
+                /* skip completed pages */
+                cl_page_unassume(env, io, page);
+        lu_ref_del(&page->cp_reference, "ra", cfs_current());
+        cl_page_put(env, page);
+        RETURN(rc);
+}
+
+/**
+ * Initiates read-ahead of a page with given index.
  *
- * obd_brw errors down in _batch_writepage are ignored, so pages are always
- * unlocked.  Also, there is nobody to return an error code to from here - the
- * application may not even be running anymore.
+ * \retval     +ve: page was added to \a queue.
  *
- * this should be async so that things like kswapd can have a chance to
- * free some more pages that our allocating writeback may need, but it isn't
- * yet.
+ * \retval -ENOLCK: there is no extent lock for this part of a file, stop
+ *                  read-ahead.
+ *
+ * \retval  -ve, 0: page wasn't added to \a queue for other reason.
  */
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-static unsigned long ll_local_cache_dirty_pages;
-static unsigned long ll_max_dirty_pages = 20 * 1024 * 1024 / PAGE_SIZE;
+static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
+                              struct cl_page_list *queue,
+                              int index, struct address_space *mapping)
+{
+        struct page      *vmpage;
+        struct cl_object *clob  = ll_i2info(mapping->host)->lli_clob;
+        struct cl_page   *page;
+        enum ra_stat      which = _NR_RA_STAT; /* keep gcc happy */
+        unsigned int      gfp_mask;
+        int               rc    = 0;
+        const char       *msg   = NULL;
 
-static spinlock_t ll_local_cache_page_count_lock = SPIN_LOCK_UNLOCKED;
+        ENTRY;
 
-int ll_rd_dirty_pages(char *page, char **start, off_t off, int count, int *eof,
-                      void *data)
-{
-        unsigned long dirty_count;
-        spin_lock(&ll_local_cache_page_count_lock);
-        dirty_count = ll_local_cache_dirty_pages;
-        spin_unlock(&ll_local_cache_page_count_lock);
-        return snprintf(page, count, "%lu\n", dirty_count);
+        gfp_mask = GFP_HIGHUSER & ~__GFP_WAIT;
+#ifdef __GFP_NOWARN
+        gfp_mask |= __GFP_NOWARN;
+#endif
+        vmpage = grab_cache_page_nowait_gfp(mapping, index, gfp_mask);
+        if (vmpage != NULL) {
+                /* Check if vmpage was truncated or reclaimed */
+                if (vmpage->mapping == mapping) {
+                        page = cl_page_find(env, clob, vmpage->index,
+                                            vmpage, CPT_CACHEABLE);
+                        if (!IS_ERR(page)) {
+                                rc = cl_read_ahead_page(env, io, queue,
+                                                        page, vmpage);
+                                if (rc == -ENOLCK) {
+                                        which = RA_STAT_FAILED_MATCH;
+                                        msg   = "lock match failed";
+                                }
+                        } else {
+                                which = RA_STAT_FAILED_GRAB_PAGE;
+                                msg   = "cl_page_find failed";
+                        }
+                } else {
+                        which = RA_STAT_WRONG_GRAB_PAGE;
+                        msg   = "g_c_p_n returned invalid page";
+                }
+                if (rc != 1)
+                        unlock_page(vmpage);
+                page_cache_release(vmpage);
+        } else {
+                which = RA_STAT_FAILED_GRAB_PAGE;
+                msg   = "g_c_p_n failed";
+        }
+        if (msg != NULL) {
+                ll_ra_stats_inc(mapping, which);
+                CDEBUG(D_READA, "%s\n", msg);
+        }
+        RETURN(rc);
 }
 
-int ll_rd_max_dirty_pages(char *page, char **start, off_t off, int count,
-                          int *eof, void *data)
+#define RIA_DEBUG(ria)                                                       \
+        CDEBUG(D_READA, "rs %lu re %lu ro %lu rl %lu rp %lu\n",       \
+        ria->ria_start, ria->ria_end, ria->ria_stoff, ria->ria_length,\
+        ria->ria_pages)
+
+#define RAS_INCREASE_STEP PTLRPC_MAX_BRW_PAGES
+
+static inline int stride_io_mode(struct ll_readahead_state *ras)
 {
-        unsigned long max_dirty;
-        spin_lock(&ll_local_cache_page_count_lock);
-        max_dirty = ll_max_dirty_pages;
-        spin_unlock(&ll_local_cache_page_count_lock);
-        return snprintf(page, count, "%lu\n", max_dirty);
+        return ras->ras_consecutive_stride_requests > 1;
 }
-
-int ll_wr_max_dirty_pages(struct file *file, const char *buffer,
-                          unsigned long count, void *data)
+/* The function calculates how much pages will be read in
+ * [off, off + length], in such stride IO area,
+ * stride_offset = st_off, stride_lengh = st_len,
+ * stride_pages = st_pgs
+ *
+ *   |------------------|*****|------------------|*****|------------|*****|....
+ * st_off
+ *   |--- st_pgs     ---|
+ *   |-----     st_len   -----|
+ *
+ *              How many pages it should read in such pattern
+ *              |-------------------------------------------------------------|
+ *              off
+ *              |<------                  length                      ------->|
+ *
+ *          =   |<----->|  +  |-------------------------------------| +   |---|
+ *             start_left                 st_pgs * i                    end_left
+ */
+static unsigned long
+stride_pg_count(pgoff_t st_off, unsigned long st_len, unsigned long st_pgs,
+                unsigned long off, unsigned long length)
 {
-        unsigned long max_dirty;
-        signed long max_dirty_signed;
-        char kernbuf[20], *end;
-        
-        if (count > (sizeof(kernbuf) - 1))
-                return -EINVAL;
+        unsigned long start = off > st_off ? off - st_off : 0;
+        unsigned long end = off + length > st_off ? off + length - st_off : 0;
+        unsigned long start_left = 0;
+        unsigned long end_left = 0;
+        unsigned long pg_count;
+
+        if (st_len == 0 || length == 0 || end == 0)
+                return length;
+
+        start_left = do_div(start, st_len);
+        if (start_left < st_pgs)
+                start_left = st_pgs - start_left;
+        else
+                start_left = 0;
 
-        if (copy_from_user(kernbuf, buffer, count))
-                return -EFAULT;
+        end_left = do_div(end, st_len);
+        if (end_left > st_pgs)
+                end_left = st_pgs;
 
-        kernbuf[count] = '\0';
+        CDEBUG(D_READA, "start %lu, end %lu start_left %lu end_left %lu \n",
+               start, end, start_left, end_left);
 
-        max_dirty_signed = simple_strtol(kernbuf, &end, 0);
-        if (kernbuf == end)
-                return -EINVAL;
-        max_dirty = (unsigned long)max_dirty_signed;
+        if (start == end)
+                pg_count = end_left - (st_pgs - start_left);
+        else
+                pg_count = start_left + st_pgs * (end - start - 1) + end_left;
 
-#if 0
-        if (max_dirty < ll_local_cache_dirty_pages)
-                flush_to_new_max_dirty();
-#endif
+        CDEBUG(D_READA, "st_off %lu, st_len %lu st_pgs %lu off %lu length %lu"
+               "pgcount %lu\n", st_off, st_len, st_pgs, off, length, pg_count);
 
-        spin_lock(&ll_local_cache_page_count_lock);
-        CDEBUG(D_CACHE, "changing max_dirty from %lu to %lu\n",
-               ll_max_dirty_pages, max_dirty);
-        ll_max_dirty_pages = max_dirty;
-        spin_unlock(&ll_local_cache_page_count_lock);
-        return count;
+        return pg_count;
 }
 
-static int ll_local_cache_full(void)
+static int ria_page_count(struct ra_io_arg *ria)
 {
-        int full = 0;
-        spin_lock(&ll_local_cache_page_count_lock);
-        if (ll_max_dirty_pages &&
-            ll_local_cache_dirty_pages >= ll_max_dirty_pages) {
-                full = 1;
-        }
-        spin_unlock(&ll_local_cache_page_count_lock);
-        /* XXX instrument? */
-        /* XXX trigger async writeback when full, or 75% of full? */
-        return full;
+        __u64 length = ria->ria_end >= ria->ria_start ?
+                       ria->ria_end - ria->ria_start + 1 : 0;
+
+        return stride_pg_count(ria->ria_stoff, ria->ria_length,
+                               ria->ria_pages, ria->ria_start,
+                               length);
 }
 
-static void ll_local_cache_flushed_pages(unsigned long pgcount)
+/*Check whether the index is in the defined ra-window */
+static int ras_inside_ra_window(unsigned long idx, struct ra_io_arg *ria)
 {
-        unsigned long dirty_count;
-        spin_lock(&ll_local_cache_page_count_lock);
-        dirty_count = ll_local_cache_dirty_pages;
-        ll_local_cache_dirty_pages -= pgcount;
-        CDEBUG(D_CACHE, "dirty pages: %lu->%lu)\n",
-               dirty_count, ll_local_cache_dirty_pages);
-        spin_unlock(&ll_local_cache_page_count_lock);
-        LASSERT(dirty_count >= pgcount);
+        /* If ria_length == ria_pages, it means non-stride I/O mode,
+         * idx should always inside read-ahead window in this case
+         * For stride I/O mode, just check whether the idx is inside
+         * the ria_pages. */
+        return ria->ria_length == 0 || ria->ria_length == ria->ria_pages ||
+               (idx - ria->ria_stoff) % ria->ria_length < ria->ria_pages;
 }
 
-static void ll_local_cache_dirtied_pages(unsigned long pgcount)
+static int ll_read_ahead_pages(const struct lu_env *env,
+                               struct cl_io *io, struct cl_page_list *queue,
+                               struct ra_io_arg *ria,
+                               unsigned long *reserved_pages,
+                               struct address_space *mapping,
+                               unsigned long *ra_end)
 {
-        unsigned long dirty_count;
-        spin_lock(&ll_local_cache_page_count_lock);
-        dirty_count = ll_local_cache_dirty_pages;
-        ll_local_cache_dirty_pages += pgcount;
-        CDEBUG(D_CACHE, "dirty pages: %lu->%lu\n",
-               dirty_count, ll_local_cache_dirty_pages);
-        spin_unlock(&ll_local_cache_page_count_lock);
-        /* XXX track maximum cached, report to lprocfs */
+        int rc, count = 0, stride_ria;
+        unsigned long page_idx;
+
+        LASSERT(ria != NULL);
+        RIA_DEBUG(ria);
+
+        stride_ria = ria->ria_length > ria->ria_pages && ria->ria_pages > 0;
+        for (page_idx = ria->ria_start; page_idx <= ria->ria_end &&
+                        *reserved_pages > 0; page_idx++) {
+                if (ras_inside_ra_window(page_idx, ria)) {
+                        /* If the page is inside the read-ahead window*/
+                        rc = ll_read_ahead_page(env, io, queue,
+                                                page_idx, mapping);
+                        if (rc == 1) {
+                                (*reserved_pages)--;
+                                count ++;
+                        } else if (rc == -ENOLCK)
+                                break;
+                } else if (stride_ria) {
+                        /* If it is not in the read-ahead window, and it is
+                         * read-ahead mode, then check whether it should skip
+                         * the stride gap */
+                        pgoff_t offset;
+                        /* FIXME: This assertion only is valid when it is for
+                         * forward read-ahead, it will be fixed when backward
+                         * read-ahead is implemented */
+                        LASSERTF(page_idx > ria->ria_stoff, "Invalid page_idx %lu"
+                                "rs %lu re %lu ro %lu rl %lu rp %lu\n", page_idx,
+                                ria->ria_start, ria->ria_end, ria->ria_stoff,
+                                ria->ria_length, ria->ria_pages);
+                        offset = page_idx - ria->ria_stoff;
+                        offset = offset % (ria->ria_length);
+                        if (offset > ria->ria_pages) {
+                                page_idx += ria->ria_length - offset;
+                                CDEBUG(D_READA, "i %lu skip %lu \n", page_idx,
+                                       ria->ria_length - offset);
+                                continue;
+                        }
+                }
+        }
+        *ra_end = page_idx;
+        return count;
 }
 
-int ll_clear_dirty_pages(struct lustre_handle *conn, struct lov_stripe_md *lsm,
-                         unsigned long start, unsigned long end)
+int ll_readahead(const struct lu_env *env, struct cl_io *io,
+                 struct ll_readahead_state *ras, struct address_space *mapping,
+                 struct cl_page_list *queue, int flags)
 {
-        unsigned long cleared;
-        int rc;
-
+        struct vvp_io *vio = vvp_env_io(env);
+        struct vvp_thread_info *vti = vvp_env_info(env);
+        struct cl_attr *attr = ccc_env_thread_attr(env);
+        unsigned long start = 0, end = 0, reserved;
+        unsigned long ra_end, len;
+        struct inode *inode;
+        struct ll_ra_read *bead;
+        struct ra_io_arg *ria = &vti->vti_ria;
+        struct ll_inode_info *lli;
+        struct cl_object *clob;
+        int ret = 0;
+        __u64 kms;
         ENTRY;
-        rc = obd_clear_dirty_pages(conn, lsm, start, end, &cleared);
-        if (!rc)
-                ll_local_cache_flushed_pages(cleared);
-        RETURN(rc);
-}
 
-int ll_mark_dirty_page(struct lustre_handle *conn, struct lov_stripe_md *lsm,
-                       unsigned long index)
-{
-        int rc;
+        inode = mapping->host;
+        lli = ll_i2info(inode);
+        clob = lli->lli_clob;
 
-        ENTRY;
-        if (ll_local_cache_full())
-                RETURN(-EDQUOT);
+        memset(ria, 0, sizeof *ria);
 
-        rc = obd_mark_page_dirty(conn, lsm, index);
-        if (!rc)
-                ll_local_cache_dirtied_pages(1);
-        RETURN(rc);
+        cl_object_attr_lock(clob);
+        ret = cl_object_attr_get(env, clob, attr);
+        cl_object_attr_unlock(clob);
+
+        if (ret != 0)
+                RETURN(ret);
+        kms = attr->cat_kms;
+        if (kms == 0) {
+                ll_ra_stats_inc(mapping, RA_STAT_ZERO_LEN);
+                RETURN(0);
+        }
+
+        cfs_spin_lock(&ras->ras_lock);
+        if (vio->cui_ra_window_set)
+                bead = &vio->cui_bead;
+        else
+                bead = NULL;
+
+        /* Enlarge the RA window to encompass the full read */
+        if (bead != NULL && ras->ras_window_start + ras->ras_window_len <
+            bead->lrr_start + bead->lrr_count) {
+                ras->ras_window_len = bead->lrr_start + bead->lrr_count -
+                                      ras->ras_window_start;
+        }
+        /* Reserve a part of the read-ahead window that we'll be issuing */
+        if (ras->ras_window_len) {
+                start = ras->ras_next_readahead;
+                end = ras->ras_window_start + ras->ras_window_len - 1;
+        }
+        if (end != 0) {
+                unsigned long tmp_end;
+                /* Align RA window to optimal RPC boundary */
+                tmp_end = ((end + 1) & (~(PTLRPC_MAX_BRW_PAGES - 1))) - 1;
+                if (tmp_end > start)
+                        end = tmp_end;
+
+                /* Truncate RA window to end of file */
+                end = min(end, (unsigned long)((kms - 1) >> CFS_PAGE_SHIFT));
+
+                ras->ras_next_readahead = max(end, end + 1);
+                RAS_CDEBUG(ras);
+        }
+        ria->ria_start = start;
+        ria->ria_end = end;
+        /* If stride I/O mode is detected, get stride window*/
+        if (stride_io_mode(ras)) {
+                ria->ria_stoff = ras->ras_stride_offset;
+                ria->ria_length = ras->ras_stride_length;
+                ria->ria_pages = ras->ras_stride_pages;
+        }
+        cfs_spin_unlock(&ras->ras_lock);
+
+        if (end == 0) {
+                ll_ra_stats_inc(mapping, RA_STAT_ZERO_WINDOW);
+                RETURN(0);
+        }
+        len = ria_page_count(ria);
+        if (len == 0)
+                RETURN(0);
+
+        reserved = ll_ra_count_get(ll_i2sbi(inode), len);
+
+        if (reserved < len)
+                ll_ra_stats_inc(mapping, RA_STAT_MAX_IN_FLIGHT);
+
+        CDEBUG(D_READA, "reserved page %lu \n", reserved);
+
+        ret = ll_read_ahead_pages(env, io, queue,
+                                  ria, &reserved, mapping, &ra_end);
+
+        LASSERTF(reserved >= 0, "reserved %lu\n", reserved);
+        if (reserved != 0)
+                ll_ra_count_put(ll_i2sbi(inode), reserved);
+
+        if (ra_end == end + 1 && ra_end == (kms >> CFS_PAGE_SHIFT))
+                ll_ra_stats_inc(mapping, RA_STAT_EOF);
+
+        /* if we didn't get to the end of the region we reserved from
+         * the ras we need to go back and update the ras so that the
+         * next read-ahead tries from where we left off.  we only do so
+         * if the region we failed to issue read-ahead on is still ahead
+         * of the app and behind the next index to start read-ahead from */
+        CDEBUG(D_READA, "ra_end %lu end %lu stride end %lu \n",
+               ra_end, end, ria->ria_end);
+
+        if (ra_end != end + 1) {
+                cfs_spin_lock(&ras->ras_lock);
+                if (ra_end < ras->ras_next_readahead &&
+                    index_in_window(ra_end, ras->ras_window_start, 0,
+                                    ras->ras_window_len)) {
+                        ras->ras_next_readahead = ra_end;
+                               RAS_CDEBUG(ras);
+                }
+                cfs_spin_unlock(&ras->ras_lock);
+        }
+
+        RETURN(ret);
 }
 
-static int ll_writepage(struct page *page)
+static void ras_set_start(struct ll_readahead_state *ras, unsigned long index)
 {
-        struct inode *inode = page->mapping->host;
-        struct obdo oa;
-        ENTRY;
+        ras->ras_window_start = index & (~(RAS_INCREASE_STEP - 1));
+}
 
-        CDEBUG(D_CACHE, "page %p [lau %d] inode %p\n", page,
-               PageLaunder(page), inode);
-        LASSERT(PageLocked(page));
+/* called with the ras_lock held or from places where it doesn't matter */
+static void ras_reset(struct ll_readahead_state *ras, unsigned long index)
+{
+        ras->ras_last_readpage = index;
+        ras->ras_consecutive_requests = 0;
+        ras->ras_consecutive_pages = 0;
+        ras->ras_window_len = 0;
+        ras_set_start(ras, index);
+        ras->ras_next_readahead = max(ras->ras_window_start, index);
+
+        RAS_CDEBUG(ras);
+}
 
-        oa.o_id = ll_i2info(inode)->lli_smd->lsm_object_id;
-        oa.o_valid = OBD_MD_FLID;
-        obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
-                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+/* called with the ras_lock held or from places where it doesn't matter */
+static void ras_stride_reset(struct ll_readahead_state *ras)
+{
+        ras->ras_consecutive_stride_requests = 0;
+        ras->ras_stride_length = 0;
+        ras->ras_stride_pages = 0;
+        RAS_CDEBUG(ras);
+}
 
-        RETURN(ll_batch_writepage(inode, &oa, page));
+void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
+{
+        cfs_spin_lock_init(&ras->ras_lock);
+        ras_reset(ras, 0);
+        ras->ras_requests = 0;
+        CFS_INIT_LIST_HEAD(&ras->ras_read_beads);
 }
 
 /*
- * we really don't want to start writeback here, we want to give callers some
- * time to further dirty the pages before we write them out.
+ * Check whether the read request is in the stride window.
+ * If it is in the stride window, return 1, otherwise return 0.
  */
-static int ll_commit_write(struct file *file, struct page *page,
-                           unsigned from, unsigned to)
+static int index_in_stride_window(unsigned long index,
+                                  struct ll_readahead_state *ras,
+                                  struct inode *inode)
 {
-        struct inode *inode = page->mapping->host;
-        loff_t size;
-        int rc = 0;
-        ENTRY;
+        unsigned long stride_gap = index - ras->ras_last_readpage - 1;
 
-        SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
-        LASSERT(inode == file->f_dentry->d_inode);
-        LASSERT(PageLocked(page));
-
-        CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n",
-               inode, page, from, to, page->index);
-        if (!PageDirty(page)) {
-                lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
-                                     LPROC_LL_DIRTY_MISSES);
-                rc = ll_mark_dirty_page(ll_i2obdconn(inode),
-                                        ll_i2info(inode)->lli_smd,
-                                        page->index);
-                if (rc < 0 && rc != -EDQUOT)
-                        RETURN(rc); /* XXX lproc counter here? */
-        } else {
-                lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
-                                     LPROC_LL_DIRTY_HITS);
+        if (ras->ras_stride_length == 0 || ras->ras_stride_pages == 0 ||
+            ras->ras_stride_pages == ras->ras_stride_length)
+                return 0;
+
+        /* If it is contiguous read */
+        if (stride_gap == 0)
+                return ras->ras_consecutive_pages + 1 <= ras->ras_stride_pages;
+
+        /*Otherwise check the stride by itself */
+        return (ras->ras_stride_length - ras->ras_stride_pages) == stride_gap &&
+             ras->ras_consecutive_pages == ras->ras_stride_pages;
+}
+
+static void ras_update_stride_detector(struct ll_readahead_state *ras,
+                                       unsigned long index)
+{
+        unsigned long stride_gap = index - ras->ras_last_readpage - 1;
+
+        if (!stride_io_mode(ras) && (stride_gap != 0 ||
+             ras->ras_consecutive_stride_requests == 0)) {
+                ras->ras_stride_pages = ras->ras_consecutive_pages;
+                ras->ras_stride_length = stride_gap +ras->ras_consecutive_pages;
         }
+        LASSERT(ras->ras_request_index == 0);
+        LASSERT(ras->ras_consecutive_stride_requests == 0);
 
-        size = (((obd_off)page->index) << PAGE_SHIFT) + to;
-        if (size > inode->i_size)
-                inode->i_size = size;
-
-        SetPageUptodate(page);
-        set_page_dirty(page);
-
-        /* This means that we've hit either the local cache limit or the limit
-         * of the OST's grant. */
-        if (rc == -EDQUOT) {
-                struct ll_file_data *fd = file->private_data;
-                struct obdo oa;
-                int rc;
-
-                oa.o_id = ll_i2info(inode)->lli_smd->lsm_object_id;
-                memcpy(obdo_handle(&oa), &fd->fd_ost_och.och_fh,
-                       sizeof(fd->fd_ost_och.och_fh));
-                oa.o_valid = OBD_MD_FLID | OBD_MD_FLHANDLE;
-                obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
-                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME);
-
-                rc = ll_batch_writepage(inode, &oa, page);
-                lock_page(page); /* caller expects to unlock */
-                RETURN(rc);
+        if (index <= ras->ras_last_readpage) {
+                /*Reset stride window for forward read*/
+                ras_stride_reset(ras);
+                return;
         }
 
-        RETURN(0);
-} /* ll_commit_write */
-#else
-static int ll_writepage(struct page *page,
-                        struct writeback_control *wbc)
+        ras->ras_stride_pages = ras->ras_consecutive_pages;
+        ras->ras_stride_length = stride_gap +ras->ras_consecutive_pages;
+
+        RAS_CDEBUG(ras);
+        return;
+}
+
+static unsigned long
+stride_page_count(struct ll_readahead_state *ras, unsigned long len)
 {
+        return stride_pg_count(ras->ras_stride_offset, ras->ras_stride_length,
+                               ras->ras_stride_pages, ras->ras_stride_offset,
+                               len);
+}
+
+/* Stride Read-ahead window will be increased inc_len according to
+ * stride I/O pattern */
+static void ras_stride_increase_window(struct ll_readahead_state *ras,
+                                       struct ll_ra_info *ra,
+                                       unsigned long inc_len)
+{
+        unsigned long left, step, window_len;
+        unsigned long stride_len;
+
+        LASSERT(ras->ras_stride_length > 0);
+        LASSERTF(ras->ras_window_start + ras->ras_window_len
+                 >= ras->ras_stride_offset, "window_start %lu, window_len %lu"
+                 " stride_offset %lu\n", ras->ras_window_start,
+                 ras->ras_window_len, ras->ras_stride_offset);
+
+        stride_len = ras->ras_window_start + ras->ras_window_len -
+                     ras->ras_stride_offset;
+
+        left = stride_len % ras->ras_stride_length;
+        window_len = ras->ras_window_len - left;
+
+        if (left < ras->ras_stride_pages)
+                left += inc_len;
+        else
+                left = ras->ras_stride_pages + inc_len;
 
-        return 0;
+        LASSERT(ras->ras_stride_pages != 0);
+
+        step = left / ras->ras_stride_pages;
+        left %= ras->ras_stride_pages;
+
+        window_len += step * ras->ras_stride_length + left;
+
+        if (stride_page_count(ras, window_len) <= ra->ra_max_pages_per_file)
+                ras->ras_window_len = window_len;
+
+        RAS_CDEBUG(ras);
 }
-static int ll_commit_write(struct file *file, struct page *page,
-                           unsigned from, unsigned to)
+
+static void ras_increase_window(struct ll_readahead_state *ras,
+                                struct ll_ra_info *ra, struct inode *inode)
 {
-        return 0;
+        /* The stretch of ra-window should be aligned with max rpc_size
+         * but current clio architecture does not support retrieve such
+         * information from lower layer. FIXME later
+         */
+        if (stride_io_mode(ras))
+                ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP);
+        else
+                ras->ras_window_len = min(ras->ras_window_len +
+                                          RAS_INCREASE_STEP,
+                                          ra->ra_max_pages_per_file);
 }
-#endif
 
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
-                        unsigned long blocknr, int blocksize)
+void ras_update(struct ll_sb_info *sbi, struct inode *inode,
+                struct ll_readahead_state *ras, unsigned long index,
+                unsigned hit)
 {
-        struct ll_inode_info *lli = ll_i2info(inode);
-        struct lov_stripe_md *lsm = lli->lli_smd;
-        struct brw_page *pga;
-        struct ptlrpc_request_set *set;
-        struct obdo oa;
-        int length, i, flags, rc = 0;
-        loff_t offset;
+        struct ll_ra_info *ra = &sbi->ll_ra_info;
+        int zero = 0, stride_detect = 0, ra_miss = 0;
         ENTRY;
 
-        if (!lsm || !lsm->lsm_object_id)
-                RETURN(-EBADF);
-
-        /* FIXME: io smaller than PAGE_SIZE is broken on ia64 */
-        if ((iobuf->offset & (PAGE_SIZE - 1)) ||
-            (iobuf->length & (PAGE_SIZE - 1)))
-                RETURN(-EINVAL);
+        cfs_spin_lock(&sbi->ll_lock);
+        cfs_spin_lock(&ras->ras_lock);
+
+        ll_ra_stats_inc_sbi(sbi, hit ? RA_STAT_HIT : RA_STAT_MISS);
+
+        /* reset the read-ahead window in two cases.  First when the app seeks
+         * or reads to some other part of the file.  Secondly if we get a
+         * read-ahead miss that we think we've previously issued.  This can
+         * be a symptom of there being so many read-ahead pages that the VM is
+         * reclaiming it before we get to it. */
+        if (!index_in_window(index, ras->ras_last_readpage, 8, 8)) {
+                zero = 1;
+                ll_ra_stats_inc_sbi(sbi, RA_STAT_DISTANT_READPAGE);
+        } else if (!hit && ras->ras_window_len &&
+                   index < ras->ras_next_readahead &&
+                   index_in_window(index, ras->ras_window_start, 0,
+                                   ras->ras_window_len)) {
+                ra_miss = 1;
+                ll_ra_stats_inc_sbi(sbi, RA_STAT_MISS_IN_WINDOW);
+        }
 
-        set = ptlrpc_prep_set();
-        if (set == NULL)
-                RETURN(-ENOMEM);
+        /* On the second access to a file smaller than the tunable
+         * ra_max_read_ahead_whole_pages trigger RA on all pages in the
+         * file up to ra_max_pages_per_file.  This is simply a best effort
+         * and only occurs once per open file.  Normal RA behavior is reverted
+         * to for subsequent IO.  The mmap case does not increment
+         * ras_requests and thus can never trigger this behavior. */
+        if (ras->ras_requests == 2 && !ras->ras_request_index) {
+                __u64 kms_pages;
+
+                kms_pages = (i_size_read(inode) + CFS_PAGE_SIZE - 1) >>
+                            CFS_PAGE_SHIFT;
+
+                CDEBUG(D_READA, "kmsp "LPU64" mwp %lu mp %lu\n", kms_pages,
+                       ra->ra_max_read_ahead_whole_pages, ra->ra_max_pages_per_file);
+
+                if (kms_pages &&
+                    kms_pages <= ra->ra_max_read_ahead_whole_pages) {
+                        ras->ras_window_start = 0;
+                        ras->ras_last_readpage = 0;
+                        ras->ras_next_readahead = 0;
+                        ras->ras_window_len = min(ra->ra_max_pages_per_file,
+                                ra->ra_max_read_ahead_whole_pages);
+                        GOTO(out_unlock, 0);
+                }
+        }
+        if (zero) {
+                /* check whether it is in stride I/O mode*/
+                if (!index_in_stride_window(index, ras, inode)) {
+                        if (ras->ras_consecutive_stride_requests == 0 &&
+                            ras->ras_request_index == 0) {
+                                ras_update_stride_detector(ras, index);
+                                ras->ras_consecutive_stride_requests ++;
+                        } else {
+                                ras_stride_reset(ras);
+                        }
+                        ras_reset(ras, index);
+                        ras->ras_consecutive_pages++;
+                        GOTO(out_unlock, 0);
+                } else {
+                        ras->ras_consecutive_pages = 0;
+                        ras->ras_consecutive_requests = 0;
+                        if (++ras->ras_consecutive_stride_requests > 1)
+                                stride_detect = 1;
+                        RAS_CDEBUG(ras);
+                }
+        } else {
+                if (ra_miss) {
+                        if (index_in_stride_window(index, ras, inode) &&
+                            stride_io_mode(ras)) {
+                                /*If stride-RA hit cache miss, the stride dector
+                                 *will not be reset to avoid the overhead of
+                                 *redetecting read-ahead mode */
+                                if (index != ras->ras_last_readpage + 1)
+                                       ras->ras_consecutive_pages = 0;
+                                ras_reset(ras, index);
+                                RAS_CDEBUG(ras);
+                        } else {
+                                /* Reset both stride window and normal RA
+                                 * window */
+                                ras_reset(ras, index);
+                                ras->ras_consecutive_pages++;
+                                ras_stride_reset(ras);
+                                GOTO(out_unlock, 0);
+                        }
+                } else if (stride_io_mode(ras)) {
+                        /* If this is contiguous read but in stride I/O mode
+                         * currently, check whether stride step still is valid,
+                         * if invalid, it will reset the stride ra window*/
+                        if (!index_in_stride_window(index, ras, inode)) {
+                                /* Shrink stride read-ahead window to be zero */
+                                ras_stride_reset(ras);
+                                ras->ras_window_len = 0;
+                                ras->ras_next_readahead = index;
+                        }
+                }
+        }
+        ras->ras_consecutive_pages++;
+        ras->ras_last_readpage = index;
+        ras_set_start(ras, index);
+
+        if (stride_io_mode(ras))
+                /* Since stride readahead is sentivite to the offset
+                 * of read-ahead, so we use original offset here,
+                 * instead of ras_window_start, which is 1M aligned*/
+                ras->ras_next_readahead = max(index,
+                                              ras->ras_next_readahead);
+        else
+                ras->ras_next_readahead = max(ras->ras_window_start,
+                                              ras->ras_next_readahead);
+        RAS_CDEBUG(ras);
+
+        /* Trigger RA in the mmap case where ras_consecutive_requests
+         * is not incremented and thus can't be used to trigger RA */
+        if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) {
+                ras->ras_window_len = RAS_INCREASE_STEP;
+                GOTO(out_unlock, 0);
+        }
 
-        OBD_ALLOC(pga, sizeof(*pga) * iobuf->nr_pages);
-        if (!pga) {
-                ptlrpc_set_destroy(set);
-                RETURN(-ENOMEM);
+        /* Initially reset the stride window offset to next_readahead*/
+        if (ras->ras_consecutive_stride_requests == 2 && stride_detect) {
+                /**
+                 * Once stride IO mode is detected, next_readahead should be
+                 * reset to make sure next_readahead > stride offset
+                 */
+                ras->ras_next_readahead = max(index, ras->ras_next_readahead);
+                ras->ras_stride_offset = index;
+                ras->ras_window_len = RAS_INCREASE_STEP;
         }
 
-        flags = (rw == WRITE ? OBD_BRW_CREATE : 0) /* | OBD_BRW_DIRECTIO */;
-        offset = ((obd_off)blocknr << inode->i_blkbits);
-        length = iobuf->length;
-
-        for (i = 0, length = iobuf->length; length > 0;
-             length -= pga[i].count, offset += pga[i].count, i++) { /*i last!*/
-                pga[i].pg = iobuf->maplist[i];
-                pga[i].off = offset;
-                /* To the end of the page, or the length, whatever is less */
-                pga[i].count = min_t(int, PAGE_SIZE - (offset & ~PAGE_MASK),
-                                     length);
-                pga[i].flag = flags;
-                if (rw == READ) {
-                        //POISON(kmap(iobuf->maplist[i]), 0xc5, PAGE_SIZE);
-                        //kunmap(iobuf->maplist[i]);
+        /* The initial ras_window_len is set to the request size.  To avoid
+         * uselessly reading and discarding pages for random IO the window is
+         * only increased once per consecutive request received. */
+        if ((ras->ras_consecutive_requests > 1 || stride_detect) &&
+            !ras->ras_request_index)
+                ras_increase_window(ras, ra, inode);
+        EXIT;
+out_unlock:
+        RAS_CDEBUG(ras);
+        ras->ras_request_index++;
+        cfs_spin_unlock(&ras->ras_lock);
+        cfs_spin_unlock(&sbi->ll_lock);
+        return;
+}
+
+int ll_writepage(struct page *vmpage, struct writeback_control *unused)
+{
+        struct inode           *inode = vmpage->mapping->host;
+        struct lu_env          *env;
+        struct cl_io           *io;
+        struct cl_page         *page;
+        struct cl_object       *clob;
+        struct cl_2queue       *queue;
+        struct cl_env_nest      nest;
+        int result;
+        ENTRY;
+
+        LASSERT(PageLocked(vmpage));
+        LASSERT(!PageWriteback(vmpage));
+
+        if (ll_i2dtexp(inode) == NULL)
+                RETURN(-EINVAL);
+
+        env = cl_env_nested_get(&nest);
+        if (IS_ERR(env))
+                RETURN(PTR_ERR(env));
+
+        io    = &ccc_env_info(env)->cti_io;
+        queue = &vvp_env_info(env)->vti_queue;
+        clob  = ll_i2info(inode)->lli_clob;
+        LASSERT(clob != NULL);
+
+        io->ci_obj = clob;
+        result = cl_io_init(env, io, CIT_MISC, clob);
+        if (result == 0) {
+                page = cl_page_find(env, clob, vmpage->index,
+                                    vmpage, CPT_CACHEABLE);
+                if (!IS_ERR(page)) {
+                        lu_ref_add(&page->cp_reference, "writepage",
+                                   cfs_current());
+                        cl_page_assume(env, io, page);
+                        /*
+                         * Mark page dirty, because this is what
+                         * ->vio_submit()->cpo_prep_write() assumes.
+                         *
+                         * XXX better solution is to detect this from within
+                         * cl_io_submit_rw() somehow.
+                         */
+                        set_page_dirty(vmpage);
+                        cl_2queue_init_page(queue, page);
+                        result = cl_io_submit_rw(env, io, CRT_WRITE,
+                                                 queue, CRP_NORMAL);
+                        cl_page_list_disown(env, io, &queue->c2_qin);
+                        if (result != 0) {
+                                /*
+                                 * There is no need to clear PG_writeback, as
+                                 * cl_io_submit_rw() calls completion callback
+                                 * on failure.
+                                 */
+                                /*
+                                 * Re-dirty page on error so it retries write,
+                                 * but not in case when IO has actually
+                                 * occurred and completed with an error.
+                                 */
+                                if (!PageError(vmpage))
+                                        set_page_dirty(vmpage);
+                        }
+                        LASSERT(!cl_page_is_owned(page, io));
+                        lu_ref_del(&page->cp_reference,
+                                   "writepage", cfs_current());
+                        cl_page_put(env, page);
+                        cl_2queue_fini(env, queue);
                 }
         }
+        cl_io_fini(env, io);
+        cl_env_nested_put(&nest, env);
+        RETURN(result);
+}
 
-        oa.o_id = lsm->lsm_object_id;
-        oa.o_valid = OBD_MD_FLID;
-        obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
-                                    OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+int ll_readpage(struct file *file, struct page *vmpage)
+{
+        struct ll_cl_context *lcc;
+        int result;
+        ENTRY;
 
-        if (rw == WRITE)
-                lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
-                                    LPROC_LL_DIRECT_WRITE, iobuf->length);
-        else
-                lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
-                                    LPROC_LL_DIRECT_READ, iobuf->length);
-        rc = obd_brw_async(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
-                           ll_i2obdconn(inode), &oa, lsm, iobuf->nr_pages, pga,
-                           set, NULL);
-        if (rc) {
-                CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
-                       "error from obd_brw_async: rc = %d\n", rc);
+        lcc = ll_cl_init(file, vmpage, 0);
+        if (!IS_ERR(lcc)) {
+                struct lu_env  *env  = lcc->lcc_env;
+                struct cl_io   *io   = lcc->lcc_io;
+                struct cl_page *page = lcc->lcc_page;
+
+                LASSERT(page->cp_type == CPT_CACHEABLE);
+                if (likely(!PageUptodate(vmpage))) {
+                        cl_page_assume(env, io, page);
+                        result = cl_io_read_page(env, io, page);
+                } else {
+                        /* Page from a non-object file. */
+                        LASSERT(!ll_i2info(vmpage->mapping->host)->lli_smd);
+                        unlock_page(vmpage);
+                        result = 0;
+                }
+                ll_cl_fini(lcc);
         } else {
-                rc = ptlrpc_set_wait(set);
-                if (rc)
-                        CERROR("error from callback: rc = %d\n", rc);
+                result = PTR_ERR(lcc);
         }
-        ptlrpc_set_destroy(set);
-        if (rc == 0)
-                rc = iobuf->length;
-
-        OBD_FREE(pga, sizeof(*pga) * iobuf->nr_pages);
-        RETURN(rc);
+        RETURN(result);
 }
-#endif
-
-//#endif
 
-struct address_space_operations ll_aops = {
-        readpage: ll_readpage,
-#if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
-        direct_IO: ll_direct_IO,
-#endif
-        writepage: ll_writepage,
-        sync_page: block_sync_page,
-        prepare_write: ll_prepare_write,
-        commit_write: ll_commit_write,
-        bmap: NULL
-//#endif
-};