+ ria->ria_start_idx = work->lrw_start_idx;
+ /* Truncate RA window to end of file */
+ eof_index = (pgoff_t)(kms - 1) >> PAGE_SHIFT;
+ if (eof_index <= work->lrw_end_idx) {
+ work->lrw_end_idx = eof_index;
+ ria->ria_eof = true;
+ }
+ if (work->lrw_end_idx <= work->lrw_start_idx)
+ GOTO(out_put_env, rc = 0);
+
+ ria->ria_end_idx = work->lrw_end_idx;
+ pages = ria->ria_end_idx - ria->ria_start_idx + 1;
+ ria->ria_reserved = ll_ra_count_get(ll_i2sbi(inode), ria,
+ ria_page_count(ria), pages_min);
+
+ CDEBUG(D_READA,
+ "async reserved pages: %lu/%lu/%lu, ra_cur %d, ra_max %lu\n",
+ ria->ria_reserved, pages, pages_min,
+ atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
+ ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
+
+ if (ria->ria_reserved < pages) {
+ ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
+ if (PAGES_TO_MiB(ria->ria_reserved) < 1) {
+ ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved);
+ GOTO(out_put_env, rc = 0);
+ }
+ }
+
+ rc = cl_io_rw_init(env, io, CIT_READ, ria->ria_start_idx, pages);
+ if (rc)
+ GOTO(out_put_env, rc);
+
+ vvp_env_io(env)->vui_io_subtype = IO_NORMAL;
+ vvp_env_io(env)->vui_fd = fd;
+ io->ci_state = CIS_LOCKED;
+ io->ci_async_readahead = true;
+ rc = cl_io_start(env, io);
+ if (rc)
+ GOTO(out_io_fini, rc);
+
+ queue = &io->ci_queue;
+ cl_2queue_init(queue);
+
+ rc = ll_read_ahead_pages(env, io, &queue->c2_qin, ras, ria,
+ &ra_end_idx);
+ if (ria->ria_reserved != 0)
+ ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved);
+ if (queue->c2_qin.pl_nr > 0) {
+ int count = queue->c2_qin.pl_nr;
+
+ rc = cl_io_submit_rw(env, io, CRT_READ, queue);
+ if (rc == 0)
+ task_io_account_read(PAGE_SIZE * count);
+ }
+ if (ria->ria_end_idx == ra_end_idx && ra_end_idx == (kms >> PAGE_SHIFT))
+ ll_ra_stats_inc(inode, RA_STAT_EOF);
+
+ if (ra_end_idx != ria->ria_end_idx)
+ ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
+
+ /* TODO: discard all pages until page reinit route is implemented */
+ cl_page_list_discard(env, io, &queue->c2_qin);
+
+ /* Unlock unsent read pages in case of error. */
+ cl_page_list_disown(env, io, &queue->c2_qin);
+
+ cl_2queue_fini(env, queue);
+out_io_fini:
+ cl_io_end(env, io);
+ cl_io_fini(env, io);
+out_put_env:
+ cl_env_put(env, &refcheck);
+out_free_work:
+ if (ra_end_idx > 0)
+ ll_ra_stats_inc_sbi(ll_i2sbi(inode), RA_STAT_ASYNC);
+ ll_readahead_work_free(work);
+}
+
+static int ll_readahead(const struct lu_env *env, struct cl_io *io,
+ struct cl_page_list *queue,
+ struct ll_readahead_state *ras, bool hit,
+ struct file *file)
+{
+ struct vvp_io *vio = vvp_env_io(env);
+ struct ll_thread_info *lti = ll_env_info(env);
+ unsigned long pages, pages_min = 0;
+ pgoff_t ra_end_idx = 0, start_idx = 0, end_idx = 0;
+ struct inode *inode;
+ struct ra_io_arg *ria = <i->lti_ria;
+ struct cl_object *clob;
+ int ret = 0;
+ __u64 kms;
+ ENTRY;
+
+ clob = io->ci_obj;
+ inode = vvp_object_inode(clob);
+
+ memset(ria, 0, sizeof(*ria));
+ ret = ll_readahead_file_kms(env, io, &kms);
+ if (ret != 0)
+ RETURN(ret);
+
+ if (kms == 0) {
+ ll_ra_stats_inc(inode, RA_STAT_ZERO_LEN);
+ RETURN(0);
+ }
+
+ spin_lock(&ras->ras_lock);
+
+ /**
+ * Note: other thread might rollback the ras_next_readahead_idx,
+ * if it can not get the full size of prepared pages, see the
+ * end of this function. For stride read ahead, it needs to
+ * make sure the offset is no less than ras_stride_offset,
+ * so that stride read ahead can work correctly.
+ */
+ if (stride_io_mode(ras))
+ start_idx = max_t(pgoff_t, ras->ras_next_readahead_idx,
+ ras->ras_stride_offset >> PAGE_SHIFT);
+ else
+ start_idx = ras->ras_next_readahead_idx;
+
+ if (ras->ras_window_pages > 0)
+ end_idx = ras->ras_window_start_idx + ras->ras_window_pages - 1;
+
+ /* Enlarge the RA window to encompass the full read */
+ if (vio->vui_ra_valid &&
+ end_idx < vio->vui_ra_start_idx + vio->vui_ra_pages - 1)
+ end_idx = vio->vui_ra_start_idx + vio->vui_ra_pages - 1;
+
+ if (end_idx != 0) {
+ pgoff_t eof_index;
+
+ /* Truncate RA window to end of file */
+ eof_index = (pgoff_t)((kms - 1) >> PAGE_SHIFT);
+ if (eof_index <= end_idx) {
+ end_idx = eof_index;
+ ria->ria_eof = true;
+ }
+ }
+ ria->ria_start_idx = start_idx;
+ ria->ria_end_idx = end_idx;
+ /* If stride I/O mode is detected, get stride window*/
+ if (stride_io_mode(ras)) {
+ ria->ria_stoff = ras->ras_stride_offset;
+ ria->ria_length = ras->ras_stride_length;
+ ria->ria_bytes = ras->ras_stride_bytes;
+ }
+ spin_unlock(&ras->ras_lock);
+
+ if (end_idx == 0) {
+ ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
+ RETURN(0);
+ }
+ pages = ria_page_count(ria);
+ if (pages == 0) {
+ ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
+ RETURN(0);
+ }
+
+ RAS_CDEBUG(ras);
+ CDEBUG(D_READA, DFID": ria: %lu/%lu, bead: %lu/%lu, hit: %d\n",
+ PFID(lu_object_fid(&clob->co_lu)),
+ ria->ria_start_idx, ria->ria_end_idx,
+ vio->vui_ra_valid ? vio->vui_ra_start_idx : 0,
+ vio->vui_ra_valid ? vio->vui_ra_pages : 0,
+ hit);
+
+ /* at least to extend the readahead window to cover current read */
+ if (!hit && vio->vui_ra_valid &&
+ vio->vui_ra_start_idx + vio->vui_ra_pages > ria->ria_start_idx)
+ ria->ria_end_idx_min =
+ vio->vui_ra_start_idx + vio->vui_ra_pages - 1;
+
+ ria->ria_reserved = ll_ra_count_get(ll_i2sbi(inode), ria, pages,
+ pages_min);
+ if (ria->ria_reserved < pages)
+ ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
+
+ CDEBUG(D_READA, "reserved pages: %lu/%lu/%lu, ra_cur %d, ra_max %lu\n",
+ ria->ria_reserved, pages, pages_min,
+ atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
+ ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
+
+ ret = ll_read_ahead_pages(env, io, queue, ras, ria, &ra_end_idx);
+
+ if (ria->ria_reserved != 0)
+ ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved);
+
+ if (ra_end_idx == end_idx && ra_end_idx == (kms >> PAGE_SHIFT))
+ ll_ra_stats_inc(inode, RA_STAT_EOF);
+
+ CDEBUG(D_READA,
+ "ra_end_idx = %lu end_idx = %lu stride end = %lu pages = %d\n",
+ ra_end_idx, end_idx, ria->ria_end_idx, ret);
+
+ if (ra_end_idx != end_idx)
+ ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
+ if (ra_end_idx > 0) {
+ /* update the ras so that the next read-ahead tries from
+ * where we left off. */
+ spin_lock(&ras->ras_lock);
+ ras->ras_next_readahead_idx = ra_end_idx + 1;
+ spin_unlock(&ras->ras_lock);
+ RAS_CDEBUG(ras);
+ }
+
+ RETURN(ret);