*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2015, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include <linux/pagemap.h>
/* current_is_kswapd() */
#include <linux/swap.h>
+#include <linux/task_io_accounting_ops.h>
#define DEBUG_SUBSYSTEM S_LLITE
static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which)
{
- LASSERTF(which >= 0 && which < _NR_RA_STAT, "which: %u\n", which);
- lprocfs_counter_incr(sbi->ll_ra_stats, which);
+ LASSERTF(which < _NR_RA_STAT, "which: %u\n", which);
+ lprocfs_counter_incr(sbi->ll_ra_stats, which);
}
void ll_ra_stats_inc(struct inode *inode, enum ra_stat which)
if (vmpage != NULL) {
if (rc != 0)
unlock_page(vmpage);
- page_cache_release(vmpage);
+ put_page(vmpage);
}
if (msg != NULL) {
ll_ra_stats_inc(inode, which);
if (end_left > st_pgs)
end_left = st_pgs;
- CDEBUG(D_READA, "start "LPU64", end "LPU64" start_left %lu end_left %lu \n",
+ CDEBUG(D_READA, "start %llu, end %llu start_left %lu end_left %lu\n",
start, end, start_left, end_left);
if (start == end)
static unsigned long
ll_read_ahead_pages(const struct lu_env *env, struct cl_io *io,
struct cl_page_list *queue, struct ll_readahead_state *ras,
- struct ra_io_arg *ria)
+ struct ra_io_arg *ria, pgoff_t *ra_end)
{
struct cl_read_ahead ra = { 0 };
- int rc = 0;
+ int rc = 0, count = 0;
bool stride_ria;
- unsigned long ra_end = 0;
pgoff_t page_idx;
LASSERT(ria != NULL);
io->ci_obj, ra.cra_end, page_idx);
/* update read ahead RPC size.
* NB: it's racy but doesn't matter */
- if (ras->ras_rpc_size > ra.cra_rpc_size &&
+ if (ras->ras_rpc_size != ra.cra_rpc_size &&
ra.cra_rpc_size > 0)
ras->ras_rpc_size = ra.cra_rpc_size;
/* trim it to align with optimal RPC size */
if (rc < 0)
break;
- ra_end = page_idx;
- if (rc == 0)
+ *ra_end = page_idx;
+ /* Only subtract from reserve & count the page if we
+ * really did readahead on that page. */
+ if (rc == 0) {
ria->ria_reserved--;
+ count++;
+ }
} else if (stride_ria) {
/* If it is not in the read-ahead window, and it is
* read-ahead mode, then check whether it should skip
cl_read_ahead_release(env, &ra);
- return ra_end;
+ return count;
}
static int ll_readahead(const struct lu_env *env, struct cl_io *io,
struct ll_thread_info *lti = ll_env_info(env);
struct cl_attr *attr = vvp_env_thread_attr(env);
unsigned long len, mlen = 0;
- pgoff_t ra_end, start = 0, end = 0;
+ pgoff_t ra_end = 0, start = 0, end = 0;
struct inode *inode;
struct ra_io_arg *ria = <i->lti_ria;
struct cl_object *clob;
unsigned long end_index;
/* Truncate RA window to end of file */
- end_index = (unsigned long)((kms - 1) >> PAGE_CACHE_SHIFT);
+ end_index = (unsigned long)((kms - 1) >> PAGE_SHIFT);
if (end_index <= end) {
end = end_index;
ria->ria_eof = true;
}
-
- ras->ras_next_readahead = max(end, end + 1);
- RAS_CDEBUG(ras);
}
ria->ria_start = start;
ria->ria_end = end;
RETURN(0);
}
+ RAS_CDEBUG(ras);
CDEBUG(D_READA, DFID": ria: %lu/%lu, bead: %lu/%lu, hit: %d\n",
PFID(lu_object_fid(&clob->co_lu)),
ria->ria_start, ria->ria_end,
atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
- ra_end = ll_read_ahead_pages(env, io, queue, ras, ria);
+ ret = ll_read_ahead_pages(env, io, queue, ras, ria, &ra_end);
if (ria->ria_reserved != 0)
ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved);
- if (ra_end == end && ra_end == (kms >> PAGE_CACHE_SHIFT))
+ if (ra_end == end && ra_end == (kms >> PAGE_SHIFT))
ll_ra_stats_inc(inode, RA_STAT_EOF);
- /* if we didn't get to the end of the region we reserved from
- * the ras we need to go back and update the ras so that the
- * next read-ahead tries from where we left off. we only do so
- * if the region we failed to issue read-ahead on is still ahead
- * of the app and behind the next index to start read-ahead from */
CDEBUG(D_READA, "ra_end = %lu end = %lu stride end = %lu pages = %d\n",
ra_end, end, ria->ria_end, ret);
- if (ra_end > 0 && ra_end != end) {
+ if (ra_end != end)
ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
+ if (ra_end > 0) {
+ /* update the ras so that the next read-ahead tries from
+ * where we left off. */
spin_lock(&ras->ras_lock);
- if (ra_end <= ras->ras_next_readahead &&
- index_in_window(ra_end, ras->ras_window_start, 0,
- ras->ras_window_len)) {
- ras->ras_next_readahead = ra_end + 1;
- RAS_CDEBUG(ras);
- }
+ ras->ras_next_readahead = ra_end + 1;
spin_unlock(&ras->ras_lock);
+ RAS_CDEBUG(ras);
}
RETURN(ret);
wlen = min(ras->ras_window_len + ras->ras_rpc_size,
ra->ra_max_pages_per_file);
- ras->ras_window_len = ras_align(ras, wlen, NULL);
+ if (wlen < ras->ras_rpc_size)
+ ras->ras_window_len = wlen;
+ else
+ ras->ras_window_len = ras_align(ras, wlen, NULL);
}
}
spin_lock(&ras->ras_lock);
+ if (!hit)
+ CDEBUG(D_READA, DFID " pages at %lu miss.\n",
+ PFID(ll_inode2fid(inode)), index);
ll_ra_stats_inc_sbi(sbi, hit ? RA_STAT_HIT : RA_STAT_MISS);
/* reset the read-ahead window in two cases. First when the app seeks
if (ras->ras_requests >= 2 && !ras->ras_request_index) {
__u64 kms_pages;
- kms_pages = (i_size_read(inode) + PAGE_CACHE_SIZE - 1) >>
- PAGE_CACHE_SHIFT;
+ kms_pages = (i_size_read(inode) + PAGE_SIZE - 1) >>
+ PAGE_SHIFT;
- CDEBUG(D_READA, "kmsp "LPU64" mwp %lu mp %lu\n", kms_pages,
+ CDEBUG(D_READA, "kmsp %llu mwp %lu mp %lu\n", kms_pages,
ra->ra_max_read_ahead_whole_pages, ra->ra_max_pages_per_file);
if (kms_pages &&
if (ra_miss) {
if (index_in_stride_window(ras, index) &&
stride_io_mode(ras)) {
- /*If stride-RA hit cache miss, the stride dector
- *will not be reset to avoid the overhead of
- *redetecting read-ahead mode */
if (index != ras->ras_last_readpage + 1)
ras->ras_consecutive_pages = 0;
ras_reset(inode, ras, index);
+
+ /* If stride-RA hit cache miss, the stride
+ * detector will not be reset to avoid the
+ * overhead of redetecting read-ahead mode,
+ * but on the condition that the stride window
+ * is still intersect with normal sequential
+ * read-ahead window. */
+ if (ras->ras_window_start <
+ ras->ras_stride_offset)
+ ras_stride_reset(ras);
RAS_CDEBUG(ras);
} else {
/* Reset both stride window and normal RA
/* Since stride readahead is sentivite to the offset
* of read-ahead, so we use original offset here,
* instead of ras_window_start, which is RPC aligned */
- ras->ras_next_readahead = max(index, ras->ras_next_readahead);
+ ras->ras_next_readahead = max(index + 1,
+ ras->ras_next_readahead);
ras->ras_window_start = max(ras->ras_stride_offset,
ras->ras_window_start);
} else {
* breaking kernel which assumes ->writepage should mark
* PageWriteback or clean the page. */
result = cl_sync_file_range(inode, offset,
- offset + PAGE_CACHE_SIZE - 1,
+ offset + PAGE_SIZE - 1,
CL_FSYNC_LOCAL, 1);
if (result > 0) {
/* actually we may have written more than one page.
int ll_writepages(struct address_space *mapping, struct writeback_control *wbc)
{
struct inode *inode = mapping->host;
- struct ll_sb_info *sbi = ll_i2sbi(inode);
loff_t start;
loff_t end;
enum cl_fsync_mode mode;
int range_whole = 0;
int result;
- int ignore_layout = 0;
ENTRY;
if (wbc->range_cyclic) {
- start = mapping->writeback_index << PAGE_CACHE_SHIFT;
+ start = mapping->writeback_index << PAGE_SHIFT;
end = OBD_OBJECT_EOF;
} else {
start = wbc->range_start;
if (wbc->sync_mode == WB_SYNC_ALL)
mode = CL_FSYNC_LOCAL;
- if (sbi->ll_umounting)
- /* if the mountpoint is being umounted, all pages have to be
- * evicted to avoid hitting LBUG when truncate_inode_pages()
- * is called later on. */
- ignore_layout = 1;
-
if (ll_i2info(inode)->lli_clob == NULL)
RETURN(0);
- result = cl_sync_file_range(inode, start, end, mode, ignore_layout);
+ /* for directio, it would call writepages() to evict cached pages
+ * inside the IO context of write, which will cause deadlock at
+ * layout_conf since it waits for active IOs to complete. */
+ result = cl_sync_file_range(inode, start, end, mode, 1);
if (result > 0) {
wbc->nr_to_write -= result;
result = 0;
if (end == OBD_OBJECT_EOF)
mapping->writeback_index = 0;
else
- mapping->writeback_index = (end >> PAGE_CACHE_SHIFT) +1;
+ mapping->writeback_index = (end >> PAGE_SHIFT) + 1;
}
RETURN(result);
}
write_unlock(&fd->fd_lock);
}
-static int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
+int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
struct cl_page *page, struct file *file)
{
struct inode *inode = vvp_object_inode(page->cp_obj);
struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
struct ll_readahead_state *ras = &fd->fd_ras;
struct cl_2queue *queue = &io->ci_queue;
+ struct cl_sync_io *anchor = NULL;
struct vvp_page *vpg;
int rc = 0;
bool uptodate;
cl_page_export(env, page, 1);
cl_page_disown(env, io, page);
} else {
+ anchor = &vvp_env_info(env)->vti_anchor;
+ cl_sync_io_init(anchor, 1, &cl_sync_io_end);
+ page->cp_sync_io = anchor;
+
cl_2queue_add(queue, page);
}
PFID(ll_inode2fid(inode)), rc2, vvp_index(vpg));
}
- if (queue->c2_qin.pl_nr > 0)
+ if (queue->c2_qin.pl_nr > 0) {
+ int count = queue->c2_qin.pl_nr;
rc = cl_io_submit_rw(env, io, CRT_READ, queue);
+ if (rc == 0)
+ task_io_account_read(PAGE_SIZE * count);
+ }
- /*
- * Unlock unsent pages in case of error.
- */
+
+ if (anchor != NULL && !cl_page_is_owned(page, io)) { /* have sent */
+ rc = cl_sync_io_wait(env, anchor, 0);
+
+ cl_page_assume(env, io, page);
+ cl_page_list_del(env, &queue->c2_qout, page);
+
+ if (!PageUptodate(cl_page_vmpage(page))) {
+ /* Failed to read a mirror, discard this page so that
+ * new page can be created with new mirror.
+ *
+ * TODO: this is not needed after page reinit
+ * route is implemented */
+ cl_page_discard(env, io, page);
+ }
+ cl_page_disown(env, io, page);
+ }
+
+ /* TODO: discard all pages until page reinit route is implemented */
+ cl_page_list_discard(env, io, &queue->c2_qin);
+
+ /* Unlock unsent read pages in case of error. */
cl_page_list_disown(env, io, &queue->c2_qin);
+
cl_2queue_fini(env, queue);
RETURN(rc);
struct inode *inode = file_inode(file);
struct cl_object *clob = ll_i2info(inode)->lli_clob;
struct ll_cl_context *lcc;
- const struct lu_env *env;
- struct cl_io *io;
+ const struct lu_env *env = NULL;
+ struct cl_io *io = NULL;
struct cl_page *page;
int result;
ENTRY;
lcc = ll_cl_find(file);
- if (lcc == NULL) {
- unlock_page(vmpage);
- RETURN(-EIO);
+ if (lcc != NULL) {
+ env = lcc->lcc_env;
+ io = lcc->lcc_io;
}
- env = lcc->lcc_env;
- io = lcc->lcc_io;
if (io == NULL) { /* fast read */
struct inode *inode = file_inode(file);
struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
struct ll_readahead_state *ras = &fd->fd_ras;
+ struct lu_env *local_env = NULL;
+ unsigned long fast_read_pages =
+ max(RA_REMAIN_WINDOW_MIN, ras->ras_rpc_size);
struct vvp_page *vpg;
result = -ENODATA;
RETURN(result);
}
+ if (!env) {
+ local_env = cl_env_percpu_get();
+ env = local_env;
+ }
+
vpg = cl2vvp_page(cl_object_page_slice(page->cp_obj, page));
if (vpg->vpg_defer_uptodate) {
enum ras_update_flags flags = LL_RAS_HIT;
- if (lcc->lcc_type == LCC_MMAP)
+ if (lcc && lcc->lcc_type == LCC_MMAP)
flags |= LL_RAS_MMAP;
/* For fast read, it updates read ahead state only
* the case, we can't do fast IO because we will need
* a cl_io to issue the RPC. */
if (ras->ras_window_start + ras->ras_window_len <
- ras->ras_next_readahead + PTLRPC_MAX_BRW_PAGES) {
+ ras->ras_next_readahead + fast_read_pages) {
/* export the page and skip io stack */
vpg->vpg_ra_used = 1;
cl_page_export(env, page, 1);
}
}
- unlock_page(vmpage);
+ /* release page refcount before unlocking the page to ensure
+ * the object won't be destroyed in the calling path of
+ * cl_page_put(). Please see comment in ll_releasepage(). */
cl_page_put(env, page);
+ unlock_page(vmpage);
+ if (local_env)
+ cl_env_percpu_put(local_env);
+
RETURN(result);
}
LASSERT(page->cp_type == CPT_CACHEABLE);
if (likely(!PageUptodate(vmpage))) {
cl_page_assume(env, io, page);
+
result = ll_io_read_page(env, io, page, file);
} else {
/* Page from a non-object file. */
}
RETURN(result);
}
-
-int ll_page_sync_io(const struct lu_env *env, struct cl_io *io,
- struct cl_page *page, enum cl_req_type crt)
-{
- struct cl_2queue *queue;
- int result;
-
- LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
-
- queue = &io->ci_queue;
- cl_2queue_init_page(queue, page);
-
- result = cl_io_submit_sync(env, io, crt, queue, 0);
- LASSERT(cl_page_is_owned(page, io));
-
- if (crt == CRT_READ)
- /*
- * in CRT_WRITE case page is left locked even in case of
- * error.
- */
- cl_page_list_disown(env, io, &queue->c2_qin);
- cl_2queue_fini(env, queue);
-
- return result;
-}