X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fllite%2Frw.c;h=d8f4075747825acdc5848fd8803f29570293f753;hb=8fbef5ee761920faa3c84f142f56934d6266a409;hp=7ab5a39584b8d1e79aec0953486b0676cf360c59;hpb=72057a3af19ee02d9a686bd7e7d074917e381310;p=fs%2Flustre-release.git diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 7ab5a39..d8f4075 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -23,7 +23,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2015, Intel Corporation. + * Copyright (c) 2011, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -44,12 +44,14 @@ #include #include +#include #include #include #include #include /* current_is_kswapd() */ #include +#include #define DEBUG_SUBSYSTEM S_LLITE @@ -116,8 +118,8 @@ void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len) static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which) { - LASSERTF(which >= 0 && which < _NR_RA_STAT, "which: %u\n", which); - lprocfs_counter_incr(sbi->ll_ra_stats, which); + LASSERTF(which < _NR_RA_STAT, "which: %u\n", which); + lprocfs_counter_incr(sbi->ll_ra_stats, which); } void ll_ra_stats_inc(struct inode *inode, enum ra_stat which) @@ -129,14 +131,15 @@ void ll_ra_stats_inc(struct inode *inode, enum ra_stat which) #define RAS_CDEBUG(ras) \ CDEBUG(D_READA, \ "lrp %lu cr %lu cp %lu ws %lu wl %lu nra %lu rpc %lu " \ - "r %lu ri %lu csr %lu sf %lu sp %lu sl %lu\n", \ + "r %lu ri %lu csr %lu sf %lu sp %lu sl %lu lr %lu\n", \ ras->ras_last_readpage, ras->ras_consecutive_requests, \ ras->ras_consecutive_pages, ras->ras_window_start, \ ras->ras_window_len, ras->ras_next_readahead, \ ras->ras_rpc_size, \ ras->ras_requests, ras->ras_request_index, \ ras->ras_consecutive_stride_requests, ras->ras_stride_offset, \ - ras->ras_stride_pages, ras->ras_stride_length) + ras->ras_stride_pages, ras->ras_stride_length, \ + ras->ras_async_last_readpage) static int index_in_window(unsigned long index, unsigned long point, unsigned long before, unsigned long after) @@ -338,12 +341,11 @@ static int ras_inside_ra_window(unsigned long idx, struct ra_io_arg *ria) static unsigned long ll_read_ahead_pages(const struct lu_env *env, struct cl_io *io, struct cl_page_list *queue, struct ll_readahead_state *ras, - struct ra_io_arg *ria) + struct ra_io_arg *ria, pgoff_t *ra_end) { struct cl_read_ahead ra = { 0 }; - int rc = 0; + int rc = 0, count = 0; bool stride_ria; - unsigned long ra_end = 0; pgoff_t page_idx; LASSERT(ria != NULL); @@ -369,7 +371,7 @@ ll_read_ahead_pages(const struct lu_env *env, struct cl_io *io, io->ci_obj, ra.cra_end, page_idx); /* update read ahead RPC size. * NB: it's racy but doesn't matter */ - if (ras->ras_rpc_size > ra.cra_rpc_size && + if (ras->ras_rpc_size != ra.cra_rpc_size && ra.cra_rpc_size > 0) ras->ras_rpc_size = ra.cra_rpc_size; /* trim it to align with optimal RPC size */ @@ -389,9 +391,13 @@ ll_read_ahead_pages(const struct lu_env *env, struct cl_io *io, if (rc < 0) break; - ra_end = page_idx; - if (rc == 0) + *ra_end = page_idx; + /* Only subtract from reserve & count the page if we + * really did readahead on that page. */ + if (rc == 0) { ria->ria_reserved--; + count++; + } } else if (stride_ria) { /* If it is not in the read-ahead window, and it is * read-ahead mode, then check whether it should skip @@ -418,18 +424,177 @@ ll_read_ahead_pages(const struct lu_env *env, struct cl_io *io, cl_read_ahead_release(env, &ra); - return ra_end; + return count; +} + +static void ll_readahead_work_free(struct ll_readahead_work *work) +{ + fput(work->lrw_file); + OBD_FREE_PTR(work); +} + +static void ll_readahead_handle_work(struct work_struct *wq); +static void ll_readahead_work_add(struct inode *inode, + struct ll_readahead_work *work) +{ + INIT_WORK(&work->lrw_readahead_work, ll_readahead_handle_work); + queue_work(ll_i2sbi(inode)->ll_ra_info.ll_readahead_wq, + &work->lrw_readahead_work); +} + +static int ll_readahead_file_kms(const struct lu_env *env, + struct cl_io *io, __u64 *kms) +{ + struct cl_object *clob; + struct inode *inode; + struct cl_attr *attr = vvp_env_thread_attr(env); + int ret; + + clob = io->ci_obj; + inode = vvp_object_inode(clob); + + cl_object_attr_lock(clob); + ret = cl_object_attr_get(env, clob, attr); + cl_object_attr_unlock(clob); + + if (ret != 0) + RETURN(ret); + + *kms = attr->cat_kms; + return 0; +} + +static void ll_readahead_handle_work(struct work_struct *wq) +{ + struct ll_readahead_work *work; + struct lu_env *env; + __u16 refcheck; + struct ra_io_arg *ria; + struct inode *inode; + struct ll_file_data *fd; + struct ll_readahead_state *ras; + struct cl_io *io; + struct cl_2queue *queue; + pgoff_t ra_end = 0; + unsigned long len, mlen = 0; + struct file *file; + __u64 kms; + int rc; + unsigned long end_index; + + work = container_of(wq, struct ll_readahead_work, + lrw_readahead_work); + fd = LUSTRE_FPRIVATE(work->lrw_file); + ras = &fd->fd_ras; + file = work->lrw_file; + inode = file_inode(file); + + env = cl_env_alloc(&refcheck, LCT_NOREF); + if (IS_ERR(env)) + GOTO(out_free_work, rc = PTR_ERR(env)); + + io = vvp_env_thread_io(env); + ll_io_init(io, file, CIT_READ); + + rc = ll_readahead_file_kms(env, io, &kms); + if (rc != 0) + GOTO(out_put_env, rc); + + if (kms == 0) { + ll_ra_stats_inc(inode, RA_STAT_ZERO_LEN); + GOTO(out_put_env, rc = 0); + } + + ria = &ll_env_info(env)->lti_ria; + memset(ria, 0, sizeof(*ria)); + + ria->ria_start = work->lrw_start; + /* Truncate RA window to end of file */ + end_index = (unsigned long)((kms - 1) >> PAGE_SHIFT); + if (end_index <= work->lrw_end) { + work->lrw_end = end_index; + ria->ria_eof = true; + } + if (work->lrw_end <= work->lrw_start) + GOTO(out_put_env, rc = 0); + + ria->ria_end = work->lrw_end; + len = ria->ria_end - ria->ria_start + 1; + ria->ria_reserved = ll_ra_count_get(ll_i2sbi(inode), ria, + ria_page_count(ria), mlen); + + CDEBUG(D_READA, + "async reserved pages: %lu/%lu/%lu, ra_cur %d, ra_max %lu\n", + ria->ria_reserved, len, mlen, + atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages), + ll_i2sbi(inode)->ll_ra_info.ra_max_pages); + + if (ria->ria_reserved < len) { + ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT); + if (PAGES_TO_MiB(ria->ria_reserved) < 1) { + ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved); + GOTO(out_put_env, rc = 0); + } + } + + rc = cl_io_rw_init(env, io, CIT_READ, ria->ria_start, len); + if (rc) + GOTO(out_put_env, rc); + + vvp_env_io(env)->vui_io_subtype = IO_NORMAL; + vvp_env_io(env)->vui_fd = fd; + io->ci_state = CIS_LOCKED; + io->ci_async_readahead = true; + rc = cl_io_start(env, io); + if (rc) + GOTO(out_io_fini, rc); + + queue = &io->ci_queue; + cl_2queue_init(queue); + + rc = ll_read_ahead_pages(env, io, &queue->c2_qin, ras, ria, &ra_end); + if (ria->ria_reserved != 0) + ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved); + if (queue->c2_qin.pl_nr > 0) { + int count = queue->c2_qin.pl_nr; + + rc = cl_io_submit_rw(env, io, CRT_READ, queue); + if (rc == 0) + task_io_account_read(PAGE_SIZE * count); + } + if (ria->ria_end == ra_end && ra_end == (kms >> PAGE_SHIFT)) + ll_ra_stats_inc(inode, RA_STAT_EOF); + + if (ra_end != ria->ria_end) + ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END); + + /* TODO: discard all pages until page reinit route is implemented */ + cl_page_list_discard(env, io, &queue->c2_qin); + + /* Unlock unsent read pages in case of error. */ + cl_page_list_disown(env, io, &queue->c2_qin); + + cl_2queue_fini(env, queue); +out_io_fini: + cl_io_end(env, io); + cl_io_fini(env, io); +out_put_env: + cl_env_put(env, &refcheck); +out_free_work: + if (ra_end > 0) + ll_ra_stats_inc_sbi(ll_i2sbi(inode), RA_STAT_ASYNC); + ll_readahead_work_free(work); } static int ll_readahead(const struct lu_env *env, struct cl_io *io, struct cl_page_list *queue, - struct ll_readahead_state *ras, bool hit) + struct ll_readahead_state *ras, bool hit, + struct file *file) { struct vvp_io *vio = vvp_env_io(env); struct ll_thread_info *lti = ll_env_info(env); - struct cl_attr *attr = vvp_env_thread_attr(env); unsigned long len, mlen = 0; - pgoff_t ra_end, start = 0, end = 0; + pgoff_t ra_end = 0, start = 0, end = 0; struct inode *inode; struct ra_io_arg *ria = <i->lti_ria; struct cl_object *clob; @@ -441,14 +606,10 @@ static int ll_readahead(const struct lu_env *env, struct cl_io *io, inode = vvp_object_inode(clob); memset(ria, 0, sizeof *ria); - - cl_object_attr_lock(clob); - ret = cl_object_attr_get(env, clob, attr); - cl_object_attr_unlock(clob); - + ret = ll_readahead_file_kms(env, io, &kms); if (ret != 0) RETURN(ret); - kms = attr->cat_kms; + if (kms == 0) { ll_ra_stats_inc(inode, RA_STAT_ZERO_LEN); RETURN(0); @@ -485,9 +646,6 @@ static int ll_readahead(const struct lu_env *env, struct cl_io *io, end = end_index; ria->ria_eof = true; } - - ras->ras_next_readahead = max(end, end + 1); - RAS_CDEBUG(ras); } ria->ria_start = start; ria->ria_end = end; @@ -509,6 +667,7 @@ static int ll_readahead(const struct lu_env *env, struct cl_io *io, RETURN(0); } + RAS_CDEBUG(ras); CDEBUG(D_READA, DFID": ria: %lu/%lu, bead: %lu/%lu, hit: %d\n", PFID(lu_object_fid(&clob->co_lu)), ria->ria_start, ria->ria_end, @@ -518,16 +677,8 @@ static int ll_readahead(const struct lu_env *env, struct cl_io *io, /* at least to extend the readahead window to cover current read */ if (!hit && vio->vui_ra_valid && - vio->vui_ra_start + vio->vui_ra_count > ria->ria_start) { - unsigned long remainder; - - /* to the end of current read window. */ - mlen = vio->vui_ra_start + vio->vui_ra_count - ria->ria_start; - /* trim to RPC boundary */ - ras_align(ras, ria->ria_start, &remainder); - mlen = min(mlen, ras->ras_rpc_size - remainder); - ria->ria_end_min = ria->ria_start + mlen; - } + vio->vui_ra_start + vio->vui_ra_count > ria->ria_start) + ria->ria_end_min = vio->vui_ra_start + vio->vui_ra_count - 1; ria->ria_reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len, mlen); if (ria->ria_reserved < len) @@ -538,7 +689,7 @@ static int ll_readahead(const struct lu_env *env, struct cl_io *io, atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages), ll_i2sbi(inode)->ll_ra_info.ra_max_pages); - ra_end = ll_read_ahead_pages(env, io, queue, ras, ria); + ret = ll_read_ahead_pages(env, io, queue, ras, ria, &ra_end); if (ria->ria_reserved != 0) ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved); @@ -546,24 +697,18 @@ static int ll_readahead(const struct lu_env *env, struct cl_io *io, if (ra_end == end && ra_end == (kms >> PAGE_SHIFT)) ll_ra_stats_inc(inode, RA_STAT_EOF); - /* if we didn't get to the end of the region we reserved from - * the ras we need to go back and update the ras so that the - * next read-ahead tries from where we left off. we only do so - * if the region we failed to issue read-ahead on is still ahead - * of the app and behind the next index to start read-ahead from */ CDEBUG(D_READA, "ra_end = %lu end = %lu stride end = %lu pages = %d\n", ra_end, end, ria->ria_end, ret); - if (ra_end > 0 && ra_end != end) { + if (ra_end != end) ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END); + if (ra_end > 0) { + /* update the ras so that the next read-ahead tries from + * where we left off. */ spin_lock(&ras->ras_lock); - if (ra_end <= ras->ras_next_readahead && - index_in_window(ra_end, ras->ras_window_start, 0, - ras->ras_window_len)) { - ras->ras_next_readahead = ra_end + 1; - RAS_CDEBUG(ras); - } + ras->ras_next_readahead = ra_end + 1; spin_unlock(&ras->ras_lock); + RAS_CDEBUG(ras); } RETURN(ret); @@ -718,7 +863,10 @@ static void ras_increase_window(struct inode *inode, wlen = min(ras->ras_window_len + ras->ras_rpc_size, ra->ra_max_pages_per_file); - ras->ras_window_len = ras_align(ras, wlen, NULL); + if (wlen < ras->ras_rpc_size) + ras->ras_window_len = wlen; + else + ras->ras_window_len = ras_align(ras, wlen, NULL); } } @@ -802,12 +950,19 @@ static void ras_update(struct ll_sb_info *sbi, struct inode *inode, if (ra_miss) { if (index_in_stride_window(ras, index) && stride_io_mode(ras)) { - /*If stride-RA hit cache miss, the stride dector - *will not be reset to avoid the overhead of - *redetecting read-ahead mode */ if (index != ras->ras_last_readpage + 1) ras->ras_consecutive_pages = 0; ras_reset(inode, ras, index); + + /* If stride-RA hit cache miss, the stride + * detector will not be reset to avoid the + * overhead of redetecting read-ahead mode, + * but on the condition that the stride window + * is still intersect with normal sequential + * read-ahead window. */ + if (ras->ras_window_start < + ras->ras_stride_offset) + ras_stride_reset(ras); RAS_CDEBUG(ras); } else { /* Reset both stride window and normal RA @@ -837,7 +992,8 @@ static void ras_update(struct ll_sb_info *sbi, struct inode *inode, /* Since stride readahead is sentivite to the offset * of read-ahead, so we use original offset here, * instead of ras_window_start, which is RPC aligned */ - ras->ras_next_readahead = max(index, ras->ras_next_readahead); + ras->ras_next_readahead = max(index + 1, + ras->ras_next_readahead); ras->ras_window_start = max(ras->ras_stride_offset, ras->ras_window_start); } else { @@ -980,13 +1136,11 @@ out: int ll_writepages(struct address_space *mapping, struct writeback_control *wbc) { struct inode *inode = mapping->host; - struct ll_sb_info *sbi = ll_i2sbi(inode); loff_t start; loff_t end; enum cl_fsync_mode mode; int range_whole = 0; int result; - int ignore_layout = 0; ENTRY; if (wbc->range_cyclic) { @@ -1005,16 +1159,13 @@ int ll_writepages(struct address_space *mapping, struct writeback_control *wbc) if (wbc->sync_mode == WB_SYNC_ALL) mode = CL_FSYNC_LOCAL; - if (sbi->ll_umounting) - /* if the mountpoint is being umounted, all pages have to be - * evicted to avoid hitting LBUG when truncate_inode_pages() - * is called later on. */ - ignore_layout = 1; - if (ll_i2info(inode)->lli_clob == NULL) RETURN(0); - result = cl_sync_file_range(inode, start, end, mode, ignore_layout); + /* for directio, it would call writepages() to evict cached pages + * inside the IO context of write, which will cause deadlock at + * layout_conf since it waits for active IOs to complete. */ + result = cl_sync_file_range(inode, start, end, mode, 1); if (result > 0) { wbc->nr_to_write -= result; result = 0; @@ -1075,7 +1226,7 @@ void ll_cl_remove(struct file *file, const struct lu_env *env) write_unlock(&fd->fd_lock); } -static int ll_io_read_page(const struct lu_env *env, struct cl_io *io, +int ll_io_read_page(const struct lu_env *env, struct cl_io *io, struct cl_page *page, struct file *file) { struct inode *inode = vvp_object_inode(page->cp_obj); @@ -1083,6 +1234,7 @@ static int ll_io_read_page(const struct lu_env *env, struct cl_io *io, struct ll_file_data *fd = LUSTRE_FPRIVATE(file); struct ll_readahead_state *ras = &fd->fd_ras; struct cl_2queue *queue = &io->ci_queue; + struct cl_sync_io *anchor = NULL; struct vvp_page *vpg; int rc = 0; bool uptodate; @@ -1110,6 +1262,10 @@ static int ll_io_read_page(const struct lu_env *env, struct cl_io *io, cl_page_export(env, page, 1); cl_page_disown(env, io, page); } else { + anchor = &vvp_env_info(env)->vti_anchor; + cl_sync_io_init(anchor, 1, &cl_sync_io_end); + page->cp_sync_io = anchor; + cl_2queue_add(queue, page); } @@ -1118,46 +1274,126 @@ static int ll_io_read_page(const struct lu_env *env, struct cl_io *io, int rc2; rc2 = ll_readahead(env, io, &queue->c2_qin, ras, - uptodate); + uptodate, file); CDEBUG(D_READA, DFID "%d pages read ahead at %lu\n", PFID(ll_inode2fid(inode)), rc2, vvp_index(vpg)); } - if (queue->c2_qin.pl_nr > 0) + if (queue->c2_qin.pl_nr > 0) { + int count = queue->c2_qin.pl_nr; rc = cl_io_submit_rw(env, io, CRT_READ, queue); + if (rc == 0) + task_io_account_read(PAGE_SIZE * count); + } - /* - * Unlock unsent pages in case of error. - */ + + if (anchor != NULL && !cl_page_is_owned(page, io)) { /* have sent */ + rc = cl_sync_io_wait(env, anchor, 0); + + cl_page_assume(env, io, page); + cl_page_list_del(env, &queue->c2_qout, page); + + if (!PageUptodate(cl_page_vmpage(page))) { + /* Failed to read a mirror, discard this page so that + * new page can be created with new mirror. + * + * TODO: this is not needed after page reinit + * route is implemented */ + cl_page_discard(env, io, page); + } + cl_page_disown(env, io, page); + } + + /* TODO: discard all pages until page reinit route is implemented */ + cl_page_list_discard(env, io, &queue->c2_qin); + + /* Unlock unsent read pages in case of error. */ cl_page_list_disown(env, io, &queue->c2_qin); + cl_2queue_fini(env, queue); RETURN(rc); } +/* + * Possible return value: + * 0 no async readahead triggered and fast read could not be used. + * 1 no async readahead, but fast read could be used. + * 2 async readahead triggered and fast read could be used too. + * < 0 on error. + */ +static int kickoff_async_readahead(struct file *file, unsigned long pages) +{ + struct ll_readahead_work *lrw; + struct inode *inode = file_inode(file); + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + struct ll_readahead_state *ras = &fd->fd_ras; + struct ll_ra_info *ra = &sbi->ll_ra_info; + unsigned long throttle; + unsigned long start = ras_align(ras, ras->ras_next_readahead, NULL); + unsigned long end = start + pages - 1; + + throttle = min(ra->ra_async_pages_per_file_threshold, + ra->ra_max_pages_per_file); + /* + * If this is strided i/o or the window is smaller than the + * throttle limit, we do not do async readahead. Otherwise, + * we do async readahead, allowing the user thread to do fast i/o. + */ + if (stride_io_mode(ras) || !throttle || + ras->ras_window_len < throttle) + return 0; + + if ((atomic_read(&ra->ra_cur_pages) + pages) > ra->ra_max_pages) + return 0; + + if (ras->ras_async_last_readpage == start) + return 1; + + /* ll_readahead_work_free() free it */ + OBD_ALLOC_PTR(lrw); + if (lrw) { + lrw->lrw_file = get_file(file); + lrw->lrw_start = start; + lrw->lrw_end = end; + spin_lock(&ras->ras_lock); + ras->ras_next_readahead = end + 1; + ras->ras_async_last_readpage = start; + spin_unlock(&ras->ras_lock); + ll_readahead_work_add(inode, lrw); + } else { + return -ENOMEM; + } + + return 2; +} + int ll_readpage(struct file *file, struct page *vmpage) { struct inode *inode = file_inode(file); struct cl_object *clob = ll_i2info(inode)->lli_clob; struct ll_cl_context *lcc; - const struct lu_env *env; - struct cl_io *io; + const struct lu_env *env = NULL; + struct cl_io *io = NULL; struct cl_page *page; + struct ll_sb_info *sbi = ll_i2sbi(inode); int result; ENTRY; lcc = ll_cl_find(file); - if (lcc == NULL) { - unlock_page(vmpage); - RETURN(-EIO); + if (lcc != NULL) { + env = lcc->lcc_env; + io = lcc->lcc_io; } - env = lcc->lcc_env; - io = lcc->lcc_io; if (io == NULL) { /* fast read */ struct inode *inode = file_inode(file); struct ll_file_data *fd = LUSTRE_FPRIVATE(file); struct ll_readahead_state *ras = &fd->fd_ras; + struct lu_env *local_env = NULL; + unsigned long fast_read_pages = + max(RA_REMAIN_WINDOW_MIN, ras->ras_rpc_size); struct vvp_page *vpg; result = -ENODATA; @@ -1167,6 +1403,7 @@ int ll_readpage(struct file *file, struct page *vmpage) page = cl_vmpage_page(vmpage, clob); if (page == NULL) { unlock_page(vmpage); + ll_ra_stats_inc_sbi(sbi, RA_STAT_FAILED_FAST_READ); RETURN(result); } @@ -1174,14 +1411,13 @@ int ll_readpage(struct file *file, struct page *vmpage) if (vpg->vpg_defer_uptodate) { enum ras_update_flags flags = LL_RAS_HIT; - if (lcc->lcc_type == LCC_MMAP) + if (lcc && lcc->lcc_type == LCC_MMAP) flags |= LL_RAS_MMAP; /* For fast read, it updates read ahead state only * if the page is hit in cache because non cache page * case will be handled by slow read later. */ - ras_update(ll_i2sbi(inode), inode, ras, vvp_index(vpg), - flags); + ras_update(sbi, inode, ras, vvp_index(vpg), flags); /* avoid duplicate ras_update() call */ vpg->vpg_ra_updated = 1; @@ -1189,16 +1425,31 @@ int ll_readpage(struct file *file, struct page *vmpage) * the case, we can't do fast IO because we will need * a cl_io to issue the RPC. */ if (ras->ras_window_start + ras->ras_window_len < - ras->ras_next_readahead + PTLRPC_MAX_BRW_PAGES) { - /* export the page and skip io stack */ - vpg->vpg_ra_used = 1; - cl_page_export(env, page, 1); + ras->ras_next_readahead + fast_read_pages || + kickoff_async_readahead(file, fast_read_pages) > 0) result = 0; - } } - unlock_page(vmpage); + if (!env) { + local_env = cl_env_percpu_get(); + env = local_env; + } + + /* export the page and skip io stack */ + if (result == 0) { + vpg->vpg_ra_used = 1; + cl_page_export(env, page, 1); + } else { + ll_ra_stats_inc_sbi(sbi, RA_STAT_FAILED_FAST_READ); + } + /* release page refcount before unlocking the page to ensure + * the object won't be destroyed in the calling path of + * cl_page_put(). Please see comment in ll_releasepage(). */ cl_page_put(env, page); + unlock_page(vmpage); + if (local_env) + cl_env_percpu_put(local_env); + RETURN(result); } @@ -1208,6 +1459,7 @@ int ll_readpage(struct file *file, struct page *vmpage) LASSERT(page->cp_type == CPT_CACHEABLE); if (likely(!PageUptodate(vmpage))) { cl_page_assume(env, io, page); + result = ll_io_read_page(env, io, page, file); } else { /* Page from a non-object file. */ @@ -1221,28 +1473,3 @@ int ll_readpage(struct file *file, struct page *vmpage) } RETURN(result); } - -int ll_page_sync_io(const struct lu_env *env, struct cl_io *io, - struct cl_page *page, enum cl_req_type crt) -{ - struct cl_2queue *queue; - int result; - - LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE); - - queue = &io->ci_queue; - cl_2queue_init_page(queue, page); - - result = cl_io_submit_sync(env, io, crt, queue, 0); - LASSERT(cl_page_is_owned(page, io)); - - if (crt == CRT_READ) - /* - * in CRT_WRITE case page is left locked even in case of - * error. - */ - cl_page_list_disown(env, io, &queue->c2_qin); - cl_2queue_fini(env, queue); - - return result; -}