-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* GPL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2013, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre Lite I/O page cache routines shared by different kernel revs
*/
-#include <linux/autoconf.h>
#include <linux/kernel.h>
#include <linux/mm.h>
#include <linux/string.h>
#include <linux/stat.h>
#include <linux/errno.h>
-#include <linux/smp_lock.h>
#include <linux/unistd.h>
-#include <linux/version.h>
-#include <asm/system.h>
+#include <linux/writeback.h>
#include <asm/uaccess.h>
#include <linux/fs.h>
#include <asm/uaccess.h>
#include <linux/mm.h>
#include <linux/pagemap.h>
-#include <linux/smp_lock.h>
/* current_is_kswapd() */
#include <linux/swap.h>
#define DEBUG_SUBSYSTEM S_LLITE
-//#include <lustre_mdc.h>
#include <lustre_lite.h>
#include <obd_cksum.h>
#include "llite_internal.h"
#include <linux/lustre_compat25.h>
-/* this isn't where truncate starts. roughly:
- * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate. setattr_raw grabs
- * DLM lock on [size, EOF], i_mutex, ->lli_size_sem, and WRITE_I_ALLOC_SEM to
- * avoid races.
- *
- * must be called under ->lli_size_sem */
-void ll_truncate(struct inode *inode)
-{
- struct ll_inode_info *lli = ll_i2info(inode);
- loff_t new_size;
- ENTRY;
- CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) to %Lu=%#Lx\n",inode->i_ino,
- inode->i_generation, inode, i_size_read(inode),
- i_size_read(inode));
-
- ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_TRUNC, 1);
- if (lli->lli_size_sem_owner != cfs_current()) {
- EXIT;
- return;
- }
-
- if (!lli->lli_smd) {
- CDEBUG(D_INODE, "truncate on inode %lu with no objects\n",
- inode->i_ino);
- GOTO(out_unlock, 0);
- }
- LASSERT_SEM_LOCKED(&lli->lli_size_sem);
-
- if (unlikely((ll_i2sbi(inode)->ll_flags & LL_SBI_CHECKSUM) &&
- (i_size_read(inode) & ~CFS_PAGE_MASK))) {
- /* If the truncate leaves a partial page, update its checksum */
- struct page *page = find_get_page(inode->i_mapping,
- i_size_read(inode) >>
- CFS_PAGE_SHIFT);
- if (page != NULL) {
-#if 0 /* XXX */
- struct ll_async_page *llap = llap_cast_private(page);
- if (llap != NULL) {
- char *kaddr = kmap_atomic(page, KM_USER0);
- llap->llap_checksum =
- init_checksum(OSC_DEFAULT_CKSUM);
- llap->llap_checksum =
- compute_checksum(llap->llap_checksum,
- kaddr, CFS_PAGE_SIZE,
- OSC_DEFAULT_CKSUM);
- kunmap_atomic(kaddr, KM_USER0);
- }
- page_cache_release(page);
-#endif
- }
- }
-
- new_size = i_size_read(inode);
- ll_inode_size_unlock(inode, 0);
-
- EXIT;
- return;
-
- out_unlock:
- ll_inode_size_unlock(inode, 0);
-} /* ll_truncate */
-
-/**
- * Initializes common cl-data at the typical address_space operation entry
- * point.
- */
-static int ll_cl_init(struct file *file, struct page *vmpage,
- struct lu_env **env,
- struct cl_io **io, struct cl_page **page, int *refcheck)
-{
- struct lu_env *_env;
- struct cl_io *_io;
- struct cl_page *_page;
- struct cl_object *clob;
-
- int result;
-
- *env = NULL;
- *io = NULL;
- *page = NULL;
-
- clob = ll_i2info(vmpage->mapping->host)->lli_clob;
- LASSERT(clob != NULL);
-
- _env = cl_env_get(refcheck);
- if (!IS_ERR(env)) {
- struct ccc_io *cio = ccc_env_io(_env);
-
- *env = _env;
- *io = _io = cio->cui_cl.cis_io;
- if (_io != NULL) {
- LASSERT(_io->ci_state == CIS_IO_GOING);
- LASSERT(cio->cui_fd == LUSTRE_FPRIVATE(file));
- _page = cl_page_find(_env, clob, vmpage->index, vmpage,
- CPT_CACHEABLE);
- if (!IS_ERR(_page)) {
- *page = _page;
- lu_ref_add(&_page->cp_reference, "cl_io", _io);
- result = 0;
- } else
- result = PTR_ERR(_page);
- } else
- /*
- * This is for a case where operation can be called
- * either with or without cl_io created by the upper
- * layer (e.g., ->prepare_write() called directly from
- * loop-back driver).
- */
- result = -EALREADY;
- } else
- result = PTR_ERR(_env);
- CDEBUG(D_VFSTRACE, "%lu@"DFID" -> %i %p %p %p\n",
- vmpage->index, PFID(lu_object_fid(&clob->co_lu)), result,
- *env, *io, *page);
- return result;
-}
-
/**
* Finalizes cl-data before exiting typical address_space operation. Dual to
* ll_cl_init().
*/
-static void ll_cl_fini(struct lu_env *env,
- struct cl_io *io, struct cl_page *page, int *refcheck)
+void ll_cl_fini(struct ll_cl_context *lcc)
{
+ struct lu_env *env = lcc->lcc_env;
+ struct cl_io *io = lcc->lcc_io;
+ struct cl_page *page = lcc->lcc_page;
+
+ LASSERT(lcc->lcc_cookie == current);
+ LASSERT(env != NULL);
+
if (page != NULL) {
lu_ref_del(&page->cp_reference, "cl_io", io);
cl_page_put(env, page);
}
- if (env != NULL) {
- struct vvp_io *vio;
-
- vio = vvp_env_io(env);
- LASSERT(vio->cui_oneshot >= 0);
- if (vio->cui_oneshot > 0) {
- if (--vio->cui_oneshot == 0) {
- cl_io_end(env, io);
- cl_io_unlock(env, io);
- cl_io_iter_fini(env, io);
- cl_io_fini(env, io);
- /* to trigger assertion above, if ll_cl_fini()
- * is called against freed io. */
- vio->cui_oneshot = -1;
- }
- /* additional reference on env was acquired by io,
- * disable refcheck */
- refcheck = NULL;
- }
- cl_env_put(env, refcheck);
- } else
- LASSERT(io == NULL);
-}
-/**
- * Initializes one-shot cl_io for the case when loop driver calls
- * ->{prepare,commit}_write() methods directly.
- */
-static int ll_prepare_loop(struct lu_env *env, struct cl_io *io,
- struct file *file, struct page *vmpage,
- unsigned from, unsigned to)
-{
- struct vvp_io *vio;
- struct ccc_io *cio;
- int result;
- loff_t pos;
-
- vio = vvp_env_io(env);
- cio = ccc_env_io(env);
- ll_io_init(io, file, 1);
- pos = (vmpage->index << CFS_PAGE_SHIFT) + from;
- /*
- * Create IO and quickly drive it through CIS_{INIT,IT_STARTED,LOCKED}
- * states. DLM locks are not taken for vio->cui_oneshot IO---we cannot
- * take DLM locks here, because page is already locked. With new
- * ->write_{being,end}() address_space operations lustre might be
- * luckier.
- */
- result = cl_io_rw_init(env, io, CIT_WRITE, pos, from - to);
- if (result == 0) {
- cio->cui_fd = LUSTRE_FPRIVATE(file);
- vio->cui_oneshot = 1;
- result = cl_io_iter_init(env, io);
- if (result == 0) {
- result = cl_io_lock(env, io);
- if (result == 0)
- result = cl_io_start(env, io);
- }
- } else
- result = io->ci_result;
- return result;
+ cl_env_put(env, &lcc->lcc_refcheck);
}
/**
- * ->prepare_write() address space operation called by generic_file_write()
- * for every page during write.
+ * Initializes common cl-data at the typical address_space operation entry
+ * point.
*/
-int ll_prepare_write(struct file *file, struct page *vmpage, unsigned from,
- unsigned to)
+struct ll_cl_context *ll_cl_init(struct file *file, struct page *vmpage)
{
+ struct ll_cl_context *lcc;
struct lu_env *env;
struct cl_io *io;
- struct cl_page *page;
- int result;
+ struct cl_object *clob;
+ struct ccc_io *cio;
+
int refcheck;
- ENTRY;
+ int result = 0;
- result = ll_cl_init(file, vmpage, &env, &io, &page, &refcheck);
- /*
- * Loop-back driver calls ->prepare_write() and ->sendfile() methods
- * directly, bypassing file system ->write() operation, so cl_io has
- * to be created here.
- */
- if (result == -EALREADY) {
- io = &ccc_env_info(env)->cti_io;
- result = ll_prepare_loop(env, io, file, vmpage, from, to);
- if (result == 0) {
- result = ll_cl_init(file, vmpage,
- &env, &io, &page, &refcheck);
- cl_env_put(env, NULL);
- }
- }
- if (result == 0) {
- cl_page_assume(env, io, page);
- if (cl_io_is_append(io)) {
- struct cl_object *obj = io->ci_obj;
- struct inode *inode = ccc_object_inode(obj);
- /**
- * In VFS file->page write loop, for appending, the
- * write offset might be reset according to the new
- * file size before holding i_mutex. So crw_pos should
- * be reset here. BUG:17711.
- */
- io->u.ci_wr.wr.crw_pos = i_size_read(inode);
- }
- result = cl_io_prepare_write(env, io, page, from, to);
- if (result == 0) {
- struct vvp_io *vio;
-
- /*
- * Add a reference, so that page is not evicted from
- * the cache until ->commit_write() is called.
- */
- cl_page_get(page);
- lu_ref_add(&page->cp_reference, "prepare_write",
- cfs_current());
- vio = vvp_env_io(env);
- if (vio->cui_oneshot > 0)
- vio->cui_oneshot++;
- } else
- cl_page_unassume(env, io, page);
- }
- ll_cl_fini(env, io, page, &refcheck);
- RETURN(result);
-}
+ clob = ll_i2info(file->f_dentry->d_inode)->lli_clob;
+ LASSERT(clob != NULL);
-int ll_commit_write(struct file *file, struct page *vmpage, unsigned from,
- unsigned to)
-{
- struct lu_env *env;
- struct cl_io *io;
- struct cl_page *page;
- int result;
- int refcheck;
- ENTRY;
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ return ERR_PTR(PTR_ERR(env));
- result = ll_cl_init(file, vmpage, &env, &io, &page, &refcheck);
- LASSERT(result != -EALREADY);
- if (result == 0) {
- LASSERT(cl_page_is_owned(page, io));
- result = cl_io_commit_write(env, io, page, from, to);
- if (cl_page_is_owned(page, io))
- cl_page_unassume(env, io, page);
- /*
- * Release reference acquired by cl_io_prepare_write().
- */
- lu_ref_del(&page->cp_reference, "prepare_write", cfs_current());
- cl_page_put(env, page);
+ lcc = &vvp_env_info(env)->vti_io_ctx;
+ memset(lcc, 0, sizeof(*lcc));
+ lcc->lcc_env = env;
+ lcc->lcc_refcheck = refcheck;
+ lcc->lcc_cookie = current;
+
+ cio = ccc_env_io(env);
+ io = cio->cui_cl.cis_io;
+ lcc->lcc_io = io;
+ if (io == NULL)
+ result = -EIO;
+ if (result == 0 && vmpage != NULL) {
+ struct cl_page *page;
+
+ LASSERT(io != NULL);
+ LASSERT(io->ci_state == CIS_IO_GOING);
+ LASSERT(cio->cui_fd == LUSTRE_FPRIVATE(file));
+ page = cl_page_find(env, clob, vmpage->index, vmpage,
+ CPT_CACHEABLE);
+ if (!IS_ERR(page)) {
+ lcc->lcc_page = page;
+ lu_ref_add(&page->cp_reference, "cl_io", io);
+ result = 0;
+ } else
+ result = PTR_ERR(page);
}
- ll_cl_fini(env, io, page, &refcheck);
- RETURN(result);
+ if (result) {
+ ll_cl_fini(lcc);
+ lcc = ERR_PTR(result);
+ }
+
+ return lcc;
}
struct obd_capa *cl_capa_lookup(struct inode *inode, enum cl_req_type crt)
static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which);
-/* WARNING: This algorithm is used to reduce the contention on
- * sbi->ll_lock. It should work well if the ra_max_pages is much
- * greater than the single file's read-ahead window.
+/**
+ * Get readahead pages from the filesystem readahead pool of the client for a
+ * thread.
*
- * TODO: There may exist a `global sync problem' in this implementation.
- * Considering the global ra window is 100M, and each file's ra window is 10M,
- * there are over 10 files trying to get its ra budget and reach
- * ll_ra_count_get at the exactly same time. All of them will get a zero ra
- * window, although the global window is 100M. -jay
- */
-static unsigned long ll_ra_count_get(struct ll_sb_info *sbi, unsigned long len)
+ * /param sbi superblock for filesystem readahead state ll_ra_info
+ * /param ria per-thread readahead state
+ * /param pages number of pages requested for readahead for the thread.
+ *
+ * WARNING: This algorithm is used to reduce contention on sbi->ll_lock.
+ * It should work well if the ra_max_pages is much greater than the single
+ * file's read-ahead window, and not too many threads contending for
+ * these readahead pages.
+ *
+ * TODO: There may be a 'global sync problem' if many threads are trying
+ * to get an ra budget that is larger than the remaining readahead pages
+ * and reach here at exactly the same time. They will compute /a ret to
+ * consume the remaining pages, but will fail at atomic_add_return() and
+ * get a zero ra window, although there is still ra space remaining. - Jay */
+
+static unsigned long ll_ra_count_get(struct ll_sb_info *sbi,
+ struct ra_io_arg *ria,
+ unsigned long pages, unsigned long min)
{
struct ll_ra_info *ra = &sbi->ll_ra_info;
- unsigned long ret;
+ long ret;
ENTRY;
- ret = min(ra->ra_max_pages - atomic_read(&ra->ra_cur_pages), len);
- if ((int)ret < 0)
+ /* If read-ahead pages left are less than 1M, do not do read-ahead,
+ * otherwise it will form small read RPC(< 1M), which hurt server
+ * performance a lot. */
+ ret = min(ra->ra_max_pages - atomic_read(&ra->ra_cur_pages), pages);
+ if (ret < 0 || ret < min_t(long, PTLRPC_MAX_BRW_PAGES, pages))
GOTO(out, ret = 0);
- if (atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) {
- atomic_sub(ret, &ra->ra_cur_pages);
- ret = 0;
+ /* If the non-strided (ria_pages == 0) readahead window
+ * (ria_start + ret) has grown across an RPC boundary, then trim
+ * readahead size by the amount beyond the RPC so it ends on an
+ * RPC boundary. If the readahead window is already ending on
+ * an RPC boundary (beyond_rpc == 0), or smaller than a full
+ * RPC (beyond_rpc < ret) the readahead size is unchanged.
+ * The (beyond_rpc != 0) check is skipped since the conditional
+ * branch is more expensive than subtracting zero from the result.
+ *
+ * Strided read is left unaligned to avoid small fragments beyond
+ * the RPC boundary from needing an extra read RPC. */
+ if (ria->ria_pages == 0) {
+ long beyond_rpc = (ria->ria_start + ret) % PTLRPC_MAX_BRW_PAGES;
+ if (/* beyond_rpc != 0 && */ beyond_rpc < ret)
+ ret -= beyond_rpc;
}
+
+ if (atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) {
+ atomic_sub(ret, &ra->ra_cur_pages);
+ ret = 0;
+ }
+
out:
- RETURN(ret);
+ if (ret < min) {
+ /* override ra limit for maximum performance */
+ atomic_add(min - ret, &ra->ra_cur_pages);
+ ret = min;
+ }
+ RETURN(ret);
}
void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len)
{
- struct ll_ra_info *ra = &sbi->ll_ra_info;
- atomic_sub(len, &ra->ra_cur_pages);
+ struct ll_ra_info *ra = &sbi->ll_ra_info;
+ atomic_sub(len, &ra->ra_cur_pages);
}
static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which)
lprocfs_counter_incr(sbi->ll_ra_stats, which);
}
-void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which)
+void ll_ra_stats_inc(struct inode *inode, enum ra_stat which)
{
- struct ll_sb_info *sbi = ll_i2sbi(mapping->host);
- ll_ra_stats_inc_sbi(sbi, which);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ ll_ra_stats_inc_sbi(sbi, which);
}
#define RAS_CDEBUG(ras) \
CDEBUG(D_READA, \
"lrp %lu cr %lu cp %lu ws %lu wl %lu nra %lu r %lu ri %lu" \
- "csr %lu sf %lu sp %lu sl %lu \n", \
+ "csr %lu sf %lu sp %lu sl %lu \n", \
ras->ras_last_readpage, ras->ras_consecutive_requests, \
ras->ras_consecutive_pages, ras->ras_window_start, \
ras->ras_window_len, ras->ras_next_readahead, \
- ras->ras_requests, ras->ras_request_index, \
+ ras->ras_requests, ras->ras_request_index, \
ras->ras_consecutive_stride_requests, ras->ras_stride_offset, \
ras->ras_stride_pages, ras->ras_stride_length)
void ll_ra_read_in(struct file *f, struct ll_ra_read *rar)
{
- struct ll_readahead_state *ras;
+ struct ll_readahead_state *ras;
- ras = ll_ras_get(f);
+ ras = ll_ras_get(f);
- spin_lock(&ras->ras_lock);
- ras->ras_requests++;
- ras->ras_request_index = 0;
- ras->ras_consecutive_requests++;
- rar->lrr_reader = current;
+ spin_lock(&ras->ras_lock);
+ ras->ras_requests++;
+ ras->ras_request_index = 0;
+ ras->ras_consecutive_requests++;
+ rar->lrr_reader = current;
- list_add(&rar->lrr_linkage, &ras->ras_read_beads);
- spin_unlock(&ras->ras_lock);
+ list_add(&rar->lrr_linkage, &ras->ras_read_beads);
+ spin_unlock(&ras->ras_lock);
}
void ll_ra_read_ex(struct file *f, struct ll_ra_read *rar)
{
- struct ll_readahead_state *ras;
+ struct ll_readahead_state *ras;
- ras = ll_ras_get(f);
+ ras = ll_ras_get(f);
- spin_lock(&ras->ras_lock);
- list_del_init(&rar->lrr_linkage);
- spin_unlock(&ras->ras_lock);
+ spin_lock(&ras->ras_lock);
+ list_del_init(&rar->lrr_linkage);
+ spin_unlock(&ras->ras_lock);
}
static struct ll_ra_read *ll_ra_read_get_locked(struct ll_readahead_state *ras)
{
struct ll_ra_read *scan;
- list_for_each_entry(scan, &ras->ras_read_beads, lrr_linkage) {
+ list_for_each_entry(scan, &ras->ras_read_beads, lrr_linkage) {
if (scan->lrr_reader == current)
return scan;
}
struct ll_ra_read *ll_ra_read_get(struct file *f)
{
- struct ll_readahead_state *ras;
- struct ll_ra_read *bead;
+ struct ll_readahead_state *ras;
+ struct ll_ra_read *bead;
- ras = ll_ras_get(f);
+ ras = ll_ras_get(f);
- spin_lock(&ras->ras_lock);
- bead = ll_ra_read_get_locked(ras);
- spin_unlock(&ras->ras_lock);
- return bead;
+ spin_lock(&ras->ras_lock);
+ bead = ll_ra_read_get_locked(ras);
+ spin_unlock(&ras->ras_lock);
+ return bead;
}
static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io,
- struct cl_page_list *queue, struct cl_page *page,
- struct page *vmpage)
+ struct cl_page_list *queue, struct cl_page *page,
+ struct cl_object *clob, pgoff_t *max_index)
{
- struct ccc_page *cp;
- int rc;
-
- ENTRY;
-
- rc = 0;
- cl_page_assume(env, io, page);
- lu_ref_add(&page->cp_reference, "ra", cfs_current());
- cp = cl2ccc_page(cl_page_at(page, &vvp_device_type));
- if (!cp->cpg_defer_uptodate && !Page_Uptodate(vmpage)) {
- rc = cl_page_is_under_lock(env, io, page);
- if (rc == -EBUSY) {
- cp->cpg_defer_uptodate = 1;
- cp->cpg_ra_used = 0;
- cl_page_list_add(queue, page);
- rc = 1;
- } else {
- cl_page_delete(env, page);
- rc = -ENOLCK;
- }
- } else
- /* skip completed pages */
- cl_page_unassume(env, io, page);
- lu_ref_del(&page->cp_reference, "ra", cfs_current());
- cl_page_put(env, page);
- RETURN(rc);
+ struct page *vmpage = page->cp_vmpage;
+ struct ccc_page *cp;
+ int rc;
+
+ ENTRY;
+
+ rc = 0;
+ cl_page_assume(env, io, page);
+ lu_ref_add(&page->cp_reference, "ra", current);
+ cp = cl2ccc_page(cl_object_page_slice(clob, page));
+ if (!cp->cpg_defer_uptodate && !PageUptodate(vmpage)) {
+ CDEBUG(D_READA, "page index %lu, max_index: %lu\n",
+ ccc_index(cp), *max_index);
+ /* Disable the optimization on prefetching maximum readahead
+ * index because there is a race with lock cancellation. This
+ * optimization will be revived later.
+ * if (*max_index == 0 || ccc_index(cp) > *max_index) */
+ rc = cl_page_is_under_lock(env, io, page, max_index);
+ if (rc == 0) {
+ cp->cpg_defer_uptodate = 1;
+ cp->cpg_ra_used = 0;
+ cl_page_list_add(queue, page);
+ rc = 1;
+ } else {
+ cl_page_discard(env, io, page);
+ rc = -ENOLCK;
+ }
+ } else {
+ /* skip completed pages */
+ cl_page_unassume(env, io, page);
+ }
+ lu_ref_del(&page->cp_reference, "ra", current);
+ cl_page_put(env, page);
+ RETURN(rc);
}
/**
* \retval -ve, 0: page wasn't added to \a queue for other reason.
*/
static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
- struct cl_page_list *queue,
- int index, struct address_space *mapping)
+ struct cl_page_list *queue,
+ pgoff_t index, pgoff_t *max_index)
{
+ struct cl_object *clob = io->ci_obj;
+ struct inode *inode = ccc_object_inode(clob);
struct page *vmpage;
- struct cl_object *clob = ll_i2info(mapping->host)->lli_clob;
struct cl_page *page;
enum ra_stat which = _NR_RA_STAT; /* keep gcc happy */
unsigned int gfp_mask;
#ifdef __GFP_NOWARN
gfp_mask |= __GFP_NOWARN;
#endif
- vmpage = grab_cache_page_nowait_gfp(mapping, index, gfp_mask);
- if (vmpage != NULL) {
- /* Check if vmpage was truncated or reclaimed */
- if (vmpage->mapping == mapping) {
- page = cl_page_find(env, clob, vmpage->index,
- vmpage, CPT_CACHEABLE);
- if (!IS_ERR(page)) {
- rc = cl_read_ahead_page(env, io, queue,
- page, vmpage);
+ vmpage = grab_cache_page_nowait(inode->i_mapping, index);
+ if (vmpage != NULL) {
+ /* Check if vmpage was truncated or reclaimed */
+ if (vmpage->mapping == inode->i_mapping) {
+ page = cl_page_find(env, clob, vmpage->index,
+ vmpage, CPT_CACHEABLE);
+ if (!IS_ERR(page)) {
+ rc = cl_read_ahead_page(env, io, queue,
+ page, clob, max_index);
if (rc == -ENOLCK) {
which = RA_STAT_FAILED_MATCH;
msg = "lock match failed";
which = RA_STAT_FAILED_GRAB_PAGE;
msg = "g_c_p_n failed";
}
- if (msg != NULL) {
- ll_ra_stats_inc(mapping, which);
- CDEBUG(D_READA, "%s\n", msg);
- }
- RETURN(rc);
+ if (msg != NULL) {
+ ll_ra_stats_inc(inode, which);
+ CDEBUG(D_READA, "%s\n", msg);
+ }
+ RETURN(rc);
}
#define RIA_DEBUG(ria) \
ria->ria_start, ria->ria_end, ria->ria_stoff, ria->ria_length,\
ria->ria_pages)
-#define RAS_INCREASE_STEP (1024 * 1024 >> CFS_PAGE_SHIFT)
+/* Limit this to the blocksize instead of PTLRPC_BRW_MAX_SIZE, since we don't
+ * know what the actual RPC size is. If this needs to change, it makes more
+ * sense to tune the i_blkbits value for the file based on the OSTs it is
+ * striped over, rather than having a constant value for all files here. */
+
+/* RAS_INCREASE_STEP should be (1UL << (inode->i_blkbits - PAGE_CACHE_SHIFT)).
+ * Temprarily set RAS_INCREASE_STEP to 1MB. After 4MB RPC is enabled
+ * by default, this should be adjusted corresponding with max_read_ahead_mb
+ * and max_read_ahead_per_file_mb otherwise the readahead budget can be used
+ * up quickly which will affect read performance siginificantly. See LU-2816 */
+#define RAS_INCREASE_STEP(inode) (ONE_MB_BRW_SIZE >> PAGE_CACHE_SHIFT)
static inline int stride_io_mode(struct ll_readahead_state *ras)
{
return ras->ras_consecutive_stride_requests > 1;
}
-
/* The function calculates how much pages will be read in
- * [off, off + length], which will be read by stride I/O mode,
+ * [off, off + length], in such stride IO area,
* stride_offset = st_off, stride_lengh = st_len,
* stride_pages = st_pgs
+ *
+ * |------------------|*****|------------------|*****|------------|*****|....
+ * st_off
+ * |--- st_pgs ---|
+ * |----- st_len -----|
+ *
+ * How many pages it should read in such pattern
+ * |-------------------------------------------------------------|
+ * off
+ * |<------ length ------->|
+ *
+ * = |<----->| + |-------------------------------------| + |---|
+ * start_left st_pgs * i end_left
*/
static unsigned long
stride_pg_count(pgoff_t st_off, unsigned long st_len, unsigned long st_pgs,
- unsigned long off, unsigned length)
+ unsigned long off, unsigned long length)
{
- unsigned long cont_len = st_off > off ? st_off - off : 0;
- __u64 stride_len = length + off > st_off ?
- length + off + 1 - st_off : 0;
- unsigned long left, pg_count;
+ __u64 start = off > st_off ? off - st_off : 0;
+ __u64 end = off + length > st_off ? off + length - st_off : 0;
+ unsigned long start_left = 0;
+ unsigned long end_left = 0;
+ unsigned long pg_count;
- if (st_len == 0 || length == 0)
+ if (st_len == 0 || length == 0 || end == 0)
return length;
- left = do_div(stride_len, st_len);
- left = min(left, st_pgs);
+ start_left = do_div(start, st_len);
+ if (start_left < st_pgs)
+ start_left = st_pgs - start_left;
+ else
+ start_left = 0;
+
+ end_left = do_div(end, st_len);
+ if (end_left > st_pgs)
+ end_left = st_pgs;
- pg_count = left + stride_len * st_pgs + cont_len;
+ CDEBUG(D_READA, "start "LPU64", end "LPU64" start_left %lu end_left %lu \n",
+ start, end, start_left, end_left);
- LASSERT(pg_count >= left);
+ if (start == end)
+ pg_count = end_left - (st_pgs - start_left);
+ else
+ pg_count = start_left + st_pgs * (end - start - 1) + end_left;
- CDEBUG(D_READA, "st_off %lu, st_len %lu st_pgs %lu off %lu length %u"
+ CDEBUG(D_READA, "st_off %lu, st_len %lu st_pgs %lu off %lu length %lu"
"pgcount %lu\n", st_off, st_len, st_pgs, off, length, pg_count);
return pg_count;
* For stride I/O mode, just check whether the idx is inside
* the ria_pages. */
return ria->ria_length == 0 || ria->ria_length == ria->ria_pages ||
- (idx - ria->ria_stoff) % ria->ria_length < ria->ria_pages;
+ (idx >= ria->ria_stoff && (idx - ria->ria_stoff) %
+ ria->ria_length < ria->ria_pages);
}
static int ll_read_ahead_pages(const struct lu_env *env,
struct cl_io *io, struct cl_page_list *queue,
struct ra_io_arg *ria,
unsigned long *reserved_pages,
- struct address_space *mapping,
unsigned long *ra_end)
{
- int rc, count = 0, stride_ria;
- unsigned long page_idx;
-
- LASSERT(ria != NULL);
- RIA_DEBUG(ria);
-
- stride_ria = ria->ria_length > ria->ria_pages && ria->ria_pages > 0;
- for (page_idx = ria->ria_start; page_idx <= ria->ria_end &&
- *reserved_pages > 0; page_idx++) {
- if (ras_inside_ra_window(page_idx, ria)) {
- /* If the page is inside the read-ahead window*/
- rc = ll_read_ahead_page(env, io, queue,
- page_idx, mapping);
+ int rc, count = 0;
+ bool stride_ria;
+ pgoff_t page_idx;
+ pgoff_t max_index = 0;
+
+ LASSERT(ria != NULL);
+ RIA_DEBUG(ria);
+
+ stride_ria = ria->ria_length > ria->ria_pages && ria->ria_pages > 0;
+ for (page_idx = ria->ria_start;
+ page_idx <= ria->ria_end && *reserved_pages > 0; page_idx++) {
+ if (ras_inside_ra_window(page_idx, ria)) {
+ /* If the page is inside the read-ahead window*/
+ rc = ll_read_ahead_page(env, io, queue,
+ page_idx, &max_index);
if (rc == 1) {
(*reserved_pages)--;
- count ++;
+ count++;
} else if (rc == -ENOLCK)
break;
} else if (stride_ria) {
/* FIXME: This assertion only is valid when it is for
* forward read-ahead, it will be fixed when backward
* read-ahead is implemented */
- LASSERTF(page_idx > ria->ria_stoff, "since %lu in the"
- " gap of ra window,it should bigger than stride"
- " offset %lu \n", page_idx, ria->ria_stoff);
-
+ LASSERTF(page_idx > ria->ria_stoff, "Invalid page_idx %lu"
+ "rs %lu re %lu ro %lu rl %lu rp %lu\n", page_idx,
+ ria->ria_start, ria->ria_end, ria->ria_stoff,
+ ria->ria_length, ria->ria_pages);
offset = page_idx - ria->ria_stoff;
offset = offset % (ria->ria_length);
if (offset > ria->ria_pages) {
}
int ll_readahead(const struct lu_env *env, struct cl_io *io,
- struct ll_readahead_state *ras, struct address_space *mapping,
- struct cl_page_list *queue, int flags)
+ struct cl_page_list *queue, struct ll_readahead_state *ras,
+ bool hit)
{
- struct vvp_io *vio = vvp_env_io(env);
- struct vvp_thread_info *vti = vvp_env_info(env);
- struct ccc_thread_info *cti = ccc_env_info(env);
- unsigned long start = 0, end = 0, reserved;
- unsigned long ra_end, len;
- struct inode *inode;
- struct ll_ra_read *bead;
- struct ra_io_arg *ria = &vti->vti_ria;
- struct ll_inode_info *lli;
- struct cl_object *clob;
- struct cl_attr *attr = &cti->cti_attr;
- int ret = 0;
- __u64 kms;
- ENTRY;
-
- inode = mapping->host;
- lli = ll_i2info(inode);
- clob = lli->lli_clob;
-
- memset(ria, 0, sizeof *ria);
-
- cl_object_attr_lock(clob);
- ret = cl_object_attr_get(env, clob, attr);
- cl_object_attr_unlock(clob);
-
- if (ret != 0)
- RETURN(ret);
- kms = attr->cat_kms;
- if (kms == 0) {
- ll_ra_stats_inc(mapping, RA_STAT_ZERO_LEN);
- RETURN(0);
- }
-
- spin_lock(&ras->ras_lock);
+ struct vvp_io *vio = vvp_env_io(env);
+ struct vvp_thread_info *vti = vvp_env_info(env);
+ struct cl_attr *attr = ccc_env_thread_attr(env);
+ unsigned long start = 0, end = 0, reserved;
+ unsigned long ra_end, len, mlen = 0;
+ struct inode *inode;
+ struct ll_ra_read *bead;
+ struct ra_io_arg *ria = &vti->vti_ria;
+ struct cl_object *clob;
+ int ret = 0;
+ __u64 kms;
+ ENTRY;
+
+ clob = io->ci_obj;
+ inode = ccc_object_inode(clob);
+
+ memset(ria, 0, sizeof *ria);
+
+ cl_object_attr_lock(clob);
+ ret = cl_object_attr_get(env, clob, attr);
+ cl_object_attr_unlock(clob);
+
+ if (ret != 0)
+ RETURN(ret);
+ kms = attr->cat_kms;
+ if (kms == 0) {
+ ll_ra_stats_inc(inode, RA_STAT_ZERO_LEN);
+ RETURN(0);
+ }
+
+ spin_lock(&ras->ras_lock);
if (vio->cui_ra_window_set)
bead = &vio->cui_bead;
else
end = ras->ras_window_start + ras->ras_window_len - 1;
}
if (end != 0) {
+ unsigned long rpc_boundary;
+ /*
+ * Align RA window to an optimal boundary.
+ *
+ * XXX This would be better to align to cl_max_pages_per_rpc
+ * instead of PTLRPC_MAX_BRW_PAGES, because the RPC size may
+ * be aligned to the RAID stripe size in the future and that
+ * is more important than the RPC size.
+ */
+ /* Note: we only trim the RPC, instead of extending the RPC
+ * to the boundary, so to avoid reading too much pages during
+ * random reading. */
+ rpc_boundary = ((end + 1) & (~(PTLRPC_MAX_BRW_PAGES - 1)));
+ if (rpc_boundary > 0)
+ rpc_boundary--;
+
+ if (rpc_boundary > start)
+ end = rpc_boundary;
+
/* Truncate RA window to end of file */
- end = min(end, (unsigned long)((kms - 1) >> CFS_PAGE_SHIFT));
+ end = min(end, (unsigned long)((kms - 1) >> PAGE_CACHE_SHIFT));
+
ras->ras_next_readahead = max(end, end + 1);
RAS_CDEBUG(ras);
}
ria->ria_length = ras->ras_stride_length;
ria->ria_pages = ras->ras_stride_pages;
}
- spin_unlock(&ras->ras_lock);
-
- if (end == 0) {
- ll_ra_stats_inc(mapping, RA_STAT_ZERO_WINDOW);
- RETURN(0);
- }
- len = ria_page_count(ria);
- if (len == 0)
- RETURN(0);
-
- reserved = ll_ra_count_get(ll_i2sbi(inode), len);
-
- if (reserved < len)
- ll_ra_stats_inc(mapping, RA_STAT_MAX_IN_FLIGHT);
-
- CDEBUG(D_READA, "reserved page %lu \n", reserved);
-
- ret = ll_read_ahead_pages(env, io, queue,
- ria, &reserved, mapping, &ra_end);
-
- LASSERTF(reserved >= 0, "reserved %lu\n", reserved);
- if (reserved != 0)
- ll_ra_count_put(ll_i2sbi(inode), reserved);
-
- if (ra_end == end + 1 && ra_end == (kms >> CFS_PAGE_SHIFT))
- ll_ra_stats_inc(mapping, RA_STAT_EOF);
-
- /* if we didn't get to the end of the region we reserved from
- * the ras we need to go back and update the ras so that the
- * next read-ahead tries from where we left off. we only do so
- * if the region we failed to issue read-ahead on is still ahead
- * of the app and behind the next index to start read-ahead from */
- CDEBUG(D_READA, "ra_end %lu end %lu stride end %lu \n",
- ra_end, end, ria->ria_end);
-
- if (ra_end != end + 1) {
- spin_lock(&ras->ras_lock);
- if (ra_end < ras->ras_next_readahead &&
- index_in_window(ra_end, ras->ras_window_start, 0,
- ras->ras_window_len)) {
- ras->ras_next_readahead = ra_end;
- RAS_CDEBUG(ras);
- }
- spin_unlock(&ras->ras_lock);
- }
-
- RETURN(ret);
+ spin_unlock(&ras->ras_lock);
+
+ if (end == 0) {
+ ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
+ RETURN(0);
+ }
+ len = ria_page_count(ria);
+ if (len == 0) {
+ ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
+ RETURN(0);
+ }
+
+ CDEBUG(D_READA, DFID": ria: %lu/%lu, bead: %lu/%lu, hit: %d\n",
+ PFID(lu_object_fid(&clob->co_lu)),
+ ria->ria_start, ria->ria_end,
+ bead == NULL ? 0 : bead->lrr_start,
+ bead == NULL ? 0 : bead->lrr_count,
+ hit);
+
+ /* at least to extend the readahead window to cover current read */
+ if (!hit && bead != NULL &&
+ bead->lrr_start + bead->lrr_count > ria->ria_start) {
+ /* to the end of current read window. */
+ mlen = bead->lrr_start + bead->lrr_count - ria->ria_start;
+ /* trim to RPC boundary */
+ start = ria->ria_start & (PTLRPC_MAX_BRW_PAGES - 1);
+ mlen = min(mlen, PTLRPC_MAX_BRW_PAGES - start);
+ }
+
+ reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len, mlen);
+ if (reserved < len)
+ ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
+
+ CDEBUG(D_READA, "reserved pages: %lu/%lu/%lu, ra_cur %d, ra_max %lu\n",
+ reserved, len, mlen,
+ atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
+ ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
+
+ ret = ll_read_ahead_pages(env, io, queue, ria, &reserved, &ra_end);
+
+ if (reserved != 0)
+ ll_ra_count_put(ll_i2sbi(inode), reserved);
+
+ if (ra_end == end + 1 && ra_end == (kms >> PAGE_CACHE_SHIFT))
+ ll_ra_stats_inc(inode, RA_STAT_EOF);
+
+ /* if we didn't get to the end of the region we reserved from
+ * the ras we need to go back and update the ras so that the
+ * next read-ahead tries from where we left off. we only do so
+ * if the region we failed to issue read-ahead on is still ahead
+ * of the app and behind the next index to start read-ahead from */
+ CDEBUG(D_READA, "ra_end %lu end %lu stride end %lu \n",
+ ra_end, end, ria->ria_end);
+
+ if (ra_end != end + 1) {
+ ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
+ spin_lock(&ras->ras_lock);
+ if (ra_end < ras->ras_next_readahead &&
+ index_in_window(ra_end, ras->ras_window_start, 0,
+ ras->ras_window_len)) {
+ ras->ras_next_readahead = ra_end;
+ RAS_CDEBUG(ras);
+ }
+ spin_unlock(&ras->ras_lock);
+ }
+
+ RETURN(ret);
}
-static void ras_set_start(struct ll_readahead_state *ras, unsigned long index)
+static void ras_set_start(struct inode *inode, struct ll_readahead_state *ras,
+ unsigned long index)
{
- ras->ras_window_start = index & (~(RAS_INCREASE_STEP - 1));
+ ras->ras_window_start = index & (~(RAS_INCREASE_STEP(inode) - 1));
}
/* called with the ras_lock held or from places where it doesn't matter */
-static void ras_reset(struct ll_readahead_state *ras, unsigned long index)
+static void ras_reset(struct inode *inode, struct ll_readahead_state *ras,
+ unsigned long index)
{
- ras->ras_last_readpage = index;
- ras->ras_consecutive_requests = 0;
- ras->ras_consecutive_pages = 0;
- ras->ras_window_len = 0;
- ras_set_start(ras, index);
- ras->ras_next_readahead = max(ras->ras_window_start, index);
-
- RAS_CDEBUG(ras);
+ ras->ras_last_readpage = index;
+ ras->ras_consecutive_requests = 0;
+ ras->ras_consecutive_pages = 0;
+ ras->ras_window_len = 0;
+ ras_set_start(inode, ras, index);
+ ras->ras_next_readahead = max(ras->ras_window_start, index);
+
+ RAS_CDEBUG(ras);
}
/* called with the ras_lock held or from places where it doesn't matter */
void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
{
- spin_lock_init(&ras->ras_lock);
- ras_reset(ras, 0);
- ras->ras_requests = 0;
- INIT_LIST_HEAD(&ras->ras_read_beads);
+ spin_lock_init(&ras->ras_lock);
+ ras_reset(inode, ras, 0);
+ ras->ras_requests = 0;
+ INIT_LIST_HEAD(&ras->ras_read_beads);
}
/*
* Check whether the read request is in the stride window.
* If it is in the stride window, return 1, otherwise return 0.
*/
-static int index_in_stride_window(unsigned long index,
- struct ll_readahead_state *ras,
- struct inode *inode)
+static int index_in_stride_window(struct ll_readahead_state *ras,
+ unsigned long index)
{
- unsigned long stride_gap = index - ras->ras_last_readpage - 1;
+ unsigned long stride_gap;
+
+ if (ras->ras_stride_length == 0 || ras->ras_stride_pages == 0 ||
+ ras->ras_stride_pages == ras->ras_stride_length)
+ return 0;
- if (ras->ras_stride_length == 0 || ras->ras_stride_pages == 0)
- return 0;
+ stride_gap = index - ras->ras_last_readpage - 1;
- /* If it is contiguous read */
- if (stride_gap == 0)
- return ras->ras_consecutive_pages + 1 <= ras->ras_stride_pages;
+ /* If it is contiguous read */
+ if (stride_gap == 0)
+ return ras->ras_consecutive_pages + 1 <= ras->ras_stride_pages;
- /* Otherwise check the stride by itself */
- return (ras->ras_stride_length - ras->ras_stride_pages) == stride_gap &&
- ras->ras_consecutive_pages == ras->ras_stride_pages;
+ /* Otherwise check the stride by itself */
+ return (ras->ras_stride_length - ras->ras_stride_pages) == stride_gap &&
+ ras->ras_consecutive_pages == ras->ras_stride_pages;
}
static void ras_update_stride_detector(struct ll_readahead_state *ras,
ras->ras_stride_pages = ras->ras_consecutive_pages;
ras->ras_stride_length = stride_gap +ras->ras_consecutive_pages;
}
+ LASSERT(ras->ras_request_index == 0);
+ LASSERT(ras->ras_consecutive_stride_requests == 0);
+
+ if (index <= ras->ras_last_readpage) {
+ /*Reset stride window for forward read*/
+ ras_stride_reset(ras);
+ return;
+ }
+
+ ras->ras_stride_pages = ras->ras_consecutive_pages;
+ ras->ras_stride_length = stride_gap +ras->ras_consecutive_pages;
+
RAS_CDEBUG(ras);
+ return;
}
static unsigned long
unsigned long stride_len;
LASSERT(ras->ras_stride_length > 0);
- LASSERTF(ras->ras_window_start + ras->ras_window_len
+ LASSERTF(ras->ras_window_start + ras->ras_window_len
>= ras->ras_stride_offset, "window_start %lu, window_len %lu"
" stride_offset %lu\n", ras->ras_window_start,
ras->ras_window_len, ras->ras_stride_offset);
window_len += step * ras->ras_stride_length + left;
- if (stride_page_count(ras, window_len) <= ra->ra_max_pages)
+ if (stride_page_count(ras, window_len) <= ra->ra_max_pages_per_file)
ras->ras_window_len = window_len;
RAS_CDEBUG(ras);
}
-/* Set stride I/O read-ahead window start offset */
-static void ras_set_stride_offset(struct ll_readahead_state *ras)
+static void ras_increase_window(struct inode *inode,
+ struct ll_readahead_state *ras,
+ struct ll_ra_info *ra)
{
- unsigned long window_len = ras->ras_next_readahead -
- ras->ras_window_start;
- unsigned long left;
-
- LASSERT(ras->ras_stride_length != 0);
-
- left = window_len % ras->ras_stride_length;
-
- ras->ras_stride_offset = ras->ras_next_readahead - left;
-
- RAS_CDEBUG(ras);
+ /* The stretch of ra-window should be aligned with max rpc_size
+ * but current clio architecture does not support retrieve such
+ * information from lower layer. FIXME later
+ */
+ if (stride_io_mode(ras))
+ ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP(inode));
+ else
+ ras->ras_window_len = min(ras->ras_window_len +
+ RAS_INCREASE_STEP(inode),
+ ra->ra_max_pages_per_file);
}
void ras_update(struct ll_sb_info *sbi, struct inode *inode,
- struct ll_readahead_state *ras, unsigned long index,
- unsigned hit)
+ struct ll_readahead_state *ras, unsigned long index,
+ unsigned hit)
{
- struct ll_ra_info *ra = &sbi->ll_ra_info;
- int zero = 0, stride_detect = 0, ra_miss = 0;
- ENTRY;
+ struct ll_ra_info *ra = &sbi->ll_ra_info;
+ int zero = 0, stride_detect = 0, ra_miss = 0;
+ ENTRY;
- spin_lock(&sbi->ll_lock);
- spin_lock(&ras->ras_lock);
+ spin_lock(&ras->ras_lock);
ll_ra_stats_inc_sbi(sbi, hit ? RA_STAT_HIT : RA_STAT_MISS);
index < ras->ras_next_readahead &&
index_in_window(index, ras->ras_window_start, 0,
ras->ras_window_len)) {
- ra_miss = 1;
+ ra_miss = 1;
ll_ra_stats_inc_sbi(sbi, RA_STAT_MISS_IN_WINDOW);
}
/* On the second access to a file smaller than the tunable
* ra_max_read_ahead_whole_pages trigger RA on all pages in the
- * file up to ra_max_pages. This is simply a best effort and
- * only occurs once per open file. Normal RA behavior is reverted
+ * file up to ra_max_pages_per_file. This is simply a best effort
+ * and only occurs once per open file. Normal RA behavior is reverted
* to for subsequent IO. The mmap case does not increment
* ras_requests and thus can never trigger this behavior. */
if (ras->ras_requests == 2 && !ras->ras_request_index) {
__u64 kms_pages;
- kms_pages = (i_size_read(inode) + CFS_PAGE_SIZE - 1) >>
- CFS_PAGE_SHIFT;
+ kms_pages = (i_size_read(inode) + PAGE_CACHE_SIZE - 1) >>
+ PAGE_CACHE_SHIFT;
CDEBUG(D_READA, "kmsp "LPU64" mwp %lu mp %lu\n", kms_pages,
- ra->ra_max_read_ahead_whole_pages, ra->ra_max_pages);
+ ra->ra_max_read_ahead_whole_pages, ra->ra_max_pages_per_file);
if (kms_pages &&
kms_pages <= ra->ra_max_read_ahead_whole_pages) {
ras->ras_window_start = 0;
ras->ras_last_readpage = 0;
ras->ras_next_readahead = 0;
- ras->ras_window_len = min(ra->ra_max_pages,
+ ras->ras_window_len = min(ra->ra_max_pages_per_file,
ra->ra_max_read_ahead_whole_pages);
GOTO(out_unlock, 0);
}
}
- if (zero) {
+ if (zero) {
/* check whether it is in stride I/O mode*/
- if (!index_in_stride_window(index, ras, inode)) {
- ras_reset(ras, index);
- ras->ras_consecutive_pages++;
- ras_stride_reset(ras);
- GOTO(out_unlock, 0);
- } else {
- ras->ras_consecutive_requests = 0;
- if (++ras->ras_consecutive_stride_requests > 1)
- stride_detect = 1;
- RAS_CDEBUG(ras);
- }
- } else {
- if (ra_miss) {
- if (index_in_stride_window(index, ras, inode) &&
- stride_io_mode(ras)) {
- /*If stride-RA hit cache miss, the stride dector
- *will not be reset to avoid the overhead of
- *redetecting read-ahead mode */
- if (index != ras->ras_last_readpage + 1)
- ras->ras_consecutive_pages = 0;
- RAS_CDEBUG(ras);
- } else {
- /* Reset both stride window and normal RA window */
- ras_reset(ras, index);
- ras->ras_consecutive_pages++;
- ras_stride_reset(ras);
- GOTO(out_unlock, 0);
- }
- } else if (stride_io_mode(ras)) {
- /* If this is contiguous read but in stride I/O mode
- * currently, check whether stride step still is valid,
- * if invalid, it will reset the stride ra window*/
- if (!index_in_stride_window(index, ras, inode)) {
- /* Shrink stride read-ahead window to be zero */
- ras_stride_reset(ras);
- ras->ras_window_len = 0;
- ras->ras_next_readahead = index;
- }
- }
- }
- ras->ras_consecutive_pages++;
- ras_update_stride_detector(ras, index);
- ras->ras_last_readpage = index;
- ras_set_start(ras, index);
- ras->ras_next_readahead = max(ras->ras_window_start,
- ras->ras_next_readahead);
- RAS_CDEBUG(ras);
-
- /* Trigger RA in the mmap case where ras_consecutive_requests
- * is not incremented and thus can't be used to trigger RA */
- if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) {
- ras->ras_window_len = RAS_INCREASE_STEP;
- GOTO(out_unlock, 0);
- }
-
- /* Initially reset the stride window offset to next_readahead*/
- if (ras->ras_consecutive_stride_requests == 2 && stride_detect)
- ras_set_stride_offset(ras);
-
- /* The initial ras_window_len is set to the request size. To avoid
- * uselessly reading and discarding pages for random IO the window is
- * only increased once per consecutive request received. */
- if ((ras->ras_consecutive_requests > 1 &&
- !ras->ras_request_index) || stride_detect) {
- if (stride_io_mode(ras))
- ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP);
- else
- ras->ras_window_len = min(ras->ras_window_len +
- RAS_INCREASE_STEP,
- ra->ra_max_pages);
- }
- EXIT;
+ if (!index_in_stride_window(ras, index)) {
+ if (ras->ras_consecutive_stride_requests == 0 &&
+ ras->ras_request_index == 0) {
+ ras_update_stride_detector(ras, index);
+ ras->ras_consecutive_stride_requests++;
+ } else {
+ ras_stride_reset(ras);
+ }
+ ras_reset(inode, ras, index);
+ ras->ras_consecutive_pages++;
+ GOTO(out_unlock, 0);
+ } else {
+ ras->ras_consecutive_pages = 0;
+ ras->ras_consecutive_requests = 0;
+ if (++ras->ras_consecutive_stride_requests > 1)
+ stride_detect = 1;
+ RAS_CDEBUG(ras);
+ }
+ } else {
+ if (ra_miss) {
+ if (index_in_stride_window(ras, index) &&
+ stride_io_mode(ras)) {
+ /*If stride-RA hit cache miss, the stride dector
+ *will not be reset to avoid the overhead of
+ *redetecting read-ahead mode */
+ if (index != ras->ras_last_readpage + 1)
+ ras->ras_consecutive_pages = 0;
+ ras_reset(inode, ras, index);
+ RAS_CDEBUG(ras);
+ } else {
+ /* Reset both stride window and normal RA
+ * window */
+ ras_reset(inode, ras, index);
+ ras->ras_consecutive_pages++;
+ ras_stride_reset(ras);
+ GOTO(out_unlock, 0);
+ }
+ } else if (stride_io_mode(ras)) {
+ /* If this is contiguous read but in stride I/O mode
+ * currently, check whether stride step still is valid,
+ * if invalid, it will reset the stride ra window*/
+ if (!index_in_stride_window(ras, index)) {
+ /* Shrink stride read-ahead window to be zero */
+ ras_stride_reset(ras);
+ ras->ras_window_len = 0;
+ ras->ras_next_readahead = index;
+ }
+ }
+ }
+ ras->ras_consecutive_pages++;
+ ras->ras_last_readpage = index;
+ ras_set_start(inode, ras, index);
+
+ if (stride_io_mode(ras)) {
+ /* Since stride readahead is sentivite to the offset
+ * of read-ahead, so we use original offset here,
+ * instead of ras_window_start, which is RPC aligned */
+ ras->ras_next_readahead = max(index, ras->ras_next_readahead);
+ } else {
+ if (ras->ras_next_readahead < ras->ras_window_start)
+ ras->ras_next_readahead = ras->ras_window_start;
+ if (!hit)
+ ras->ras_next_readahead = index + 1;
+ }
+ RAS_CDEBUG(ras);
+
+ /* Trigger RA in the mmap case where ras_consecutive_requests
+ * is not incremented and thus can't be used to trigger RA */
+ if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) {
+ ras->ras_window_len = RAS_INCREASE_STEP(inode);
+ GOTO(out_unlock, 0);
+ }
+
+ /* Initially reset the stride window offset to next_readahead*/
+ if (ras->ras_consecutive_stride_requests == 2 && stride_detect) {
+ /**
+ * Once stride IO mode is detected, next_readahead should be
+ * reset to make sure next_readahead > stride offset
+ */
+ ras->ras_next_readahead = max(index, ras->ras_next_readahead);
+ ras->ras_stride_offset = index;
+ ras->ras_window_len = RAS_INCREASE_STEP(inode);
+ }
+
+ /* The initial ras_window_len is set to the request size. To avoid
+ * uselessly reading and discarding pages for random IO the window is
+ * only increased once per consecutive request received. */
+ if ((ras->ras_consecutive_requests > 1 || stride_detect) &&
+ !ras->ras_request_index)
+ ras_increase_window(inode, ras, ra);
+ EXIT;
out_unlock:
- RAS_CDEBUG(ras);
- ras->ras_request_index++;
- spin_unlock(&ras->ras_lock);
- spin_unlock(&sbi->ll_lock);
- return;
+ RAS_CDEBUG(ras);
+ ras->ras_request_index++;
+ spin_unlock(&ras->ras_lock);
+ return;
}
-int ll_writepage(struct page *vmpage, struct writeback_control *_)
+int ll_writepage(struct page *vmpage, struct writeback_control *wbc)
{
- struct inode *inode = vmpage->mapping->host;
+ struct inode *inode = vmpage->mapping->host;
+ struct ll_inode_info *lli = ll_i2info(inode);
struct lu_env *env;
struct cl_io *io;
struct cl_page *page;
struct cl_object *clob;
- struct cl_2queue *queue;
struct cl_env_nest nest;
+ bool redirtied = false;
+ bool unlocked = false;
int result;
ENTRY;
LASSERT(PageLocked(vmpage));
LASSERT(!PageWriteback(vmpage));
- if (ll_i2dtexp(inode) == NULL)
- RETURN(-EINVAL);
+ LASSERT(ll_i2dtexp(inode) != NULL);
- env = cl_env_nested_get(&nest);
- if (IS_ERR(env))
- RETURN(PTR_ERR(env));
+ env = cl_env_nested_get(&nest);
+ if (IS_ERR(env))
+ GOTO(out, result = PTR_ERR(env));
- io = &ccc_env_info(env)->cti_io;
- queue = &vvp_env_info(env)->vti_queue;
clob = ll_i2info(inode)->lli_clob;
LASSERT(clob != NULL);
+ io = ccc_env_thread_io(env);
io->ci_obj = clob;
+ io->ci_ignore_layout = 1;
result = cl_io_init(env, io, CIT_MISC, clob);
if (result == 0) {
page = cl_page_find(env, clob, vmpage->index,
vmpage, CPT_CACHEABLE);
- if (!IS_ERR(page)) {
- lu_ref_add(&page->cp_reference, "writepage",
- cfs_current());
- cl_page_assume(env, io, page);
- /*
- * Mark page dirty, because this is what
- * ->vio_submit()->cpo_prep_write() assumes.
- *
- * XXX better solution is to detect this from within
- * cl_io_submit_rw() somehow.
- */
- set_page_dirty(vmpage);
- cl_2queue_init_page(queue, page);
- result = cl_io_submit_rw(env, io, CRT_WRITE,
- queue, CRP_NORMAL);
- cl_page_list_disown(env, io, &queue->c2_qin);
- if (result != 0) {
- /*
- * There is no need to clear PG_writeback, as
- * cl_io_submit_rw() calls completion callback
- * on failure.
- */
- /*
- * Re-dirty page on error so it retries write,
- * but not in case when IO has actually
- * occurred and completed with an error.
- */
- if (!PageError(vmpage))
- set_page_dirty(vmpage);
- }
- LASSERT(!cl_page_is_owned(page, io));
- lu_ref_del(&page->cp_reference,
- "writepage", cfs_current());
- cl_page_put(env, page);
- cl_2queue_fini(env, queue);
- }
+ if (!IS_ERR(page)) {
+ lu_ref_add(&page->cp_reference, "writepage",
+ current);
+ cl_page_assume(env, io, page);
+ result = cl_page_flush(env, io, page);
+ if (result != 0) {
+ /*
+ * Re-dirty page on error so it retries write,
+ * but not in case when IO has actually
+ * occurred and completed with an error.
+ */
+ if (!PageError(vmpage)) {
+ redirty_page_for_writepage(wbc, vmpage);
+ result = 0;
+ redirtied = true;
+ }
+ }
+ cl_page_disown(env, io, page);
+ unlocked = true;
+ lu_ref_del(&page->cp_reference,
+ "writepage", current);
+ cl_page_put(env, page);
+ } else {
+ result = PTR_ERR(page);
+ }
}
cl_io_fini(env, io);
+
+ if (redirtied && wbc->sync_mode == WB_SYNC_ALL) {
+ loff_t offset = cl_offset(clob, vmpage->index);
+
+ /* Flush page failed because the extent is being written out.
+ * Wait for the write of extent to be finished to avoid
+ * breaking kernel which assumes ->writepage should mark
+ * PageWriteback or clean the page. */
+ result = cl_sync_file_range(inode, offset,
+ offset + PAGE_CACHE_SIZE - 1,
+ CL_FSYNC_LOCAL, 1);
+ if (result > 0) {
+ /* actually we may have written more than one page.
+ * decreasing this page because the caller will count
+ * it. */
+ wbc->nr_to_write -= result - 1;
+ result = 0;
+ }
+ }
+
cl_env_nested_put(&nest, env);
- RETURN(result);
+ GOTO(out, result);
+
+out:
+ if (result < 0) {
+ if (!lli->lli_async_rc)
+ lli->lli_async_rc = result;
+ SetPageError(vmpage);
+ if (!unlocked)
+ unlock_page(vmpage);
+ }
+ return result;
+}
+
+int ll_writepages(struct address_space *mapping, struct writeback_control *wbc)
+{
+ struct inode *inode = mapping->host;
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ loff_t start;
+ loff_t end;
+ enum cl_fsync_mode mode;
+ int range_whole = 0;
+ int result;
+ int ignore_layout = 0;
+ ENTRY;
+
+ if (wbc->range_cyclic) {
+ start = mapping->writeback_index << PAGE_CACHE_SHIFT;
+ end = OBD_OBJECT_EOF;
+ } else {
+ start = wbc->range_start;
+ end = wbc->range_end;
+ if (end == LLONG_MAX) {
+ end = OBD_OBJECT_EOF;
+ range_whole = start == 0;
+ }
+ }
+
+ mode = CL_FSYNC_NONE;
+ if (wbc->sync_mode == WB_SYNC_ALL)
+ mode = CL_FSYNC_LOCAL;
+
+ if (sbi->ll_umounting)
+ /* if the mountpoint is being umounted, all pages have to be
+ * evicted to avoid hitting LBUG when truncate_inode_pages()
+ * is called later on. */
+ ignore_layout = 1;
+
+ if (cl_i2info(inode)->lli_clob == NULL)
+ RETURN(0);
+
+ result = cl_sync_file_range(inode, start, end, mode, ignore_layout);
+ if (result > 0) {
+ wbc->nr_to_write -= result;
+ result = 0;
+ }
+
+ if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) {
+ if (end == OBD_OBJECT_EOF)
+ mapping->writeback_index = 0;
+ else
+ mapping->writeback_index = (end >> PAGE_CACHE_SHIFT) +1;
+ }
+ RETURN(result);
}
int ll_readpage(struct file *file, struct page *vmpage)
{
- struct lu_env *env;
- struct cl_io *io;
- struct cl_page *page;
+ struct ll_cl_context *lcc;
int result;
- int refcheck;
ENTRY;
- result = ll_cl_init(file, vmpage, &env, &io, &page, &refcheck);
- if (result == 0) {
+ lcc = ll_cl_init(file, vmpage);
+ if (!IS_ERR(lcc)) {
+ struct lu_env *env = lcc->lcc_env;
+ struct cl_io *io = lcc->lcc_io;
+ struct cl_page *page = lcc->lcc_page;
+
LASSERT(page->cp_type == CPT_CACHEABLE);
if (likely(!PageUptodate(vmpage))) {
cl_page_assume(env, io, page);
result = cl_io_read_page(env, io, page);
} else {
/* Page from a non-object file. */
- LASSERT(!ll_i2info(vmpage->mapping->host)->lli_smd);
unlock_page(vmpage);
result = 0;
}
+ ll_cl_fini(lcc);
+ } else {
+ unlock_page(vmpage);
+ result = PTR_ERR(lcc);
}
- LASSERT(!cl_page_is_owned(page, io));
- ll_cl_fini(env, io, page, &refcheck);
RETURN(result);
}
+int ll_page_sync_io(const struct lu_env *env, struct cl_io *io,
+ struct cl_page *page, enum cl_req_type crt)
+{
+ struct cl_2queue *queue;
+ int result;
+
+ LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
+
+ queue = &io->ci_queue;
+ cl_2queue_init_page(queue, page);
+
+ result = cl_io_submit_sync(env, io, crt, queue, 0);
+ LASSERT(cl_page_is_owned(page, io));
+
+ if (crt == CRT_READ)
+ /*
+ * in CRT_WRITE case page is left locked even in case of
+ * error.
+ */
+ cl_page_list_disown(env, io, &queue->c2_qin);
+ cl_2queue_fini(env, queue);
+
+ return result;
+}