+struct ll_cl_context *ll_cl_find(struct file *file)
+{
+ struct ll_file_data *fd = file->private_data;
+ struct ll_cl_context *lcc;
+ struct ll_cl_context *found = NULL;
+
+ read_lock(&fd->fd_lock);
+ list_for_each_entry(lcc, &fd->fd_lccs, lcc_list) {
+ if (lcc->lcc_cookie == current) {
+ found = lcc;
+ break;
+ }
+ }
+ read_unlock(&fd->fd_lock);
+
+ return found;
+}
+
+void ll_cl_add(struct file *file, const struct lu_env *env, struct cl_io *io,
+ enum lcc_type type)
+{
+ struct ll_file_data *fd = file->private_data;
+ struct ll_cl_context *lcc = &ll_env_info(env)->lti_io_ctx;
+
+ memset(lcc, 0, sizeof(*lcc));
+ INIT_LIST_HEAD(&lcc->lcc_list);
+ lcc->lcc_cookie = current;
+ lcc->lcc_env = env;
+ lcc->lcc_io = io;
+ lcc->lcc_type = type;
+
+ write_lock(&fd->fd_lock);
+ list_add(&lcc->lcc_list, &fd->fd_lccs);
+ write_unlock(&fd->fd_lock);
+}
+
+void ll_cl_remove(struct file *file, const struct lu_env *env)
+{
+ struct ll_file_data *fd = file->private_data;
+ struct ll_cl_context *lcc = &ll_env_info(env)->lti_io_ctx;
+
+ write_lock(&fd->fd_lock);
+ list_del_init(&lcc->lcc_list);
+ write_unlock(&fd->fd_lock);
+}
+
+int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
+ struct cl_page *page, struct file *file)
+{
+ struct inode *inode = vvp_object_inode(page->cp_obj);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ll_file_data *fd = NULL;
+ struct ll_readahead_state *ras = NULL;
+ struct cl_2queue *queue = &io->ci_queue;
+ struct cl_sync_io *anchor = NULL;
+ struct vvp_page *vpg;
+ int rc = 0, rc2 = 0;
+ bool uptodate;
+ pgoff_t io_start_index;
+ pgoff_t io_end_index;
+ ENTRY;
+
+ if (file) {
+ fd = file->private_data;
+ ras = &fd->fd_ras;
+ }
+
+ vpg = cl2vvp_page(cl_object_page_slice(page->cp_obj, page));
+ uptodate = vpg->vpg_defer_uptodate;
+
+ if (ll_readahead_enabled(sbi) && !vpg->vpg_ra_updated && ras) {
+ struct vvp_io *vio = vvp_env_io(env);
+ enum ras_update_flags flags = 0;
+
+ if (uptodate)
+ flags |= LL_RAS_HIT;
+ if (!vio->vui_ra_valid)
+ flags |= LL_RAS_MMAP;
+ ras_update(sbi, inode, ras, vvp_index(vpg), flags, io);
+ }
+
+ cl_2queue_init(queue);
+ if (uptodate) {
+ vpg->vpg_ra_used = 1;
+ cl_page_export(env, page, 1);
+ cl_page_disown(env, io, page);
+ } else {
+ anchor = &vvp_env_info(env)->vti_anchor;
+ cl_sync_io_init(anchor, 1);
+ page->cp_sync_io = anchor;
+
+ cl_2queue_add(queue, page);
+ }
+
+ io_start_index = cl_index(io->ci_obj, io->u.ci_rw.crw_pos);
+ io_end_index = cl_index(io->ci_obj, io->u.ci_rw.crw_pos +
+ io->u.ci_rw.crw_count - 1);
+ if (ll_readahead_enabled(sbi) && ras && !io->ci_rand_read) {
+ pgoff_t skip_index = 0;
+
+ if (ras->ras_next_readahead_idx < vvp_index(vpg))
+ skip_index = vvp_index(vpg);
+ rc2 = ll_readahead(env, io, &queue->c2_qin, ras,
+ uptodate, file, skip_index);
+ CDEBUG(D_READA, DFID " %d pages read ahead at %lu\n",
+ PFID(ll_inode2fid(inode)), rc2, vvp_index(vpg));
+ } else if (vvp_index(vpg) == io_start_index &&
+ io_end_index - io_start_index > 0) {
+ rc2 = ll_readpages(env, io, &queue->c2_qin, io_start_index + 1,
+ io_end_index);
+ CDEBUG(D_READA, DFID " %d pages read at %lu\n",
+ PFID(ll_inode2fid(inode)), rc2, vvp_index(vpg));
+ }
+
+ if (queue->c2_qin.pl_nr > 0) {
+ int count = queue->c2_qin.pl_nr;
+ rc = cl_io_submit_rw(env, io, CRT_READ, queue);
+ if (rc == 0)
+ task_io_account_read(PAGE_SIZE * count);
+ }
+
+
+ if (anchor != NULL && !cl_page_is_owned(page, io)) { /* have sent */
+ rc = cl_sync_io_wait(env, anchor, 0);
+
+ cl_page_assume(env, io, page);
+ cl_page_list_del(env, &queue->c2_qout, page);
+
+ if (!PageUptodate(cl_page_vmpage(page))) {
+ /* Failed to read a mirror, discard this page so that
+ * new page can be created with new mirror.
+ *
+ * TODO: this is not needed after page reinit
+ * route is implemented */
+ cl_page_discard(env, io, page);
+ }
+ cl_page_disown(env, io, page);
+ }
+
+ /* TODO: discard all pages until page reinit route is implemented */
+ cl_page_list_discard(env, io, &queue->c2_qin);
+
+ /* Unlock unsent read pages in case of error. */
+ cl_page_list_disown(env, io, &queue->c2_qin);
+
+ cl_2queue_fini(env, queue);
+
+ RETURN(rc);
+}
+
+/*
+ * Possible return value:
+ * 0 no async readahead triggered and fast read could not be used.
+ * 1 no async readahead, but fast read could be used.
+ * 2 async readahead triggered and fast read could be used too.
+ * < 0 on error.
+ */
+static int kickoff_async_readahead(struct file *file, unsigned long pages)
+{
+ struct ll_readahead_work *lrw;
+ struct inode *inode = file_inode(file);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ll_file_data *fd = file->private_data;
+ struct ll_readahead_state *ras = &fd->fd_ras;
+ struct ll_ra_info *ra = &sbi->ll_ra_info;
+ unsigned long throttle;
+ pgoff_t start_idx = ras_align(ras, ras->ras_next_readahead_idx);
+ pgoff_t end_idx = start_idx + pages - 1;
+
+ /**
+ * In case we have a limited max_cached_mb, readahead
+ * should be stopped if it have run out of all LRU slots.
+ */
+ if (atomic_read(&ra->ra_cur_pages) >= sbi->ll_cache->ccc_lru_max) {
+ ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
+ return 0;
+ }
+
+ throttle = min(ra->ra_async_pages_per_file_threshold,
+ ra->ra_max_pages_per_file);
+ /*
+ * If this is strided i/o or the window is smaller than the
+ * throttle limit, we do not do async readahead. Otherwise,
+ * we do async readahead, allowing the user thread to do fast i/o.
+ */
+ if (stride_io_mode(ras) || !throttle ||
+ ras->ras_window_pages < throttle ||
+ atomic_read(&ra->ra_async_inflight) > ra->ra_async_max_active)
+ return 0;
+
+ if ((atomic_read(&ra->ra_cur_pages) + pages) > ra->ra_max_pages)
+ return 0;
+
+ if (ras->ras_async_last_readpage_idx == start_idx)
+ return 1;
+
+ /* ll_readahead_work_free() free it */
+ OBD_ALLOC_PTR(lrw);
+ if (lrw) {
+ atomic_inc(&sbi->ll_ra_info.ra_async_inflight);
+ lrw->lrw_file = get_file(file);
+ lrw->lrw_start_idx = start_idx;
+ lrw->lrw_end_idx = end_idx;
+ spin_lock(&ras->ras_lock);
+ ras->ras_next_readahead_idx = end_idx + 1;
+ ras->ras_async_last_readpage_idx = start_idx;
+ spin_unlock(&ras->ras_lock);
+ memcpy(lrw->lrw_jobid, ll_i2info(inode)->lli_jobid,
+ sizeof(lrw->lrw_jobid));
+ ll_readahead_work_add(inode, lrw);
+ } else {
+ return -ENOMEM;
+ }
+
+ return 2;
+}
+
+/*
+ * Check if we can issue a readahead RPC, if that is
+ * the case, we can't do fast IO because we will need
+ * a cl_io to issue the RPC.
+ */
+static bool ll_use_fast_io(struct file *file,
+ struct ll_readahead_state *ras, pgoff_t index)
+{
+ unsigned long fast_read_pages =
+ max(RA_REMAIN_WINDOW_MIN, ras->ras_rpc_pages);
+ loff_t skip_pages;
+ loff_t stride_bytes = ras->ras_stride_bytes;
+
+ if (stride_io_mode(ras) && stride_bytes) {
+ skip_pages = (ras->ras_stride_length +
+ ras->ras_stride_bytes - 1) / stride_bytes;
+ skip_pages *= fast_read_pages;
+ } else {
+ skip_pages = fast_read_pages;
+ }
+
+ if (ras->ras_window_start_idx + ras->ras_window_pages <
+ ras->ras_next_readahead_idx + skip_pages ||
+ kickoff_async_readahead(file, fast_read_pages) > 0)
+ return true;
+
+ return false;
+}
+