-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2016, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include <linux/string.h>
#include <linux/stat.h>
#include <linux/errno.h>
-#include <linux/smp_lock.h>
#include <linux/unistd.h>
-#include <linux/version.h>
-#include <asm/system.h>
+#include <linux/writeback.h>
#include <asm/uaccess.h>
#include <linux/fs.h>
#include <asm/uaccess.h>
#include <linux/mm.h>
#include <linux/pagemap.h>
-#include <linux/smp_lock.h>
/* current_is_kswapd() */
#include <linux/swap.h>
#define DEBUG_SUBSYSTEM S_LLITE
-#include <lustre_lite.h>
#include <obd_cksum.h>
#include "llite_internal.h"
-#include <linux/lustre_compat25.h>
-
-/* this isn't where truncate starts. roughly:
- * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate. setattr_raw grabs
- * DLM lock on [size, EOF], i_mutex, ->lli_size_sem, and WRITE_I_ALLOC_SEM to
- * avoid races.
- *
- * must be called under ->lli_size_sem */
-void ll_truncate(struct inode *inode)
-{
- ENTRY;
-
- CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) to %llu\n", inode->i_ino,
- inode->i_generation, inode, i_size_read(inode));
-
- ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_TRUNC, 1);
-
- EXIT;
- return;
-} /* ll_truncate */
-
-/**
- * Finalizes cl-data before exiting typical address_space operation. Dual to
- * ll_cl_init().
- */
-static void ll_cl_fini(struct ll_cl_context *lcc)
-{
- struct lu_env *env = lcc->lcc_env;
- struct cl_io *io = lcc->lcc_io;
- struct cl_page *page = lcc->lcc_page;
-
- LASSERT(lcc->lcc_cookie == current);
- LASSERT(env != NULL);
-
- if (page != NULL) {
- lu_ref_del(&page->cp_reference, "cl_io", io);
- cl_page_put(env, page);
- }
-
- if (io && lcc->lcc_created) {
- cl_io_end(env, io);
- cl_io_unlock(env, io);
- cl_io_iter_fini(env, io);
- cl_io_fini(env, io);
- }
- cl_env_put(env, &lcc->lcc_refcheck);
-}
-
-/**
- * Initializes common cl-data at the typical address_space operation entry
- * point.
- */
-static struct ll_cl_context *ll_cl_init(struct file *file,
- struct page *vmpage, int create)
-{
- struct ll_cl_context *lcc;
- struct lu_env *env;
- struct cl_io *io;
- struct cl_object *clob;
- struct ccc_io *cio;
-
- int refcheck;
- int result = 0;
-
- clob = ll_i2info(vmpage->mapping->host)->lli_clob;
- LASSERT(clob != NULL);
-
- env = cl_env_get(&refcheck);
- if (IS_ERR(env))
- return ERR_PTR(PTR_ERR(env));
-
- lcc = &vvp_env_info(env)->vti_io_ctx;
- memset(lcc, 0, sizeof(*lcc));
- lcc->lcc_env = env;
- lcc->lcc_refcheck = refcheck;
- lcc->lcc_cookie = current;
-
- cio = ccc_env_io(env);
- io = cio->cui_cl.cis_io;
- if (io == NULL && create) {
- loff_t pos;
-
- /*
- * Loop-back driver calls ->prepare_write() and ->sendfile()
- * methods directly, bypassing file system ->write() operation,
- * so cl_io has to be created here.
- */
- io = ccc_env_thread_io(env);
- ll_io_init(io, file, 1);
-
- /* No lock at all for this kind of IO - we can't do it because
- * we have held page lock, it would cause deadlock.
- * XXX: This causes poor performance to loop device - One page
- * per RPC.
- * In order to get better performance, users should use
- * lloop driver instead.
- */
- io->ci_lockreq = CILR_NEVER;
-
- pos = (vmpage->index << CFS_PAGE_SHIFT);
-
- /* Create a temp IO to serve write. */
- result = cl_io_rw_init(env, io, CIT_WRITE, pos, CFS_PAGE_SIZE);
- if (result == 0) {
- cio->cui_fd = LUSTRE_FPRIVATE(file);
- cio->cui_iov = NULL;
- cio->cui_nrsegs = 0;
- result = cl_io_iter_init(env, io);
- if (result == 0) {
- result = cl_io_lock(env, io);
- if (result == 0)
- result = cl_io_start(env, io);
- }
- } else
- result = io->ci_result;
- lcc->lcc_created = 1;
- }
-
- lcc->lcc_io = io;
- if (io == NULL)
- result = -EIO;
- if (result == 0) {
- struct cl_page *page;
-
- LASSERT(io != NULL);
- LASSERT(io->ci_state == CIS_IO_GOING);
- LASSERT(cio->cui_fd == LUSTRE_FPRIVATE(file));
- page = cl_page_find(env, clob, vmpage->index, vmpage,
- CPT_CACHEABLE);
- if (!IS_ERR(page)) {
- lcc->lcc_page = page;
- lu_ref_add(&page->cp_reference, "cl_io", io);
- result = 0;
- } else
- result = PTR_ERR(page);
- }
- if (result) {
- ll_cl_fini(lcc);
- lcc = ERR_PTR(result);
- }
-
- CDEBUG(D_VFSTRACE, "%lu@"DFID" -> %d %p %p\n",
- vmpage->index, PFID(lu_object_fid(&clob->co_lu)), result,
- env, io);
- return lcc;
-}
-
-static struct ll_cl_context *ll_cl_get(void)
-{
- struct ll_cl_context *lcc;
- struct lu_env *env;
- int refcheck;
-
- env = cl_env_get(&refcheck);
- LASSERT(!IS_ERR(env));
- lcc = &vvp_env_info(env)->vti_io_ctx;
- LASSERT(env == lcc->lcc_env);
- LASSERT(current == lcc->lcc_cookie);
- cl_env_put(env, &refcheck);
-
- /* env has got in ll_cl_init, so it is still usable. */
- return lcc;
-}
-
-/**
- * ->prepare_write() address space operation called by generic_file_write()
- * for every page during write.
- */
-int ll_prepare_write(struct file *file, struct page *vmpage, unsigned from,
- unsigned to)
-{
- struct ll_cl_context *lcc;
- int result;
- ENTRY;
-
- lcc = ll_cl_init(file, vmpage, 1);
- if (!IS_ERR(lcc)) {
- struct lu_env *env = lcc->lcc_env;
- struct cl_io *io = lcc->lcc_io;
- struct cl_page *page = lcc->lcc_page;
-
- cl_page_assume(env, io, page);
- if (cl_io_is_append(io)) {
- struct cl_object *obj = io->ci_obj;
- struct inode *inode = ccc_object_inode(obj);
- /**
- * In VFS file->page write loop, for appending, the
- * write offset might be reset according to the new
- * file size before holding i_mutex. So crw_pos should
- * be reset here. BUG:17711.
- */
- io->u.ci_wr.wr.crw_pos = i_size_read(inode);
- }
- result = cl_io_prepare_write(env, io, page, from, to);
- if (result == 0) {
- /*
- * Add a reference, so that page is not evicted from
- * the cache until ->commit_write() is called.
- */
- cl_page_get(page);
- lu_ref_add(&page->cp_reference, "prepare_write",
- cfs_current());
- } else {
- cl_page_unassume(env, io, page);
- ll_cl_fini(lcc);
- }
- /* returning 0 in prepare assumes commit must be called
- * afterwards */
- } else {
- result = PTR_ERR(lcc);
- }
- RETURN(result);
-}
-
-int ll_commit_write(struct file *file, struct page *vmpage, unsigned from,
- unsigned to)
-{
- struct ll_cl_context *lcc;
- struct lu_env *env;
- struct cl_io *io;
- struct cl_page *page;
- int result = 0;
- ENTRY;
-
- lcc = ll_cl_get();
- env = lcc->lcc_env;
- page = lcc->lcc_page;
- io = lcc->lcc_io;
-
- LASSERT(cl_page_is_owned(page, io));
- LASSERT(from <= to);
- if (from != to) /* handle short write case. */
- result = cl_io_commit_write(env, io, page, from, to);
- if (cl_page_is_owned(page, io))
- cl_page_unassume(env, io, page);
-
- /*
- * Release reference acquired by ll_prepare_write().
- */
- lu_ref_del(&page->cp_reference, "prepare_write", cfs_current());
- cl_page_put(env, page);
- ll_cl_fini(lcc);
- RETURN(result);
-}
-
-struct obd_capa *cl_capa_lookup(struct inode *inode, enum cl_req_type crt)
-{
- __u64 opc;
-
- opc = crt == CRT_WRITE ? CAPA_OPC_OSS_WRITE : CAPA_OPC_OSS_RW;
- return ll_osscapa_get(inode, opc);
-}
+#include <lustre_compat.h>
static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which);
* get a zero ra window, although there is still ra space remaining. - Jay */
static unsigned long ll_ra_count_get(struct ll_sb_info *sbi,
- struct ra_io_arg *ria,
- unsigned long pages)
+ struct ra_io_arg *ria,
+ unsigned long pages, unsigned long min)
{
struct ll_ra_info *ra = &sbi->ll_ra_info;
long ret;
/* If read-ahead pages left are less than 1M, do not do read-ahead,
* otherwise it will form small read RPC(< 1M), which hurt server
* performance a lot. */
- ret = min(ra->ra_max_pages - cfs_atomic_read(&ra->ra_cur_pages), pages);
+ ret = min(ra->ra_max_pages - atomic_read(&ra->ra_cur_pages),
+ pages);
if (ret < 0 || ret < min_t(long, PTLRPC_MAX_BRW_PAGES, pages))
GOTO(out, ret = 0);
- /* If the non-strided (ria_pages == 0) readahead window
- * (ria_start + ret) has grown across an RPC boundary, then trim
- * readahead size by the amount beyond the RPC so it ends on an
- * RPC boundary. If the readahead window is already ending on
- * an RPC boundary (beyond_rpc == 0), or smaller than a full
- * RPC (beyond_rpc < ret) the readahead size is unchanged.
- * The (beyond_rpc != 0) check is skipped since the conditional
- * branch is more expensive than subtracting zero from the result.
- *
- * Strided read is left unaligned to avoid small fragments beyond
- * the RPC boundary from needing an extra read RPC. */
- if (ria->ria_pages == 0) {
- long beyond_rpc = (ria->ria_start + ret) % PTLRPC_MAX_BRW_PAGES;
- if (/* beyond_rpc != 0 && */ beyond_rpc < ret)
- ret -= beyond_rpc;
- }
-
- if (cfs_atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) {
- cfs_atomic_sub(ret, &ra->ra_cur_pages);
- ret = 0;
- }
+ if (atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) {
+ atomic_sub(ret, &ra->ra_cur_pages);
+ ret = 0;
+ }
out:
- RETURN(ret);
+ if (ret < min) {
+ /* override ra limit for maximum performance */
+ atomic_add(min - ret, &ra->ra_cur_pages);
+ ret = min;
+ }
+ RETURN(ret);
}
void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len)
{
- struct ll_ra_info *ra = &sbi->ll_ra_info;
- cfs_atomic_sub(len, &ra->ra_cur_pages);
+ struct ll_ra_info *ra = &sbi->ll_ra_info;
+ atomic_sub(len, &ra->ra_cur_pages);
}
static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which)
{
- LASSERTF(which >= 0 && which < _NR_RA_STAT, "which: %u\n", which);
- lprocfs_counter_incr(sbi->ll_ra_stats, which);
+ LASSERTF(which < _NR_RA_STAT, "which: %u\n", which);
+ lprocfs_counter_incr(sbi->ll_ra_stats, which);
}
-void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which)
+void ll_ra_stats_inc(struct inode *inode, enum ra_stat which)
{
- struct ll_sb_info *sbi = ll_i2sbi(mapping->host);
- ll_ra_stats_inc_sbi(sbi, which);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ ll_ra_stats_inc_sbi(sbi, which);
}
#define RAS_CDEBUG(ras) \
- CDEBUG(D_READA, \
- "lrp %lu cr %lu cp %lu ws %lu wl %lu nra %lu r %lu ri %lu" \
- "csr %lu sf %lu sp %lu sl %lu \n", \
- ras->ras_last_readpage, ras->ras_consecutive_requests, \
- ras->ras_consecutive_pages, ras->ras_window_start, \
- ras->ras_window_len, ras->ras_next_readahead, \
- ras->ras_requests, ras->ras_request_index, \
- ras->ras_consecutive_stride_requests, ras->ras_stride_offset, \
- ras->ras_stride_pages, ras->ras_stride_length)
+ CDEBUG(D_READA, \
+ "lrp %lu cr %lu cp %lu ws %lu wl %lu nra %lu rpc %lu " \
+ "r %lu ri %lu csr %lu sf %lu sp %lu sl %lu\n", \
+ ras->ras_last_readpage, ras->ras_consecutive_requests, \
+ ras->ras_consecutive_pages, ras->ras_window_start, \
+ ras->ras_window_len, ras->ras_next_readahead, \
+ ras->ras_rpc_size, \
+ ras->ras_requests, ras->ras_request_index, \
+ ras->ras_consecutive_stride_requests, ras->ras_stride_offset, \
+ ras->ras_stride_pages, ras->ras_stride_length)
static int index_in_window(unsigned long index, unsigned long point,
unsigned long before, unsigned long after)
return start <= index && index <= end;
}
-static struct ll_readahead_state *ll_ras_get(struct file *f)
-{
- struct ll_file_data *fd;
-
- fd = LUSTRE_FPRIVATE(f);
- return &fd->fd_ras;
-}
-
-void ll_ra_read_in(struct file *f, struct ll_ra_read *rar)
+void ll_ras_enter(struct file *f)
{
- struct ll_readahead_state *ras;
-
- ras = ll_ras_get(f);
-
- cfs_spin_lock(&ras->ras_lock);
- ras->ras_requests++;
- ras->ras_request_index = 0;
- ras->ras_consecutive_requests++;
- rar->lrr_reader = current;
-
- cfs_list_add(&rar->lrr_linkage, &ras->ras_read_beads);
- cfs_spin_unlock(&ras->ras_lock);
-}
-
-void ll_ra_read_ex(struct file *f, struct ll_ra_read *rar)
-{
- struct ll_readahead_state *ras;
-
- ras = ll_ras_get(f);
-
- cfs_spin_lock(&ras->ras_lock);
- cfs_list_del_init(&rar->lrr_linkage);
- cfs_spin_unlock(&ras->ras_lock);
-}
-
-static struct ll_ra_read *ll_ra_read_get_locked(struct ll_readahead_state *ras)
-{
- struct ll_ra_read *scan;
-
- cfs_list_for_each_entry(scan, &ras->ras_read_beads, lrr_linkage) {
- if (scan->lrr_reader == current)
- return scan;
- }
- return NULL;
-}
-
-struct ll_ra_read *ll_ra_read_get(struct file *f)
-{
- struct ll_readahead_state *ras;
- struct ll_ra_read *bead;
-
- ras = ll_ras_get(f);
-
- cfs_spin_lock(&ras->ras_lock);
- bead = ll_ra_read_get_locked(ras);
- cfs_spin_unlock(&ras->ras_lock);
- return bead;
-}
-
-static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io,
- struct cl_page_list *queue, struct cl_page *page,
- struct page *vmpage)
-{
- struct ccc_page *cp;
- int rc;
-
- ENTRY;
-
- rc = 0;
- cl_page_assume(env, io, page);
- lu_ref_add(&page->cp_reference, "ra", cfs_current());
- cp = cl2ccc_page(cl_page_at(page, &vvp_device_type));
- if (!cp->cpg_defer_uptodate && !Page_Uptodate(vmpage)) {
- rc = cl_page_is_under_lock(env, io, page);
- if (rc == -EBUSY) {
- cp->cpg_defer_uptodate = 1;
- cp->cpg_ra_used = 0;
- cl_page_list_add(queue, page);
- rc = 1;
- } else {
- cl_page_delete(env, page);
- rc = -ENOLCK;
- }
- } else
- /* skip completed pages */
- cl_page_unassume(env, io, page);
- lu_ref_del(&page->cp_reference, "ra", cfs_current());
- cl_page_put(env, page);
- RETURN(rc);
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(f);
+ struct ll_readahead_state *ras = &fd->fd_ras;
+
+ spin_lock(&ras->ras_lock);
+ ras->ras_requests++;
+ ras->ras_request_index = 0;
+ ras->ras_consecutive_requests++;
+ spin_unlock(&ras->ras_lock);
}
/**
* Initiates read-ahead of a page with given index.
*
- * \retval +ve: page was added to \a queue.
- *
- * \retval -ENOLCK: there is no extent lock for this part of a file, stop
- * read-ahead.
- *
- * \retval -ve, 0: page wasn't added to \a queue for other reason.
+ * \retval +ve: page was already uptodate so it will be skipped
+ * from being added;
+ * \retval -ve: page wasn't added to \a queue for error;
+ * \retval 0: page was added into \a queue for read ahead.
*/
static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
- struct cl_page_list *queue,
- pgoff_t index, struct address_space *mapping)
+ struct cl_page_list *queue, pgoff_t index)
{
- struct page *vmpage;
- struct cl_object *clob = ll_i2info(mapping->host)->lli_clob;
- struct cl_page *page;
- enum ra_stat which = _NR_RA_STAT; /* keep gcc happy */
- unsigned int gfp_mask;
- int rc = 0;
- const char *msg = NULL;
-
- ENTRY;
+ struct cl_object *clob = io->ci_obj;
+ struct inode *inode = vvp_object_inode(clob);
+ struct page *vmpage;
+ struct cl_page *page;
+ struct vvp_page *vpg;
+ enum ra_stat which = _NR_RA_STAT; /* keep gcc happy */
+ int rc = 0;
+ const char *msg = NULL;
+ ENTRY;
+
+ vmpage = grab_cache_page_nowait(inode->i_mapping, index);
+ if (vmpage == NULL) {
+ which = RA_STAT_FAILED_GRAB_PAGE;
+ msg = "g_c_p_n failed";
+ GOTO(out, rc = -EBUSY);
+ }
+
+ /* Check if vmpage was truncated or reclaimed */
+ if (vmpage->mapping != inode->i_mapping) {
+ which = RA_STAT_WRONG_GRAB_PAGE;
+ msg = "g_c_p_n returned invalid page";
+ GOTO(out, rc = -EBUSY);
+ }
+
+ page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
+ if (IS_ERR(page)) {
+ which = RA_STAT_FAILED_GRAB_PAGE;
+ msg = "cl_page_find failed";
+ GOTO(out, rc = PTR_ERR(page));
+ }
+
+ lu_ref_add(&page->cp_reference, "ra", current);
+ cl_page_assume(env, io, page);
+ vpg = cl2vvp_page(cl_object_page_slice(clob, page));
+ if (!vpg->vpg_defer_uptodate && !PageUptodate(vmpage)) {
+ vpg->vpg_defer_uptodate = 1;
+ vpg->vpg_ra_used = 0;
+ cl_page_list_add(queue, page);
+ } else {
+ /* skip completed pages */
+ cl_page_unassume(env, io, page);
+ /* This page is already uptodate, returning a positive number
+ * to tell the callers about this */
+ rc = 1;
+ }
+
+ lu_ref_del(&page->cp_reference, "ra", current);
+ cl_page_put(env, page);
- gfp_mask = GFP_HIGHUSER & ~__GFP_WAIT;
-#ifdef __GFP_NOWARN
- gfp_mask |= __GFP_NOWARN;
-#endif
- vmpage = grab_cache_page_nowait_gfp(mapping, index, gfp_mask);
- if (vmpage != NULL) {
- /* Check if vmpage was truncated or reclaimed */
- if (vmpage->mapping == mapping) {
- page = cl_page_find(env, clob, vmpage->index,
- vmpage, CPT_CACHEABLE);
- if (!IS_ERR(page)) {
- rc = cl_read_ahead_page(env, io, queue,
- page, vmpage);
- if (rc == -ENOLCK) {
- which = RA_STAT_FAILED_MATCH;
- msg = "lock match failed";
- }
- } else {
- which = RA_STAT_FAILED_GRAB_PAGE;
- msg = "cl_page_find failed";
- }
- } else {
- which = RA_STAT_WRONG_GRAB_PAGE;
- msg = "g_c_p_n returned invalid page";
- }
- if (rc != 1)
- unlock_page(vmpage);
- page_cache_release(vmpage);
- } else {
- which = RA_STAT_FAILED_GRAB_PAGE;
- msg = "g_c_p_n failed";
- }
- if (msg != NULL) {
- ll_ra_stats_inc(mapping, which);
- CDEBUG(D_READA, "%s\n", msg);
- }
- RETURN(rc);
+out:
+ if (vmpage != NULL) {
+ if (rc != 0)
+ unlock_page(vmpage);
+ put_page(vmpage);
+ }
+ if (msg != NULL) {
+ ll_ra_stats_inc(inode, which);
+ CDEBUG(D_READA, "%s\n", msg);
+
+ }
+
+ RETURN(rc);
}
#define RIA_DEBUG(ria) \
ria->ria_start, ria->ria_end, ria->ria_stoff, ria->ria_length,\
ria->ria_pages)
-#define RAS_INCREASE_STEP PTLRPC_MAX_BRW_PAGES
-
static inline int stride_io_mode(struct ll_readahead_state *ras)
{
return ras->ras_consecutive_stride_requests > 1;
}
+
/* The function calculates how much pages will be read in
* [off, off + length], in such stride IO area,
* stride_offset = st_off, stride_lengh = st_len,
if (end_left > st_pgs)
end_left = st_pgs;
- CDEBUG(D_READA, "start "LPU64", end "LPU64" start_left %lu end_left %lu \n",
+ CDEBUG(D_READA, "start %llu, end %llu start_left %lu end_left %lu\n",
start, end, start_left, end_left);
if (start == end)
length);
}
+static unsigned long ras_align(struct ll_readahead_state *ras,
+ unsigned long index,
+ unsigned long *remainder)
+{
+ unsigned long rem = index % ras->ras_rpc_size;
+ if (remainder != NULL)
+ *remainder = rem;
+ return index - rem;
+}
+
/*Check whether the index is in the defined ra-window */
static int ras_inside_ra_window(unsigned long idx, struct ra_io_arg *ria)
{
ria->ria_length < ria->ria_pages);
}
-static int ll_read_ahead_pages(const struct lu_env *env,
- struct cl_io *io, struct cl_page_list *queue,
- struct ra_io_arg *ria,
- unsigned long *reserved_pages,
- struct address_space *mapping,
- unsigned long *ra_end)
+static unsigned long
+ll_read_ahead_pages(const struct lu_env *env, struct cl_io *io,
+ struct cl_page_list *queue, struct ll_readahead_state *ras,
+ struct ra_io_arg *ria)
{
- int rc, count = 0, stride_ria;
- unsigned long page_idx;
-
- LASSERT(ria != NULL);
- RIA_DEBUG(ria);
-
- stride_ria = ria->ria_length > ria->ria_pages && ria->ria_pages > 0;
- for (page_idx = ria->ria_start; page_idx <= ria->ria_end &&
- *reserved_pages > 0; page_idx++) {
- if (ras_inside_ra_window(page_idx, ria)) {
- /* If the page is inside the read-ahead window*/
- rc = ll_read_ahead_page(env, io, queue,
- page_idx, mapping);
- if (rc == 1) {
- (*reserved_pages)--;
- count ++;
- } else if (rc == -ENOLCK)
- break;
+ struct cl_read_ahead ra = { 0 };
+ int rc = 0;
+ bool stride_ria;
+ unsigned long ra_end = 0;
+ pgoff_t page_idx;
+
+ LASSERT(ria != NULL);
+ RIA_DEBUG(ria);
+
+ stride_ria = ria->ria_length > ria->ria_pages && ria->ria_pages > 0;
+ for (page_idx = ria->ria_start;
+ page_idx <= ria->ria_end && ria->ria_reserved > 0; page_idx++) {
+ if (ras_inside_ra_window(page_idx, ria)) {
+ if (ra.cra_end == 0 || ra.cra_end < page_idx) {
+ unsigned long end;
+
+ cl_read_ahead_release(env, &ra);
+
+ rc = cl_io_read_ahead(env, io, page_idx, &ra);
+ if (rc < 0)
+ break;
+
+ CDEBUG(D_READA, "idx: %lu, ra: %lu, rpc: %lu\n",
+ page_idx, ra.cra_end, ra.cra_rpc_size);
+ LASSERTF(ra.cra_end >= page_idx,
+ "object: %p, indcies %lu / %lu\n",
+ io->ci_obj, ra.cra_end, page_idx);
+ /* update read ahead RPC size.
+ * NB: it's racy but doesn't matter */
+ if (ras->ras_rpc_size > ra.cra_rpc_size &&
+ ra.cra_rpc_size > 0)
+ ras->ras_rpc_size = ra.cra_rpc_size;
+ /* trim it to align with optimal RPC size */
+ end = ras_align(ras, ria->ria_end + 1, NULL);
+ if (end > 0 && !ria->ria_eof)
+ ria->ria_end = end - 1;
+ if (ria->ria_end < ria->ria_end_min)
+ ria->ria_end = ria->ria_end_min;
+ if (ria->ria_end > ra.cra_end)
+ ria->ria_end = ra.cra_end;
+ }
+ if (page_idx > ria->ria_end)
+ break;
+
+ /* If the page is inside the read-ahead window */
+ rc = ll_read_ahead_page(env, io, queue, page_idx);
+ if (rc < 0)
+ break;
+
+ ra_end = page_idx;
+ if (rc == 0)
+ ria->ria_reserved--;
} else if (stride_ria) {
/* If it is not in the read-ahead window, and it is
* read-ahead mode, then check whether it should skip
/* FIXME: This assertion only is valid when it is for
* forward read-ahead, it will be fixed when backward
* read-ahead is implemented */
- LASSERTF(page_idx > ria->ria_stoff, "Invalid page_idx %lu"
- "rs %lu re %lu ro %lu rl %lu rp %lu\n", page_idx,
- ria->ria_start, ria->ria_end, ria->ria_stoff,
- ria->ria_length, ria->ria_pages);
+ LASSERTF(page_idx >= ria->ria_stoff,
+ "Invalid page_idx %lu rs %lu re %lu ro %lu "
+ "rl %lu rp %lu\n", page_idx,
+ ria->ria_start, ria->ria_end, ria->ria_stoff,
+ ria->ria_length, ria->ria_pages);
offset = page_idx - ria->ria_stoff;
offset = offset % (ria->ria_length);
if (offset > ria->ria_pages) {
}
}
}
- *ra_end = page_idx;
- return count;
-}
-int ll_readahead(const struct lu_env *env, struct cl_io *io,
- struct ll_readahead_state *ras, struct address_space *mapping,
- struct cl_page_list *queue, int flags)
-{
- struct vvp_io *vio = vvp_env_io(env);
- struct vvp_thread_info *vti = vvp_env_info(env);
- struct cl_attr *attr = ccc_env_thread_attr(env);
- unsigned long start = 0, end = 0, reserved;
- unsigned long ra_end, len;
- struct inode *inode;
- struct ll_ra_read *bead;
- struct ra_io_arg *ria = &vti->vti_ria;
- struct ll_inode_info *lli;
- struct cl_object *clob;
- int ret = 0;
- __u64 kms;
- ENTRY;
+ cl_read_ahead_release(env, &ra);
- inode = mapping->host;
- lli = ll_i2info(inode);
- clob = lli->lli_clob;
-
- memset(ria, 0, sizeof *ria);
+ return ra_end;
+}
- cl_object_attr_lock(clob);
- ret = cl_object_attr_get(env, clob, attr);
- cl_object_attr_unlock(clob);
+static int ll_readahead(const struct lu_env *env, struct cl_io *io,
+ struct cl_page_list *queue,
+ struct ll_readahead_state *ras, bool hit)
+{
+ struct vvp_io *vio = vvp_env_io(env);
+ struct ll_thread_info *lti = ll_env_info(env);
+ struct cl_attr *attr = vvp_env_thread_attr(env);
+ unsigned long len, mlen = 0;
+ pgoff_t ra_end, start = 0, end = 0;
+ struct inode *inode;
+ struct ra_io_arg *ria = <i->lti_ria;
+ struct cl_object *clob;
+ int ret = 0;
+ __u64 kms;
+ ENTRY;
+
+ clob = io->ci_obj;
+ inode = vvp_object_inode(clob);
+
+ memset(ria, 0, sizeof *ria);
+
+ cl_object_attr_lock(clob);
+ ret = cl_object_attr_get(env, clob, attr);
+ cl_object_attr_unlock(clob);
+
+ if (ret != 0)
+ RETURN(ret);
+ kms = attr->cat_kms;
+ if (kms == 0) {
+ ll_ra_stats_inc(inode, RA_STAT_ZERO_LEN);
+ RETURN(0);
+ }
+
+ spin_lock(&ras->ras_lock);
+
+ /**
+ * Note: other thread might rollback the ras_next_readahead,
+ * if it can not get the full size of prepared pages, see the
+ * end of this function. For stride read ahead, it needs to
+ * make sure the offset is no less than ras_stride_offset,
+ * so that stride read ahead can work correctly.
+ */
+ if (stride_io_mode(ras))
+ start = max(ras->ras_next_readahead, ras->ras_stride_offset);
+ else
+ start = ras->ras_next_readahead;
+
+ if (ras->ras_window_len > 0)
+ end = ras->ras_window_start + ras->ras_window_len - 1;
+
+ /* Enlarge the RA window to encompass the full read */
+ if (vio->vui_ra_valid &&
+ end < vio->vui_ra_start + vio->vui_ra_count - 1)
+ end = vio->vui_ra_start + vio->vui_ra_count - 1;
- if (ret != 0)
- RETURN(ret);
- kms = attr->cat_kms;
- if (kms == 0) {
- ll_ra_stats_inc(mapping, RA_STAT_ZERO_LEN);
- RETURN(0);
- }
+ if (end != 0) {
+ unsigned long end_index;
- cfs_spin_lock(&ras->ras_lock);
- if (vio->cui_ra_window_set)
- bead = &vio->cui_bead;
- else
- bead = NULL;
+ /* Truncate RA window to end of file */
+ end_index = (unsigned long)((kms - 1) >> PAGE_SHIFT);
+ if (end_index <= end) {
+ end = end_index;
+ ria->ria_eof = true;
+ }
- /* Enlarge the RA window to encompass the full read */
- if (bead != NULL && ras->ras_window_start + ras->ras_window_len <
- bead->lrr_start + bead->lrr_count) {
- ras->ras_window_len = bead->lrr_start + bead->lrr_count -
- ras->ras_window_start;
- }
- /* Reserve a part of the read-ahead window that we'll be issuing */
- if (ras->ras_window_len) {
- start = ras->ras_next_readahead;
- end = ras->ras_window_start + ras->ras_window_len - 1;
- }
- if (end != 0) {
- unsigned long rpc_boundary;
- /*
- * Align RA window to an optimal boundary.
- *
- * XXX This would be better to align to cl_max_pages_per_rpc
- * instead of PTLRPC_MAX_BRW_PAGES, because the RPC size may
- * be aligned to the RAID stripe size in the future and that
- * is more important than the RPC size.
- */
- /* Note: we only trim the RPC, instead of extending the RPC
- * to the boundary, so to avoid reading too much pages during
- * random reading. */
- rpc_boundary = ((end + 1) & (~(PTLRPC_MAX_BRW_PAGES - 1)));
- if (rpc_boundary > 0)
- rpc_boundary--;
-
- if (rpc_boundary > start)
- end = rpc_boundary;
-
- /* Truncate RA window to end of file */
- end = min(end, (unsigned long)((kms - 1) >> CFS_PAGE_SHIFT));
-
- ras->ras_next_readahead = max(end, end + 1);
- RAS_CDEBUG(ras);
+ ras->ras_next_readahead = max(end, end + 1);
+ RAS_CDEBUG(ras);
}
ria->ria_start = start;
ria->ria_end = end;
ria->ria_length = ras->ras_stride_length;
ria->ria_pages = ras->ras_stride_pages;
}
- cfs_spin_unlock(&ras->ras_lock);
-
- if (end == 0) {
- ll_ra_stats_inc(mapping, RA_STAT_ZERO_WINDOW);
- RETURN(0);
- }
- len = ria_page_count(ria);
- if (len == 0)
- RETURN(0);
-
- reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len);
- if (reserved < len)
- ll_ra_stats_inc(mapping, RA_STAT_MAX_IN_FLIGHT);
-
- CDEBUG(D_READA, "reserved page %lu ra_cur %d ra_max %lu\n", reserved,
- cfs_atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
- ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
-
- ret = ll_read_ahead_pages(env, io, queue,
- ria, &reserved, mapping, &ra_end);
-
- LASSERTF(reserved >= 0, "reserved %lu\n", reserved);
- if (reserved != 0)
- ll_ra_count_put(ll_i2sbi(inode), reserved);
-
- if (ra_end == end + 1 && ra_end == (kms >> CFS_PAGE_SHIFT))
- ll_ra_stats_inc(mapping, RA_STAT_EOF);
-
- /* if we didn't get to the end of the region we reserved from
- * the ras we need to go back and update the ras so that the
- * next read-ahead tries from where we left off. we only do so
- * if the region we failed to issue read-ahead on is still ahead
- * of the app and behind the next index to start read-ahead from */
- CDEBUG(D_READA, "ra_end %lu end %lu stride end %lu \n",
- ra_end, end, ria->ria_end);
-
- if (ra_end != end + 1) {
- cfs_spin_lock(&ras->ras_lock);
- if (ra_end < ras->ras_next_readahead &&
- index_in_window(ra_end, ras->ras_window_start, 0,
- ras->ras_window_len)) {
- ras->ras_next_readahead = ra_end;
- RAS_CDEBUG(ras);
- }
- cfs_spin_unlock(&ras->ras_lock);
- }
-
- RETURN(ret);
+ spin_unlock(&ras->ras_lock);
+
+ if (end == 0) {
+ ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
+ RETURN(0);
+ }
+ len = ria_page_count(ria);
+ if (len == 0) {
+ ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
+ RETURN(0);
+ }
+
+ CDEBUG(D_READA, DFID": ria: %lu/%lu, bead: %lu/%lu, hit: %d\n",
+ PFID(lu_object_fid(&clob->co_lu)),
+ ria->ria_start, ria->ria_end,
+ vio->vui_ra_valid ? vio->vui_ra_start : 0,
+ vio->vui_ra_valid ? vio->vui_ra_count : 0,
+ hit);
+
+ /* at least to extend the readahead window to cover current read */
+ if (!hit && vio->vui_ra_valid &&
+ vio->vui_ra_start + vio->vui_ra_count > ria->ria_start) {
+ unsigned long remainder;
+
+ /* to the end of current read window. */
+ mlen = vio->vui_ra_start + vio->vui_ra_count - ria->ria_start;
+ /* trim to RPC boundary */
+ ras_align(ras, ria->ria_start, &remainder);
+ mlen = min(mlen, ras->ras_rpc_size - remainder);
+ ria->ria_end_min = ria->ria_start + mlen;
+ }
+
+ ria->ria_reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len, mlen);
+ if (ria->ria_reserved < len)
+ ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
+
+ CDEBUG(D_READA, "reserved pages: %lu/%lu/%lu, ra_cur %d, ra_max %lu\n",
+ ria->ria_reserved, len, mlen,
+ atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
+ ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
+
+ ra_end = ll_read_ahead_pages(env, io, queue, ras, ria);
+
+ if (ria->ria_reserved != 0)
+ ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved);
+
+ if (ra_end == end && ra_end == (kms >> PAGE_SHIFT))
+ ll_ra_stats_inc(inode, RA_STAT_EOF);
+
+ /* if we didn't get to the end of the region we reserved from
+ * the ras we need to go back and update the ras so that the
+ * next read-ahead tries from where we left off. we only do so
+ * if the region we failed to issue read-ahead on is still ahead
+ * of the app and behind the next index to start read-ahead from */
+ CDEBUG(D_READA, "ra_end = %lu end = %lu stride end = %lu pages = %d\n",
+ ra_end, end, ria->ria_end, ret);
+
+ if (ra_end > 0 && ra_end != end) {
+ ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
+ spin_lock(&ras->ras_lock);
+ if (ra_end <= ras->ras_next_readahead &&
+ index_in_window(ra_end, ras->ras_window_start, 0,
+ ras->ras_window_len)) {
+ ras->ras_next_readahead = ra_end + 1;
+ RAS_CDEBUG(ras);
+ }
+ spin_unlock(&ras->ras_lock);
+ }
+
+ RETURN(ret);
}
-static void ras_set_start(struct ll_readahead_state *ras, unsigned long index)
+static void ras_set_start(struct inode *inode, struct ll_readahead_state *ras,
+ unsigned long index)
{
- ras->ras_window_start = index & (~(RAS_INCREASE_STEP - 1));
+ ras->ras_window_start = ras_align(ras, index, NULL);
}
/* called with the ras_lock held or from places where it doesn't matter */
-static void ras_reset(struct ll_readahead_state *ras, unsigned long index)
+static void ras_reset(struct inode *inode, struct ll_readahead_state *ras,
+ unsigned long index)
{
- ras->ras_last_readpage = index;
- ras->ras_consecutive_requests = 0;
- ras->ras_consecutive_pages = 0;
- ras->ras_window_len = 0;
- ras_set_start(ras, index);
- ras->ras_next_readahead = max(ras->ras_window_start, index);
-
- RAS_CDEBUG(ras);
+ ras->ras_last_readpage = index;
+ ras->ras_consecutive_requests = 0;
+ ras->ras_consecutive_pages = 0;
+ ras->ras_window_len = 0;
+ ras_set_start(inode, ras, index);
+ ras->ras_next_readahead = max(ras->ras_window_start, index + 1);
+
+ RAS_CDEBUG(ras);
}
/* called with the ras_lock held or from places where it doesn't matter */
void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
{
- cfs_spin_lock_init(&ras->ras_lock);
- ras_reset(ras, 0);
- ras->ras_requests = 0;
- CFS_INIT_LIST_HEAD(&ras->ras_read_beads);
+ spin_lock_init(&ras->ras_lock);
+ ras->ras_rpc_size = PTLRPC_MAX_BRW_PAGES;
+ ras_reset(inode, ras, 0);
+ ras->ras_requests = 0;
}
/*
* Check whether the read request is in the stride window.
* If it is in the stride window, return 1, otherwise return 0.
*/
-static int index_in_stride_window(unsigned long index,
- struct ll_readahead_state *ras,
- struct inode *inode)
+static int index_in_stride_window(struct ll_readahead_state *ras,
+ unsigned long index)
{
- unsigned long stride_gap = index - ras->ras_last_readpage - 1;
+ unsigned long stride_gap;
+
+ if (ras->ras_stride_length == 0 || ras->ras_stride_pages == 0 ||
+ ras->ras_stride_pages == ras->ras_stride_length)
+ return 0;
- if (ras->ras_stride_length == 0 || ras->ras_stride_pages == 0 ||
- ras->ras_stride_pages == ras->ras_stride_length)
- return 0;
+ stride_gap = index - ras->ras_last_readpage - 1;
- /* If it is contiguous read */
- if (stride_gap == 0)
- return ras->ras_consecutive_pages + 1 <= ras->ras_stride_pages;
+ /* If it is contiguous read */
+ if (stride_gap == 0)
+ return ras->ras_consecutive_pages + 1 <= ras->ras_stride_pages;
- /*Otherwise check the stride by itself */
- return (ras->ras_stride_length - ras->ras_stride_pages) == stride_gap &&
- ras->ras_consecutive_pages == ras->ras_stride_pages;
+ /* Otherwise check the stride by itself */
+ return (ras->ras_stride_length - ras->ras_stride_pages) == stride_gap &&
+ ras->ras_consecutive_pages == ras->ras_stride_pages;
}
static void ras_update_stride_detector(struct ll_readahead_state *ras,
RAS_CDEBUG(ras);
}
-static void ras_increase_window(struct ll_readahead_state *ras,
- struct ll_ra_info *ra, struct inode *inode)
+static void ras_increase_window(struct inode *inode,
+ struct ll_readahead_state *ras,
+ struct ll_ra_info *ra)
{
- /* The stretch of ra-window should be aligned with max rpc_size
- * but current clio architecture does not support retrieve such
- * information from lower layer. FIXME later
- */
- if (stride_io_mode(ras))
- ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP);
- else
- ras->ras_window_len = min(ras->ras_window_len +
- RAS_INCREASE_STEP,
- ra->ra_max_pages_per_file);
+ /* The stretch of ra-window should be aligned with max rpc_size
+ * but current clio architecture does not support retrieve such
+ * information from lower layer. FIXME later
+ */
+ if (stride_io_mode(ras)) {
+ ras_stride_increase_window(ras, ra, ras->ras_rpc_size);
+ } else {
+ unsigned long wlen;
+
+ wlen = min(ras->ras_window_len + ras->ras_rpc_size,
+ ra->ra_max_pages_per_file);
+ ras->ras_window_len = ras_align(ras, wlen, NULL);
+ }
}
-void ras_update(struct ll_sb_info *sbi, struct inode *inode,
- struct ll_readahead_state *ras, unsigned long index,
- unsigned hit)
+static void ras_update(struct ll_sb_info *sbi, struct inode *inode,
+ struct ll_readahead_state *ras, unsigned long index,
+ enum ras_update_flags flags)
{
- struct ll_ra_info *ra = &sbi->ll_ra_info;
- int zero = 0, stride_detect = 0, ra_miss = 0;
- ENTRY;
+ struct ll_ra_info *ra = &sbi->ll_ra_info;
+ bool hit = flags & LL_RAS_HIT;
+ int zero = 0, stride_detect = 0, ra_miss = 0;
+ ENTRY;
- cfs_spin_lock(&ras->ras_lock);
+ spin_lock(&ras->ras_lock);
+ if (!hit)
+ CDEBUG(D_READA, DFID " pages at %lu miss.\n",
+ PFID(ll_inode2fid(inode)), index);
ll_ra_stats_inc_sbi(sbi, hit ? RA_STAT_HIT : RA_STAT_MISS);
/* reset the read-ahead window in two cases. First when the app seeks
* and only occurs once per open file. Normal RA behavior is reverted
* to for subsequent IO. The mmap case does not increment
* ras_requests and thus can never trigger this behavior. */
- if (ras->ras_requests == 2 && !ras->ras_request_index) {
- __u64 kms_pages;
+ if (ras->ras_requests >= 2 && !ras->ras_request_index) {
+ __u64 kms_pages;
- kms_pages = (i_size_read(inode) + CFS_PAGE_SIZE - 1) >>
- CFS_PAGE_SHIFT;
+ kms_pages = (i_size_read(inode) + PAGE_SIZE - 1) >>
+ PAGE_SHIFT;
- CDEBUG(D_READA, "kmsp "LPU64" mwp %lu mp %lu\n", kms_pages,
+ CDEBUG(D_READA, "kmsp %llu mwp %lu mp %lu\n", kms_pages,
ra->ra_max_read_ahead_whole_pages, ra->ra_max_pages_per_file);
if (kms_pages &&
kms_pages <= ra->ra_max_read_ahead_whole_pages) {
ras->ras_window_start = 0;
- ras->ras_last_readpage = 0;
- ras->ras_next_readahead = 0;
+ ras->ras_next_readahead = index + 1;
ras->ras_window_len = min(ra->ra_max_pages_per_file,
ra->ra_max_read_ahead_whole_pages);
GOTO(out_unlock, 0);
}
}
- if (zero) {
- /* check whether it is in stride I/O mode*/
- if (!index_in_stride_window(index, ras, inode)) {
- if (ras->ras_consecutive_stride_requests == 0 &&
- ras->ras_request_index == 0) {
- ras_update_stride_detector(ras, index);
- ras->ras_consecutive_stride_requests ++;
- } else {
- ras_stride_reset(ras);
- }
- ras_reset(ras, index);
- ras->ras_consecutive_pages++;
- GOTO(out_unlock, 0);
- } else {
- ras->ras_consecutive_pages = 0;
- ras->ras_consecutive_requests = 0;
- if (++ras->ras_consecutive_stride_requests > 1)
- stride_detect = 1;
- RAS_CDEBUG(ras);
- }
- } else {
- if (ra_miss) {
- if (index_in_stride_window(index, ras, inode) &&
- stride_io_mode(ras)) {
- /*If stride-RA hit cache miss, the stride dector
- *will not be reset to avoid the overhead of
- *redetecting read-ahead mode */
- if (index != ras->ras_last_readpage + 1)
- ras->ras_consecutive_pages = 0;
- ras_reset(ras, index);
- RAS_CDEBUG(ras);
- } else {
- /* Reset both stride window and normal RA
- * window */
- ras_reset(ras, index);
- ras->ras_consecutive_pages++;
- ras_stride_reset(ras);
- GOTO(out_unlock, 0);
- }
- } else if (stride_io_mode(ras)) {
- /* If this is contiguous read but in stride I/O mode
- * currently, check whether stride step still is valid,
- * if invalid, it will reset the stride ra window*/
- if (!index_in_stride_window(index, ras, inode)) {
- /* Shrink stride read-ahead window to be zero */
- ras_stride_reset(ras);
- ras->ras_window_len = 0;
- ras->ras_next_readahead = index;
- }
- }
- }
- ras->ras_consecutive_pages++;
- ras->ras_last_readpage = index;
- ras_set_start(ras, index);
-
- if (stride_io_mode(ras))
- /* Since stride readahead is sentivite to the offset
- * of read-ahead, so we use original offset here,
- * instead of ras_window_start, which is 1M aligned*/
- ras->ras_next_readahead = max(index,
- ras->ras_next_readahead);
- else
- ras->ras_next_readahead = max(ras->ras_window_start,
- ras->ras_next_readahead);
- RAS_CDEBUG(ras);
-
- /* Trigger RA in the mmap case where ras_consecutive_requests
- * is not incremented and thus can't be used to trigger RA */
- if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) {
- ras->ras_window_len = RAS_INCREASE_STEP;
- GOTO(out_unlock, 0);
- }
-
- /* Initially reset the stride window offset to next_readahead*/
- if (ras->ras_consecutive_stride_requests == 2 && stride_detect) {
- /**
- * Once stride IO mode is detected, next_readahead should be
- * reset to make sure next_readahead > stride offset
- */
- ras->ras_next_readahead = max(index, ras->ras_next_readahead);
- ras->ras_stride_offset = index;
- ras->ras_window_len = RAS_INCREASE_STEP;
- }
-
- /* The initial ras_window_len is set to the request size. To avoid
- * uselessly reading and discarding pages for random IO the window is
- * only increased once per consecutive request received. */
- if ((ras->ras_consecutive_requests > 1 || stride_detect) &&
- !ras->ras_request_index)
- ras_increase_window(ras, ra, inode);
- EXIT;
+ if (zero) {
+ /* check whether it is in stride I/O mode*/
+ if (!index_in_stride_window(ras, index)) {
+ if (ras->ras_consecutive_stride_requests == 0 &&
+ ras->ras_request_index == 0) {
+ ras_update_stride_detector(ras, index);
+ ras->ras_consecutive_stride_requests++;
+ } else {
+ ras_stride_reset(ras);
+ }
+ ras_reset(inode, ras, index);
+ ras->ras_consecutive_pages++;
+ GOTO(out_unlock, 0);
+ } else {
+ ras->ras_consecutive_pages = 0;
+ ras->ras_consecutive_requests = 0;
+ if (++ras->ras_consecutive_stride_requests > 1)
+ stride_detect = 1;
+ RAS_CDEBUG(ras);
+ }
+ } else {
+ if (ra_miss) {
+ if (index_in_stride_window(ras, index) &&
+ stride_io_mode(ras)) {
+ if (index != ras->ras_last_readpage + 1)
+ ras->ras_consecutive_pages = 0;
+ ras_reset(inode, ras, index);
+
+ /* If stride-RA hit cache miss, the stride
+ * detector will not be reset to avoid the
+ * overhead of redetecting read-ahead mode,
+ * but on the condition that the stride window
+ * is still intersect with normal sequential
+ * read-ahead window. */
+ if (ras->ras_window_start <
+ ras->ras_stride_offset)
+ ras_stride_reset(ras);
+ RAS_CDEBUG(ras);
+ } else {
+ /* Reset both stride window and normal RA
+ * window */
+ ras_reset(inode, ras, index);
+ ras->ras_consecutive_pages++;
+ ras_stride_reset(ras);
+ GOTO(out_unlock, 0);
+ }
+ } else if (stride_io_mode(ras)) {
+ /* If this is contiguous read but in stride I/O mode
+ * currently, check whether stride step still is valid,
+ * if invalid, it will reset the stride ra window*/
+ if (!index_in_stride_window(ras, index)) {
+ /* Shrink stride read-ahead window to be zero */
+ ras_stride_reset(ras);
+ ras->ras_window_len = 0;
+ ras->ras_next_readahead = index;
+ }
+ }
+ }
+ ras->ras_consecutive_pages++;
+ ras->ras_last_readpage = index;
+ ras_set_start(inode, ras, index);
+
+ if (stride_io_mode(ras)) {
+ /* Since stride readahead is sentivite to the offset
+ * of read-ahead, so we use original offset here,
+ * instead of ras_window_start, which is RPC aligned */
+ ras->ras_next_readahead = max(index, ras->ras_next_readahead);
+ ras->ras_window_start = max(ras->ras_stride_offset,
+ ras->ras_window_start);
+ } else {
+ if (ras->ras_next_readahead < ras->ras_window_start)
+ ras->ras_next_readahead = ras->ras_window_start;
+ if (!hit)
+ ras->ras_next_readahead = index + 1;
+ }
+ RAS_CDEBUG(ras);
+
+ /* Trigger RA in the mmap case where ras_consecutive_requests
+ * is not incremented and thus can't be used to trigger RA */
+ if (ras->ras_consecutive_pages >= 4 && flags & LL_RAS_MMAP) {
+ ras_increase_window(inode, ras, ra);
+ /* reset consecutive pages so that the readahead window can
+ * grow gradually. */
+ ras->ras_consecutive_pages = 0;
+ GOTO(out_unlock, 0);
+ }
+
+ /* Initially reset the stride window offset to next_readahead*/
+ if (ras->ras_consecutive_stride_requests == 2 && stride_detect) {
+ /**
+ * Once stride IO mode is detected, next_readahead should be
+ * reset to make sure next_readahead > stride offset
+ */
+ ras->ras_next_readahead = max(index, ras->ras_next_readahead);
+ ras->ras_stride_offset = index;
+ ras->ras_window_start = max(index, ras->ras_window_start);
+ }
+
+ /* The initial ras_window_len is set to the request size. To avoid
+ * uselessly reading and discarding pages for random IO the window is
+ * only increased once per consecutive request received. */
+ if ((ras->ras_consecutive_requests > 1 || stride_detect) &&
+ !ras->ras_request_index)
+ ras_increase_window(inode, ras, ra);
+ EXIT;
out_unlock:
- RAS_CDEBUG(ras);
- ras->ras_request_index++;
- cfs_spin_unlock(&ras->ras_lock);
- return;
+ RAS_CDEBUG(ras);
+ ras->ras_request_index++;
+ spin_unlock(&ras->ras_lock);
+ return;
}
int ll_writepage(struct page *vmpage, struct writeback_control *wbc)
{
- struct inode *inode = vmpage->mapping->host;
+ struct inode *inode = vmpage->mapping->host;
+ struct ll_inode_info *lli = ll_i2info(inode);
struct lu_env *env;
struct cl_io *io;
struct cl_page *page;
struct cl_object *clob;
- struct cl_2queue *queue;
- struct cl_env_nest nest;
+ bool redirtied = false;
+ bool unlocked = false;
int result;
+ __u16 refcheck;
ENTRY;
LASSERT(PageLocked(vmpage));
LASSERT(!PageWriteback(vmpage));
- if (ll_i2dtexp(inode) == NULL)
- RETURN(-EINVAL);
+ LASSERT(ll_i2dtexp(inode) != NULL);
- env = cl_env_nested_get(&nest);
- if (IS_ERR(env))
- RETURN(PTR_ERR(env));
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ GOTO(out, result = PTR_ERR(env));
- queue = &vvp_env_info(env)->vti_queue;
clob = ll_i2info(inode)->lli_clob;
LASSERT(clob != NULL);
- io = ccc_env_thread_io(env);
+ io = vvp_env_thread_io(env);
io->ci_obj = clob;
+ io->ci_ignore_layout = 1;
result = cl_io_init(env, io, CIT_MISC, clob);
if (result == 0) {
page = cl_page_find(env, clob, vmpage->index,
vmpage, CPT_CACHEABLE);
- if (!IS_ERR(page)) {
- lu_ref_add(&page->cp_reference, "writepage",
- cfs_current());
- cl_page_assume(env, io, page);
- /*
- * Mark page dirty, because this is what
- * ->vio_submit()->cpo_prep_write() assumes.
- *
- * XXX better solution is to detect this from within
- * cl_io_submit_rw() somehow.
- */
- set_page_dirty(vmpage);
- cl_2queue_init_page(queue, page);
- result = cl_io_submit_rw(env, io, CRT_WRITE,
- queue, CRP_NORMAL);
- if (result != 0) {
- /*
- * Re-dirty page on error so it retries write,
- * but not in case when IO has actually
- * occurred and completed with an error.
- */
- if (!PageError(vmpage)) {
- redirty_page_for_writepage(wbc, vmpage);
- result = 0;
- }
- }
- cl_page_list_disown(env, io, &queue->c2_qin);
- LASSERT(!cl_page_is_owned(page, io));
- lu_ref_del(&page->cp_reference,
- "writepage", cfs_current());
- cl_page_put(env, page);
- cl_2queue_fini(env, queue);
- }
+ if (!IS_ERR(page)) {
+ lu_ref_add(&page->cp_reference, "writepage",
+ current);
+ cl_page_assume(env, io, page);
+ result = cl_page_flush(env, io, page);
+ if (result != 0) {
+ /*
+ * Re-dirty page on error so it retries write,
+ * but not in case when IO has actually
+ * occurred and completed with an error.
+ */
+ if (!PageError(vmpage)) {
+ redirty_page_for_writepage(wbc, vmpage);
+ result = 0;
+ redirtied = true;
+ }
+ }
+ cl_page_disown(env, io, page);
+ unlocked = true;
+ lu_ref_del(&page->cp_reference,
+ "writepage", current);
+ cl_page_put(env, page);
+ } else {
+ result = PTR_ERR(page);
+ }
}
cl_io_fini(env, io);
- cl_env_nested_put(&nest, env);
- RETURN(result);
+
+ if (redirtied && wbc->sync_mode == WB_SYNC_ALL) {
+ loff_t offset = cl_offset(clob, vmpage->index);
+
+ /* Flush page failed because the extent is being written out.
+ * Wait for the write of extent to be finished to avoid
+ * breaking kernel which assumes ->writepage should mark
+ * PageWriteback or clean the page. */
+ result = cl_sync_file_range(inode, offset,
+ offset + PAGE_SIZE - 1,
+ CL_FSYNC_LOCAL, 1);
+ if (result > 0) {
+ /* actually we may have written more than one page.
+ * decreasing this page because the caller will count
+ * it. */
+ wbc->nr_to_write -= result - 1;
+ result = 0;
+ }
+ }
+
+ cl_env_put(env, &refcheck);
+ GOTO(out, result);
+
+out:
+ if (result < 0) {
+ if (!lli->lli_async_rc)
+ lli->lli_async_rc = result;
+ SetPageError(vmpage);
+ if (!unlocked)
+ unlock_page(vmpage);
+ }
+ return result;
}
-int ll_readpage(struct file *file, struct page *vmpage)
+int ll_writepages(struct address_space *mapping, struct writeback_control *wbc)
{
- struct ll_cl_context *lcc;
- int result;
- ENTRY;
+ struct inode *inode = mapping->host;
+ loff_t start;
+ loff_t end;
+ enum cl_fsync_mode mode;
+ int range_whole = 0;
+ int result;
+ ENTRY;
+
+ if (wbc->range_cyclic) {
+ start = mapping->writeback_index << PAGE_SHIFT;
+ end = OBD_OBJECT_EOF;
+ } else {
+ start = wbc->range_start;
+ end = wbc->range_end;
+ if (end == LLONG_MAX) {
+ end = OBD_OBJECT_EOF;
+ range_whole = start == 0;
+ }
+ }
+
+ mode = CL_FSYNC_NONE;
+ if (wbc->sync_mode == WB_SYNC_ALL)
+ mode = CL_FSYNC_LOCAL;
+
+ if (ll_i2info(inode)->lli_clob == NULL)
+ RETURN(0);
+
+ /* for directio, it would call writepages() to evict cached pages
+ * inside the IO context of write, which will cause deadlock at
+ * layout_conf since it waits for active IOs to complete. */
+ result = cl_sync_file_range(inode, start, end, mode, 1);
+ if (result > 0) {
+ wbc->nr_to_write -= result;
+ result = 0;
+ }
+
+ if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) {
+ if (end == OBD_OBJECT_EOF)
+ mapping->writeback_index = 0;
+ else
+ mapping->writeback_index = (end >> PAGE_SHIFT) + 1;
+ }
+ RETURN(result);
+}
- lcc = ll_cl_init(file, vmpage, 0);
- if (!IS_ERR(lcc)) {
- struct lu_env *env = lcc->lcc_env;
- struct cl_io *io = lcc->lcc_io;
- struct cl_page *page = lcc->lcc_page;
-
- LASSERT(page->cp_type == CPT_CACHEABLE);
- if (likely(!PageUptodate(vmpage))) {
- cl_page_assume(env, io, page);
- result = cl_io_read_page(env, io, page);
- } else {
- /* Page from a non-object file. */
- LASSERT(!ll_i2info(vmpage->mapping->host)->lli_smd);
- unlock_page(vmpage);
- result = 0;
- }
- ll_cl_fini(lcc);
- } else {
- unlock_page(vmpage);
- result = PTR_ERR(lcc);
+struct ll_cl_context *ll_cl_find(struct file *file)
+{
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_cl_context *lcc;
+ struct ll_cl_context *found = NULL;
+
+ read_lock(&fd->fd_lock);
+ list_for_each_entry(lcc, &fd->fd_lccs, lcc_list) {
+ if (lcc->lcc_cookie == current) {
+ found = lcc;
+ break;
+ }
+ }
+ read_unlock(&fd->fd_lock);
+
+ return found;
+}
+
+void ll_cl_add(struct file *file, const struct lu_env *env, struct cl_io *io,
+ enum lcc_type type)
+{
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_cl_context *lcc = &ll_env_info(env)->lti_io_ctx;
+
+ memset(lcc, 0, sizeof(*lcc));
+ INIT_LIST_HEAD(&lcc->lcc_list);
+ lcc->lcc_cookie = current;
+ lcc->lcc_env = env;
+ lcc->lcc_io = io;
+ lcc->lcc_type = type;
+
+ write_lock(&fd->fd_lock);
+ list_add(&lcc->lcc_list, &fd->fd_lccs);
+ write_unlock(&fd->fd_lock);
+}
+
+void ll_cl_remove(struct file *file, const struct lu_env *env)
+{
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_cl_context *lcc = &ll_env_info(env)->lti_io_ctx;
+
+ write_lock(&fd->fd_lock);
+ list_del_init(&lcc->lcc_list);
+ write_unlock(&fd->fd_lock);
+}
+
+static int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
+ struct cl_page *page, struct file *file)
+{
+ struct inode *inode = vvp_object_inode(page->cp_obj);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_readahead_state *ras = &fd->fd_ras;
+ struct cl_2queue *queue = &io->ci_queue;
+ struct vvp_page *vpg;
+ int rc = 0;
+ bool uptodate;
+ ENTRY;
+
+ vpg = cl2vvp_page(cl_object_page_slice(page->cp_obj, page));
+ uptodate = vpg->vpg_defer_uptodate;
+
+ if (sbi->ll_ra_info.ra_max_pages_per_file > 0 &&
+ sbi->ll_ra_info.ra_max_pages > 0 &&
+ !vpg->vpg_ra_updated) {
+ struct vvp_io *vio = vvp_env_io(env);
+ enum ras_update_flags flags = 0;
+
+ if (uptodate)
+ flags |= LL_RAS_HIT;
+ if (!vio->vui_ra_valid)
+ flags |= LL_RAS_MMAP;
+ ras_update(sbi, inode, ras, vvp_index(vpg), flags);
+ }
+
+ cl_2queue_init(queue);
+ if (uptodate) {
+ vpg->vpg_ra_used = 1;
+ cl_page_export(env, page, 1);
+ cl_page_disown(env, io, page);
+ } else {
+ cl_2queue_add(queue, page);
+ }
+
+ if (sbi->ll_ra_info.ra_max_pages_per_file > 0 &&
+ sbi->ll_ra_info.ra_max_pages > 0) {
+ int rc2;
+
+ rc2 = ll_readahead(env, io, &queue->c2_qin, ras,
+ uptodate);
+ CDEBUG(D_READA, DFID "%d pages read ahead at %lu\n",
+ PFID(ll_inode2fid(inode)), rc2, vvp_index(vpg));
+ }
+
+ if (queue->c2_qin.pl_nr > 0)
+ rc = cl_io_submit_rw(env, io, CRT_READ, queue);
+
+ /*
+ * Unlock unsent pages in case of error.
+ */
+ cl_page_list_disown(env, io, &queue->c2_qin);
+ cl_2queue_fini(env, queue);
+
+ RETURN(rc);
+}
+
+int ll_readpage(struct file *file, struct page *vmpage)
+{
+ struct inode *inode = file_inode(file);
+ struct cl_object *clob = ll_i2info(inode)->lli_clob;
+ struct ll_cl_context *lcc;
+ const struct lu_env *env = NULL;
+ struct cl_io *io = NULL;
+ struct cl_page *page;
+ int result;
+ ENTRY;
+
+ lcc = ll_cl_find(file);
+ if (lcc != NULL) {
+ env = lcc->lcc_env;
+ io = lcc->lcc_io;
+ }
+
+ if (io == NULL) { /* fast read */
+ struct inode *inode = file_inode(file);
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_readahead_state *ras = &fd->fd_ras;
+ struct lu_env *local_env = NULL;
+ struct vvp_page *vpg;
+
+ result = -ENODATA;
+
+ /* TODO: need to verify the layout version to make sure
+ * the page is not invalid due to layout change. */
+ page = cl_vmpage_page(vmpage, clob);
+ if (page == NULL) {
+ unlock_page(vmpage);
+ RETURN(result);
+ }
+
+ if (!env) {
+ local_env = cl_env_percpu_get();
+ env = local_env;
+ }
+
+ vpg = cl2vvp_page(cl_object_page_slice(page->cp_obj, page));
+ if (vpg->vpg_defer_uptodate) {
+ enum ras_update_flags flags = LL_RAS_HIT;
+
+ if (lcc && lcc->lcc_type == LCC_MMAP)
+ flags |= LL_RAS_MMAP;
+
+ /* For fast read, it updates read ahead state only
+ * if the page is hit in cache because non cache page
+ * case will be handled by slow read later. */
+ ras_update(ll_i2sbi(inode), inode, ras, vvp_index(vpg),
+ flags);
+ /* avoid duplicate ras_update() call */
+ vpg->vpg_ra_updated = 1;
+
+ /* Check if we can issue a readahead RPC, if that is
+ * the case, we can't do fast IO because we will need
+ * a cl_io to issue the RPC. */
+ if (ras->ras_window_start + ras->ras_window_len <
+ ras->ras_next_readahead + PTLRPC_MAX_BRW_PAGES) {
+ /* export the page and skip io stack */
+ vpg->vpg_ra_used = 1;
+ cl_page_export(env, page, 1);
+ result = 0;
+ }
+ }
+
+ /* release page refcount before unlocking the page to ensure
+ * the object won't be destroyed in the calling path of
+ * cl_page_put(). Please see comment in ll_releasepage(). */
+ cl_page_put(env, page);
+ unlock_page(vmpage);
+ if (local_env)
+ cl_env_percpu_put(local_env);
+
+ RETURN(result);
+ }
+
+ LASSERT(io->ci_state == CIS_IO_GOING);
+ page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
+ if (!IS_ERR(page)) {
+ LASSERT(page->cp_type == CPT_CACHEABLE);
+ if (likely(!PageUptodate(vmpage))) {
+ cl_page_assume(env, io, page);
+ result = ll_io_read_page(env, io, page, file);
+ } else {
+ /* Page from a non-object file. */
+ unlock_page(vmpage);
+ result = 0;
+ }
+ cl_page_put(env, page);
+ } else {
+ unlock_page(vmpage);
+ result = PTR_ERR(page);
}
- RETURN(result);
+ RETURN(result);
}
+int ll_page_sync_io(const struct lu_env *env, struct cl_io *io,
+ struct cl_page *page, enum cl_req_type crt)
+{
+ struct cl_2queue *queue;
+ int result;
+
+ LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
+
+ queue = &io->ci_queue;
+ cl_2queue_init_page(queue, page);
+
+ result = cl_io_submit_sync(env, io, crt, queue, 0);
+ LASSERT(cl_page_is_owned(page, io));
+
+ if (crt == CRT_READ)
+ /*
+ * in CRT_WRITE case page is left locked even in case of
+ * error.
+ */
+ cl_page_list_disown(env, io, &queue->c2_qin);
+ cl_2queue_fini(env, queue);
+
+ return result;
+}