Whamcloud - gitweb
LU-3259 clio: Revise read ahead implementation 59/10859/17
authorJinshan Xiong <jinshan.xiong@intel.com>
Fri, 16 Jan 2015 19:23:38 +0000 (11:23 -0800)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 12 Mar 2015 03:07:07 +0000 (03:07 +0000)
In this implementation, read ahead will hold the underlying DLM lock
to add read ahead pages. A new cl_io operation cio_read_ahead() is
added for this purpose. It takes parameter cl_read_ahead{} so that
each layer can adjust it by their own requirements. For example, at
OSC layer, it will make sure the read ahead region is covered by a
LDLM lock; at the LOV layer, it will make sure that the region won't
cross stripe boundary.

Legacy callback cpo_is_under_lock() is removed.

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: Ic388e3a3f744ea5a8352cc8529e32a71073bddb3
Reviewed-on: http://review.whamcloud.com/10859
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Reviewed-by: Bobi Jam <bobijam@hotmail.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
15 files changed:
lustre/doc/clio.txt
lustre/include/cl_object.h
lustre/llite/llite_internal.h
lustre/llite/rw.c
lustre/llite/vvp_io.c
lustre/llite/vvp_page.c
lustre/lov/lov_io.c
lustre/lov/lov_page.c
lustre/obdclass/cl_io.c
lustre/obdclass/cl_page.c
lustre/osc/osc_cache.c
lustre/osc/osc_internal.h
lustre/osc/osc_io.c
lustre/osc/osc_lock.c
lustre/osc/osc_page.c

index 31f9d42..3590c8c 100644 (file)
@@ -1264,10 +1264,16 @@ generic_file_{read,write}() function.
 
 In the case of read, generic_file_read() calls for every non-up-to-date page
 the a_ops->readpage() method that eventually (after obtaining cl_page
-corresponding to the VM page supplied to it) calls cl_io_read_page() which in
-turn calls cl_io_operations::cio_read_page().
-
-vvp_io_read_page() populates a queue by a target page and pages from read-ahead
+corresponding to the VM page supplied to it) calls ll_io_read_page() where it
+decides if it's necessary to read ahead more pages by calling ll_readahead().
+The number of pages to be read ahead is determined by the read pattern, also
+it will factor in the requirements from different layers in CLIO stack, for
+example, stripe alignment on the LOV layer and DLM lock coverage on the OSC
+layer. The callback ->cio_read_ahead() is used to gather the requirements from
+each layer. Please refer to lov_io_read_ahead() and osc_io_read_ahead() for
+details.
+
+ll_readahead() populates a queue by a target page and pages from read-ahead
 window. The resulting queue is then submitted for immediate transfer by calling
 cl_io_submit_rw() which ends up calling osc_io_submit_page() for every
 not-up-to-date page in the queue.
index 09828e1..7c72c41 100644 (file)
@@ -894,26 +894,6 @@ struct cl_page_operations {
         /** Destructor. Frees resources and slice itself. */
         void (*cpo_fini)(const struct lu_env *env,
                          struct cl_page_slice *slice);
-
-        /**
-         * Checks whether the page is protected by a cl_lock. This is a
-         * per-layer method, because certain layers have ways to check for the
-         * lock much more efficiently than through the generic locks scan, or
-         * implement locking mechanisms separate from cl_lock, e.g.,
-         * LL_FILE_GROUP_LOCKED in vvp. If \a pending is true, check for locks
-         * being canceled, or scheduled for cancellation as soon as the last
-         * user goes away, too.
-         *
-         * \retval    -EBUSY: page is protected by a lock of a given mode;
-         * \retval  -ENODATA: page is not protected by a lock;
-         * \retval         0: this layer cannot decide.
-         *
-         * \see cl_page_is_under_lock()
-         */
-        int (*cpo_is_under_lock)(const struct lu_env *env,
-                                const struct cl_page_slice *slice,
-                                struct cl_io *io, pgoff_t *max);
-
         /**
          * Optional debugging helper. Prints given page slice.
          *
@@ -1371,7 +1351,6 @@ struct cl_2queue {
  *     (3) sort all locks to avoid dead-locks, and acquire them
  *
  *     (4) process the chunk: call per-page methods
- *         (cl_io_operations::cio_read_page() for read,
  *         cl_io_operations::cio_prepare_write(),
  *         cl_io_operations::cio_commit_write() for write)
  *
@@ -1472,7 +1451,28 @@ struct cl_io_slice {
 };
 
 typedef void (*cl_commit_cbt)(const struct lu_env *, struct cl_io *,
-                               struct cl_page *);
+                             struct cl_page *);
+
+struct cl_read_ahead {
+       /* Maximum page index the readahead window will end.
+        * This is determined DLM lock coverage, RPC and stripe boundary.
+        * cra_end is included. */
+       pgoff_t cra_end;
+       /* Release routine. If readahead holds resources underneath, this
+        * function should be called to release it. */
+       void    (*cra_release)(const struct lu_env *env, void *cbdata);
+       /* Callback data for cra_release routine */
+       void    *cra_cbdata;
+};
+
+static inline void cl_read_ahead_release(const struct lu_env *env,
+                                        struct cl_read_ahead *ra)
+{
+       if (ra->cra_release != NULL)
+               ra->cra_release(env, ra->cra_cbdata);
+       memset(ra, 0, sizeof(*ra));
+}
+
 
 /**
  * Per-layer io operations.
@@ -1579,17 +1579,14 @@ struct cl_io_operations {
                        const struct cl_io_slice *slice,
                        struct cl_page_list *queue, int from, int to,
                        cl_commit_cbt cb);
-        /**
-         * Read missing page.
-         *
-         * Called by a top-level cl_io_operations::op[CIT_READ]::cio_start()
-         * method, when it hits not-up-to-date page in the range. Optional.
-         *
-         * \pre io->ci_type == CIT_READ
-         */
-        int (*cio_read_page)(const struct lu_env *env,
-                             const struct cl_io_slice *slice,
-                             const struct cl_page_slice *page);
+       /**
+        * Decide maximum read ahead extent
+        *
+        * \pre io->ci_type == CIT_READ
+        */
+       int (*cio_read_ahead)(const struct lu_env *env,
+                             const struct cl_io_slice *slice,
+                             pgoff_t start, struct cl_read_ahead *ra);
         /**
          * Optional debugging helper. Print given io slice.
          */
@@ -2306,8 +2303,6 @@ int     cl_page_is_vmlocked(const struct lu_env *env,
                            const struct cl_page *pg);
 void    cl_page_export(const struct lu_env *env,
                       struct cl_page *pg, int uptodate);
-int     cl_page_is_under_lock(const struct lu_env *env, struct cl_io *io,
-                             struct cl_page *page, pgoff_t *max_index);
 loff_t  cl_offset(const struct cl_object *obj, pgoff_t idx);
 pgoff_t cl_index(const struct cl_object *obj, loff_t offset);
 size_t  cl_page_size(const struct cl_object *obj);
@@ -2413,8 +2408,6 @@ int   cl_io_lock_add     (const struct lu_env *env, struct cl_io *io,
                           struct cl_io_lock_link *link);
 int   cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
                            struct cl_lock_descr *descr);
-int   cl_io_read_page    (const struct lu_env *env, struct cl_io *io,
-                          struct cl_page *page);
 int   cl_io_submit_rw    (const struct lu_env *env, struct cl_io *io,
                          enum cl_req_type iot, struct cl_2queue *queue);
 int   cl_io_submit_sync  (const struct lu_env *env, struct cl_io *io,
@@ -2423,6 +2416,8 @@ int   cl_io_submit_sync  (const struct lu_env *env, struct cl_io *io,
 int   cl_io_commit_async (const struct lu_env *env, struct cl_io *io,
                          struct cl_page_list *queue, int from, int to,
                          cl_commit_cbt cb);
+int   cl_io_read_ahead   (const struct lu_env *env, struct cl_io *io,
+                         pgoff_t start, struct cl_read_ahead *ra);
 void  cl_io_rw_advance   (const struct lu_env *env, struct cl_io *io,
                           size_t nob);
 int   cl_io_cancel       (const struct lu_env *env, struct cl_io *io,
index 7bbcd5b..83b26e3 100644 (file)
@@ -779,9 +779,7 @@ int ll_writepage(struct page *page, struct writeback_control *wbc);
 int ll_writepages(struct address_space *, struct writeback_control *wbc);
 int ll_readpage(struct file *file, struct page *page);
 void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras);
-int ll_readahead(const struct lu_env *env, struct cl_io *io,
-                struct cl_page_list *queue, struct ll_readahead_state *ras,
-                bool hit);
+int vvp_io_write_commit(const struct lu_env *env, struct cl_io *io);
 struct ll_cl_context *ll_cl_find(struct file *file);
 void ll_cl_add(struct file *file, const struct lu_env *env, struct cl_io *io);
 void ll_cl_remove(struct file *file, const struct lu_env *env);
@@ -1126,9 +1124,6 @@ extern struct lu_device_type vvp_device_type;
 int cl_sb_init(struct super_block *sb);
 int cl_sb_fini(struct super_block *sb);
 
-void ras_update(struct ll_sb_info *sbi, struct inode *inode,
-                struct ll_readahead_state *ras, unsigned long index,
-                unsigned hit);
 void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len);
 void ll_ra_stats_inc(struct inode *inode, enum ra_stat which);
 
index bf59ffe..17386e8 100644 (file)
@@ -191,107 +191,78 @@ void ll_ras_enter(struct file *f)
        spin_unlock(&ras->ras_lock);
 }
 
-static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io,
-                             struct cl_page_list *queue, struct cl_page *page,
-                             struct cl_object *clob, pgoff_t *max_index)
+/**
+ * Initiates read-ahead of a page with given index.
+ *
+ * \retval +ve: page was already uptodate so it will be skipped
+ *              from being added;
+ * \retval -ve: page wasn't added to \a queue for error;
+ * \retval   0: page was added into \a queue for read ahead.
+ */
+static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
+                             struct cl_page_list *queue, pgoff_t index)
 {
-       struct page *vmpage = page->cp_vmpage;
-       struct vvp_page *vpg;
-       int              rc;
-
+       struct cl_object *clob  = io->ci_obj;
+       struct inode     *inode = vvp_object_inode(clob);
+       struct page      *vmpage;
+       struct cl_page   *page;
+       struct vvp_page  *vpg;
+       enum ra_stat      which = _NR_RA_STAT; /* keep gcc happy */
+       int               rc    = 0;
+       const char       *msg   = NULL;
        ENTRY;
 
-       rc = 0;
-       cl_page_assume(env, io, page);
+       vmpage = grab_cache_page_nowait(inode->i_mapping, index);
+       if (vmpage == NULL) {
+               which = RA_STAT_FAILED_GRAB_PAGE;
+               msg   = "g_c_p_n failed";
+               GOTO(out, rc = -EBUSY);
+       }
+
+       /* Check if vmpage was truncated or reclaimed */
+       if (vmpage->mapping != inode->i_mapping) {
+               which = RA_STAT_WRONG_GRAB_PAGE;
+               msg   = "g_c_p_n returned invalid page";
+               GOTO(out, rc = -EBUSY);
+       }
+
+       page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
+       if (IS_ERR(page)) {
+               which = RA_STAT_FAILED_GRAB_PAGE;
+               msg   = "cl_page_find failed";
+               GOTO(out, rc = PTR_ERR(page));
+       }
+
        lu_ref_add(&page->cp_reference, "ra", current);
+       cl_page_assume(env, io, page);
        vpg = cl2vvp_page(cl_object_page_slice(clob, page));
        if (!vpg->vpg_defer_uptodate && !PageUptodate(vmpage)) {
-               CDEBUG(D_READA, "page index %lu, max_index: %lu\n",
-                      vvp_index(vpg), *max_index);
-               /* Disable the optimization on prefetching maximum readahead
-                * index because there is a race with lock cancellation. This
-                * optimization will be revived later.
-                * if (*max_index == 0 || vvp_index(vpg) > *max_index) */
-               rc = cl_page_is_under_lock(env, io, page, max_index);
-               if (rc == 0) {
-                       vpg->vpg_defer_uptodate = 1;
-                       vpg->vpg_ra_used = 0;
-                       cl_page_list_add(queue, page);
-                       rc = 1;
-               } else {
-                       cl_page_discard(env, io, page);
-                       rc = -ENOLCK;
-               }
+               vpg->vpg_defer_uptodate = 1;
+               vpg->vpg_ra_used = 0;
+               cl_page_list_add(queue, page);
        } else {
                /* skip completed pages */
                cl_page_unassume(env, io, page);
+               /* This page is already uptodate, returning a positive number
+                * to tell the callers about this */
+               rc = 1;
        }
+
        lu_ref_del(&page->cp_reference, "ra", current);
        cl_page_put(env, page);
-       RETURN(rc);
-}
-
-/**
- * Initiates read-ahead of a page with given index.
- *
- * \retval     +ve: page was added to \a queue.
- *
- * \retval -ENOLCK: there is no extent lock for this part of a file, stop
- *                  read-ahead.
- *
- * \retval  -ve, 0: page wasn't added to \a queue for other reason.
- */
-static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
-                             struct cl_page_list *queue,
-                             pgoff_t index, pgoff_t *max_index)
-{
-       struct cl_object *clob  = io->ci_obj;
-       struct inode     *inode = vvp_object_inode(clob);
-        struct page      *vmpage;
-        struct cl_page   *page;
-        enum ra_stat      which = _NR_RA_STAT; /* keep gcc happy */
-       gfp_t             gfp_mask;
-        int               rc    = 0;
-        const char       *msg   = NULL;
 
-        ENTRY;
-
-        gfp_mask = GFP_HIGHUSER & ~__GFP_WAIT;
-#ifdef __GFP_NOWARN
-        gfp_mask |= __GFP_NOWARN;
-#endif
-       vmpage = grab_cache_page_nowait(inode->i_mapping, index);
+out:
        if (vmpage != NULL) {
-               /* Check if vmpage was truncated or reclaimed */
-               if (vmpage->mapping == inode->i_mapping) {
-                       page = cl_page_find(env, clob, vmpage->index,
-                                           vmpage, CPT_CACHEABLE);
-                       if (!IS_ERR(page)) {
-                               rc = cl_read_ahead_page(env, io, queue,
-                                                       page, clob, max_index);
-                                if (rc == -ENOLCK) {
-                                        which = RA_STAT_FAILED_MATCH;
-                                        msg   = "lock match failed";
-                                }
-                        } else {
-                                which = RA_STAT_FAILED_GRAB_PAGE;
-                                msg   = "cl_page_find failed";
-                        }
-                } else {
-                        which = RA_STAT_WRONG_GRAB_PAGE;
-                        msg   = "g_c_p_n returned invalid page";
-                }
-                if (rc != 1)
-                        unlock_page(vmpage);
-                page_cache_release(vmpage);
-        } else {
-                which = RA_STAT_FAILED_GRAB_PAGE;
-                msg   = "g_c_p_n failed";
-        }
+               if (rc != 0)
+                       unlock_page(vmpage);
+               page_cache_release(vmpage);
+       }
        if (msg != NULL) {
                ll_ra_stats_inc(inode, which);
                CDEBUG(D_READA, "%s\n", msg);
+
        }
+
        RETURN(rc);
 }
 
@@ -394,15 +365,15 @@ static int ras_inside_ra_window(unsigned long idx, struct ra_io_arg *ria)
 }
 
 static int ll_read_ahead_pages(const struct lu_env *env,
-                               struct cl_io *io, struct cl_page_list *queue,
-                               struct ra_io_arg *ria,
-                               unsigned long *reserved_pages,
-                               unsigned long *ra_end)
+                              struct cl_io *io, struct cl_page_list *queue,
+                              struct ra_io_arg *ria,
+                              unsigned long *reserved_pages,
+                              pgoff_t *ra_end)
 {
+       struct cl_read_ahead ra = { 0 };
        int rc, count = 0;
        bool stride_ria;
        pgoff_t page_idx;
-       pgoff_t max_index = 0;
 
        LASSERT(ria != NULL);
        RIA_DEBUG(ria);
@@ -411,14 +382,24 @@ static int ll_read_ahead_pages(const struct lu_env *env,
        for (page_idx = ria->ria_start;
             page_idx <= ria->ria_end && *reserved_pages > 0; page_idx++) {
                if (ras_inside_ra_window(page_idx, ria)) {
+                       if (ra.cra_end == 0 || ra.cra_end < page_idx) {
+                               cl_read_ahead_release(env, &ra);
+
+                               rc = cl_io_read_ahead(env, io, page_idx, &ra);
+                               if (rc < 0)
+                                       break;
+
+                               LASSERTF(ra.cra_end >= page_idx,
+                                        "object: %p, indcies %lu / %lu\n",
+                                        io->ci_obj, ra.cra_end, page_idx);
+                       }
+
                        /* If the page is inside the read-ahead window*/
-                       rc = ll_read_ahead_page(env, io, queue,
-                                               page_idx, &max_index);
-                        if (rc == 1) {
-                                (*reserved_pages)--;
-                                count++;
-                        } else if (rc == -ENOLCK)
-                                break;
+                       rc = ll_read_ahead_page(env, io, queue, page_idx);
+                       if (rc == 0) {
+                               (*reserved_pages)--;
+                               count++;
+                       }
                 } else if (stride_ria) {
                         /* If it is not in the read-ahead window, and it is
                          * read-ahead mode, then check whether it should skip
@@ -442,19 +423,22 @@ static int ll_read_ahead_pages(const struct lu_env *env,
                         }
                 }
         }
-        *ra_end = page_idx;
-        return count;
+
+       cl_read_ahead_release(env, &ra);
+
+       *ra_end = page_idx;
+       return count;
 }
 
-int ll_readahead(const struct lu_env *env, struct cl_io *io,
-                struct cl_page_list *queue, struct ll_readahead_state *ras,
-                bool hit)
+static int ll_readahead(const struct lu_env *env, struct cl_io *io,
+                       struct cl_page_list *queue,
+                       struct ll_readahead_state *ras, bool hit)
 {
        struct vvp_io *vio = vvp_env_io(env);
        struct vvp_thread_info *vti = vvp_env_info(env);
        struct cl_attr *attr = ccc_env_thread_attr(env);
-       unsigned long start = 0, end = 0, reserved;
-       unsigned long ra_end, len, mlen = 0;
+       unsigned long len, mlen = 0, reserved;
+       pgoff_t ra_end, start = 0, end = 0;
        struct inode *inode;
        struct ra_io_arg *ria = &vti->vti_ria;
        struct cl_object *clob;
@@ -591,8 +575,8 @@ int ll_readahead(const struct lu_env *env, struct cl_io *io,
         * next read-ahead tries from where we left off.  we only do so
         * if the region we failed to issue read-ahead on is still ahead
         * of the app and behind the next index to start read-ahead from */
-       CDEBUG(D_READA, "ra_end %lu end %lu stride end %lu \n",
-              ra_end, end, ria->ria_end);
+       CDEBUG(D_READA, "ra_end = %lu end = %lu stride end = %lu pages = %d\n",
+              ra_end, end, ria->ria_end, ret);
 
        if (ra_end != end + 1) {
                ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
@@ -758,9 +742,9 @@ static void ras_increase_window(struct inode *inode,
                                          ra->ra_max_pages_per_file);
 }
 
-void ras_update(struct ll_sb_info *sbi, struct inode *inode,
-               struct ll_readahead_state *ras, unsigned long index,
-               unsigned hit)
+static void ras_update(struct ll_sb_info *sbi, struct inode *inode,
+                      struct ll_readahead_state *ras, unsigned long index,
+                      unsigned hit)
 {
        struct ll_ra_info *ra = &sbi->ll_ra_info;
        int zero = 0, stride_detect = 0, ra_miss = 0;
@@ -1101,6 +1085,57 @@ void ll_cl_remove(struct file *file, const struct lu_env *env)
        write_unlock(&fd->fd_lock);
 }
 
+static int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
+                          struct cl_page *page)
+{
+       struct inode              *inode  = vvp_object_inode(page->cp_obj);
+       struct ll_sb_info         *sbi    = ll_i2sbi(inode);
+       struct ll_file_data       *fd     = vvp_env_io(env)->vui_fd;
+       struct ll_readahead_state *ras    = &fd->fd_ras;
+       struct cl_2queue          *queue  = &io->ci_queue;
+       struct vvp_page           *vpg;
+       int                        rc = 0;
+       ENTRY;
+
+       vpg = cl2vvp_page(cl_object_page_slice(page->cp_obj, page));
+       if (sbi->ll_ra_info.ra_max_pages_per_file > 0 &&
+           sbi->ll_ra_info.ra_max_pages > 0)
+               ras_update(sbi, inode, ras, vvp_index(vpg),
+                          vpg->vpg_defer_uptodate);
+
+       if (vpg->vpg_defer_uptodate) {
+               vpg->vpg_ra_used = 1;
+               cl_page_export(env, page, 1);
+       }
+
+       cl_2queue_init(queue);
+       /*
+        * Add page into the queue even when it is marked uptodate above.
+        * this will unlock it automatically as part of cl_page_list_disown().
+        */
+       cl_2queue_add(queue, page);
+       if (sbi->ll_ra_info.ra_max_pages_per_file > 0 &&
+           sbi->ll_ra_info.ra_max_pages > 0) {
+               int rc2;
+
+               rc2 = ll_readahead(env, io, &queue->c2_qin, ras,
+                                  vpg->vpg_defer_uptodate);
+               CDEBUG(D_READA, DFID "%d pages read ahead at %lu\n",
+                      PFID(ll_inode2fid(inode)), rc2, vvp_index(vpg));
+       }
+
+       if (queue->c2_qin.pl_nr > 0)
+               rc = cl_io_submit_rw(env, io, CRT_READ, queue);
+
+       /*
+        * Unlock unsent pages in case of error.
+        */
+       cl_page_list_disown(env, io, &queue->c2_qin);
+       cl_2queue_fini(env, queue);
+
+       RETURN(rc);
+}
+
 int ll_readpage(struct file *file, struct page *vmpage)
 {
        struct cl_object *clob = ll_i2info(file->f_dentry->d_inode)->lli_clob;
@@ -1126,7 +1161,7 @@ int ll_readpage(struct file *file, struct page *vmpage)
                LASSERT(page->cp_type == CPT_CACHEABLE);
                if (likely(!PageUptodate(vmpage))) {
                        cl_page_assume(env, io, page);
-                       result = cl_io_read_page(env, io, page);
+                       result = ll_io_read_page(env, io, page);
                } else {
                        /* Page from a non-object file. */
                        unlock_page(vmpage);
index e6605d8..3e75ffd 100644 (file)
@@ -1288,42 +1288,24 @@ static int vvp_io_fsync_start(const struct lu_env *env,
        return 0;
 }
 
-static int vvp_io_read_page(const struct lu_env *env,
-                            const struct cl_io_slice *ios,
-                            const struct cl_page_slice *slice)
+static int vvp_io_read_ahead(const struct lu_env *env,
+                            const struct cl_io_slice *ios,
+                            pgoff_t start, struct cl_read_ahead *ra)
 {
-       struct cl_io              *io     = ios->cis_io;
-       struct vvp_page           *vpg    = cl2vvp_page(slice);
-       struct cl_page            *page   = slice->cpl_page;
-       struct inode              *inode  = vvp_object_inode(slice->cpl_obj);
-       struct ll_sb_info         *sbi    = ll_i2sbi(inode);
-       struct ll_file_data       *fd     = cl2vvp_io(env, ios)->vui_fd;
-       struct ll_readahead_state *ras    = &fd->fd_ras;
-       struct cl_2queue          *queue  = &io->ci_queue;
-
+       int result = 0;
        ENTRY;
 
-       if (sbi->ll_ra_info.ra_max_pages_per_file > 0 &&
-           sbi->ll_ra_info.ra_max_pages > 0)
-               ras_update(sbi, inode, ras, vvp_index(vpg),
-                          vpg->vpg_defer_uptodate);
+       if (ios->cis_io->ci_type == CIT_READ ||
+           ios->cis_io->ci_type == CIT_FAULT) {
+               struct vvp_io *vio = cl2vvp_io(env, ios);
 
-       if (vpg->vpg_defer_uptodate) {
-               vpg->vpg_ra_used = 1;
-               cl_page_export(env, page, 1);
+               if (unlikely(vio->vui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
+                       ra->cra_end = CL_PAGE_EOF;
+                       result = +1; /* no need to call down */
+               }
        }
 
-       /*
-        * Add page into the queue even when it is marked uptodate above.
-        * this will unlock it automatically as part of cl_page_list_disown().
-        */
-       cl_2queue_add(queue, page);
-       if (sbi->ll_ra_info.ra_max_pages_per_file > 0 &&
-           sbi->ll_ra_info.ra_max_pages > 0)
-               ll_readahead(env, io, &queue->c2_qin, ras,
-                            vpg->vpg_defer_uptodate);
-
-       RETURN(0);
+       RETURN(result);
 }
 
 static void vvp_io_end(const struct lu_env *env, const struct cl_io_slice *ios)
@@ -1370,7 +1352,7 @@ static const struct cl_io_operations vvp_io_ops = {
                         .cio_fini   = vvp_io_fini
                 }
         },
-        .cio_read_page     = vvp_io_read_page,
+       .cio_read_ahead = vvp_io_read_ahead
 };
 
 int vvp_io_init(const struct lu_env *env, struct cl_object *obj,
index 8cc68ef..56ee513 100644 (file)
@@ -346,23 +346,6 @@ static int vvp_page_make_ready(const struct lu_env *env,
        RETURN(result);
 }
 
-static int vvp_page_is_under_lock(const struct lu_env *env,
-                                 const struct cl_page_slice *slice,
-                                 struct cl_io *io, pgoff_t *max_index)
-{
-       ENTRY;
-
-       if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
-           io->ci_type == CIT_FAULT) {
-               struct vvp_io *vio = vvp_env_io(env);
-
-               if (unlikely(vio->vui_fd->fd_flags & LL_FILE_GROUP_LOCKED))
-                       *max_index = CL_PAGE_EOF;
-       }
-       RETURN(0);
-}
-
-
 static int vvp_page_print(const struct lu_env *env,
                          const struct cl_page_slice *slice,
                          void *cookie, lu_printer_t printer)
@@ -409,7 +392,6 @@ static const struct cl_page_operations vvp_page_ops = {
        .cpo_is_vmlocked   = vvp_page_is_vmlocked,
        .cpo_fini          = vvp_page_fini,
        .cpo_print         = vvp_page_print,
-       .cpo_is_under_lock = vvp_page_is_under_lock,
        .io = {
                [CRT_READ] = {
                        .cpo_prep       = vvp_page_prep_read,
@@ -520,7 +502,6 @@ static const struct cl_page_operations vvp_transient_page_ops = {
        .cpo_fini               = vvp_transient_page_fini,
        .cpo_is_vmlocked        = vvp_transient_page_is_vmlocked,
        .cpo_print              = vvp_page_print,
-       .cpo_is_under_lock      = vvp_page_is_under_lock,
        .io = {
                [CRT_READ] = {
                        .cpo_prep       = vvp_transient_page_prep,
index 8448721..a24a984 100644 (file)
@@ -578,6 +578,65 @@ static void lov_io_unlock(const struct lu_env *env,
         EXIT;
 }
 
+static int lov_io_read_ahead(const struct lu_env *env,
+                            const struct cl_io_slice *ios,
+                            pgoff_t start, struct cl_read_ahead *ra)
+{
+       struct lov_io           *lio = cl2lov_io(env, ios);
+       struct lov_object       *loo = lio->lis_object;
+       struct cl_object        *obj = lov2cl(loo);
+       struct lov_layout_raid0 *r0 = lov_r0(loo);
+       struct lov_io_sub       *sub;
+       obd_off                  suboff;
+       pgoff_t                  ra_end;
+       unsigned int             pps; /* pages per stripe */
+       int                      stripe;
+       int                      rc;
+       ENTRY;
+
+       stripe = lov_stripe_number(loo->lo_lsm, cl_offset(obj, start));
+       if (unlikely(r0->lo_sub[stripe] == NULL))
+               RETURN(-EIO);
+
+       sub = lov_sub_get(env, lio, stripe);
+
+       lov_stripe_offset(loo->lo_lsm, cl_offset(obj, start), stripe, &suboff);
+       rc = cl_io_read_ahead(sub->sub_env, sub->sub_io,
+                             cl_index(lovsub2cl(r0->lo_sub[stripe]), suboff),
+                             ra);
+       lov_sub_put(sub);
+
+       CDEBUG(D_READA, DFID " cra_end = %lu, stripes = %d, rc = %d\n",
+              PFID(lu_object_fid(lov2lu(loo))), ra->cra_end, r0->lo_nr, rc);
+       if (rc != 0)
+               RETURN(rc);
+
+       /**
+        * Adjust the stripe index by layout of raid0. ra->cra_end is the maximum
+        * page index covered by an underlying DLM lock.
+        * This function converts cra_end from stripe level to file level, and
+        * make sure it's not beyond stripe boundary.
+        */
+       if (r0->lo_nr == 1) /* single stripe file */
+               RETURN(0);
+
+       /* cra_end is stripe level, convert it into file level */
+       ra_end = ra->cra_end;
+       if (ra_end != CL_PAGE_EOF)
+               ra_end = lov_stripe_pgoff(loo->lo_lsm, ra_end, stripe);
+
+       pps = loo->lo_lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
+
+       CDEBUG(D_READA, DFID " max_index = %lu, pps = %u, "
+              "stripe_size = %u, stripe no = %u, start index = %lu\n",
+              PFID(lu_object_fid(lov2lu(loo))), ra_end, pps,
+              loo->lo_lsm->lsm_stripe_size, stripe, start);
+
+       /* never exceed the end of the stripe */
+       ra->cra_end = min_t(pgoff_t, ra_end, start + pps - start % pps - 1);
+       RETURN(0);
+}
+
 /**
  * lov implementation of cl_operations::cio_submit() method. It takes a list
  * of pages in \a queue, splits it into per-stripe sub-lists, invokes
@@ -826,6 +885,7 @@ static const struct cl_io_operations lov_io_ops = {
                        .cio_fini      = lov_io_fini
                }
        },
+       .cio_read_ahead                = lov_io_read_ahead,
        .cio_submit                    = lov_io_submit,
        .cio_commit_async              = lov_io_commit_async,
 };
index 121ad67..cbfc245 100644 (file)
  *
  */
 
-/**
- * Adjust the stripe index by layout of raid0. @max_index is the maximum
- * page index covered by an underlying DLM lock.
- * This function converts max_index from stripe level to file level, and make
- * sure it's not beyond one stripe.
- */
-static int lov_raid0_page_is_under_lock(const struct lu_env *env,
-                                       const struct cl_page_slice *slice,
-                                       struct cl_io *unused,
-                                       pgoff_t *max_index)
-{
-       struct lov_object *loo = cl2lov(slice->cpl_obj);
-       struct lov_layout_raid0 *r0 = lov_r0(loo);
-       pgoff_t index = *max_index;
-       unsigned int pps; /* pages per stripe */
-       ENTRY;
-
-       CDEBUG(D_READA, DFID "*max_index = %lu, nr = %d\n",
-              PFID(lu_object_fid(lov2lu(loo))), index, r0->lo_nr);
-
-       if (index == 0) /* the page is not covered by any lock */
-               RETURN(0);
-
-       if (r0->lo_nr == 1) /* single stripe file */
-               RETURN(0);
-
-       /* max_index is stripe level, convert it into file level */
-       if (index != CL_PAGE_EOF) {
-               int stripeno = lov_page_stripe(slice->cpl_page);
-               *max_index = lov_stripe_pgoff(loo->lo_lsm, index, stripeno);
-       }
-
-       /* calculate the end of current stripe */
-       pps = loo->lo_lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
-       index = slice->cpl_index + pps - slice->cpl_index % pps - 1;
-
-       CDEBUG(D_READA, DFID "*max_index = %lu, index = %lu, pps = %u, "
-              "stripe_size = %u, stripe no = %u, page index = %lu\n",
-              PFID(lu_object_fid(lov2lu(loo))), *max_index, index, pps,
-              loo->lo_lsm->lsm_stripe_size, lov_page_stripe(slice->cpl_page),
-              slice->cpl_index);
-
-       /* never exceed the end of the stripe */
-       *max_index = min_t(pgoff_t, *max_index, index);
-       RETURN(0);
-}
-
 static int lov_raid0_page_print(const struct lu_env *env,
                                const struct cl_page_slice *slice,
                                void *cookie, lu_printer_t printer)
@@ -110,7 +63,6 @@ static int lov_raid0_page_print(const struct lu_env *env,
 }
 
 static const struct cl_page_operations lov_raid0_page_ops = {
-       .cpo_is_under_lock = lov_raid0_page_is_under_lock,
        .cpo_print = lov_raid0_page_print
 };
 
index d885932..2d8a11b 100644 (file)
@@ -606,68 +606,33 @@ void cl_io_end(const struct lu_env *env, struct cl_io *io)
 }
 EXPORT_SYMBOL(cl_io_end);
 
-static const struct cl_page_slice *
-cl_io_slice_page(const struct cl_io_slice *ios, struct cl_page *page)
-{
-        const struct cl_page_slice *slice;
-
-        slice = cl_page_at(page, ios->cis_obj->co_lu.lo_dev->ld_type);
-        LINVRNT(slice != NULL);
-        return slice;
-}
-
 /**
- * Called by read io, when page has to be read from the server.
+ * Called by read io, to decide the readahead extent
  *
- * \see cl_io_operations::cio_read_page()
+ * \see cl_io_operations::cio_read_ahead()
  */
-int cl_io_read_page(const struct lu_env *env, struct cl_io *io,
-                    struct cl_page *page)
+int cl_io_read_ahead(const struct lu_env *env, struct cl_io *io,
+                    pgoff_t start, struct cl_read_ahead *ra)
 {
-        const struct cl_io_slice *scan;
-        struct cl_2queue         *queue;
-        int                       result = 0;
-
-        LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_FAULT);
-        LINVRNT(cl_page_is_owned(page, io));
-        LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
-        LINVRNT(cl_io_invariant(io));
-        ENTRY;
+       const struct cl_io_slice *scan;
+       int                       result = 0;
 
-        queue = &io->ci_queue;
+       LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_FAULT);
+       LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
+       LINVRNT(cl_io_invariant(io));
+       ENTRY;
 
-        cl_2queue_init(queue);
-        /*
-         * ->cio_read_page() methods called in the loop below are supposed to
-         * never block waiting for network (the only subtle point is the
-         * creation of new pages for read-ahead that might result in cache
-         * shrinking, but currently only clean pages are shrunk and this
-         * requires no network io).
-         *
-         * Should this ever starts blocking, retry loop would be needed for
-         * "parallel io" (see CLO_REPEAT loops in cl_lock.c).
-         */
-        cl_io_for_each(scan, io) {
-                if (scan->cis_iop->cio_read_page != NULL) {
-                        const struct cl_page_slice *slice;
+       cl_io_for_each(scan, io) {
+               if (scan->cis_iop->cio_read_ahead == NULL)
+                       continue;
 
-                        slice = cl_io_slice_page(scan, page);
-                        LINVRNT(slice != NULL);
-                        result = scan->cis_iop->cio_read_page(env, scan, slice);
-                        if (result != 0)
-                                break;
-                }
-        }
-       if (result == 0 && queue->c2_qin.pl_nr > 0)
-               result = cl_io_submit_rw(env, io, CRT_READ, queue);
-        /*
-         * Unlock unsent pages in case of error.
-         */
-        cl_page_list_disown(env, io, &queue->c2_qin);
-        cl_2queue_fini(env, queue);
-        RETURN(result);
+               result = scan->cis_iop->cio_read_ahead(env, scan, start, ra);
+               if (result != 0)
+                       break;
+       }
+       RETURN(result > 0 ? 0 : result);
 }
-EXPORT_SYMBOL(cl_io_read_page);
+EXPORT_SYMBOL(cl_io_read_ahead);
 
 /**
  * Commit a list of contiguous pages into writeback cache.
index b361c23..da96ab2 100644 (file)
@@ -451,30 +451,6 @@ EXPORT_SYMBOL(cl_page_at);
        __result;                                                       \
 })
 
-#define CL_PAGE_INVOKE_REVERSE(_env, _page, _op, _proto, ...)          \
-({                                                                     \
-       const struct lu_env        *__env  = (_env);                    \
-       struct cl_page             *__page = (_page);                   \
-       const struct cl_page_slice *__scan;                             \
-       int                         __result;                           \
-       ptrdiff_t                   __op   = (_op);                     \
-       int                       (*__method)_proto;                    \
-                                                                       \
-       __result = 0;                                                   \
-       list_for_each_entry_reverse(__scan, &__page->cp_layers,         \
-                                   cpl_linkage) {                      \
-               __method = *(void **)((char *)__scan->cpl_ops +  __op); \
-               if (__method != NULL) {                                 \
-                       __result = (*__method)(__env, __scan, ## __VA_ARGS__); \
-                       if (__result != 0)                              \
-                               break;                                  \
-               }                                                       \
-       }                                                               \
-       if (__result > 0)                                               \
-               __result = 0;                                           \
-       __result;                                                       \
-})
-
 #define CL_PAGE_INVOID(_env, _page, _op, _proto, ...)                  \
 do {                                                                   \
        const struct lu_env        *__env  = (_env);                    \
@@ -1026,30 +1002,6 @@ int cl_page_flush(const struct lu_env *env, struct cl_io *io,
 EXPORT_SYMBOL(cl_page_flush);
 
 /**
- * Checks whether page is protected by any extent lock is at least required
- * mode.
- *
- * \return the same as in cl_page_operations::cpo_is_under_lock() method.
- * \see cl_page_operations::cpo_is_under_lock()
- */
-int cl_page_is_under_lock(const struct lu_env *env, struct cl_io *io,
-                          struct cl_page *page, pgoff_t *max_index)
-{
-       int rc;
-
-       PINVRNT(env, page, cl_page_invariant(page));
-
-       ENTRY;
-       rc = CL_PAGE_INVOKE_REVERSE(env, page, CL_PAGE_OP(cpo_is_under_lock),
-                                   (const struct lu_env *,
-                                    const struct cl_page_slice *,
-                                    struct cl_io *, pgoff_t *),
-                                   io, max_index);
-       RETURN(rc);
-}
-EXPORT_SYMBOL(cl_page_is_under_lock);
-
-/**
  * Tells transfer engine that only part of a page is to be transmitted.
  *
  * \see cl_page_operations::cpo_clip()
index 50c9a4e..73b1fad 100644 (file)
@@ -3115,7 +3115,8 @@ static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
                struct cl_page *page = ops->ops_cl.cpl_page;
 
                /* refresh non-overlapped index */
-               tmp = osc_dlmlock_at_pgoff(env, osc, index, 0, 0);
+               tmp = osc_dlmlock_at_pgoff(env, osc, index,
+                                          OSC_DAP_FL_TEST_LOCK);
                if (tmp != NULL) {
                        __u64 end = tmp->l_policy_data.l_extent.end;
                        /* Cache the first-non-overlapped index so as to skip
index c0d8d7c..9e44778 100644 (file)
@@ -206,10 +206,24 @@ int osc_quotactl(struct obd_device *unused, struct obd_export *exp,
 int osc_quotacheck(struct obd_device *unused, struct obd_export *exp,
                    struct obd_quotactl *oqctl);
 int osc_quota_poll_check(struct obd_export *exp, struct if_quotacheck *qchk);
-struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env,
-                                      struct osc_object *obj, pgoff_t index,
-                                      int pending, int canceling);
 void osc_inc_unstable_pages(struct ptlrpc_request *req);
 void osc_dec_unstable_pages(struct ptlrpc_request *req);
 bool osc_over_unstable_soft_limit(struct client_obd *cli);
+/**
+ * Bit flags for osc_dlm_lock_at_pageoff().
+ */
+enum osc_dap_flags {
+       /**
+        * Just check if the desired lock exists, it won't hold reference
+        * count on lock.
+        */
+       OSC_DAP_FL_TEST_LOCK = 1 << 0,
+       /**
+        * Return the lock even if it is being canceled.
+        */
+       OSC_DAP_FL_CANCELING = 1 << 1
+};
+struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env,
+                                      struct osc_object *obj, pgoff_t index,
+                                      enum osc_dap_flags flags);
 #endif /* OSC_INTERNAL_H */
index 24f86ab..de25635 100644 (file)
@@ -92,6 +92,45 @@ static void osc_io_fini(const struct lu_env *env, const struct cl_io_slice *io)
 {
 }
 
+static void osc_read_ahead_release(const struct lu_env *env,
+                                  void *cbdata)
+{
+       struct ldlm_lock *dlmlock = cbdata;
+       struct lustre_handle lockh;
+
+       ldlm_lock2handle(dlmlock, &lockh);
+       ldlm_lock_decref(&lockh, LCK_PR);
+       LDLM_LOCK_PUT(dlmlock);
+}
+
+static int osc_io_read_ahead(const struct lu_env *env,
+                            const struct cl_io_slice *ios,
+                            pgoff_t start, struct cl_read_ahead *ra)
+{
+       struct osc_object       *osc = cl2osc(ios->cis_obj);
+       struct ldlm_lock        *dlmlock;
+       int                     result = -ENODATA;
+       ENTRY;
+
+       dlmlock = osc_dlmlock_at_pgoff(env, osc, start, 0);
+       if (dlmlock != NULL) {
+               if (dlmlock->l_req_mode != LCK_PR) {
+                       struct lustre_handle lockh;
+                       ldlm_lock2handle(dlmlock, &lockh);
+                       ldlm_lock_addref(&lockh, LCK_PR);
+                       ldlm_lock_decref(&lockh, dlmlock->l_req_mode);
+               }
+
+               ra->cra_end = cl_index(osc2cl(osc),
+                                      dlmlock->l_policy_data.l_extent.end);
+               ra->cra_release = osc_read_ahead_release;
+               ra->cra_cbdata = dlmlock;
+               result = 0;
+       }
+
+       RETURN(result);
+}
+
 /**
  * An implementation of cl_io_operations::cio_io_submit() method for osc
  * layer. Iterates over pages in the in-queue, prepares each for io by calling
@@ -730,6 +769,7 @@ static const struct cl_io_operations osc_io_ops = {
                        .cio_fini   = osc_io_fini
                }
        },
+       .cio_read_ahead             = osc_io_read_ahead,
        .cio_submit                 = osc_io_submit,
        .cio_commit_async           = osc_io_commit_async
 };
@@ -804,7 +844,7 @@ static void osc_req_attr_set(const struct lu_env *env,
                                     struct cl_page, cp_flight);
                opg = osc_cl_page_osc(apage, NULL);
                lock = osc_dlmlock_at_pgoff(env, cl2osc(obj), osc_index(opg),
-                                           1, 1);
+                               OSC_DAP_FL_TEST_LOCK | OSC_DAP_FL_CANCELING);
                if (lock == NULL && !opg->ops_srvlock) {
                        struct ldlm_resource *res;
                        struct ldlm_res_id *resname;
index 6c7d8df..64168b1 100644 (file)
@@ -1195,7 +1195,7 @@ int osc_lock_init(const struct lu_env *env,
  */
 struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env,
                                       struct osc_object *obj, pgoff_t index,
-                                      int pending, int canceling)
+                                      enum osc_dap_flags dap_flags)
 {
        struct osc_thread_info *info = osc_env_info(env);
        struct ldlm_res_id     *resname = &info->oti_resname;
@@ -1211,9 +1211,9 @@ struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env,
        osc_index2policy(policy, osc2cl(obj), index, index);
        policy->l_extent.gid = LDLM_GID_ANY;
 
-       flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
-       if (pending)
-               flags |= LDLM_FL_CBPENDING;
+       flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
+       if (dap_flags & OSC_DAP_FL_TEST_LOCK)
+               flags |= LDLM_FL_TEST_LOCK;
        /*
         * It is fine to match any group lock since there could be only one
         * with a uniq gid and it conflicts with all other lock modes too
@@ -1221,7 +1221,8 @@ struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env,
 again:
        mode = ldlm_lock_match(osc_export(obj)->exp_obd->obd_namespace,
                               flags, resname, LDLM_EXTENT, policy,
-                              LCK_PR | LCK_PW | LCK_GROUP, &lockh, canceling);
+                              LCK_PR | LCK_PW | LCK_GROUP, &lockh,
+                              dap_flags & OSC_DAP_FL_CANCELING);
        if (mode != 0) {
                lock = ldlm_handle2lock(&lockh);
                /* RACE: the lock is cancelled so let's try again */
index 9f9f2d0..08eb661 100644 (file)
@@ -232,26 +232,6 @@ void osc_index2policy(ldlm_policy_data_t *policy, const struct cl_object *obj,
         policy->l_extent.end   = cl_offset(obj, end + 1) - 1;
 }
 
-static int osc_page_is_under_lock(const struct lu_env *env,
-                                 const struct cl_page_slice *slice,
-                                 struct cl_io *unused, pgoff_t *max_index)
-{
-       struct osc_page         *opg = cl2osc_page(slice);
-       struct ldlm_lock        *dlmlock;
-       int                     result = -ENODATA;
-
-       ENTRY;
-       dlmlock = osc_dlmlock_at_pgoff(env, cl2osc(slice->cpl_obj),
-                                      osc_index(opg), 1, 0);
-       if (dlmlock != NULL) {
-               *max_index = cl_index(slice->cpl_obj,
-                                     dlmlock->l_policy_data.l_extent.end);
-               LDLM_LOCK_PUT(dlmlock);
-               result = 0;
-       }
-       RETURN(result);
-}
-
 static const char *osc_list(struct list_head *head)
 {
        return list_empty(head) ? "-" : "+";
@@ -406,7 +386,6 @@ static int osc_page_flush(const struct lu_env *env,
 static const struct cl_page_operations osc_page_ops = {
        .cpo_print         = osc_page_print,
        .cpo_delete        = osc_page_delete,
-       .cpo_is_under_lock = osc_page_is_under_lock,
        .cpo_clip           = osc_page_clip,
        .cpo_cancel         = osc_page_cancel,
        .cpo_flush          = osc_page_flush