* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2015, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include <asm/uaccess.h>
#include <linux/fs.h>
+#include <linux/file.h>
#include <linux/stat.h>
#include <asm/uaccess.h>
#include <linux/mm.h>
#include <linux/pagemap.h>
/* current_is_kswapd() */
#include <linux/swap.h>
+#include <linux/task_io_accounting_ops.h>
#define DEBUG_SUBSYSTEM S_LLITE
static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which)
{
- LASSERTF(which >= 0 && which < _NR_RA_STAT, "which: %u\n", which);
- lprocfs_counter_incr(sbi->ll_ra_stats, which);
+ LASSERTF(which < _NR_RA_STAT, "which: %u\n", which);
+ lprocfs_counter_incr(sbi->ll_ra_stats, which);
}
void ll_ra_stats_inc(struct inode *inode, enum ra_stat which)
#define RAS_CDEBUG(ras) \
CDEBUG(D_READA, \
"lrp %lu cr %lu cp %lu ws %lu wl %lu nra %lu rpc %lu " \
- "r %lu ri %lu csr %lu sf %lu sp %lu sl %lu\n", \
+ "r %lu ri %lu csr %lu sf %lu sp %lu sl %lu lr %lu\n", \
ras->ras_last_readpage, ras->ras_consecutive_requests, \
ras->ras_consecutive_pages, ras->ras_window_start, \
ras->ras_window_len, ras->ras_next_readahead, \
ras->ras_rpc_size, \
ras->ras_requests, ras->ras_request_index, \
ras->ras_consecutive_stride_requests, ras->ras_stride_offset, \
- ras->ras_stride_pages, ras->ras_stride_length)
+ ras->ras_stride_pages, ras->ras_stride_length, \
+ ras->ras_async_last_readpage)
static int index_in_window(unsigned long index, unsigned long point,
unsigned long before, unsigned long after)
static unsigned long
ll_read_ahead_pages(const struct lu_env *env, struct cl_io *io,
struct cl_page_list *queue, struct ll_readahead_state *ras,
- struct ra_io_arg *ria)
+ struct ra_io_arg *ria, pgoff_t *ra_end)
{
struct cl_read_ahead ra = { 0 };
- int rc = 0;
+ int rc = 0, count = 0;
bool stride_ria;
- unsigned long ra_end = 0;
pgoff_t page_idx;
LASSERT(ria != NULL);
io->ci_obj, ra.cra_end, page_idx);
/* update read ahead RPC size.
* NB: it's racy but doesn't matter */
- if (ras->ras_rpc_size > ra.cra_rpc_size &&
+ if (ras->ras_rpc_size != ra.cra_rpc_size &&
ra.cra_rpc_size > 0)
ras->ras_rpc_size = ra.cra_rpc_size;
/* trim it to align with optimal RPC size */
if (rc < 0)
break;
- ra_end = page_idx;
- if (rc == 0)
+ *ra_end = page_idx;
+ /* Only subtract from reserve & count the page if we
+ * really did readahead on that page. */
+ if (rc == 0) {
ria->ria_reserved--;
+ count++;
+ }
} else if (stride_ria) {
/* If it is not in the read-ahead window, and it is
* read-ahead mode, then check whether it should skip
cl_read_ahead_release(env, &ra);
- return ra_end;
+ return count;
+}
+
+static void ll_readahead_work_free(struct ll_readahead_work *work)
+{
+ fput(work->lrw_file);
+ OBD_FREE_PTR(work);
+}
+
+static void ll_readahead_handle_work(struct work_struct *wq);
+static void ll_readahead_work_add(struct inode *inode,
+ struct ll_readahead_work *work)
+{
+ INIT_WORK(&work->lrw_readahead_work, ll_readahead_handle_work);
+ queue_work(ll_i2sbi(inode)->ll_ra_info.ll_readahead_wq,
+ &work->lrw_readahead_work);
+}
+
+static int ll_readahead_file_kms(const struct lu_env *env,
+ struct cl_io *io, __u64 *kms)
+{
+ struct cl_object *clob;
+ struct inode *inode;
+ struct cl_attr *attr = vvp_env_thread_attr(env);
+ int ret;
+
+ clob = io->ci_obj;
+ inode = vvp_object_inode(clob);
+
+ cl_object_attr_lock(clob);
+ ret = cl_object_attr_get(env, clob, attr);
+ cl_object_attr_unlock(clob);
+
+ if (ret != 0)
+ RETURN(ret);
+
+ *kms = attr->cat_kms;
+ return 0;
+}
+
+static void ll_readahead_handle_work(struct work_struct *wq)
+{
+ struct ll_readahead_work *work;
+ struct lu_env *env;
+ __u16 refcheck;
+ struct ra_io_arg *ria;
+ struct inode *inode;
+ struct ll_file_data *fd;
+ struct ll_readahead_state *ras;
+ struct cl_io *io;
+ struct cl_2queue *queue;
+ pgoff_t ra_end = 0;
+ unsigned long len, mlen = 0;
+ struct file *file;
+ __u64 kms;
+ int rc;
+ unsigned long end_index;
+
+ work = container_of(wq, struct ll_readahead_work,
+ lrw_readahead_work);
+ fd = LUSTRE_FPRIVATE(work->lrw_file);
+ ras = &fd->fd_ras;
+ file = work->lrw_file;
+ inode = file_inode(file);
+
+ env = cl_env_alloc(&refcheck, LCT_NOREF);
+ if (IS_ERR(env))
+ GOTO(out_free_work, rc = PTR_ERR(env));
+
+ io = vvp_env_thread_io(env);
+ ll_io_init(io, file, CIT_READ);
+
+ rc = ll_readahead_file_kms(env, io, &kms);
+ if (rc != 0)
+ GOTO(out_put_env, rc);
+
+ if (kms == 0) {
+ ll_ra_stats_inc(inode, RA_STAT_ZERO_LEN);
+ GOTO(out_put_env, rc = 0);
+ }
+
+ ria = &ll_env_info(env)->lti_ria;
+ memset(ria, 0, sizeof(*ria));
+
+ ria->ria_start = work->lrw_start;
+ /* Truncate RA window to end of file */
+ end_index = (unsigned long)((kms - 1) >> PAGE_SHIFT);
+ if (end_index <= work->lrw_end) {
+ work->lrw_end = end_index;
+ ria->ria_eof = true;
+ }
+ if (work->lrw_end <= work->lrw_start)
+ GOTO(out_put_env, rc = 0);
+
+ ria->ria_end = work->lrw_end;
+ len = ria->ria_end - ria->ria_start + 1;
+ ria->ria_reserved = ll_ra_count_get(ll_i2sbi(inode), ria,
+ ria_page_count(ria), mlen);
+
+ CDEBUG(D_READA,
+ "async reserved pages: %lu/%lu/%lu, ra_cur %d, ra_max %lu\n",
+ ria->ria_reserved, len, mlen,
+ atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
+ ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
+
+ if (ria->ria_reserved < len) {
+ ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
+ if (PAGES_TO_MiB(ria->ria_reserved) < 1) {
+ ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved);
+ GOTO(out_put_env, rc = 0);
+ }
+ }
+
+ rc = cl_io_rw_init(env, io, CIT_READ, ria->ria_start, len);
+ if (rc)
+ GOTO(out_put_env, rc);
+
+ vvp_env_io(env)->vui_io_subtype = IO_NORMAL;
+ vvp_env_io(env)->vui_fd = fd;
+ io->ci_state = CIS_LOCKED;
+ io->ci_async_readahead = true;
+ rc = cl_io_start(env, io);
+ if (rc)
+ GOTO(out_io_fini, rc);
+
+ queue = &io->ci_queue;
+ cl_2queue_init(queue);
+
+ rc = ll_read_ahead_pages(env, io, &queue->c2_qin, ras, ria, &ra_end);
+ if (ria->ria_reserved != 0)
+ ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved);
+ if (queue->c2_qin.pl_nr > 0) {
+ int count = queue->c2_qin.pl_nr;
+
+ rc = cl_io_submit_rw(env, io, CRT_READ, queue);
+ if (rc == 0)
+ task_io_account_read(PAGE_SIZE * count);
+ }
+ if (ria->ria_end == ra_end && ra_end == (kms >> PAGE_SHIFT))
+ ll_ra_stats_inc(inode, RA_STAT_EOF);
+
+ if (ra_end != ria->ria_end)
+ ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
+
+ /* TODO: discard all pages until page reinit route is implemented */
+ cl_page_list_discard(env, io, &queue->c2_qin);
+
+ /* Unlock unsent read pages in case of error. */
+ cl_page_list_disown(env, io, &queue->c2_qin);
+
+ cl_2queue_fini(env, queue);
+out_io_fini:
+ cl_io_end(env, io);
+ cl_io_fini(env, io);
+out_put_env:
+ cl_env_put(env, &refcheck);
+out_free_work:
+ if (ra_end > 0)
+ ll_ra_stats_inc_sbi(ll_i2sbi(inode), RA_STAT_ASYNC);
+ ll_readahead_work_free(work);
}
static int ll_readahead(const struct lu_env *env, struct cl_io *io,
struct cl_page_list *queue,
- struct ll_readahead_state *ras, bool hit)
+ struct ll_readahead_state *ras, bool hit,
+ struct file *file)
{
struct vvp_io *vio = vvp_env_io(env);
struct ll_thread_info *lti = ll_env_info(env);
- struct cl_attr *attr = vvp_env_thread_attr(env);
unsigned long len, mlen = 0;
- pgoff_t ra_end, start = 0, end = 0;
+ pgoff_t ra_end = 0, start = 0, end = 0;
struct inode *inode;
struct ra_io_arg *ria = <i->lti_ria;
struct cl_object *clob;
inode = vvp_object_inode(clob);
memset(ria, 0, sizeof *ria);
-
- cl_object_attr_lock(clob);
- ret = cl_object_attr_get(env, clob, attr);
- cl_object_attr_unlock(clob);
-
+ ret = ll_readahead_file_kms(env, io, &kms);
if (ret != 0)
RETURN(ret);
- kms = attr->cat_kms;
+
if (kms == 0) {
ll_ra_stats_inc(inode, RA_STAT_ZERO_LEN);
RETURN(0);
end = end_index;
ria->ria_eof = true;
}
-
- ras->ras_next_readahead = max(end, end + 1);
- RAS_CDEBUG(ras);
}
ria->ria_start = start;
ria->ria_end = end;
RETURN(0);
}
+ RAS_CDEBUG(ras);
CDEBUG(D_READA, DFID": ria: %lu/%lu, bead: %lu/%lu, hit: %d\n",
PFID(lu_object_fid(&clob->co_lu)),
ria->ria_start, ria->ria_end,
/* at least to extend the readahead window to cover current read */
if (!hit && vio->vui_ra_valid &&
- vio->vui_ra_start + vio->vui_ra_count > ria->ria_start) {
- unsigned long remainder;
-
- /* to the end of current read window. */
- mlen = vio->vui_ra_start + vio->vui_ra_count - ria->ria_start;
- /* trim to RPC boundary */
- ras_align(ras, ria->ria_start, &remainder);
- mlen = min(mlen, ras->ras_rpc_size - remainder);
- ria->ria_end_min = ria->ria_start + mlen;
- }
+ vio->vui_ra_start + vio->vui_ra_count > ria->ria_start)
+ ria->ria_end_min = vio->vui_ra_start + vio->vui_ra_count - 1;
ria->ria_reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len, mlen);
if (ria->ria_reserved < len)
atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
- ra_end = ll_read_ahead_pages(env, io, queue, ras, ria);
+ ret = ll_read_ahead_pages(env, io, queue, ras, ria, &ra_end);
if (ria->ria_reserved != 0)
ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved);
if (ra_end == end && ra_end == (kms >> PAGE_SHIFT))
ll_ra_stats_inc(inode, RA_STAT_EOF);
- /* if we didn't get to the end of the region we reserved from
- * the ras we need to go back and update the ras so that the
- * next read-ahead tries from where we left off. we only do so
- * if the region we failed to issue read-ahead on is still ahead
- * of the app and behind the next index to start read-ahead from */
CDEBUG(D_READA, "ra_end = %lu end = %lu stride end = %lu pages = %d\n",
ra_end, end, ria->ria_end, ret);
- if (ra_end > 0 && ra_end != end) {
+ if (ra_end != end)
ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
+ if (ra_end > 0) {
+ /* update the ras so that the next read-ahead tries from
+ * where we left off. */
spin_lock(&ras->ras_lock);
- if (ra_end <= ras->ras_next_readahead &&
- index_in_window(ra_end, ras->ras_window_start, 0,
- ras->ras_window_len)) {
- ras->ras_next_readahead = ra_end + 1;
- RAS_CDEBUG(ras);
- }
+ ras->ras_next_readahead = ra_end + 1;
spin_unlock(&ras->ras_lock);
+ RAS_CDEBUG(ras);
}
RETURN(ret);
wlen = min(ras->ras_window_len + ras->ras_rpc_size,
ra->ra_max_pages_per_file);
- ras->ras_window_len = ras_align(ras, wlen, NULL);
+ if (wlen < ras->ras_rpc_size)
+ ras->ras_window_len = wlen;
+ else
+ ras->ras_window_len = ras_align(ras, wlen, NULL);
}
}
if (ra_miss) {
if (index_in_stride_window(ras, index) &&
stride_io_mode(ras)) {
- /*If stride-RA hit cache miss, the stride dector
- *will not be reset to avoid the overhead of
- *redetecting read-ahead mode */
if (index != ras->ras_last_readpage + 1)
ras->ras_consecutive_pages = 0;
ras_reset(inode, ras, index);
+
+ /* If stride-RA hit cache miss, the stride
+ * detector will not be reset to avoid the
+ * overhead of redetecting read-ahead mode,
+ * but on the condition that the stride window
+ * is still intersect with normal sequential
+ * read-ahead window. */
+ if (ras->ras_window_start <
+ ras->ras_stride_offset)
+ ras_stride_reset(ras);
RAS_CDEBUG(ras);
} else {
/* Reset both stride window and normal RA
/* Since stride readahead is sentivite to the offset
* of read-ahead, so we use original offset here,
* instead of ras_window_start, which is RPC aligned */
- ras->ras_next_readahead = max(index, ras->ras_next_readahead);
+ ras->ras_next_readahead = max(index + 1,
+ ras->ras_next_readahead);
ras->ras_window_start = max(ras->ras_stride_offset,
ras->ras_window_start);
} else {
int ll_writepages(struct address_space *mapping, struct writeback_control *wbc)
{
struct inode *inode = mapping->host;
- struct ll_sb_info *sbi = ll_i2sbi(inode);
loff_t start;
loff_t end;
enum cl_fsync_mode mode;
int range_whole = 0;
int result;
- int ignore_layout = 0;
ENTRY;
if (wbc->range_cyclic) {
if (wbc->sync_mode == WB_SYNC_ALL)
mode = CL_FSYNC_LOCAL;
- if (sbi->ll_umounting)
- /* if the mountpoint is being umounted, all pages have to be
- * evicted to avoid hitting LBUG when truncate_inode_pages()
- * is called later on. */
- ignore_layout = 1;
-
if (ll_i2info(inode)->lli_clob == NULL)
RETURN(0);
- result = cl_sync_file_range(inode, start, end, mode, ignore_layout);
+ /* for directio, it would call writepages() to evict cached pages
+ * inside the IO context of write, which will cause deadlock at
+ * layout_conf since it waits for active IOs to complete. */
+ result = cl_sync_file_range(inode, start, end, mode, 1);
if (result > 0) {
wbc->nr_to_write -= result;
result = 0;
write_unlock(&fd->fd_lock);
}
-static int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
+int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
struct cl_page *page, struct file *file)
{
struct inode *inode = vvp_object_inode(page->cp_obj);
struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
struct ll_readahead_state *ras = &fd->fd_ras;
struct cl_2queue *queue = &io->ci_queue;
+ struct cl_sync_io *anchor = NULL;
struct vvp_page *vpg;
int rc = 0;
bool uptodate;
cl_page_export(env, page, 1);
cl_page_disown(env, io, page);
} else {
+ anchor = &vvp_env_info(env)->vti_anchor;
+ cl_sync_io_init(anchor, 1, &cl_sync_io_end);
+ page->cp_sync_io = anchor;
+
cl_2queue_add(queue, page);
}
int rc2;
rc2 = ll_readahead(env, io, &queue->c2_qin, ras,
- uptodate);
+ uptodate, file);
CDEBUG(D_READA, DFID "%d pages read ahead at %lu\n",
PFID(ll_inode2fid(inode)), rc2, vvp_index(vpg));
}
- if (queue->c2_qin.pl_nr > 0)
+ if (queue->c2_qin.pl_nr > 0) {
+ int count = queue->c2_qin.pl_nr;
rc = cl_io_submit_rw(env, io, CRT_READ, queue);
+ if (rc == 0)
+ task_io_account_read(PAGE_SIZE * count);
+ }
- /*
- * Unlock unsent pages in case of error.
- */
+
+ if (anchor != NULL && !cl_page_is_owned(page, io)) { /* have sent */
+ rc = cl_sync_io_wait(env, anchor, 0);
+
+ cl_page_assume(env, io, page);
+ cl_page_list_del(env, &queue->c2_qout, page);
+
+ if (!PageUptodate(cl_page_vmpage(page))) {
+ /* Failed to read a mirror, discard this page so that
+ * new page can be created with new mirror.
+ *
+ * TODO: this is not needed after page reinit
+ * route is implemented */
+ cl_page_discard(env, io, page);
+ }
+ cl_page_disown(env, io, page);
+ }
+
+ /* TODO: discard all pages until page reinit route is implemented */
+ cl_page_list_discard(env, io, &queue->c2_qin);
+
+ /* Unlock unsent read pages in case of error. */
cl_page_list_disown(env, io, &queue->c2_qin);
+
cl_2queue_fini(env, queue);
RETURN(rc);
}
+/*
+ * Possible return value:
+ * 0 no async readahead triggered and fast read could not be used.
+ * 1 no async readahead, but fast read could be used.
+ * 2 async readahead triggered and fast read could be used too.
+ * < 0 on error.
+ */
+static int kickoff_async_readahead(struct file *file, unsigned long pages)
+{
+ struct ll_readahead_work *lrw;
+ struct inode *inode = file_inode(file);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_readahead_state *ras = &fd->fd_ras;
+ struct ll_ra_info *ra = &sbi->ll_ra_info;
+ unsigned long throttle;
+ unsigned long start = ras_align(ras, ras->ras_next_readahead, NULL);
+ unsigned long end = start + pages - 1;
+
+ throttle = min(ra->ra_async_pages_per_file_threshold,
+ ra->ra_max_pages_per_file);
+ /*
+ * If this is strided i/o or the window is smaller than the
+ * throttle limit, we do not do async readahead. Otherwise,
+ * we do async readahead, allowing the user thread to do fast i/o.
+ */
+ if (stride_io_mode(ras) || !throttle ||
+ ras->ras_window_len < throttle)
+ return 0;
+
+ if ((atomic_read(&ra->ra_cur_pages) + pages) > ra->ra_max_pages)
+ return 0;
+
+ if (ras->ras_async_last_readpage == start)
+ return 1;
+
+ /* ll_readahead_work_free() free it */
+ OBD_ALLOC_PTR(lrw);
+ if (lrw) {
+ lrw->lrw_file = get_file(file);
+ lrw->lrw_start = start;
+ lrw->lrw_end = end;
+ spin_lock(&ras->ras_lock);
+ ras->ras_next_readahead = end + 1;
+ ras->ras_async_last_readpage = start;
+ spin_unlock(&ras->ras_lock);
+ ll_readahead_work_add(inode, lrw);
+ } else {
+ return -ENOMEM;
+ }
+
+ return 2;
+}
+
int ll_readpage(struct file *file, struct page *vmpage)
{
struct inode *inode = file_inode(file);
struct cl_object *clob = ll_i2info(inode)->lli_clob;
struct ll_cl_context *lcc;
- const struct lu_env *env;
- struct cl_io *io;
+ const struct lu_env *env = NULL;
+ struct cl_io *io = NULL;
struct cl_page *page;
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
int result;
ENTRY;
lcc = ll_cl_find(file);
- if (lcc == NULL) {
- unlock_page(vmpage);
- RETURN(-EIO);
+ if (lcc != NULL) {
+ env = lcc->lcc_env;
+ io = lcc->lcc_io;
}
- env = lcc->lcc_env;
- io = lcc->lcc_io;
if (io == NULL) { /* fast read */
struct inode *inode = file_inode(file);
struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
struct ll_readahead_state *ras = &fd->fd_ras;
+ struct lu_env *local_env = NULL;
+ unsigned long fast_read_pages =
+ max(RA_REMAIN_WINDOW_MIN, ras->ras_rpc_size);
struct vvp_page *vpg;
result = -ENODATA;
page = cl_vmpage_page(vmpage, clob);
if (page == NULL) {
unlock_page(vmpage);
+ ll_ra_stats_inc_sbi(sbi, RA_STAT_FAILED_FAST_READ);
RETURN(result);
}
if (vpg->vpg_defer_uptodate) {
enum ras_update_flags flags = LL_RAS_HIT;
- if (lcc->lcc_type == LCC_MMAP)
+ if (lcc && lcc->lcc_type == LCC_MMAP)
flags |= LL_RAS_MMAP;
/* For fast read, it updates read ahead state only
* if the page is hit in cache because non cache page
* case will be handled by slow read later. */
- ras_update(ll_i2sbi(inode), inode, ras, vvp_index(vpg),
- flags);
+ ras_update(sbi, inode, ras, vvp_index(vpg), flags);
/* avoid duplicate ras_update() call */
vpg->vpg_ra_updated = 1;
* the case, we can't do fast IO because we will need
* a cl_io to issue the RPC. */
if (ras->ras_window_start + ras->ras_window_len <
- ras->ras_next_readahead + PTLRPC_MAX_BRW_PAGES) {
- /* export the page and skip io stack */
- vpg->vpg_ra_used = 1;
- cl_page_export(env, page, 1);
+ ras->ras_next_readahead + fast_read_pages ||
+ kickoff_async_readahead(file, fast_read_pages) > 0)
result = 0;
- }
}
- unlock_page(vmpage);
+ if (!env) {
+ local_env = cl_env_percpu_get();
+ env = local_env;
+ }
+
+ /* export the page and skip io stack */
+ if (result == 0) {
+ vpg->vpg_ra_used = 1;
+ cl_page_export(env, page, 1);
+ } else {
+ ll_ra_stats_inc_sbi(sbi, RA_STAT_FAILED_FAST_READ);
+ }
+ /* release page refcount before unlocking the page to ensure
+ * the object won't be destroyed in the calling path of
+ * cl_page_put(). Please see comment in ll_releasepage(). */
cl_page_put(env, page);
+ unlock_page(vmpage);
+ if (local_env)
+ cl_env_percpu_put(local_env);
+
RETURN(result);
}
LASSERT(page->cp_type == CPT_CACHEABLE);
if (likely(!PageUptodate(vmpage))) {
cl_page_assume(env, io, page);
+
result = ll_io_read_page(env, io, page, file);
} else {
/* Page from a non-object file. */
}
RETURN(result);
}
-
-int ll_page_sync_io(const struct lu_env *env, struct cl_io *io,
- struct cl_page *page, enum cl_req_type crt)
-{
- struct cl_2queue *queue;
- int result;
-
- LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
-
- queue = &io->ci_queue;
- cl_2queue_init_page(queue, page);
-
- result = cl_io_submit_sync(env, io, crt, queue, 0);
- LASSERT(cl_page_is_owned(page, io));
-
- if (crt == CRT_READ)
- /*
- * in CRT_WRITE case page is left locked even in case of
- * error.
- */
- cl_page_list_disown(env, io, &queue->c2_qin);
- cl_2queue_fini(env, queue);
-
- return result;
-}