Whamcloud - gitweb
LU-9679 llite: Discard LUSTRE_FPRIVATE()
[fs/lustre-release.git] / lustre / llite / rw.c
index 61e8baf..a11a038 100644 (file)
@@ -359,7 +359,8 @@ ll_read_ahead_pages(const struct lu_env *env, struct cl_io *io,
                    struct ra_io_arg *ria, pgoff_t *ra_end)
 {
        struct cl_read_ahead ra = { 0 };
-       int rc = 0, count = 0;
+       /* busy page count is per stride */
+       int rc = 0, count = 0, busy_page_count = 0;
        pgoff_t page_idx;
 
        LASSERT(ria != NULL);
@@ -412,8 +413,20 @@ ll_read_ahead_pages(const struct lu_env *env, struct cl_io *io,
 
                        /* If the page is inside the read-ahead window */
                        rc = ll_read_ahead_page(env, io, queue, page_idx);
-                       if (rc < 0)
+                       if (rc < 0 && rc != -EBUSY)
                                break;
+                       if (rc == -EBUSY) {
+                               busy_page_count++;
+                               CDEBUG(D_READA,
+                                      "skip busy page: %lu\n", page_idx);
+                               /* For page unaligned readahead the first
+                                * last pages of each region can be read by
+                                * another reader on the same node, and so
+                                * may be busy. So only stop for > 2 busy
+                                * pages. */
+                               if (busy_page_count > 2)
+                                       break;
+                       }
 
                        *ra_end = page_idx;
                        /* Only subtract from reserve & count the page if we
@@ -436,6 +449,7 @@ ll_read_ahead_pages(const struct lu_env *env, struct cl_io *io,
                                pos += (ria->ria_length - offset);
                                if ((pos >> PAGE_SHIFT) >= page_idx + 1)
                                        page_idx = (pos >> PAGE_SHIFT) - 1;
+                               busy_page_count = 0;
                                CDEBUG(D_READA,
                                       "Stride: jump %llu pages to %lu\n",
                                       ria->ria_length - offset, page_idx);
@@ -506,7 +520,7 @@ static void ll_readahead_handle_work(struct work_struct *wq)
 
        work = container_of(wq, struct ll_readahead_work,
                            lrw_readahead_work);
-       fd = LUSTRE_FPRIVATE(work->lrw_file);
+       fd = work->lrw_file->private_data;
        ras = &fd->fd_ras;
        file = work->lrw_file;
        inode = file_inode(file);
@@ -829,7 +843,7 @@ stride_page_count(struct ll_readahead_state *ras, loff_t len)
        loff_t bytes_count =
                stride_byte_count(ras->ras_stride_offset,
                                  ras->ras_stride_length, ras->ras_stride_bytes,
-                                 ras->ras_stride_offset, len);
+                                 ras->ras_window_start_idx << PAGE_SHIFT, len);
 
        return (bytes_count + PAGE_SIZE - 1) >> PAGE_SHIFT;
 }
@@ -860,23 +874,33 @@ static void ras_stride_increase_window(struct ll_readahead_state *ras,
                stride_bytes = end - ras->ras_stride_offset;
 
        div64_u64_rem(stride_bytes, ras->ras_stride_length, &left_bytes);
-       window_bytes = ((loff_t)ras->ras_window_pages << PAGE_SHIFT) -
-               left_bytes;
-
-       if (left_bytes < ras->ras_stride_bytes)
-               left_bytes += inc_bytes;
-       else
-               left_bytes = ras->ras_stride_bytes + inc_bytes;
+       window_bytes = (ras->ras_window_pages << PAGE_SHIFT);
+       if (left_bytes < ras->ras_stride_bytes) {
+               if (ras->ras_stride_bytes - left_bytes >= inc_bytes) {
+                       window_bytes += inc_bytes;
+                       goto out;
+               } else {
+                       window_bytes += (ras->ras_stride_bytes - left_bytes);
+                       inc_bytes -= (ras->ras_stride_bytes - left_bytes);
+               }
+       } else {
+               window_bytes += (ras->ras_stride_length - left_bytes);
+       }
 
        LASSERT(ras->ras_stride_bytes != 0);
 
-       step = div64_u64_rem(left_bytes, ras->ras_stride_bytes, &left_bytes);
+       step = div64_u64_rem(inc_bytes, ras->ras_stride_bytes, &left_bytes);
 
        window_bytes += step * ras->ras_stride_length + left_bytes;
+       LASSERT(window_bytes > 0);
 
-       if (stride_page_count(ras, window_bytes) <= ra->ra_max_pages_per_file)
+out:
+       if (stride_page_count(ras, window_bytes) <=
+           ra->ra_max_pages_per_file || ras->ras_window_pages == 0)
                ras->ras_window_pages = (window_bytes >> PAGE_SHIFT);
 
+       LASSERT(ras->ras_window_pages > 0);
+
        RAS_CDEBUG(ras);
 }
 
@@ -972,7 +996,7 @@ static void ras_detect_read_pattern(struct ll_readahead_state *ras,
 
 void ll_ras_enter(struct file *f, loff_t pos, size_t count)
 {
-       struct ll_file_data *fd = LUSTRE_FPRIVATE(f);
+       struct ll_file_data *fd = f->private_data;
        struct ll_readahead_state *ras = &fd->fd_ras;
        struct inode *inode = file_inode(f);
        unsigned long index = pos >> PAGE_SHIFT;
@@ -1284,7 +1308,7 @@ int ll_writepages(struct address_space *mapping, struct writeback_control *wbc)
 
 struct ll_cl_context *ll_cl_find(struct file *file)
 {
-       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct ll_file_data *fd = file->private_data;
        struct ll_cl_context *lcc;
        struct ll_cl_context *found = NULL;
 
@@ -1303,7 +1327,7 @@ struct ll_cl_context *ll_cl_find(struct file *file)
 void ll_cl_add(struct file *file, const struct lu_env *env, struct cl_io *io,
               enum lcc_type type)
 {
-       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct ll_file_data *fd = file->private_data;
        struct ll_cl_context *lcc = &ll_env_info(env)->lti_io_ctx;
 
        memset(lcc, 0, sizeof(*lcc));
@@ -1320,7 +1344,7 @@ void ll_cl_add(struct file *file, const struct lu_env *env, struct cl_io *io,
 
 void ll_cl_remove(struct file *file, const struct lu_env *env)
 {
-       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct ll_file_data *fd = file->private_data;
        struct ll_cl_context *lcc = &ll_env_info(env)->lti_io_ctx;
 
        write_lock(&fd->fd_lock);
@@ -1333,7 +1357,7 @@ int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
 {
        struct inode              *inode  = vvp_object_inode(page->cp_obj);
        struct ll_sb_info         *sbi    = ll_i2sbi(inode);
-       struct ll_file_data       *fd     = LUSTRE_FPRIVATE(file);
+       struct ll_file_data       *fd     = file->private_data;
        struct ll_readahead_state *ras    = &fd->fd_ras;
        struct cl_2queue          *queue  = &io->ci_queue;
        struct cl_sync_io         *anchor = NULL;
@@ -1429,7 +1453,7 @@ static int kickoff_async_readahead(struct file *file, unsigned long pages)
        struct ll_readahead_work *lrw;
        struct inode *inode = file_inode(file);
        struct ll_sb_info *sbi = ll_i2sbi(inode);
-       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct ll_file_data *fd = file->private_data;
        struct ll_readahead_state *ras = &fd->fd_ras;
        struct ll_ra_info *ra = &sbi->ll_ra_info;
        unsigned long throttle;
@@ -1471,6 +1495,34 @@ static int kickoff_async_readahead(struct file *file, unsigned long pages)
        return 2;
 }
 
+/*
+ * Check if we can issue a readahead RPC, if that is
+ * the case, we can't do fast IO because we will need
+ * a cl_io to issue the RPC.
+ */
+static bool ll_use_fast_io(struct file *file,
+                          struct ll_readahead_state *ras, pgoff_t index)
+{
+       unsigned long fast_read_pages =
+               max(RA_REMAIN_WINDOW_MIN, ras->ras_rpc_pages);
+       loff_t skip_pages;
+
+       if (stride_io_mode(ras)) {
+               skip_pages = (ras->ras_stride_length +
+                       ras->ras_stride_bytes - 1) / ras->ras_stride_bytes;
+               skip_pages *= fast_read_pages;
+       } else {
+               skip_pages = fast_read_pages;
+       }
+
+       if (ras->ras_window_start_idx + ras->ras_window_pages <
+           ras->ras_next_readahead_idx + skip_pages ||
+           kickoff_async_readahead(file, fast_read_pages) > 0)
+               return true;
+
+       return false;
+}
+
 int ll_readpage(struct file *file, struct page *vmpage)
 {
        struct inode *inode = file_inode(file);
@@ -1491,11 +1543,9 @@ int ll_readpage(struct file *file, struct page *vmpage)
 
        if (io == NULL) { /* fast read */
                struct inode *inode = file_inode(file);
-               struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+               struct ll_file_data *fd = file->private_data;
                struct ll_readahead_state *ras = &fd->fd_ras;
                struct lu_env  *local_env = NULL;
-               unsigned long fast_read_pages =
-                       max(RA_REMAIN_WINDOW_MIN, ras->ras_rpc_pages);
                struct vvp_page *vpg;
 
                result = -ENODATA;
@@ -1523,12 +1573,7 @@ int ll_readpage(struct file *file, struct page *vmpage)
                        /* avoid duplicate ras_update() call */
                        vpg->vpg_ra_updated = 1;
 
-                       /* Check if we can issue a readahead RPC, if that is
-                        * the case, we can't do fast IO because we will need
-                        * a cl_io to issue the RPC. */
-                       if (ras->ras_window_start_idx + ras->ras_window_pages <
-                           ras->ras_next_readahead_idx + fast_read_pages ||
-                           kickoff_async_readahead(file, fast_read_pages) > 0)
+                       if (ll_use_fast_io(file, ras, vvp_index(vpg)))
                                result = 0;
                }
 
@@ -1555,6 +1600,20 @@ int ll_readpage(struct file *file, struct page *vmpage)
                RETURN(result);
        }
 
+       /**
+        * Direct read can fall back to buffered read, but DIO is done
+        * with lockless i/o, and buffered requires LDLM locking, so in
+        * this case we must restart without lockless.
+        */
+       if (file->f_flags & O_DIRECT &&
+           lcc && lcc->lcc_type == LCC_RW &&
+           !io->ci_ignore_lockless) {
+               unlock_page(vmpage);
+               io->ci_ignore_lockless = 1;
+               io->ci_need_restart = 1;
+               RETURN(-ENOLCK);
+       }
+
        LASSERT(io->ci_state == CIS_IO_GOING);
        page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
        if (!IS_ERR(page)) {