+ if (ra.cra_end_idx == 0 || ra.cra_end_idx < page_idx) {
+ pgoff_t end_idx;
+
+ cl_read_ahead_release(env, &ra);
+
+ rc = cl_io_read_ahead(env, io, page_idx, &ra);
+ if (rc < 0)
+ break;
+
+ /* Do not shrink ria_end_idx at any case until
+ * the minimum end of current read is covered.
+ * And only shrink ria_end_idx if the matched
+ * LDLM lock doesn't cover more. */
+ if (page_idx > ra.cra_end_idx ||
+ (ra.cra_contention &&
+ page_idx > ria->ria_end_idx_min)) {
+ ria->ria_end_idx = ra.cra_end_idx;
+ break;
+ }
+
+ CDEBUG(D_READA, "idx: %lu, ra: %lu, rpc: %lu\n",
+ page_idx, ra.cra_end_idx,
+ ra.cra_rpc_pages);
+ LASSERTF(ra.cra_end_idx >= page_idx,
+ "object: %p, indcies %lu / %lu\n",
+ io->ci_obj, ra.cra_end_idx, page_idx);
+ /* update read ahead RPC size.
+ * NB: it's racy but doesn't matter */
+ if (ras->ras_rpc_pages != ra.cra_rpc_pages &&
+ ra.cra_rpc_pages > 0)
+ ras->ras_rpc_pages = ra.cra_rpc_pages;
+ /* trim it to align with optimal RPC size */
+ end_idx = ras_align(ras, ria->ria_end_idx + 1);
+ if (end_idx > 0 && !ria->ria_eof)
+ ria->ria_end_idx = end_idx - 1;
+ if (ria->ria_end_idx < ria->ria_end_idx_min)
+ ria->ria_end_idx = ria->ria_end_idx_min;
+ }
+ if (page_idx > ria->ria_end_idx)
+ break;
+
+ /* If the page is inside the read-ahead window */
+ rc = ll_read_ahead_page(env, io, queue, page_idx);
+ if (rc < 0 && rc != -EBUSY)
+ break;
+ if (rc == -EBUSY) {
+ busy_page_count++;
+ CDEBUG(D_READA,
+ "skip busy page: %lu\n", page_idx);
+ /* For page unaligned readahead the first
+ * last pages of each region can be read by
+ * another reader on the same node, and so
+ * may be busy. So only stop for > 2 busy
+ * pages. */
+ if (busy_page_count > 2)
+ break;
+ }
+
+ *ra_end = page_idx;
+ /* Only subtract from reserve & count the page if we
+ * really did readahead on that page. */
+ if (rc == 0) {
+ ria->ria_reserved--;
+ count++;
+ }
+ } else if (stride_io_mode(ras)) {
+ /* If it is not in the read-ahead window, and it is
+ * read-ahead mode, then check whether it should skip
+ * the stride gap.
+ */
+ loff_t pos = (loff_t)page_idx << PAGE_SHIFT;
+ u64 offset;
+
+ div64_u64_rem(pos - ria->ria_stoff, ria->ria_length,
+ &offset);
+ if (offset >= ria->ria_bytes) {
+ pos += (ria->ria_length - offset);
+ if ((pos >> PAGE_SHIFT) >= page_idx + 1)
+ page_idx = (pos >> PAGE_SHIFT) - 1;
+ busy_page_count = 0;
+ CDEBUG(D_READA,
+ "Stride: jump %llu pages to %lu\n",
+ ria->ria_length - offset, page_idx);
+ continue;
+ }
+ }
+ }
+
+ cl_read_ahead_release(env, &ra);
+
+ return count;
+}
+
+static void ll_readahead_work_free(struct ll_readahead_work *work)
+{
+ fput(work->lrw_file);
+ OBD_FREE_PTR(work);
+}
+
+static void ll_readahead_handle_work(struct work_struct *wq);
+static void ll_readahead_work_add(struct inode *inode,
+ struct ll_readahead_work *work)
+{
+ INIT_WORK(&work->lrw_readahead_work, ll_readahead_handle_work);
+ queue_work(ll_i2sbi(inode)->ll_ra_info.ll_readahead_wq,
+ &work->lrw_readahead_work);
+}
+
+static int ll_readahead_file_kms(const struct lu_env *env,
+ struct cl_io *io, __u64 *kms)
+{
+ struct cl_object *clob;
+ struct inode *inode;
+ struct cl_attr *attr = vvp_env_thread_attr(env);
+ int ret;
+
+ clob = io->ci_obj;
+ inode = vvp_object_inode(clob);
+
+ cl_object_attr_lock(clob);
+ ret = cl_object_attr_get(env, clob, attr);
+ cl_object_attr_unlock(clob);
+
+ if (ret != 0)
+ RETURN(ret);
+
+ *kms = attr->cat_kms;
+ return 0;
+}
+
+static void ll_readahead_handle_work(struct work_struct *wq)
+{
+ struct ll_readahead_work *work;
+ struct lu_env *env;
+ __u16 refcheck;
+ struct ra_io_arg *ria;
+ struct inode *inode;
+ struct ll_file_data *fd;
+ struct ll_readahead_state *ras;
+ struct cl_io *io;
+ struct cl_2queue *queue;
+ pgoff_t ra_end_idx = 0;
+ unsigned long pages, pages_min = 0;
+ struct file *file;
+ __u64 kms;
+ int rc;
+ pgoff_t eof_index;
+
+ work = container_of(wq, struct ll_readahead_work,
+ lrw_readahead_work);
+ fd = work->lrw_file->private_data;
+ ras = &fd->fd_ras;
+ file = work->lrw_file;
+ inode = file_inode(file);
+
+ env = cl_env_alloc(&refcheck, LCT_NOREF);
+ if (IS_ERR(env))
+ GOTO(out_free_work, rc = PTR_ERR(env));
+
+ io = vvp_env_thread_io(env);
+ ll_io_init(io, file, CIT_READ, NULL);
+
+ rc = ll_readahead_file_kms(env, io, &kms);
+ if (rc != 0)
+ GOTO(out_put_env, rc);
+
+ if (kms == 0) {
+ ll_ra_stats_inc(inode, RA_STAT_ZERO_LEN);
+ GOTO(out_put_env, rc = 0);
+ }
+
+ ria = &ll_env_info(env)->lti_ria;
+ memset(ria, 0, sizeof(*ria));
+
+ ria->ria_start_idx = work->lrw_start_idx;
+ /* Truncate RA window to end of file */
+ eof_index = (pgoff_t)(kms - 1) >> PAGE_SHIFT;
+ if (eof_index <= work->lrw_end_idx) {
+ work->lrw_end_idx = eof_index;
+ ria->ria_eof = true;
+ }
+ if (work->lrw_end_idx <= work->lrw_start_idx)
+ GOTO(out_put_env, rc = 0);
+
+ ria->ria_end_idx = work->lrw_end_idx;
+ pages = ria->ria_end_idx - ria->ria_start_idx + 1;
+ ria->ria_reserved = ll_ra_count_get(ll_i2sbi(inode), ria,
+ ria_page_count(ria), pages_min);
+
+ CDEBUG(D_READA,
+ "async reserved pages: %lu/%lu/%lu, ra_cur %d, ra_max %lu\n",
+ ria->ria_reserved, pages, pages_min,
+ atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
+ ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
+
+ if (ria->ria_reserved < pages) {
+ ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
+ if (PAGES_TO_MiB(ria->ria_reserved) < 1) {
+ ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved);
+ GOTO(out_put_env, rc = 0);
+ }
+ }
+
+ rc = cl_io_rw_init(env, io, CIT_READ, ria->ria_start_idx, pages);
+ if (rc)
+ GOTO(out_put_env, rc);
+
+ vvp_env_io(env)->vui_io_subtype = IO_NORMAL;
+ vvp_env_io(env)->vui_fd = fd;
+ io->ci_state = CIS_LOCKED;
+ io->ci_async_readahead = true;
+ rc = cl_io_start(env, io);
+ if (rc)
+ GOTO(out_io_fini, rc);
+
+ queue = &io->ci_queue;
+ cl_2queue_init(queue);
+
+ rc = ll_read_ahead_pages(env, io, &queue->c2_qin, ras, ria,
+ &ra_end_idx);
+ if (ria->ria_reserved != 0)
+ ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved);
+ if (queue->c2_qin.pl_nr > 0) {
+ int count = queue->c2_qin.pl_nr;
+
+ rc = cl_io_submit_rw(env, io, CRT_READ, queue);
+ if (rc == 0)
+ task_io_account_read(PAGE_SIZE * count);
+ }
+ if (ria->ria_end_idx == ra_end_idx && ra_end_idx == (kms >> PAGE_SHIFT))
+ ll_ra_stats_inc(inode, RA_STAT_EOF);
+
+ if (ra_end_idx != ria->ria_end_idx)
+ ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
+
+ /* TODO: discard all pages until page reinit route is implemented */
+ cl_page_list_discard(env, io, &queue->c2_qin);
+
+ /* Unlock unsent read pages in case of error. */
+ cl_page_list_disown(env, io, &queue->c2_qin);
+
+ cl_2queue_fini(env, queue);
+out_io_fini:
+ cl_io_end(env, io);
+ cl_io_fini(env, io);
+out_put_env:
+ cl_env_put(env, &refcheck);
+out_free_work:
+ if (ra_end_idx > 0)
+ ll_ra_stats_inc_sbi(ll_i2sbi(inode), RA_STAT_ASYNC);
+ ll_readahead_work_free(work);