struct ra_io_arg *ria, pgoff_t *ra_end)
{
struct cl_read_ahead ra = { 0 };
- int rc = 0, count = 0;
+ /* busy page count is per stride */
+ int rc = 0, count = 0, busy_page_count = 0;
pgoff_t page_idx;
LASSERT(ria != NULL);
/* If the page is inside the read-ahead window */
rc = ll_read_ahead_page(env, io, queue, page_idx);
- if (rc < 0)
+ if (rc < 0 && rc != -EBUSY)
break;
+ if (rc == -EBUSY) {
+ busy_page_count++;
+ CDEBUG(D_READA,
+ "skip busy page: %lu\n", page_idx);
+ /* For page unaligned readahead the first
+ * last pages of each region can be read by
+ * another reader on the same node, and so
+ * may be busy. So only stop for > 2 busy
+ * pages. */
+ if (busy_page_count > 2)
+ break;
+ }
*ra_end = page_idx;
/* Only subtract from reserve & count the page if we
pos += (ria->ria_length - offset);
if ((pos >> PAGE_SHIFT) >= page_idx + 1)
page_idx = (pos >> PAGE_SHIFT) - 1;
+ busy_page_count = 0;
CDEBUG(D_READA,
"Stride: jump %llu pages to %lu\n",
ria->ria_length - offset, page_idx);
work = container_of(wq, struct ll_readahead_work,
lrw_readahead_work);
- fd = LUSTRE_FPRIVATE(work->lrw_file);
+ fd = work->lrw_file->private_data;
ras = &fd->fd_ras;
file = work->lrw_file;
inode = file_inode(file);
loff_t bytes_count =
stride_byte_count(ras->ras_stride_offset,
ras->ras_stride_length, ras->ras_stride_bytes,
- ras->ras_stride_offset, len);
+ ras->ras_window_start_idx << PAGE_SHIFT, len);
return (bytes_count + PAGE_SIZE - 1) >> PAGE_SHIFT;
}
stride_bytes = end - ras->ras_stride_offset;
div64_u64_rem(stride_bytes, ras->ras_stride_length, &left_bytes);
- window_bytes = ((loff_t)ras->ras_window_pages << PAGE_SHIFT) -
- left_bytes;
-
- if (left_bytes < ras->ras_stride_bytes)
- left_bytes += inc_bytes;
- else
- left_bytes = ras->ras_stride_bytes + inc_bytes;
+ window_bytes = (ras->ras_window_pages << PAGE_SHIFT);
+ if (left_bytes < ras->ras_stride_bytes) {
+ if (ras->ras_stride_bytes - left_bytes >= inc_bytes) {
+ window_bytes += inc_bytes;
+ goto out;
+ } else {
+ window_bytes += (ras->ras_stride_bytes - left_bytes);
+ inc_bytes -= (ras->ras_stride_bytes - left_bytes);
+ }
+ } else {
+ window_bytes += (ras->ras_stride_length - left_bytes);
+ }
LASSERT(ras->ras_stride_bytes != 0);
- step = div64_u64_rem(left_bytes, ras->ras_stride_bytes, &left_bytes);
+ step = div64_u64_rem(inc_bytes, ras->ras_stride_bytes, &left_bytes);
window_bytes += step * ras->ras_stride_length + left_bytes;
+ LASSERT(window_bytes > 0);
- if (stride_page_count(ras, window_bytes) <= ra->ra_max_pages_per_file)
+out:
+ if (stride_page_count(ras, window_bytes) <=
+ ra->ra_max_pages_per_file || ras->ras_window_pages == 0)
ras->ras_window_pages = (window_bytes >> PAGE_SHIFT);
+ LASSERT(ras->ras_window_pages > 0);
+
RAS_CDEBUG(ras);
}
void ll_ras_enter(struct file *f, loff_t pos, size_t count)
{
- struct ll_file_data *fd = LUSTRE_FPRIVATE(f);
+ struct ll_file_data *fd = f->private_data;
struct ll_readahead_state *ras = &fd->fd_ras;
struct inode *inode = file_inode(f);
unsigned long index = pos >> PAGE_SHIFT;
struct ll_cl_context *ll_cl_find(struct file *file)
{
- struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_file_data *fd = file->private_data;
struct ll_cl_context *lcc;
struct ll_cl_context *found = NULL;
void ll_cl_add(struct file *file, const struct lu_env *env, struct cl_io *io,
enum lcc_type type)
{
- struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_file_data *fd = file->private_data;
struct ll_cl_context *lcc = &ll_env_info(env)->lti_io_ctx;
memset(lcc, 0, sizeof(*lcc));
void ll_cl_remove(struct file *file, const struct lu_env *env)
{
- struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_file_data *fd = file->private_data;
struct ll_cl_context *lcc = &ll_env_info(env)->lti_io_ctx;
write_lock(&fd->fd_lock);
{
struct inode *inode = vvp_object_inode(page->cp_obj);
struct ll_sb_info *sbi = ll_i2sbi(inode);
- struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_file_data *fd = file->private_data;
struct ll_readahead_state *ras = &fd->fd_ras;
struct cl_2queue *queue = &io->ci_queue;
struct cl_sync_io *anchor = NULL;
struct ll_readahead_work *lrw;
struct inode *inode = file_inode(file);
struct ll_sb_info *sbi = ll_i2sbi(inode);
- struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_file_data *fd = file->private_data;
struct ll_readahead_state *ras = &fd->fd_ras;
struct ll_ra_info *ra = &sbi->ll_ra_info;
unsigned long throttle;
return 2;
}
+/*
+ * Check if we can issue a readahead RPC, if that is
+ * the case, we can't do fast IO because we will need
+ * a cl_io to issue the RPC.
+ */
+static bool ll_use_fast_io(struct file *file,
+ struct ll_readahead_state *ras, pgoff_t index)
+{
+ unsigned long fast_read_pages =
+ max(RA_REMAIN_WINDOW_MIN, ras->ras_rpc_pages);
+ loff_t skip_pages;
+
+ if (stride_io_mode(ras)) {
+ skip_pages = (ras->ras_stride_length +
+ ras->ras_stride_bytes - 1) / ras->ras_stride_bytes;
+ skip_pages *= fast_read_pages;
+ } else {
+ skip_pages = fast_read_pages;
+ }
+
+ if (ras->ras_window_start_idx + ras->ras_window_pages <
+ ras->ras_next_readahead_idx + skip_pages ||
+ kickoff_async_readahead(file, fast_read_pages) > 0)
+ return true;
+
+ return false;
+}
+
int ll_readpage(struct file *file, struct page *vmpage)
{
struct inode *inode = file_inode(file);
if (io == NULL) { /* fast read */
struct inode *inode = file_inode(file);
- struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_file_data *fd = file->private_data;
struct ll_readahead_state *ras = &fd->fd_ras;
struct lu_env *local_env = NULL;
- unsigned long fast_read_pages =
- max(RA_REMAIN_WINDOW_MIN, ras->ras_rpc_pages);
struct vvp_page *vpg;
result = -ENODATA;
/* avoid duplicate ras_update() call */
vpg->vpg_ra_updated = 1;
- /* Check if we can issue a readahead RPC, if that is
- * the case, we can't do fast IO because we will need
- * a cl_io to issue the RPC. */
- if (ras->ras_window_start_idx + ras->ras_window_pages <
- ras->ras_next_readahead_idx + fast_read_pages ||
- kickoff_async_readahead(file, fast_read_pages) > 0)
+ if (ll_use_fast_io(file, ras, vvp_index(vpg)))
result = 0;
}
RETURN(result);
}
+ /**
+ * Direct read can fall back to buffered read, but DIO is done
+ * with lockless i/o, and buffered requires LDLM locking, so in
+ * this case we must restart without lockless.
+ */
+ if (file->f_flags & O_DIRECT &&
+ lcc && lcc->lcc_type == LCC_RW &&
+ !io->ci_ignore_lockless) {
+ unlock_page(vmpage);
+ io->ci_ignore_lockless = 1;
+ io->ci_need_restart = 1;
+ RETURN(-ENOLCK);
+ }
+
LASSERT(io->ci_state == CIS_IO_GOING);
page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
if (!IS_ERR(page)) {