+void ll_cl_add(struct file *file, const struct lu_env *env, struct cl_io *io,
+ enum lcc_type type)
+{
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_cl_context *lcc = &ll_env_info(env)->lti_io_ctx;
+
+ memset(lcc, 0, sizeof(*lcc));
+ INIT_LIST_HEAD(&lcc->lcc_list);
+ lcc->lcc_cookie = current;
+ lcc->lcc_env = env;
+ lcc->lcc_io = io;
+ lcc->lcc_type = type;
+
+ write_lock(&fd->fd_lock);
+ list_add(&lcc->lcc_list, &fd->fd_lccs);
+ write_unlock(&fd->fd_lock);
+}
+
+void ll_cl_remove(struct file *file, const struct lu_env *env)
+{
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_cl_context *lcc = &ll_env_info(env)->lti_io_ctx;
+
+ write_lock(&fd->fd_lock);
+ list_del_init(&lcc->lcc_list);
+ write_unlock(&fd->fd_lock);
+}
+
+int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
+ struct cl_page *page, struct file *file)
+{
+ struct inode *inode = vvp_object_inode(page->cp_obj);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_readahead_state *ras = &fd->fd_ras;
+ struct cl_2queue *queue = &io->ci_queue;
+ struct cl_sync_io *anchor = NULL;
+ struct vvp_page *vpg;
+ int rc = 0;
+ bool uptodate;
+ ENTRY;
+
+ vpg = cl2vvp_page(cl_object_page_slice(page->cp_obj, page));
+ uptodate = vpg->vpg_defer_uptodate;
+
+ if (sbi->ll_ra_info.ra_max_pages_per_file > 0 &&
+ sbi->ll_ra_info.ra_max_pages > 0 &&
+ !vpg->vpg_ra_updated) {
+ struct vvp_io *vio = vvp_env_io(env);
+ enum ras_update_flags flags = 0;
+
+ if (uptodate)
+ flags |= LL_RAS_HIT;
+ if (!vio->vui_ra_valid)
+ flags |= LL_RAS_MMAP;
+ ras_update(sbi, inode, ras, vvp_index(vpg), flags);
+ }
+
+ cl_2queue_init(queue);
+ if (uptodate) {
+ vpg->vpg_ra_used = 1;
+ cl_page_export(env, page, 1);
+ cl_page_disown(env, io, page);
+ } else {
+ anchor = &vvp_env_info(env)->vti_anchor;
+ cl_sync_io_init(anchor, 1);
+ page->cp_sync_io = anchor;
+
+ cl_2queue_add(queue, page);
+ }
+
+ if (sbi->ll_ra_info.ra_max_pages_per_file > 0 &&
+ sbi->ll_ra_info.ra_max_pages > 0) {
+ int rc2;
+
+ rc2 = ll_readahead(env, io, &queue->c2_qin, ras,
+ uptodate, file);
+ CDEBUG(D_READA, DFID "%d pages read ahead at %lu\n",
+ PFID(ll_inode2fid(inode)), rc2, vvp_index(vpg));
+ }
+
+ if (queue->c2_qin.pl_nr > 0) {
+ int count = queue->c2_qin.pl_nr;
+ rc = cl_io_submit_rw(env, io, CRT_READ, queue);
+ if (rc == 0)
+ task_io_account_read(PAGE_SIZE * count);
+ }
+
+
+ if (anchor != NULL && !cl_page_is_owned(page, io)) { /* have sent */
+ rc = cl_sync_io_wait(env, anchor, 0);
+
+ cl_page_assume(env, io, page);
+ cl_page_list_del(env, &queue->c2_qout, page);
+
+ if (!PageUptodate(cl_page_vmpage(page))) {
+ /* Failed to read a mirror, discard this page so that
+ * new page can be created with new mirror.
+ *
+ * TODO: this is not needed after page reinit
+ * route is implemented */
+ cl_page_discard(env, io, page);
+ }
+ cl_page_disown(env, io, page);
+ }
+
+ /* TODO: discard all pages until page reinit route is implemented */
+ cl_page_list_discard(env, io, &queue->c2_qin);
+
+ /* Unlock unsent read pages in case of error. */
+ cl_page_list_disown(env, io, &queue->c2_qin);
+
+ cl_2queue_fini(env, queue);
+
+ RETURN(rc);
+}
+
+/*
+ * Possible return value:
+ * 0 no async readahead triggered and fast read could not be used.
+ * 1 no async readahead, but fast read could be used.
+ * 2 async readahead triggered and fast read could be used too.
+ * < 0 on error.
+ */
+static int kickoff_async_readahead(struct file *file, unsigned long pages)
+{
+ struct ll_readahead_work *lrw;
+ struct inode *inode = file_inode(file);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_readahead_state *ras = &fd->fd_ras;
+ struct ll_ra_info *ra = &sbi->ll_ra_info;
+ unsigned long throttle;
+ pgoff_t start_idx = ras_align(ras, ras->ras_next_readahead_idx);
+ pgoff_t end_idx = start_idx + pages - 1;
+
+ throttle = min(ra->ra_async_pages_per_file_threshold,
+ ra->ra_max_pages_per_file);
+ /*
+ * If this is strided i/o or the window is smaller than the
+ * throttle limit, we do not do async readahead. Otherwise,
+ * we do async readahead, allowing the user thread to do fast i/o.
+ */
+ if (stride_io_mode(ras) || !throttle ||
+ ras->ras_window_pages < throttle)
+ return 0;
+
+ if ((atomic_read(&ra->ra_cur_pages) + pages) > ra->ra_max_pages)
+ return 0;
+
+ if (ras->ras_async_last_readpage_idx == start_idx)
+ return 1;
+
+ /* ll_readahead_work_free() free it */
+ OBD_ALLOC_PTR(lrw);
+ if (lrw) {
+ lrw->lrw_file = get_file(file);
+ lrw->lrw_start_idx = start_idx;
+ lrw->lrw_end_idx = end_idx;
+ spin_lock(&ras->ras_lock);
+ ras->ras_next_readahead_idx = end_idx + 1;
+ ras->ras_async_last_readpage_idx = start_idx;
+ spin_unlock(&ras->ras_lock);
+ ll_readahead_work_add(inode, lrw);
+ } else {
+ return -ENOMEM;
+ }
+
+ return 2;
+}
+
+/*
+ * Check if we can issue a readahead RPC, if that is
+ * the case, we can't do fast IO because we will need
+ * a cl_io to issue the RPC.
+ */
+static bool ll_use_fast_io(struct file *file,
+ struct ll_readahead_state *ras, pgoff_t index)
+{
+ unsigned long fast_read_pages =
+ max(RA_REMAIN_WINDOW_MIN, ras->ras_rpc_pages);
+ loff_t skip_pages;
+
+ if (stride_io_mode(ras)) {
+ skip_pages = (ras->ras_stride_length +
+ ras->ras_stride_bytes - 1) / ras->ras_stride_bytes;
+ skip_pages *= fast_read_pages;
+ } else {
+ skip_pages = fast_read_pages;
+ }
+
+ if (ras->ras_window_start_idx + ras->ras_window_pages <
+ ras->ras_next_readahead_idx + skip_pages ||
+ kickoff_async_readahead(file, fast_read_pages) > 0)
+ return true;
+
+ return false;
+}
+
+int ll_readpage(struct file *file, struct page *vmpage)
+{
+ struct inode *inode = file_inode(file);
+ struct cl_object *clob = ll_i2info(inode)->lli_clob;
+ struct ll_cl_context *lcc;
+ const struct lu_env *env = NULL;
+ struct cl_io *io = NULL;
+ struct cl_page *page;
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ int result;
+ ENTRY;
+
+ lcc = ll_cl_find(file);
+ if (lcc != NULL) {
+ env = lcc->lcc_env;
+ io = lcc->lcc_io;
+ }
+
+ if (io == NULL) { /* fast read */
+ struct inode *inode = file_inode(file);
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_readahead_state *ras = &fd->fd_ras;
+ struct lu_env *local_env = NULL;
+ struct vvp_page *vpg;
+
+ result = -ENODATA;
+
+ /* TODO: need to verify the layout version to make sure
+ * the page is not invalid due to layout change. */
+ page = cl_vmpage_page(vmpage, clob);
+ if (page == NULL) {
+ unlock_page(vmpage);
+ ll_ra_stats_inc_sbi(sbi, RA_STAT_FAILED_FAST_READ);
+ RETURN(result);
+ }
+
+ vpg = cl2vvp_page(cl_object_page_slice(page->cp_obj, page));
+ if (vpg->vpg_defer_uptodate) {
+ enum ras_update_flags flags = LL_RAS_HIT;
+
+ if (lcc && lcc->lcc_type == LCC_MMAP)
+ flags |= LL_RAS_MMAP;
+
+ /* For fast read, it updates read ahead state only
+ * if the page is hit in cache because non cache page
+ * case will be handled by slow read later. */
+ ras_update(sbi, inode, ras, vvp_index(vpg), flags);
+ /* avoid duplicate ras_update() call */
+ vpg->vpg_ra_updated = 1;
+
+ if (ll_use_fast_io(file, ras, vvp_index(vpg)))
+ result = 0;
+ }
+
+ if (!env) {
+ local_env = cl_env_percpu_get();
+ env = local_env;
+ }
+
+ /* export the page and skip io stack */
+ if (result == 0) {
+ vpg->vpg_ra_used = 1;
+ cl_page_export(env, page, 1);
+ } else {
+ ll_ra_stats_inc_sbi(sbi, RA_STAT_FAILED_FAST_READ);
+ }
+ /* release page refcount before unlocking the page to ensure
+ * the object won't be destroyed in the calling path of
+ * cl_page_put(). Please see comment in ll_releasepage(). */
+ cl_page_put(env, page);
+ unlock_page(vmpage);
+ if (local_env)
+ cl_env_percpu_put(local_env);
+
+ RETURN(result);
+ }
+
+ /**
+ * Direct read can fall back to buffered read, but DIO is done
+ * with lockless i/o, and buffered requires LDLM locking, so in
+ * this case we must restart without lockless.
+ */
+ if (file->f_flags & O_DIRECT &&
+ lcc && lcc->lcc_type == LCC_RW &&
+ !io->ci_ignore_lockless) {
+ unlock_page(vmpage);
+ io->ci_ignore_lockless = 1;
+ io->ci_need_restart = 1;
+ RETURN(-ENOLCK);
+ }
+
+ LASSERT(io->ci_state == CIS_IO_GOING);
+ page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
+ if (!IS_ERR(page)) {
+ LASSERT(page->cp_type == CPT_CACHEABLE);
+ if (likely(!PageUptodate(vmpage))) {
+ cl_page_assume(env, io, page);
+
+ result = ll_io_read_page(env, io, page, file);
+ } else {
+ /* Page from a non-object file. */
+ unlock_page(vmpage);
+ result = 0;
+ }
+ cl_page_put(env, page);
+ } else {
+ unlock_page(vmpage);
+ result = PTR_ERR(page);
+ }
+ RETURN(result);
+}