Whamcloud - gitweb
LU-3259 clio: Revise read ahead implementation
[fs/lustre-release.git] / lustre / llite / rw.c
index bf59ffe..17386e8 100644 (file)
@@ -191,107 +191,78 @@ void ll_ras_enter(struct file *f)
        spin_unlock(&ras->ras_lock);
 }
 
-static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io,
-                             struct cl_page_list *queue, struct cl_page *page,
-                             struct cl_object *clob, pgoff_t *max_index)
+/**
+ * Initiates read-ahead of a page with given index.
+ *
+ * \retval +ve: page was already uptodate so it will be skipped
+ *              from being added;
+ * \retval -ve: page wasn't added to \a queue for error;
+ * \retval   0: page was added into \a queue for read ahead.
+ */
+static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
+                             struct cl_page_list *queue, pgoff_t index)
 {
-       struct page *vmpage = page->cp_vmpage;
-       struct vvp_page *vpg;
-       int              rc;
-
+       struct cl_object *clob  = io->ci_obj;
+       struct inode     *inode = vvp_object_inode(clob);
+       struct page      *vmpage;
+       struct cl_page   *page;
+       struct vvp_page  *vpg;
+       enum ra_stat      which = _NR_RA_STAT; /* keep gcc happy */
+       int               rc    = 0;
+       const char       *msg   = NULL;
        ENTRY;
 
-       rc = 0;
-       cl_page_assume(env, io, page);
+       vmpage = grab_cache_page_nowait(inode->i_mapping, index);
+       if (vmpage == NULL) {
+               which = RA_STAT_FAILED_GRAB_PAGE;
+               msg   = "g_c_p_n failed";
+               GOTO(out, rc = -EBUSY);
+       }
+
+       /* Check if vmpage was truncated or reclaimed */
+       if (vmpage->mapping != inode->i_mapping) {
+               which = RA_STAT_WRONG_GRAB_PAGE;
+               msg   = "g_c_p_n returned invalid page";
+               GOTO(out, rc = -EBUSY);
+       }
+
+       page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
+       if (IS_ERR(page)) {
+               which = RA_STAT_FAILED_GRAB_PAGE;
+               msg   = "cl_page_find failed";
+               GOTO(out, rc = PTR_ERR(page));
+       }
+
        lu_ref_add(&page->cp_reference, "ra", current);
+       cl_page_assume(env, io, page);
        vpg = cl2vvp_page(cl_object_page_slice(clob, page));
        if (!vpg->vpg_defer_uptodate && !PageUptodate(vmpage)) {
-               CDEBUG(D_READA, "page index %lu, max_index: %lu\n",
-                      vvp_index(vpg), *max_index);
-               /* Disable the optimization on prefetching maximum readahead
-                * index because there is a race with lock cancellation. This
-                * optimization will be revived later.
-                * if (*max_index == 0 || vvp_index(vpg) > *max_index) */
-               rc = cl_page_is_under_lock(env, io, page, max_index);
-               if (rc == 0) {
-                       vpg->vpg_defer_uptodate = 1;
-                       vpg->vpg_ra_used = 0;
-                       cl_page_list_add(queue, page);
-                       rc = 1;
-               } else {
-                       cl_page_discard(env, io, page);
-                       rc = -ENOLCK;
-               }
+               vpg->vpg_defer_uptodate = 1;
+               vpg->vpg_ra_used = 0;
+               cl_page_list_add(queue, page);
        } else {
                /* skip completed pages */
                cl_page_unassume(env, io, page);
+               /* This page is already uptodate, returning a positive number
+                * to tell the callers about this */
+               rc = 1;
        }
+
        lu_ref_del(&page->cp_reference, "ra", current);
        cl_page_put(env, page);
-       RETURN(rc);
-}
-
-/**
- * Initiates read-ahead of a page with given index.
- *
- * \retval     +ve: page was added to \a queue.
- *
- * \retval -ENOLCK: there is no extent lock for this part of a file, stop
- *                  read-ahead.
- *
- * \retval  -ve, 0: page wasn't added to \a queue for other reason.
- */
-static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
-                             struct cl_page_list *queue,
-                             pgoff_t index, pgoff_t *max_index)
-{
-       struct cl_object *clob  = io->ci_obj;
-       struct inode     *inode = vvp_object_inode(clob);
-        struct page      *vmpage;
-        struct cl_page   *page;
-        enum ra_stat      which = _NR_RA_STAT; /* keep gcc happy */
-       gfp_t             gfp_mask;
-        int               rc    = 0;
-        const char       *msg   = NULL;
 
-        ENTRY;
-
-        gfp_mask = GFP_HIGHUSER & ~__GFP_WAIT;
-#ifdef __GFP_NOWARN
-        gfp_mask |= __GFP_NOWARN;
-#endif
-       vmpage = grab_cache_page_nowait(inode->i_mapping, index);
+out:
        if (vmpage != NULL) {
-               /* Check if vmpage was truncated or reclaimed */
-               if (vmpage->mapping == inode->i_mapping) {
-                       page = cl_page_find(env, clob, vmpage->index,
-                                           vmpage, CPT_CACHEABLE);
-                       if (!IS_ERR(page)) {
-                               rc = cl_read_ahead_page(env, io, queue,
-                                                       page, clob, max_index);
-                                if (rc == -ENOLCK) {
-                                        which = RA_STAT_FAILED_MATCH;
-                                        msg   = "lock match failed";
-                                }
-                        } else {
-                                which = RA_STAT_FAILED_GRAB_PAGE;
-                                msg   = "cl_page_find failed";
-                        }
-                } else {
-                        which = RA_STAT_WRONG_GRAB_PAGE;
-                        msg   = "g_c_p_n returned invalid page";
-                }
-                if (rc != 1)
-                        unlock_page(vmpage);
-                page_cache_release(vmpage);
-        } else {
-                which = RA_STAT_FAILED_GRAB_PAGE;
-                msg   = "g_c_p_n failed";
-        }
+               if (rc != 0)
+                       unlock_page(vmpage);
+               page_cache_release(vmpage);
+       }
        if (msg != NULL) {
                ll_ra_stats_inc(inode, which);
                CDEBUG(D_READA, "%s\n", msg);
+
        }
+
        RETURN(rc);
 }
 
@@ -394,15 +365,15 @@ static int ras_inside_ra_window(unsigned long idx, struct ra_io_arg *ria)
 }
 
 static int ll_read_ahead_pages(const struct lu_env *env,
-                               struct cl_io *io, struct cl_page_list *queue,
-                               struct ra_io_arg *ria,
-                               unsigned long *reserved_pages,
-                               unsigned long *ra_end)
+                              struct cl_io *io, struct cl_page_list *queue,
+                              struct ra_io_arg *ria,
+                              unsigned long *reserved_pages,
+                              pgoff_t *ra_end)
 {
+       struct cl_read_ahead ra = { 0 };
        int rc, count = 0;
        bool stride_ria;
        pgoff_t page_idx;
-       pgoff_t max_index = 0;
 
        LASSERT(ria != NULL);
        RIA_DEBUG(ria);
@@ -411,14 +382,24 @@ static int ll_read_ahead_pages(const struct lu_env *env,
        for (page_idx = ria->ria_start;
             page_idx <= ria->ria_end && *reserved_pages > 0; page_idx++) {
                if (ras_inside_ra_window(page_idx, ria)) {
+                       if (ra.cra_end == 0 || ra.cra_end < page_idx) {
+                               cl_read_ahead_release(env, &ra);
+
+                               rc = cl_io_read_ahead(env, io, page_idx, &ra);
+                               if (rc < 0)
+                                       break;
+
+                               LASSERTF(ra.cra_end >= page_idx,
+                                        "object: %p, indcies %lu / %lu\n",
+                                        io->ci_obj, ra.cra_end, page_idx);
+                       }
+
                        /* If the page is inside the read-ahead window*/
-                       rc = ll_read_ahead_page(env, io, queue,
-                                               page_idx, &max_index);
-                        if (rc == 1) {
-                                (*reserved_pages)--;
-                                count++;
-                        } else if (rc == -ENOLCK)
-                                break;
+                       rc = ll_read_ahead_page(env, io, queue, page_idx);
+                       if (rc == 0) {
+                               (*reserved_pages)--;
+                               count++;
+                       }
                 } else if (stride_ria) {
                         /* If it is not in the read-ahead window, and it is
                          * read-ahead mode, then check whether it should skip
@@ -442,19 +423,22 @@ static int ll_read_ahead_pages(const struct lu_env *env,
                         }
                 }
         }
-        *ra_end = page_idx;
-        return count;
+
+       cl_read_ahead_release(env, &ra);
+
+       *ra_end = page_idx;
+       return count;
 }
 
-int ll_readahead(const struct lu_env *env, struct cl_io *io,
-                struct cl_page_list *queue, struct ll_readahead_state *ras,
-                bool hit)
+static int ll_readahead(const struct lu_env *env, struct cl_io *io,
+                       struct cl_page_list *queue,
+                       struct ll_readahead_state *ras, bool hit)
 {
        struct vvp_io *vio = vvp_env_io(env);
        struct vvp_thread_info *vti = vvp_env_info(env);
        struct cl_attr *attr = ccc_env_thread_attr(env);
-       unsigned long start = 0, end = 0, reserved;
-       unsigned long ra_end, len, mlen = 0;
+       unsigned long len, mlen = 0, reserved;
+       pgoff_t ra_end, start = 0, end = 0;
        struct inode *inode;
        struct ra_io_arg *ria = &vti->vti_ria;
        struct cl_object *clob;
@@ -591,8 +575,8 @@ int ll_readahead(const struct lu_env *env, struct cl_io *io,
         * next read-ahead tries from where we left off.  we only do so
         * if the region we failed to issue read-ahead on is still ahead
         * of the app and behind the next index to start read-ahead from */
-       CDEBUG(D_READA, "ra_end %lu end %lu stride end %lu \n",
-              ra_end, end, ria->ria_end);
+       CDEBUG(D_READA, "ra_end = %lu end = %lu stride end = %lu pages = %d\n",
+              ra_end, end, ria->ria_end, ret);
 
        if (ra_end != end + 1) {
                ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
@@ -758,9 +742,9 @@ static void ras_increase_window(struct inode *inode,
                                          ra->ra_max_pages_per_file);
 }
 
-void ras_update(struct ll_sb_info *sbi, struct inode *inode,
-               struct ll_readahead_state *ras, unsigned long index,
-               unsigned hit)
+static void ras_update(struct ll_sb_info *sbi, struct inode *inode,
+                      struct ll_readahead_state *ras, unsigned long index,
+                      unsigned hit)
 {
        struct ll_ra_info *ra = &sbi->ll_ra_info;
        int zero = 0, stride_detect = 0, ra_miss = 0;
@@ -1101,6 +1085,57 @@ void ll_cl_remove(struct file *file, const struct lu_env *env)
        write_unlock(&fd->fd_lock);
 }
 
+static int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
+                          struct cl_page *page)
+{
+       struct inode              *inode  = vvp_object_inode(page->cp_obj);
+       struct ll_sb_info         *sbi    = ll_i2sbi(inode);
+       struct ll_file_data       *fd     = vvp_env_io(env)->vui_fd;
+       struct ll_readahead_state *ras    = &fd->fd_ras;
+       struct cl_2queue          *queue  = &io->ci_queue;
+       struct vvp_page           *vpg;
+       int                        rc = 0;
+       ENTRY;
+
+       vpg = cl2vvp_page(cl_object_page_slice(page->cp_obj, page));
+       if (sbi->ll_ra_info.ra_max_pages_per_file > 0 &&
+           sbi->ll_ra_info.ra_max_pages > 0)
+               ras_update(sbi, inode, ras, vvp_index(vpg),
+                          vpg->vpg_defer_uptodate);
+
+       if (vpg->vpg_defer_uptodate) {
+               vpg->vpg_ra_used = 1;
+               cl_page_export(env, page, 1);
+       }
+
+       cl_2queue_init(queue);
+       /*
+        * Add page into the queue even when it is marked uptodate above.
+        * this will unlock it automatically as part of cl_page_list_disown().
+        */
+       cl_2queue_add(queue, page);
+       if (sbi->ll_ra_info.ra_max_pages_per_file > 0 &&
+           sbi->ll_ra_info.ra_max_pages > 0) {
+               int rc2;
+
+               rc2 = ll_readahead(env, io, &queue->c2_qin, ras,
+                                  vpg->vpg_defer_uptodate);
+               CDEBUG(D_READA, DFID "%d pages read ahead at %lu\n",
+                      PFID(ll_inode2fid(inode)), rc2, vvp_index(vpg));
+       }
+
+       if (queue->c2_qin.pl_nr > 0)
+               rc = cl_io_submit_rw(env, io, CRT_READ, queue);
+
+       /*
+        * Unlock unsent pages in case of error.
+        */
+       cl_page_list_disown(env, io, &queue->c2_qin);
+       cl_2queue_fini(env, queue);
+
+       RETURN(rc);
+}
+
 int ll_readpage(struct file *file, struct page *vmpage)
 {
        struct cl_object *clob = ll_i2info(file->f_dentry->d_inode)->lli_clob;
@@ -1126,7 +1161,7 @@ int ll_readpage(struct file *file, struct page *vmpage)
                LASSERT(page->cp_type == CPT_CACHEABLE);
                if (likely(!PageUptodate(vmpage))) {
                        cl_page_assume(env, io, page);
-                       result = cl_io_read_page(env, io, page);
+                       result = ll_io_read_page(env, io, page);
                } else {
                        /* Page from a non-object file. */
                        unlock_page(vmpage);