-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* GPL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre Lite I/O page cache routines shared by different kernel revs
*/
-#include <linux/autoconf.h>
#include <linux/kernel.h>
#include <linux/mm.h>
#include <linux/string.h>
#include <linux/stat.h>
#include <linux/errno.h>
-#include <linux/smp_lock.h>
#include <linux/unistd.h>
-#include <linux/version.h>
-#include <asm/system.h>
+#include <linux/writeback.h>
#include <asm/uaccess.h>
#include <linux/fs.h>
#include <asm/uaccess.h>
#include <linux/mm.h>
#include <linux/pagemap.h>
-#include <linux/smp_lock.h>
/* current_is_kswapd() */
#include <linux/swap.h>
#define DEBUG_SUBSYSTEM S_LLITE
-//#include <lustre_mdc.h>
#include <lustre_lite.h>
#include <obd_cksum.h>
#include "llite_internal.h"
#include <linux/lustre_compat25.h>
-/* this isn't where truncate starts. roughly:
- * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate. setattr_raw grabs
- * DLM lock on [size, EOF], i_mutex, ->lli_size_sem, and WRITE_I_ALLOC_SEM to
- * avoid races.
- *
- * must be called under ->lli_size_sem */
-void ll_truncate(struct inode *inode)
+/**
+ * Finalizes cl-data before exiting typical address_space operation. Dual to
+ * ll_cl_init().
+ */
+static void ll_cl_fini(struct ll_cl_context *lcc)
{
- struct ll_inode_info *lli = ll_i2info(inode);
- loff_t new_size;
- ENTRY;
- CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) to %Lu=%#Lx\n",inode->i_ino,
- inode->i_generation, inode, i_size_read(inode),
- i_size_read(inode));
+ struct lu_env *env = lcc->lcc_env;
+ struct cl_io *io = lcc->lcc_io;
+ struct cl_page *page = lcc->lcc_page;
- ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_TRUNC, 1);
- if (lli->lli_size_sem_owner != cfs_current()) {
- EXIT;
- return;
- }
+ LASSERT(lcc->lcc_cookie == current);
+ LASSERT(env != NULL);
- if (!lli->lli_smd) {
- CDEBUG(D_INODE, "truncate on inode %lu with no objects\n",
- inode->i_ino);
- GOTO(out_unlock, 0);
- }
- LASSERT_SEM_LOCKED(&lli->lli_size_sem);
-
- if (unlikely((ll_i2sbi(inode)->ll_flags & LL_SBI_CHECKSUM) &&
- (i_size_read(inode) & ~CFS_PAGE_MASK))) {
- /* If the truncate leaves a partial page, update its checksum */
- struct page *page = find_get_page(inode->i_mapping,
- i_size_read(inode) >>
- CFS_PAGE_SHIFT);
- if (page != NULL) {
-#if 0 /* XXX */
- struct ll_async_page *llap = llap_cast_private(page);
- if (llap != NULL) {
- char *kaddr = kmap_atomic(page, KM_USER0);
- llap->llap_checksum =
- init_checksum(OSC_DEFAULT_CKSUM);
- llap->llap_checksum =
- compute_checksum(llap->llap_checksum,
- kaddr, CFS_PAGE_SIZE,
- OSC_DEFAULT_CKSUM);
- kunmap_atomic(kaddr, KM_USER0);
- }
- page_cache_release(page);
-#endif
- }
+ if (page != NULL) {
+ lu_ref_del(&page->cp_reference, "cl_io", io);
+ cl_page_put(env, page);
}
- new_size = i_size_read(inode);
- ll_inode_size_unlock(inode, 0);
-
- EXIT;
- return;
-
- out_unlock:
- ll_inode_size_unlock(inode, 0);
-} /* ll_truncate */
+ if (io && lcc->lcc_created) {
+ cl_io_end(env, io);
+ cl_io_unlock(env, io);
+ cl_io_iter_fini(env, io);
+ cl_io_fini(env, io);
+ }
+ cl_env_put(env, &lcc->lcc_refcheck);
+}
/**
* Initializes common cl-data at the typical address_space operation entry
* point.
*/
-static int ll_cl_init(struct file *file, struct page *vmpage,
- struct lu_env **env,
- struct cl_io **io, struct cl_page **page, int *refcheck)
+static struct ll_cl_context *ll_cl_init(struct file *file,
+ struct page *vmpage, int create)
{
- struct lu_env *_env;
- struct cl_io *_io;
- struct cl_page *_page;
+ struct ll_cl_context *lcc;
+ struct lu_env *env;
+ struct cl_io *io;
struct cl_object *clob;
+ struct ccc_io *cio;
- int result;
-
- *env = NULL;
- *io = NULL;
- *page = NULL;
+ int refcheck;
+ int result = 0;
clob = ll_i2info(vmpage->mapping->host)->lli_clob;
LASSERT(clob != NULL);
- _env = cl_env_get(refcheck);
- if (!IS_ERR(env)) {
- struct ccc_io *cio = ccc_env_io(_env);
-
- *env = _env;
- *io = _io = cio->cui_cl.cis_io;
- if (_io != NULL) {
- LASSERT(_io->ci_state == CIS_IO_GOING);
- LASSERT(cio->cui_fd == LUSTRE_FPRIVATE(file));
- _page = cl_page_find(_env, clob, vmpage->index, vmpage,
- CPT_CACHEABLE);
- if (!IS_ERR(_page)) {
- *page = _page;
- lu_ref_add(&_page->cp_reference, "cl_io", _io);
- result = 0;
- } else
- result = PTR_ERR(_page);
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ return ERR_PTR(PTR_ERR(env));
+
+ lcc = &vvp_env_info(env)->vti_io_ctx;
+ memset(lcc, 0, sizeof(*lcc));
+ lcc->lcc_env = env;
+ lcc->lcc_refcheck = refcheck;
+ lcc->lcc_cookie = current;
+
+ cio = ccc_env_io(env);
+ io = cio->cui_cl.cis_io;
+ if (io == NULL && create) {
+ struct inode *inode = vmpage->mapping->host;
+ loff_t pos;
+
+ if (mutex_trylock(&inode->i_mutex)) {
+ mutex_unlock(&(inode)->i_mutex);
+
+ /* this is too bad. Someone is trying to write the
+ * page w/o holding inode mutex. This means we can
+ * add dirty pages into cache during truncate */
+ CERROR("Proc %s is dirting page w/o inode lock, this"
+ "will break truncate.\n", cfs_current()->comm);
+ libcfs_debug_dumpstack(NULL);
+ LBUG();
+ return ERR_PTR(-EIO);
+ }
+
+ /*
+ * Loop-back driver calls ->prepare_write() and ->sendfile()
+ * methods directly, bypassing file system ->write() operation,
+ * so cl_io has to be created here.
+ */
+ io = ccc_env_thread_io(env);
+ ll_io_init(io, file, 1);
+
+ /* No lock at all for this kind of IO - we can't do it because
+ * we have held page lock, it would cause deadlock.
+ * XXX: This causes poor performance to loop device - One page
+ * per RPC.
+ * In order to get better performance, users should use
+ * lloop driver instead.
+ */
+ io->ci_lockreq = CILR_NEVER;
+
+ pos = (vmpage->index << CFS_PAGE_SHIFT);
+
+ /* Create a temp IO to serve write. */
+ result = cl_io_rw_init(env, io, CIT_WRITE, pos, CFS_PAGE_SIZE);
+ if (result == 0) {
+ cio->cui_fd = LUSTRE_FPRIVATE(file);
+ cio->cui_iov = NULL;
+ cio->cui_nrsegs = 0;
+ result = cl_io_iter_init(env, io);
+ if (result == 0) {
+ result = cl_io_lock(env, io);
+ if (result == 0)
+ result = cl_io_start(env, io);
+ }
} else
- /*
- * This is for a case where operation can be called
- * either with or without cl_io created by the upper
- * layer (e.g., ->prepare_write() called directly from
- * loop-back driver).
- */
- result = -EALREADY;
- } else
- result = PTR_ERR(_env);
- CDEBUG(D_VFSTRACE, "%lu@"DFID" -> %i %p %p %p\n",
- vmpage->index, PFID(lu_object_fid(&clob->co_lu)), result,
- *env, *io, *page);
- return result;
-}
+ result = io->ci_result;
+ lcc->lcc_created = 1;
+ }
-/**
- * Finalizes cl-data before exiting typical address_space operation. Dual to
- * ll_cl_init().
- */
-static void ll_cl_fini(struct lu_env *env,
- struct cl_io *io, struct cl_page *page, int *refcheck)
-{
- if (page != NULL) {
- lu_ref_del(&page->cp_reference, "cl_io", io);
- cl_page_put(env, page);
+ lcc->lcc_io = io;
+ if (io == NULL)
+ result = -EIO;
+ if (result == 0) {
+ struct cl_page *page;
+
+ LASSERT(io != NULL);
+ LASSERT(io->ci_state == CIS_IO_GOING);
+ LASSERT(cio->cui_fd == LUSTRE_FPRIVATE(file));
+ page = cl_page_find(env, clob, vmpage->index, vmpage,
+ CPT_CACHEABLE);
+ if (!IS_ERR(page)) {
+ lcc->lcc_page = page;
+ lu_ref_add(&page->cp_reference, "cl_io", io);
+ result = 0;
+ } else
+ result = PTR_ERR(page);
}
- if (env != NULL) {
- struct vvp_io *vio;
-
- vio = vvp_env_io(env);
- LASSERT(vio->cui_oneshot >= 0);
- if (vio->cui_oneshot > 0) {
- if (--vio->cui_oneshot == 0) {
- cl_io_end(env, io);
- cl_io_unlock(env, io);
- cl_io_iter_fini(env, io);
- cl_io_fini(env, io);
- /* to trigger assertion above, if ll_cl_fini()
- * is called against freed io. */
- vio->cui_oneshot = -1;
- }
- /* additional reference on env was acquired by io,
- * disable refcheck */
- refcheck = NULL;
- }
- cl_env_put(env, refcheck);
- } else
- LASSERT(io == NULL);
+ if (result) {
+ ll_cl_fini(lcc);
+ lcc = ERR_PTR(result);
+ }
+
+ CDEBUG(D_VFSTRACE, "%lu@"DFID" -> %d %p %p\n",
+ vmpage->index, PFID(lu_object_fid(&clob->co_lu)), result,
+ env, io);
+ return lcc;
}
-/**
- * Initializes one-shot cl_io for the case when loop driver calls
- * ->{prepare,commit}_write() methods directly.
- */
-static int ll_prepare_loop(struct lu_env *env, struct cl_io *io,
- struct file *file, struct page *vmpage,
- unsigned from, unsigned to)
+static struct ll_cl_context *ll_cl_get(void)
{
- struct vvp_io *vio;
- struct ccc_io *cio;
- int result;
- loff_t pos;
+ struct ll_cl_context *lcc;
+ struct lu_env *env;
+ int refcheck;
- vio = vvp_env_io(env);
- cio = ccc_env_io(env);
- ll_io_init(io, file, 1);
- pos = (vmpage->index << CFS_PAGE_SHIFT) + from;
- /*
- * Create IO and quickly drive it through CIS_{INIT,IT_STARTED,LOCKED}
- * states. DLM locks are not taken for vio->cui_oneshot IO---we cannot
- * take DLM locks here, because page is already locked. With new
- * ->write_{being,end}() address_space operations lustre might be
- * luckier.
- */
- result = cl_io_rw_init(env, io, CIT_WRITE, pos, from - to);
- if (result == 0) {
- cio->cui_fd = LUSTRE_FPRIVATE(file);
- vio->cui_oneshot = 1;
- result = cl_io_iter_init(env, io);
- if (result == 0) {
- result = cl_io_lock(env, io);
- if (result == 0)
- result = cl_io_start(env, io);
- }
- } else
- result = io->ci_result;
- return result;
+ env = cl_env_get(&refcheck);
+ LASSERT(!IS_ERR(env));
+ lcc = &vvp_env_info(env)->vti_io_ctx;
+ LASSERT(env == lcc->lcc_env);
+ LASSERT(current == lcc->lcc_cookie);
+ cl_env_put(env, &refcheck);
+
+ /* env has got in ll_cl_init, so it is still usable. */
+ return lcc;
}
/**
int ll_prepare_write(struct file *file, struct page *vmpage, unsigned from,
unsigned to)
{
- struct lu_env *env;
- struct cl_io *io;
- struct cl_page *page;
+ struct ll_cl_context *lcc;
int result;
- int refcheck;
ENTRY;
- result = ll_cl_init(file, vmpage, &env, &io, &page, &refcheck);
- /*
- * Loop-back driver calls ->prepare_write() and ->sendfile() methods
- * directly, bypassing file system ->write() operation, so cl_io has
- * to be created here.
- */
- if (result == -EALREADY) {
- io = &ccc_env_info(env)->cti_io;
- result = ll_prepare_loop(env, io, file, vmpage, from, to);
- if (result == 0) {
- result = ll_cl_init(file, vmpage,
- &env, &io, &page, &refcheck);
- cl_env_put(env, NULL);
- }
- }
- if (result == 0) {
+ lcc = ll_cl_init(file, vmpage, 1);
+ if (!IS_ERR(lcc)) {
+ struct lu_env *env = lcc->lcc_env;
+ struct cl_io *io = lcc->lcc_io;
+ struct cl_page *page = lcc->lcc_page;
+
cl_page_assume(env, io, page);
- if (cl_io_is_append(io)) {
- struct cl_object *obj = io->ci_obj;
- struct inode *inode = ccc_object_inode(obj);
- /**
- * In VFS file->page write loop, for appending, the
- * write offset might be reset according to the new
- * file size before holding i_mutex. So crw_pos should
- * be reset here. BUG:17711.
- */
- io->u.ci_wr.wr.crw_pos = i_size_read(inode);
- }
- result = cl_io_prepare_write(env, io, page, from, to);
+
+ result = cl_io_prepare_write(env, io, page, from, to);
if (result == 0) {
- struct vvp_io *vio;
-
/*
* Add a reference, so that page is not evicted from
* the cache until ->commit_write() is called.
cl_page_get(page);
lu_ref_add(&page->cp_reference, "prepare_write",
cfs_current());
- vio = vvp_env_io(env);
- if (vio->cui_oneshot > 0)
- vio->cui_oneshot++;
- } else
+ } else {
cl_page_unassume(env, io, page);
+ ll_cl_fini(lcc);
+ }
+ /* returning 0 in prepare assumes commit must be called
+ * afterwards */
+ } else {
+ result = PTR_ERR(lcc);
}
- ll_cl_fini(env, io, page, &refcheck);
RETURN(result);
}
int ll_commit_write(struct file *file, struct page *vmpage, unsigned from,
unsigned to)
{
+ struct ll_cl_context *lcc;
struct lu_env *env;
struct cl_io *io;
struct cl_page *page;
- int result;
- int refcheck;
+ int result = 0;
ENTRY;
- result = ll_cl_init(file, vmpage, &env, &io, &page, &refcheck);
- LASSERT(result != -EALREADY);
- if (result == 0) {
- LASSERT(cl_page_is_owned(page, io));
+ lcc = ll_cl_get();
+ env = lcc->lcc_env;
+ page = lcc->lcc_page;
+ io = lcc->lcc_io;
+
+ LASSERT(cl_page_is_owned(page, io));
+ LASSERT(from <= to);
+ if (from != to) /* handle short write case. */
result = cl_io_commit_write(env, io, page, from, to);
- if (cl_page_is_owned(page, io))
- cl_page_unassume(env, io, page);
- /*
- * Release reference acquired by cl_io_prepare_write().
- */
- lu_ref_del(&page->cp_reference, "prepare_write", cfs_current());
- cl_page_put(env, page);
- }
- ll_cl_fini(env, io, page, &refcheck);
+ if (cl_page_is_owned(page, io))
+ cl_page_unassume(env, io, page);
+
+ /*
+ * Release reference acquired by ll_prepare_write().
+ */
+ lu_ref_del(&page->cp_reference, "prepare_write", cfs_current());
+ cl_page_put(env, page);
+ ll_cl_fini(lcc);
RETURN(result);
}
static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which);
-/* WARNING: This algorithm is used to reduce the contention on
- * sbi->ll_lock. It should work well if the ra_max_pages is much
- * greater than the single file's read-ahead window.
+/**
+ * Get readahead pages from the filesystem readahead pool of the client for a
+ * thread.
*
- * TODO: There may exist a `global sync problem' in this implementation.
- * Considering the global ra window is 100M, and each file's ra window is 10M,
- * there are over 10 files trying to get its ra budget and reach
- * ll_ra_count_get at the exactly same time. All of them will get a zero ra
- * window, although the global window is 100M. -jay
- */
-static unsigned long ll_ra_count_get(struct ll_sb_info *sbi, unsigned long len)
+ * /param sbi superblock for filesystem readahead state ll_ra_info
+ * /param ria per-thread readahead state
+ * /param pages number of pages requested for readahead for the thread.
+ *
+ * WARNING: This algorithm is used to reduce contention on sbi->ll_lock.
+ * It should work well if the ra_max_pages is much greater than the single
+ * file's read-ahead window, and not too many threads contending for
+ * these readahead pages.
+ *
+ * TODO: There may be a 'global sync problem' if many threads are trying
+ * to get an ra budget that is larger than the remaining readahead pages
+ * and reach here at exactly the same time. They will compute /a ret to
+ * consume the remaining pages, but will fail at atomic_add_return() and
+ * get a zero ra window, although there is still ra space remaining. - Jay */
+
+static unsigned long ll_ra_count_get(struct ll_sb_info *sbi,
+ struct ra_io_arg *ria,
+ unsigned long pages)
{
struct ll_ra_info *ra = &sbi->ll_ra_info;
- unsigned long ret;
+ long ret;
ENTRY;
- /**
- * If read-ahead pages left are less than 1M, do not do read-ahead,
+ /* If read-ahead pages left are less than 1M, do not do read-ahead,
* otherwise it will form small read RPC(< 1M), which hurt server
- * performance a lot.
- */
- ret = min(ra->ra_max_pages - atomic_read(&ra->ra_cur_pages), len);
- if ((int)ret < 0 || ret < min((unsigned long)PTLRPC_MAX_BRW_PAGES, len))
+ * performance a lot. */
+ ret = min(ra->ra_max_pages - cfs_atomic_read(&ra->ra_cur_pages), pages);
+ if (ret < 0 || ret < min_t(long, PTLRPC_MAX_BRW_PAGES, pages))
GOTO(out, ret = 0);
- if (atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) {
- atomic_sub(ret, &ra->ra_cur_pages);
+ /* If the non-strided (ria_pages == 0) readahead window
+ * (ria_start + ret) has grown across an RPC boundary, then trim
+ * readahead size by the amount beyond the RPC so it ends on an
+ * RPC boundary. If the readahead window is already ending on
+ * an RPC boundary (beyond_rpc == 0), or smaller than a full
+ * RPC (beyond_rpc < ret) the readahead size is unchanged.
+ * The (beyond_rpc != 0) check is skipped since the conditional
+ * branch is more expensive than subtracting zero from the result.
+ *
+ * Strided read is left unaligned to avoid small fragments beyond
+ * the RPC boundary from needing an extra read RPC. */
+ if (ria->ria_pages == 0) {
+ long beyond_rpc = (ria->ria_start + ret) % PTLRPC_MAX_BRW_PAGES;
+ if (/* beyond_rpc != 0 && */ beyond_rpc < ret)
+ ret -= beyond_rpc;
+ }
+
+ if (cfs_atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) {
+ cfs_atomic_sub(ret, &ra->ra_cur_pages);
ret = 0;
}
+
out:
RETURN(ret);
}
void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len)
{
struct ll_ra_info *ra = &sbi->ll_ra_info;
- atomic_sub(len, &ra->ra_cur_pages);
+ cfs_atomic_sub(len, &ra->ra_cur_pages);
}
static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which)
void ll_ra_read_in(struct file *f, struct ll_ra_read *rar)
{
- struct ll_readahead_state *ras;
+ struct ll_readahead_state *ras;
- ras = ll_ras_get(f);
+ ras = ll_ras_get(f);
- spin_lock(&ras->ras_lock);
- ras->ras_requests++;
- ras->ras_request_index = 0;
- ras->ras_consecutive_requests++;
- rar->lrr_reader = current;
+ spin_lock(&ras->ras_lock);
+ ras->ras_requests++;
+ ras->ras_request_index = 0;
+ ras->ras_consecutive_requests++;
+ rar->lrr_reader = current;
- list_add(&rar->lrr_linkage, &ras->ras_read_beads);
- spin_unlock(&ras->ras_lock);
+ cfs_list_add(&rar->lrr_linkage, &ras->ras_read_beads);
+ spin_unlock(&ras->ras_lock);
}
void ll_ra_read_ex(struct file *f, struct ll_ra_read *rar)
{
- struct ll_readahead_state *ras;
+ struct ll_readahead_state *ras;
- ras = ll_ras_get(f);
+ ras = ll_ras_get(f);
- spin_lock(&ras->ras_lock);
- list_del_init(&rar->lrr_linkage);
- spin_unlock(&ras->ras_lock);
+ spin_lock(&ras->ras_lock);
+ cfs_list_del_init(&rar->lrr_linkage);
+ spin_unlock(&ras->ras_lock);
}
static struct ll_ra_read *ll_ra_read_get_locked(struct ll_readahead_state *ras)
{
struct ll_ra_read *scan;
- list_for_each_entry(scan, &ras->ras_read_beads, lrr_linkage) {
+ cfs_list_for_each_entry(scan, &ras->ras_read_beads, lrr_linkage) {
if (scan->lrr_reader == current)
return scan;
}
struct ll_ra_read *ll_ra_read_get(struct file *f)
{
- struct ll_readahead_state *ras;
- struct ll_ra_read *bead;
+ struct ll_readahead_state *ras;
+ struct ll_ra_read *bead;
- ras = ll_ras_get(f);
+ ras = ll_ras_get(f);
- spin_lock(&ras->ras_lock);
- bead = ll_ra_read_get_locked(ras);
- spin_unlock(&ras->ras_lock);
- return bead;
+ spin_lock(&ras->ras_lock);
+ bead = ll_ra_read_get_locked(ras);
+ spin_unlock(&ras->ras_lock);
+ return bead;
}
static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io,
rc = 0;
cl_page_assume(env, io, page);
- lu_ref_add(&page->cp_reference, "ra", cfs_current());
- cp = cl2ccc_page(cl_page_at(page, &vvp_device_type));
- if (!cp->cpg_defer_uptodate && !Page_Uptodate(vmpage)) {
- rc = cl_page_is_under_lock(env, io, page);
+ lu_ref_add(&page->cp_reference, "ra", cfs_current());
+ cp = cl2ccc_page(cl_page_at(page, &vvp_device_type));
+ if (!cp->cpg_defer_uptodate && !PageUptodate(vmpage)) {
+ rc = cl_page_is_under_lock(env, io, page);
if (rc == -EBUSY) {
cp->cpg_defer_uptodate = 1;
cp->cpg_ra_used = 0;
cl_page_delete(env, page);
rc = -ENOLCK;
}
- } else
- /* skip completed pages */
- cl_page_unassume(env, io, page);
+ } else {
+ /* skip completed pages */
+ cl_page_unassume(env, io, page);
+ }
lu_ref_del(&page->cp_reference, "ra", cfs_current());
cl_page_put(env, page);
RETURN(rc);
*/
static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
struct cl_page_list *queue,
- int index, struct address_space *mapping)
+ pgoff_t index, struct address_space *mapping)
{
struct page *vmpage;
struct cl_object *clob = ll_i2info(mapping->host)->lli_clob;
#ifdef __GFP_NOWARN
gfp_mask |= __GFP_NOWARN;
#endif
- vmpage = grab_cache_page_nowait_gfp(mapping, index, gfp_mask);
+ vmpage = grab_cache_page_nowait(mapping, index);
if (vmpage != NULL) {
/* Check if vmpage was truncated or reclaimed */
if (vmpage->mapping == mapping) {
ria->ria_start, ria->ria_end, ria->ria_stoff, ria->ria_length,\
ria->ria_pages)
-#define RAS_INCREASE_STEP (1024 * 1024 >> CFS_PAGE_SHIFT)
+/* Limit this to the blocksize instead of PTLRPC_BRW_MAX_SIZE, since we don't
+ * know what the actual RPC size is. If this needs to change, it makes more
+ * sense to tune the i_blkbits value for the file based on the OSTs it is
+ * striped over, rather than having a constant value for all files here. */
+
+/* RAS_INCREASE_STEP should be (1UL << (inode->i_blkbits - CFS_PAGE_SHIFT)).
+ * Temprarily set RAS_INCREASE_STEP to 1MB. After 4MB RPC is enabled
+ * by default, this should be adjusted corresponding with max_read_ahead_mb
+ * and max_read_ahead_per_file_mb otherwise the readahead budget can be used
+ * up quickly which will affect read performance siginificantly. See LU-2816 */
+#define RAS_INCREASE_STEP(inode) (ONE_MB_BRW_SIZE >> CFS_PAGE_SHIFT)
static inline int stride_io_mode(struct ll_readahead_state *ras)
{
return ras->ras_consecutive_stride_requests > 1;
}
-
/* The function calculates how much pages will be read in
- * [off, off + length], which will be read by stride I/O mode,
+ * [off, off + length], in such stride IO area,
* stride_offset = st_off, stride_lengh = st_len,
* stride_pages = st_pgs
+ *
+ * |------------------|*****|------------------|*****|------------|*****|....
+ * st_off
+ * |--- st_pgs ---|
+ * |----- st_len -----|
+ *
+ * How many pages it should read in such pattern
+ * |-------------------------------------------------------------|
+ * off
+ * |<------ length ------->|
+ *
+ * = |<----->| + |-------------------------------------| + |---|
+ * start_left st_pgs * i end_left
*/
static unsigned long
stride_pg_count(pgoff_t st_off, unsigned long st_len, unsigned long st_pgs,
- unsigned long off, unsigned length)
+ unsigned long off, unsigned long length)
{
- unsigned long cont_len = st_off > off ? st_off - off : 0;
- __u64 stride_len = length + off > st_off ?
- length + off + 1 - st_off : 0;
- unsigned long left, pg_count;
+ __u64 start = off > st_off ? off - st_off : 0;
+ __u64 end = off + length > st_off ? off + length - st_off : 0;
+ unsigned long start_left = 0;
+ unsigned long end_left = 0;
+ unsigned long pg_count;
- if (st_len == 0 || length == 0)
+ if (st_len == 0 || length == 0 || end == 0)
return length;
- left = do_div(stride_len, st_len);
- left = min(left, st_pgs);
+ start_left = do_div(start, st_len);
+ if (start_left < st_pgs)
+ start_left = st_pgs - start_left;
+ else
+ start_left = 0;
+
+ end_left = do_div(end, st_len);
+ if (end_left > st_pgs)
+ end_left = st_pgs;
- pg_count = left + stride_len * st_pgs + cont_len;
+ CDEBUG(D_READA, "start "LPU64", end "LPU64" start_left %lu end_left %lu \n",
+ start, end, start_left, end_left);
- LASSERT(pg_count >= left);
+ if (start == end)
+ pg_count = end_left - (st_pgs - start_left);
+ else
+ pg_count = start_left + st_pgs * (end - start - 1) + end_left;
- CDEBUG(D_READA, "st_off %lu, st_len %lu st_pgs %lu off %lu length %u"
+ CDEBUG(D_READA, "st_off %lu, st_len %lu st_pgs %lu off %lu length %lu"
"pgcount %lu\n", st_off, st_len, st_pgs, off, length, pg_count);
return pg_count;
* For stride I/O mode, just check whether the idx is inside
* the ria_pages. */
return ria->ria_length == 0 || ria->ria_length == ria->ria_pages ||
- (idx - ria->ria_stoff) % ria->ria_length < ria->ria_pages;
+ (idx >= ria->ria_stoff && (idx - ria->ria_stoff) %
+ ria->ria_length < ria->ria_pages);
}
static int ll_read_ahead_pages(const struct lu_env *env,
/* FIXME: This assertion only is valid when it is for
* forward read-ahead, it will be fixed when backward
* read-ahead is implemented */
- LASSERTF(page_idx > ria->ria_stoff, "since %lu in the"
- " gap of ra window,it should bigger than stride"
- " offset %lu \n", page_idx, ria->ria_stoff);
-
+ LASSERTF(page_idx > ria->ria_stoff, "Invalid page_idx %lu"
+ "rs %lu re %lu ro %lu rl %lu rp %lu\n", page_idx,
+ ria->ria_start, ria->ria_end, ria->ria_stoff,
+ ria->ria_length, ria->ria_pages);
offset = page_idx - ria->ria_stoff;
offset = offset % (ria->ria_length);
if (offset > ria->ria_pages) {
{
struct vvp_io *vio = vvp_env_io(env);
struct vvp_thread_info *vti = vvp_env_info(env);
- struct ccc_thread_info *cti = ccc_env_info(env);
+ struct cl_attr *attr = ccc_env_thread_attr(env);
unsigned long start = 0, end = 0, reserved;
unsigned long ra_end, len;
struct inode *inode;
struct ra_io_arg *ria = &vti->vti_ria;
struct ll_inode_info *lli;
struct cl_object *clob;
- struct cl_attr *attr = &cti->cti_attr;
int ret = 0;
__u64 kms;
ENTRY;
RETURN(0);
}
- spin_lock(&ras->ras_lock);
+ spin_lock(&ras->ras_lock);
if (vio->cui_ra_window_set)
bead = &vio->cui_bead;
else
end = ras->ras_window_start + ras->ras_window_len - 1;
}
if (end != 0) {
+ unsigned long rpc_boundary;
+ /*
+ * Align RA window to an optimal boundary.
+ *
+ * XXX This would be better to align to cl_max_pages_per_rpc
+ * instead of PTLRPC_MAX_BRW_PAGES, because the RPC size may
+ * be aligned to the RAID stripe size in the future and that
+ * is more important than the RPC size.
+ */
+ /* Note: we only trim the RPC, instead of extending the RPC
+ * to the boundary, so to avoid reading too much pages during
+ * random reading. */
+ rpc_boundary = ((end + 1) & (~(PTLRPC_MAX_BRW_PAGES - 1)));
+ if (rpc_boundary > 0)
+ rpc_boundary--;
+
+ if (rpc_boundary > start)
+ end = rpc_boundary;
+
/* Truncate RA window to end of file */
end = min(end, (unsigned long)((kms - 1) >> CFS_PAGE_SHIFT));
+
ras->ras_next_readahead = max(end, end + 1);
RAS_CDEBUG(ras);
}
ria->ria_length = ras->ras_stride_length;
ria->ria_pages = ras->ras_stride_pages;
}
- spin_unlock(&ras->ras_lock);
+ spin_unlock(&ras->ras_lock);
if (end == 0) {
ll_ra_stats_inc(mapping, RA_STAT_ZERO_WINDOW);
if (len == 0)
RETURN(0);
- reserved = ll_ra_count_get(ll_i2sbi(inode), len);
-
+ reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len);
if (reserved < len)
ll_ra_stats_inc(mapping, RA_STAT_MAX_IN_FLIGHT);
- CDEBUG(D_READA, "reserved page %lu \n", reserved);
+ CDEBUG(D_READA, "reserved page %lu ra_cur %d ra_max %lu\n", reserved,
+ cfs_atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
+ ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
ret = ll_read_ahead_pages(env, io, queue,
ria, &reserved, mapping, &ra_end);
CDEBUG(D_READA, "ra_end %lu end %lu stride end %lu \n",
ra_end, end, ria->ria_end);
- if (ra_end != end + 1) {
- spin_lock(&ras->ras_lock);
- if (ra_end < ras->ras_next_readahead &&
- index_in_window(ra_end, ras->ras_window_start, 0,
- ras->ras_window_len)) {
- ras->ras_next_readahead = ra_end;
- RAS_CDEBUG(ras);
- }
- spin_unlock(&ras->ras_lock);
- }
-
- RETURN(ret);
+ if (ra_end != end + 1) {
+ spin_lock(&ras->ras_lock);
+ if (ra_end < ras->ras_next_readahead &&
+ index_in_window(ra_end, ras->ras_window_start, 0,
+ ras->ras_window_len)) {
+ ras->ras_next_readahead = ra_end;
+ RAS_CDEBUG(ras);
+ }
+ spin_unlock(&ras->ras_lock);
+ }
+
+ RETURN(ret);
}
-static void ras_set_start(struct ll_readahead_state *ras, unsigned long index)
+static void ras_set_start(struct inode *inode, struct ll_readahead_state *ras,
+ unsigned long index)
{
- ras->ras_window_start = index & (~(RAS_INCREASE_STEP - 1));
+ ras->ras_window_start = index & (~(RAS_INCREASE_STEP(inode) - 1));
}
/* called with the ras_lock held or from places where it doesn't matter */
-static void ras_reset(struct ll_readahead_state *ras, unsigned long index)
+static void ras_reset(struct inode *inode, struct ll_readahead_state *ras,
+ unsigned long index)
{
- ras->ras_last_readpage = index;
- ras->ras_consecutive_requests = 0;
- ras->ras_consecutive_pages = 0;
- ras->ras_window_len = 0;
- ras_set_start(ras, index);
- ras->ras_next_readahead = max(ras->ras_window_start, index);
-
- RAS_CDEBUG(ras);
+ ras->ras_last_readpage = index;
+ ras->ras_consecutive_requests = 0;
+ ras->ras_consecutive_pages = 0;
+ ras->ras_window_len = 0;
+ ras_set_start(inode, ras, index);
+ ras->ras_next_readahead = max(ras->ras_window_start, index);
+
+ RAS_CDEBUG(ras);
}
/* called with the ras_lock held or from places where it doesn't matter */
void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
{
- spin_lock_init(&ras->ras_lock);
- ras_reset(ras, 0);
- ras->ras_requests = 0;
- INIT_LIST_HEAD(&ras->ras_read_beads);
+ spin_lock_init(&ras->ras_lock);
+ ras_reset(inode, ras, 0);
+ ras->ras_requests = 0;
+ CFS_INIT_LIST_HEAD(&ras->ras_read_beads);
}
/*
* Check whether the read request is in the stride window.
* If it is in the stride window, return 1, otherwise return 0.
*/
-static int index_in_stride_window(unsigned long index,
- struct ll_readahead_state *ras,
- struct inode *inode)
+static int index_in_stride_window(struct ll_readahead_state *ras,
+ unsigned long index)
{
- unsigned long stride_gap = index - ras->ras_last_readpage - 1;
+ unsigned long stride_gap;
+
+ if (ras->ras_stride_length == 0 || ras->ras_stride_pages == 0 ||
+ ras->ras_stride_pages == ras->ras_stride_length)
+ return 0;
- if (ras->ras_stride_length == 0 || ras->ras_stride_pages == 0)
- return 0;
+ stride_gap = index - ras->ras_last_readpage - 1;
- /* If it is contiguous read */
- if (stride_gap == 0)
- return ras->ras_consecutive_pages + 1 <= ras->ras_stride_pages;
+ /* If it is contiguous read */
+ if (stride_gap == 0)
+ return ras->ras_consecutive_pages + 1 <= ras->ras_stride_pages;
- /* Otherwise check the stride by itself */
- return (ras->ras_stride_length - ras->ras_stride_pages) == stride_gap &&
- ras->ras_consecutive_pages == ras->ras_stride_pages;
+ /* Otherwise check the stride by itself */
+ return (ras->ras_stride_length - ras->ras_stride_pages) == stride_gap &&
+ ras->ras_consecutive_pages == ras->ras_stride_pages;
}
static void ras_update_stride_detector(struct ll_readahead_state *ras,
ras->ras_stride_pages = ras->ras_consecutive_pages;
ras->ras_stride_length = stride_gap +ras->ras_consecutive_pages;
}
+ LASSERT(ras->ras_request_index == 0);
+ LASSERT(ras->ras_consecutive_stride_requests == 0);
+
+ if (index <= ras->ras_last_readpage) {
+ /*Reset stride window for forward read*/
+ ras_stride_reset(ras);
+ return;
+ }
+
+ ras->ras_stride_pages = ras->ras_consecutive_pages;
+ ras->ras_stride_length = stride_gap +ras->ras_consecutive_pages;
+
RAS_CDEBUG(ras);
+ return;
}
static unsigned long
RAS_CDEBUG(ras);
}
-/* Set stride I/O read-ahead window start offset */
-static void ras_set_stride_offset(struct ll_readahead_state *ras)
+static void ras_increase_window(struct inode *inode,
+ struct ll_readahead_state *ras,
+ struct ll_ra_info *ra)
{
- unsigned long window_len = ras->ras_next_readahead -
- ras->ras_window_start;
- unsigned long left;
-
- LASSERT(ras->ras_stride_length != 0);
-
- left = window_len % ras->ras_stride_length;
-
- ras->ras_stride_offset = ras->ras_next_readahead - left;
-
- RAS_CDEBUG(ras);
+ /* The stretch of ra-window should be aligned with max rpc_size
+ * but current clio architecture does not support retrieve such
+ * information from lower layer. FIXME later
+ */
+ if (stride_io_mode(ras))
+ ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP(inode));
+ else
+ ras->ras_window_len = min(ras->ras_window_len +
+ RAS_INCREASE_STEP(inode),
+ ra->ra_max_pages_per_file);
}
void ras_update(struct ll_sb_info *sbi, struct inode *inode,
- struct ll_readahead_state *ras, unsigned long index,
- unsigned hit)
+ struct ll_readahead_state *ras, unsigned long index,
+ unsigned hit)
{
- struct ll_ra_info *ra = &sbi->ll_ra_info;
- int zero = 0, stride_detect = 0, ra_miss = 0;
- ENTRY;
+ struct ll_ra_info *ra = &sbi->ll_ra_info;
+ int zero = 0, stride_detect = 0, ra_miss = 0;
+ ENTRY;
- spin_lock(&sbi->ll_lock);
- spin_lock(&ras->ras_lock);
+ spin_lock(&ras->ras_lock);
ll_ra_stats_inc_sbi(sbi, hit ? RA_STAT_HIT : RA_STAT_MISS);
GOTO(out_unlock, 0);
}
}
- if (zero) {
- /* check whether it is in stride I/O mode*/
- if (!index_in_stride_window(index, ras, inode)) {
- ras_reset(ras, index);
- ras->ras_consecutive_pages++;
- ras_stride_reset(ras);
- GOTO(out_unlock, 0);
- } else {
- ras->ras_consecutive_requests = 0;
- if (++ras->ras_consecutive_stride_requests > 1)
- stride_detect = 1;
- RAS_CDEBUG(ras);
- }
- } else {
- if (ra_miss) {
- if (index_in_stride_window(index, ras, inode) &&
- stride_io_mode(ras)) {
- /*If stride-RA hit cache miss, the stride dector
- *will not be reset to avoid the overhead of
- *redetecting read-ahead mode */
- if (index != ras->ras_last_readpage + 1)
- ras->ras_consecutive_pages = 0;
- RAS_CDEBUG(ras);
- } else {
- /* Reset both stride window and normal RA window */
- ras_reset(ras, index);
- ras->ras_consecutive_pages++;
- ras_stride_reset(ras);
- GOTO(out_unlock, 0);
- }
- } else if (stride_io_mode(ras)) {
- /* If this is contiguous read but in stride I/O mode
- * currently, check whether stride step still is valid,
- * if invalid, it will reset the stride ra window*/
- if (!index_in_stride_window(index, ras, inode)) {
- /* Shrink stride read-ahead window to be zero */
- ras_stride_reset(ras);
- ras->ras_window_len = 0;
- ras->ras_next_readahead = index;
- }
- }
- }
- ras->ras_consecutive_pages++;
- ras_update_stride_detector(ras, index);
- ras->ras_last_readpage = index;
- ras_set_start(ras, index);
- ras->ras_next_readahead = max(ras->ras_window_start,
- ras->ras_next_readahead);
- RAS_CDEBUG(ras);
-
- /* Trigger RA in the mmap case where ras_consecutive_requests
- * is not incremented and thus can't be used to trigger RA */
- if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) {
- ras->ras_window_len = RAS_INCREASE_STEP;
- GOTO(out_unlock, 0);
- }
-
- /* Initially reset the stride window offset to next_readahead*/
- if (ras->ras_consecutive_stride_requests == 2 && stride_detect)
- ras_set_stride_offset(ras);
-
- /* The initial ras_window_len is set to the request size. To avoid
- * uselessly reading and discarding pages for random IO the window is
- * only increased once per consecutive request received. */
- if ((ras->ras_consecutive_requests > 1 &&
- !ras->ras_request_index) || stride_detect) {
- if (stride_io_mode(ras))
- ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP);
- else
- ras->ras_window_len = min(ras->ras_window_len +
- RAS_INCREASE_STEP,
- ra->ra_max_pages_per_file);
- }
- EXIT;
+ if (zero) {
+ /* check whether it is in stride I/O mode*/
+ if (!index_in_stride_window(ras, index)) {
+ if (ras->ras_consecutive_stride_requests == 0 &&
+ ras->ras_request_index == 0) {
+ ras_update_stride_detector(ras, index);
+ ras->ras_consecutive_stride_requests++;
+ } else {
+ ras_stride_reset(ras);
+ }
+ ras_reset(inode, ras, index);
+ ras->ras_consecutive_pages++;
+ GOTO(out_unlock, 0);
+ } else {
+ ras->ras_consecutive_pages = 0;
+ ras->ras_consecutive_requests = 0;
+ if (++ras->ras_consecutive_stride_requests > 1)
+ stride_detect = 1;
+ RAS_CDEBUG(ras);
+ }
+ } else {
+ if (ra_miss) {
+ if (index_in_stride_window(ras, index) &&
+ stride_io_mode(ras)) {
+ /*If stride-RA hit cache miss, the stride dector
+ *will not be reset to avoid the overhead of
+ *redetecting read-ahead mode */
+ if (index != ras->ras_last_readpage + 1)
+ ras->ras_consecutive_pages = 0;
+ ras_reset(inode, ras, index);
+ RAS_CDEBUG(ras);
+ } else {
+ /* Reset both stride window and normal RA
+ * window */
+ ras_reset(inode, ras, index);
+ ras->ras_consecutive_pages++;
+ ras_stride_reset(ras);
+ GOTO(out_unlock, 0);
+ }
+ } else if (stride_io_mode(ras)) {
+ /* If this is contiguous read but in stride I/O mode
+ * currently, check whether stride step still is valid,
+ * if invalid, it will reset the stride ra window*/
+ if (!index_in_stride_window(ras, index)) {
+ /* Shrink stride read-ahead window to be zero */
+ ras_stride_reset(ras);
+ ras->ras_window_len = 0;
+ ras->ras_next_readahead = index;
+ }
+ }
+ }
+ ras->ras_consecutive_pages++;
+ ras->ras_last_readpage = index;
+ ras_set_start(inode, ras, index);
+
+ if (stride_io_mode(ras))
+ /* Since stride readahead is sentivite to the offset
+ * of read-ahead, so we use original offset here,
+ * instead of ras_window_start, which is RPC aligned */
+ ras->ras_next_readahead = max(index, ras->ras_next_readahead);
+ else
+ ras->ras_next_readahead = max(ras->ras_window_start,
+ ras->ras_next_readahead);
+ RAS_CDEBUG(ras);
+
+ /* Trigger RA in the mmap case where ras_consecutive_requests
+ * is not incremented and thus can't be used to trigger RA */
+ if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) {
+ ras->ras_window_len = RAS_INCREASE_STEP(inode);
+ GOTO(out_unlock, 0);
+ }
+
+ /* Initially reset the stride window offset to next_readahead*/
+ if (ras->ras_consecutive_stride_requests == 2 && stride_detect) {
+ /**
+ * Once stride IO mode is detected, next_readahead should be
+ * reset to make sure next_readahead > stride offset
+ */
+ ras->ras_next_readahead = max(index, ras->ras_next_readahead);
+ ras->ras_stride_offset = index;
+ ras->ras_window_len = RAS_INCREASE_STEP(inode);
+ }
+
+ /* The initial ras_window_len is set to the request size. To avoid
+ * uselessly reading and discarding pages for random IO the window is
+ * only increased once per consecutive request received. */
+ if ((ras->ras_consecutive_requests > 1 || stride_detect) &&
+ !ras->ras_request_index)
+ ras_increase_window(inode, ras, ra);
+ EXIT;
out_unlock:
- RAS_CDEBUG(ras);
- ras->ras_request_index++;
- spin_unlock(&ras->ras_lock);
- spin_unlock(&sbi->ll_lock);
- return;
+ RAS_CDEBUG(ras);
+ ras->ras_request_index++;
+ spin_unlock(&ras->ras_lock);
+ return;
}
-int ll_writepage(struct page *vmpage, struct writeback_control *_)
+int ll_writepage(struct page *vmpage, struct writeback_control *wbc)
{
- struct inode *inode = vmpage->mapping->host;
+ struct inode *inode = vmpage->mapping->host;
+ struct ll_inode_info *lli = ll_i2info(inode);
struct lu_env *env;
struct cl_io *io;
struct cl_page *page;
struct cl_object *clob;
- struct cl_2queue *queue;
struct cl_env_nest nest;
+ bool redirtied = false;
+ bool unlocked = false;
int result;
ENTRY;
LASSERT(PageLocked(vmpage));
LASSERT(!PageWriteback(vmpage));
- if (ll_i2dtexp(inode) == NULL)
- RETURN(-EINVAL);
+ LASSERT(ll_i2dtexp(inode) != NULL);
- env = cl_env_nested_get(&nest);
- if (IS_ERR(env))
- RETURN(PTR_ERR(env));
+ env = cl_env_nested_get(&nest);
+ if (IS_ERR(env))
+ GOTO(out, result = PTR_ERR(env));
- io = &ccc_env_info(env)->cti_io;
- queue = &vvp_env_info(env)->vti_queue;
clob = ll_i2info(inode)->lli_clob;
LASSERT(clob != NULL);
+ io = ccc_env_thread_io(env);
io->ci_obj = clob;
+ io->ci_ignore_layout = 1;
result = cl_io_init(env, io, CIT_MISC, clob);
if (result == 0) {
page = cl_page_find(env, clob, vmpage->index,
lu_ref_add(&page->cp_reference, "writepage",
cfs_current());
cl_page_assume(env, io, page);
- /*
- * Mark page dirty, because this is what
- * ->vio_submit()->cpo_prep_write() assumes.
- *
- * XXX better solution is to detect this from within
- * cl_io_submit_rw() somehow.
- */
- set_page_dirty(vmpage);
- cl_2queue_init_page(queue, page);
- result = cl_io_submit_rw(env, io, CRT_WRITE,
- queue, CRP_NORMAL);
- cl_page_list_disown(env, io, &queue->c2_qin);
- if (result != 0) {
- /*
- * There is no need to clear PG_writeback, as
- * cl_io_submit_rw() calls completion callback
- * on failure.
- */
- /*
- * Re-dirty page on error so it retries write,
- * but not in case when IO has actually
- * occurred and completed with an error.
- */
- if (!PageError(vmpage))
- set_page_dirty(vmpage);
- }
- LASSERT(!cl_page_is_owned(page, io));
+ result = cl_page_flush(env, io, page);
+ if (result != 0) {
+ /*
+ * Re-dirty page on error so it retries write,
+ * but not in case when IO has actually
+ * occurred and completed with an error.
+ */
+ if (!PageError(vmpage)) {
+ redirty_page_for_writepage(wbc, vmpage);
+ result = 0;
+ redirtied = true;
+ }
+ }
+ cl_page_disown(env, io, page);
+ unlocked = true;
lu_ref_del(&page->cp_reference,
"writepage", cfs_current());
cl_page_put(env, page);
- cl_2queue_fini(env, queue);
- }
+ } else {
+ result = PTR_ERR(page);
+ }
}
cl_io_fini(env, io);
+
+ if (redirtied && wbc->sync_mode == WB_SYNC_ALL) {
+ loff_t offset = cl_offset(clob, vmpage->index);
+
+ /* Flush page failed because the extent is being written out.
+ * Wait for the write of extent to be finished to avoid
+ * breaking kernel which assumes ->writepage should mark
+ * PageWriteback or clean the page. */
+ result = cl_sync_file_range(inode, offset,
+ offset + CFS_PAGE_SIZE - 1,
+ CL_FSYNC_LOCAL);
+ if (result > 0) {
+ /* actually we may have written more than one page.
+ * decreasing this page because the caller will count
+ * it. */
+ wbc->nr_to_write -= result - 1;
+ result = 0;
+ }
+ }
+
cl_env_nested_put(&nest, env);
- RETURN(result);
+ GOTO(out, result);
+
+out:
+ if (result < 0) {
+ if (!lli->lli_async_rc)
+ lli->lli_async_rc = result;
+ SetPageError(vmpage);
+ if (!unlocked)
+ unlock_page(vmpage);
+ }
+ return result;
+}
+
+int ll_writepages(struct address_space *mapping, struct writeback_control *wbc)
+{
+ struct inode *inode = mapping->host;
+ loff_t start;
+ loff_t end;
+ enum cl_fsync_mode mode;
+ int range_whole = 0;
+ int result;
+ ENTRY;
+
+ if (wbc->range_cyclic) {
+ start = mapping->writeback_index << CFS_PAGE_SHIFT;
+ end = OBD_OBJECT_EOF;
+ } else {
+ start = wbc->range_start;
+ end = wbc->range_end;
+ if (end == LLONG_MAX) {
+ end = OBD_OBJECT_EOF;
+ range_whole = start == 0;
+ }
+ }
+
+ mode = CL_FSYNC_NONE;
+ if (wbc->sync_mode == WB_SYNC_ALL)
+ mode = CL_FSYNC_LOCAL;
+
+ result = cl_sync_file_range(inode, start, end, mode);
+ if (result > 0) {
+ wbc->nr_to_write -= result;
+ result = 0;
+ }
+
+ if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) {
+ if (end == OBD_OBJECT_EOF)
+ end = i_size_read(inode);
+ mapping->writeback_index = (end >> CFS_PAGE_SHIFT) + 1;
+ }
+ RETURN(result);
}
int ll_readpage(struct file *file, struct page *vmpage)
{
- struct lu_env *env;
- struct cl_io *io;
- struct cl_page *page;
+ struct ll_cl_context *lcc;
int result;
- int refcheck;
ENTRY;
- result = ll_cl_init(file, vmpage, &env, &io, &page, &refcheck);
- if (result == 0) {
+ lcc = ll_cl_init(file, vmpage, 0);
+ if (!IS_ERR(lcc)) {
+ struct lu_env *env = lcc->lcc_env;
+ struct cl_io *io = lcc->lcc_io;
+ struct cl_page *page = lcc->lcc_page;
+
LASSERT(page->cp_type == CPT_CACHEABLE);
if (likely(!PageUptodate(vmpage))) {
cl_page_assume(env, io, page);
result = cl_io_read_page(env, io, page);
} else {
/* Page from a non-object file. */
- LASSERT(!ll_i2info(vmpage->mapping->host)->lli_smd);
unlock_page(vmpage);
result = 0;
}
+ ll_cl_fini(lcc);
+ } else {
+ unlock_page(vmpage);
+ result = PTR_ERR(lcc);
}
- LASSERT(!cl_page_is_owned(page, io));
- ll_cl_fini(env, io, page, &refcheck);
RETURN(result);
}