Whamcloud - gitweb
LU-10994 clio: remove cpo_own and cpo_disown 72/47372/4
authorJohn L. Hammond <jhammond@whamcloud.com>
Wed, 4 May 2022 20:44:03 +0000 (15:44 -0500)
committerOleg Drokin <green@whamcloud.com>
Mon, 27 Jun 2022 04:49:46 +0000 (04:49 +0000)
Remove the cpo_own and cpo_disown methods from struct
cl_page_operations. These methods were only implemented by the vvp
layer so they can be inlined into cl_page_own0() and
cl_page_disown(). Move most of vvp_page_discard() and all of
vvp_transient_page_discard() into cl_page_discard().

Signed-off-by: John L. Hammond <jhammond@whamcloud.com>
Change-Id: I3f156d6ca3e4ea11c050b2addda38e84a84634b9
Reviewed-on: https://review.whamcloud.com/47372
Reviewed-by: Patrick Farrell <pfarrell@whamcloud.com>
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Bobi Jam <bobijam@hotmail.com>
Reviewed-by: James Simmons <jsimmons@infradead.org>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/cl_object.h
lustre/llite/llite_lib.c
lustre/llite/rw.c
lustre/llite/rw26.c
lustre/llite/vvp_dev.c
lustre/llite/vvp_internal.h
lustre/llite/vvp_page.c
lustre/obdclass/cl_internal.h
lustre/obdclass/cl_io.c
lustre/obdclass/cl_page.c

index 37aca77..a5422af 100644 (file)
@@ -763,6 +763,9 @@ struct cl_page {
          * creation.
          */
        enum cl_page_type       cp_type:CP_TYPE_BITS;
+       unsigned                cp_defer_uptodate:1,
+                               cp_ra_updated:1,
+                               cp_ra_used:1;
        /* which slab kmem index this memory allocated from */
        short int               cp_kmem_index;
 
@@ -822,7 +825,7 @@ enum cl_req_type {
  *
  * Methods taking an \a io argument are for the activity happening in the
  * context of given \a io. Page is assumed to be owned by that io, except for
- * the obvious cases (like cl_page_operations::cpo_own()).
+ * the obvious cases.
  *
  * \see vvp_page_ops, lov_page_ops, osc_page_ops
  */
@@ -834,25 +837,6 @@ struct cl_page_operations {
          */
 
         /**
-         * Called when \a io acquires this page into the exclusive
-         * ownership. When this method returns, it is guaranteed that the is
-         * not owned by other io, and no transfer is going on against
-         * it. Optional.
-         *
-         * \see cl_page_own()
-         * \see vvp_page_own(), lov_page_own()
-         */
-        int  (*cpo_own)(const struct lu_env *env,
-                        const struct cl_page_slice *slice,
-                        struct cl_io *io, int nonblock);
-        /** Called when ownership it yielded. Optional.
-         *
-         * \see cl_page_disown()
-         * \see vvp_page_disown()
-         */
-        void (*cpo_disown)(const struct lu_env *env,
-                           const struct cl_page_slice *slice, struct cl_io *io);
-        /**
          * Called for a page that is already "owned" by \a io from VM point of
          * view. Optional.
          *
@@ -2525,7 +2509,7 @@ void cl_page_list_splice(struct cl_page_list *list,
 void cl_page_list_del(const struct lu_env *env,
                      struct cl_page_list *plist, struct cl_page *page);
 void cl_page_list_disown(const struct lu_env *env,
-                        struct cl_io *io, struct cl_page_list *plist);
+                        struct cl_page_list *plist);
 void cl_page_list_assume(const struct lu_env *env,
                         struct cl_io *io, struct cl_page_list *plist);
 void cl_page_list_discard(const struct lu_env *env,
@@ -2535,8 +2519,7 @@ void cl_page_list_fini(const struct lu_env *env, struct cl_page_list *plist);
 void cl_2queue_init(struct cl_2queue *queue);
 void cl_2queue_add(struct cl_2queue *queue, struct cl_page *page,
                   bool get_ref);
-void cl_2queue_disown(const struct lu_env *env, struct cl_io *io,
-                     struct cl_2queue *queue);
+void cl_2queue_disown(const struct lu_env *env, struct cl_2queue *queue);
 void cl_2queue_assume(const struct lu_env *env, struct cl_io *io,
                      struct cl_2queue *queue);
 void cl_2queue_discard(const struct lu_env *env, struct cl_io *io,
index a68983d..99f436d 100644 (file)
@@ -2055,7 +2055,7 @@ int ll_io_zero_page(struct inode *inode, pgoff_t index, pgoff_t offset,
 queuefini2:
                cl_2queue_discard(env, io, queue);
 queuefini1:
-               cl_2queue_disown(env, io, queue);
+               cl_2queue_disown(env, queue);
                cl_2queue_fini(env, queue);
        }
 
index d707a93..b93ed5f 100644 (file)
@@ -197,8 +197,7 @@ static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
        struct cl_object *clob  = io->ci_obj;
        struct inode     *inode = vvp_object_inode(clob);
        struct page      *vmpage = NULL;
-       struct cl_page   *page;
-       struct vvp_page  *vpg;
+       struct cl_page   *cp;
        enum ra_stat      which = _NR_RA_STAT; /* keep gcc happy */
        int               rc    = 0;
        const char       *msg   = NULL;
@@ -224,7 +223,7 @@ static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
                /* should not come here */
                GOTO(out, rc = -EINVAL);
        }
+
        /* Check if vmpage was truncated or reclaimed */
        if (vmpage->mapping != inode->i_mapping) {
                which = RA_STAT_WRONG_GRAB_PAGE;
@@ -232,32 +231,33 @@ static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
                GOTO(out, rc = -EBUSY);
        }
 
-       page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
-       if (IS_ERR(page)) {
+       cp = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
+       if (IS_ERR(cp)) {
                which = RA_STAT_FAILED_GRAB_PAGE;
                msg   = "cl_page_find failed";
-               GOTO(out, rc = PTR_ERR(page));
+               GOTO(out, rc = PTR_ERR(cp));
        }
 
-       lu_ref_add(&page->cp_reference, "ra", current);
-       cl_page_assume(env, io, page);
-       vpg = cl2vvp_page(cl_object_page_slice(clob, page));
-       if (!vpg->vpg_defer_uptodate && !PageUptodate(vmpage)) {
+       lu_ref_add(&cp->cp_reference, "ra", current);
+       cl_page_assume(env, io, cp);
+
+       if (!cp->cp_defer_uptodate && !PageUptodate(vmpage)) {
                if (hint == MAYNEED) {
-                       vpg->vpg_defer_uptodate = 1;
-                       vpg->vpg_ra_used = 0;
+                       cp->cp_defer_uptodate = 1;
+                       cp->cp_ra_used = 0;
                }
-               cl_page_list_add(queue, page, true);
+
+               cl_page_list_add(queue, cp, true);
        } else {
                /* skip completed pages */
-               cl_page_unassume(env, io, page);
+               cl_page_unassume(env, io, cp);
                /* This page is already uptodate, returning a positive number
                 * to tell the callers about this */
                rc = 1;
        }
 
-       lu_ref_del(&page->cp_reference, "ra", current);
-       cl_page_put(env, page);
+       lu_ref_del(&cp->cp_reference, "ra", current);
+       cl_page_put(env, cp);
 
 out:
        if (vmpage != NULL) {
@@ -685,7 +685,7 @@ static void ll_readahead_handle_work(struct work_struct *wq)
        cl_page_list_discard(env, io, &queue->c2_qin);
 
        /* Unlock unsent read pages in case of error. */
-       cl_page_list_disown(env, io, &queue->c2_qin);
+       cl_page_list_disown(env, &queue->c2_qin);
 
        cl_2queue_fini(env, queue);
 out_io_fini:
@@ -1648,9 +1648,9 @@ int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
                unlockpage = false;
 
        vpg = cl2vvp_page(cl_object_page_slice(page->cp_obj, page));
-       uptodate = vpg->vpg_defer_uptodate;
+       uptodate = page->cp_defer_uptodate;
 
-       if (ll_readahead_enabled(sbi) && !vpg->vpg_ra_updated && ras) {
+       if (ll_readahead_enabled(sbi) && !page->cp_ra_updated && ras) {
                enum ras_update_flags flags = 0;
 
                if (uptodate)
@@ -1662,7 +1662,7 @@ int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
 
        cl_2queue_init(queue);
        if (uptodate) {
-               vpg->vpg_ra_used = 1;
+               page->cp_ra_used = 1;
                SetPageUptodate(page->cp_vmpage);
                cl_page_disown(env, io, page);
        } else {
@@ -1738,7 +1738,7 @@ int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
        cl_page_list_discard(env, io, &queue->c2_qin);
 
        /* Unlock unsent read pages in case of error. */
-       cl_page_list_disown(env, io, &queue->c2_qin);
+       cl_page_list_disown(env, &queue->c2_qin);
 
        cl_2queue_fini(env, queue);
 
@@ -1879,7 +1879,7 @@ int ll_readpage(struct file *file, struct page *vmpage)
                }
 
                vpg = cl2vvp_page(cl_object_page_slice(page->cp_obj, page));
-               if (vpg->vpg_defer_uptodate) {
+               if (page->cp_defer_uptodate) {
                        enum ras_update_flags flags = LL_RAS_HIT;
 
                        if (lcc && lcc->lcc_type == LCC_MMAP)
@@ -1890,7 +1890,7 @@ int ll_readpage(struct file *file, struct page *vmpage)
                         * case will be handled by slow read later. */
                        ras_update(sbi, inode, ras, vvp_index(vpg), flags, io);
                        /* avoid duplicate ras_update() call */
-                       vpg->vpg_ra_updated = 1;
+                       page->cp_ra_updated = 1;
 
                        if (ll_use_fast_io(file, ras, vvp_index(vpg)))
                                result = 0;
@@ -1903,11 +1903,12 @@ int ll_readpage(struct file *file, struct page *vmpage)
 
                /* export the page and skip io stack */
                if (result == 0) {
-                       vpg->vpg_ra_used = 1;
+                       page->cp_ra_used = 1;
                        SetPageUptodate(vmpage);
                } else {
                        ll_ra_stats_inc_sbi(sbi, RA_STAT_FAILED_FAST_READ);
                }
+
                /* release page refcount before unlocking the page to ensure
                 * the object won't be destroyed in the calling path of
                 * cl_page_put(). Please see comment in ll_releasepage(). */
index e2a3049..11c5a55 100644 (file)
@@ -379,7 +379,7 @@ ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
        }
 
        cl_2queue_discard(env, io, queue);
-       cl_2queue_disown(env, io, queue);
+       cl_2queue_disown(env, queue);
        cl_2queue_fini(env, queue);
        RETURN(rc);
 }
@@ -599,8 +599,8 @@ static int ll_prepare_partial_page(const struct lu_env *env, struct cl_io *io,
                GOTO(out, result = 0);
        }
 
-       if (vpg->vpg_defer_uptodate) {
-               vpg->vpg_ra_used = 1;
+       if (pg->cp_defer_uptodate) {
+               pg->cp_ra_used = 1;
                GOTO(out, result = 0);
        }
 
index 4d7995f..1ea1fde 100644 (file)
@@ -447,11 +447,10 @@ static void vvp_pgcache_page_show(const struct lu_env *env,
 
        vpg = cl2vvp_page(cl_page_at(page, &vvp_device_type));
        vmpage = vpg->vpg_page;
-       seq_printf(seq, " %5i | %p %p %s %s %s | %p "DFID"(%p) %lu %u [",
+       seq_printf(seq, " %5i | %p %p %s %s | %p "DFID"(%p) %lu %u [",
                   0 /* gen */,
                   vpg, page,
                   "none",
-                  vpg->vpg_defer_uptodate ? "du" : "- ",
                   PageWriteback(vmpage) ? "wb" : "-",
                   vmpage,
                   PFID(ll_inode2fid(vmpage->mapping->host)),
index fa4f512..f350e3f 100644 (file)
@@ -212,9 +212,6 @@ struct vvp_object {
  */
 struct vvp_page {
        struct cl_page_slice vpg_cl;
-       unsigned        vpg_defer_uptodate:1,
-                       vpg_ra_updated:1,
-                       vpg_ra_used:1;
        /** VM page */
        struct page     *vpg_page;
 };
index f87aa0e..1e6238c 100644 (file)
@@ -73,34 +73,6 @@ static void vvp_page_fini(const struct lu_env *env,
        }
 }
 
-static int vvp_page_own(const struct lu_env *env,
-                       const struct cl_page_slice *slice, struct cl_io *io,
-                       int nonblock)
-{
-       struct vvp_page *vpg    = cl2vvp_page(slice);
-       struct page     *vmpage = vpg->vpg_page;
-
-       ENTRY;
-
-       LASSERT(vmpage != NULL);
-       if (nonblock) {
-               if (!trylock_page(vmpage))
-                       return -EAGAIN;
-
-               if (unlikely(PageWriteback(vmpage))) {
-                       unlock_page(vmpage);
-                       return -EAGAIN;
-               }
-
-               return 0;
-       }
-
-       lock_page(vmpage);
-       wait_on_page_writeback(vmpage);
-
-       RETURN(0);
-}
-
 static void vvp_page_assume(const struct lu_env *env,
                            const struct cl_page_slice *slice,
                            struct cl_io *unused)
@@ -122,35 +94,15 @@ static void vvp_page_unassume(const struct lu_env *env,
        LASSERT(PageLocked(vmpage));
 }
 
-static void vvp_page_disown(const struct lu_env *env,
-                           const struct cl_page_slice *slice, struct cl_io *io)
-{
-       struct page *vmpage = cl2vm_page(slice);
-
-       ENTRY;
-
-       LASSERT(vmpage != NULL);
-       LASSERT(PageLocked(vmpage));
-
-       unlock_page(cl2vm_page(slice));
-
-       EXIT;
-}
-
 static void vvp_page_discard(const struct lu_env *env,
                             const struct cl_page_slice *slice,
                             struct cl_io *unused)
 {
-       struct page     *vmpage = cl2vm_page(slice);
-       struct vvp_page *vpg    = cl2vvp_page(slice);
+       struct cl_page *cp = slice->cpl_page;
+       struct page *vmpage = cp->cp_vmpage;
 
-       LASSERT(vmpage != NULL);
-       LASSERT(PageLocked(vmpage));
-
-       if (vpg->vpg_defer_uptodate && !vpg->vpg_ra_used && vmpage->mapping)
+       if (cp->cp_defer_uptodate && !cp->cp_ra_used && vmpage->mapping != NULL)
                ll_ra_stats_inc(vmpage->mapping->host, RA_STAT_DISCARDED);
-
-       generic_error_remove_page(vmpage->mapping, vmpage);
 }
 
 static void vvp_page_delete(const struct lu_env *env,
@@ -237,23 +189,22 @@ static void vvp_page_completion_read(const struct lu_env *env,
                                     const struct cl_page_slice *slice,
                                     int ioret)
 {
-       struct vvp_page *vpg    = cl2vvp_page(slice);
-       struct page     *vmpage = vpg->vpg_page;
-       struct cl_page  *page   = slice->cpl_page;
-       struct inode    *inode  = vvp_object_inode(page->cp_obj);
+       struct cl_page *cp = slice->cpl_page;
+       struct page *vmpage = cp->cp_vmpage;
+       struct inode *inode = vvp_object_inode(cp->cp_obj);
 
        ENTRY;
        LASSERT(PageLocked(vmpage));
-       CL_PAGE_HEADER(D_PAGE, env, page, "completing READ with %d\n", ioret);
+       CL_PAGE_HEADER(D_PAGE, env, cp, "completing READ with %d\n", ioret);
 
-       if (vpg->vpg_defer_uptodate)
+       if (cp->cp_defer_uptodate)
                ll_ra_count_put(ll_i2sbi(inode), 1);
 
        if (ioret == 0)  {
-               if (!vpg->vpg_defer_uptodate)
+               if (!cp->cp_defer_uptodate)
                        SetPageUptodate(vmpage);
-       } else if (vpg->vpg_defer_uptodate) {
-               vpg->vpg_defer_uptodate = 0;
+       } else if (cp->cp_defer_uptodate) {
+               cp->cp_defer_uptodate = 0;
                if (ioret == -EAGAIN) {
                        /* mirror read failed, it needs to destroy the page
                         * because subpage would be from wrong osc when trying
@@ -263,7 +214,7 @@ static void vvp_page_completion_read(const struct lu_env *env,
                }
        }
 
-       if (page->cp_sync_io == NULL)
+       if (cp->cp_sync_io == NULL)
                unlock_page(vmpage);
 
        EXIT;
@@ -347,8 +298,7 @@ static int vvp_page_print(const struct lu_env *env,
        struct page     *vmpage = vpg->vpg_page;
 
        (*printer)(env, cookie,
-                  LUSTRE_VVP_NAME"-page@%p(%d:%d) vm@%p ",
-                  vpg, vpg->vpg_defer_uptodate, vpg->vpg_ra_used, vmpage);
+                  LUSTRE_VVP_NAME"-page@%p vm@%p ", vpg, vmpage);
 
        if (vmpage != NULL) {
                (*printer)(env, cookie, "%lx %d:%d %lx %lu %slru",
@@ -375,10 +325,8 @@ static int vvp_page_fail(const struct lu_env *env,
 }
 
 static const struct cl_page_operations vvp_page_ops = {
-       .cpo_own           = vvp_page_own,
        .cpo_assume        = vvp_page_assume,
        .cpo_unassume      = vvp_page_unassume,
-       .cpo_disown        = vvp_page_disown,
        .cpo_discard       = vvp_page_discard,
        .cpo_delete        = vvp_page_delete,
        .cpo_fini          = vvp_page_fini,
@@ -397,20 +345,7 @@ static const struct cl_page_operations vvp_page_ops = {
        },
 };
 
-static void vvp_transient_page_discard(const struct lu_env *env,
-                                      const struct cl_page_slice *slice,
-                                      struct cl_io *unused)
-{
-       struct cl_page *page = slice->cpl_page;
-
-       /*
-        * For transient pages, remove it from the radix tree.
-        */
-       cl_page_delete(env, page);
-}
-
 static const struct cl_page_operations vvp_transient_page_ops = {
-       .cpo_discard            = vvp_transient_page_discard,
        .cpo_print              = vvp_page_print,
 };
 
index e075a05..63b1dab 100644 (file)
@@ -50,7 +50,6 @@ extern struct kmem_cache *cl_page_kmem_array[16];
 extern unsigned short cl_page_kmem_size_array[16];
 
 struct cl_thread_info *cl_env_info(const struct lu_env *env);
-void cl_page_disown0(const struct lu_env *env,
-                    struct cl_io *io, struct cl_page *pg);
+void cl_page_disown0(const struct lu_env *env, struct cl_page *pg);
 
 #endif /* _CL_INTERNAL_H */
index 97dfda1..6c7e7e4 100644 (file)
@@ -948,13 +948,11 @@ EXPORT_SYMBOL(cl_page_list_splice);
 /**
  * Disowns pages in a queue.
  */
-void cl_page_list_disown(const struct lu_env *env,
-                        struct cl_io *io, struct cl_page_list *plist)
+void cl_page_list_disown(const struct lu_env *env, struct cl_page_list *plist)
 {
        struct cl_page *page;
        struct cl_page *temp;
 
-
        ENTRY;
        cl_page_list_for_each_safe(page, temp, plist) {
                LASSERT(plist->pl_nr > 0);
@@ -969,7 +967,7 @@ void cl_page_list_disown(const struct lu_env *env,
                /*
                 * XXX cl_page_disown0() will fail if page is not locked.
                 */
-               cl_page_disown0(env, io, page);
+               cl_page_disown0(env, page);
                lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue",
                              plist);
                cl_page_put(env, page);
@@ -1047,12 +1045,11 @@ EXPORT_SYMBOL(cl_2queue_add);
 /**
  * Disown pages in both lists of a 2-queue.
  */
-void cl_2queue_disown(const struct lu_env *env,
-                      struct cl_io *io, struct cl_2queue *queue)
+void cl_2queue_disown(const struct lu_env *env, struct cl_2queue *queue)
 {
         ENTRY;
-        cl_page_list_disown(env, io, &queue->c2_qin);
-        cl_page_list_disown(env, io, &queue->c2_qout);
+        cl_page_list_disown(env, &queue->c2_qin);
+        cl_page_list_disown(env, &queue->c2_qout);
         EXIT;
 }
 EXPORT_SYMBOL(cl_2queue_disown);
index 7a646e0..1b589bc 100644 (file)
@@ -569,31 +569,25 @@ static void cl_page_owner_set(struct cl_page *page)
        EXIT;
 }
 
-void cl_page_disown0(const struct lu_env *env,
-                    struct cl_io *io, struct cl_page *cl_page)
+void cl_page_disown0(const struct lu_env *env, struct cl_page *cp)
 {
-       const struct cl_page_slice *slice;
+       struct page *vmpage;
        enum cl_page_state state;
-       int i;
 
-        ENTRY;
-       state = cl_page->cp_state;
-       PINVRNT(env, cl_page, state == CPS_OWNED ||
-               state == CPS_FREEING);
-       PINVRNT(env, cl_page, cl_page_invariant(cl_page) ||
-               state == CPS_FREEING);
-       cl_page_owner_clear(cl_page);
+       ENTRY;
+       state = cp->cp_state;
+       PINVRNT(env, cp, state == CPS_OWNED || state == CPS_FREEING);
+       PINVRNT(env, cp, cl_page_invariant(cp) || state == CPS_FREEING);
+       cl_page_owner_clear(cp);
 
        if (state == CPS_OWNED)
-               cl_page_state_set(env, cl_page, CPS_CACHED);
-        /*
-        * Completion call-backs are executed in the bottom-up order, so that
-        * uppermost layer (llite), responsible for VFS/VM interaction runs
-        * last and can release locks safely.
-        */
-       cl_page_slice_for_each_reverse(cl_page, slice, i) {
-               if (slice->cpl_ops->cpo_disown != NULL)
-                       (*slice->cpl_ops->cpo_disown)(env, slice, io);
+               cl_page_state_set(env, cp, CPS_CACHED);
+
+       if (cp->cp_type == CPT_CACHEABLE) {
+               vmpage = cp->cp_vmpage;
+               LASSERT(vmpage != NULL);
+               LASSERT(PageLocked(vmpage));
+               unlock_page(vmpage);
        }
 
        EXIT;
@@ -627,48 +621,55 @@ EXPORT_SYMBOL(cl_page_is_owned);
  *             or, page was owned by another thread, or in IO.
  *
  * \see cl_page_disown()
- * \see cl_page_operations::cpo_own()
  * \see cl_page_own_try()
  * \see cl_page_own
  */
 static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
                        struct cl_page *cl_page, int nonblock)
 {
-       const struct cl_page_slice *slice;
-       int result = 0;
-       int i;
+       struct page *vmpage = cl_page->cp_vmpage;
+       int result;
 
        ENTRY;
        PINVRNT(env, cl_page, !cl_page_is_owned(cl_page, io));
-        io = cl_io_top(io);
 
        if (cl_page->cp_state == CPS_FREEING) {
                result = -ENOENT;
                goto out;
        }
 
-       cl_page_slice_for_each(cl_page, slice, i) {
-               if (slice->cpl_ops->cpo_own)
-                       result = (*slice->cpl_ops->cpo_own)(env, slice,
-                                                           io, nonblock);
-               if (result != 0)
-                       break;
-       }
-       if (result > 0)
-               result = 0;
+       LASSERT(vmpage != NULL);
 
-       if (result == 0) {
-               PASSERT(env, cl_page, cl_page->cp_owner == NULL);
-               cl_page->cp_owner = cl_io_top(io);
-               cl_page_owner_set(cl_page);
-               if (cl_page->cp_state != CPS_FREEING) {
-                       cl_page_state_set(env, cl_page, CPS_OWNED);
-               } else {
-                       cl_page_disown0(env, io, cl_page);
-                       result = -ENOENT;
+       if (cl_page->cp_type == CPT_TRANSIENT) {
+               /* OK */
+       } else if (nonblock) {
+               if (!trylock_page(vmpage)) {
+                       result = -EAGAIN;
+                       goto out;
                }
+
+               if (unlikely(PageWriteback(vmpage))) {
+                       unlock_page(vmpage);
+                       result = -EAGAIN;
+                       goto out;
+                }
+       } else {
+               lock_page(vmpage);
+               wait_on_page_writeback(vmpage);
        }
 
+       PASSERT(env, cl_page, cl_page->cp_owner == NULL);
+       cl_page->cp_owner = cl_io_top(io);
+       cl_page_owner_set(cl_page);
+
+       if (cl_page->cp_state == CPS_FREEING) {
+               cl_page_disown0(env, cl_page);
+               result = -ENOENT;
+               goto out;
+       }
+
+       cl_page_state_set(env, cl_page, CPS_OWNED);
+       result = 0;
 out:
        PINVRNT(env, cl_page, ergo(result == 0,
                cl_page_invariant(cl_page)));
@@ -777,7 +778,6 @@ EXPORT_SYMBOL(cl_page_unassume);
  * \post !cl_page_is_owned(pg, io)
  *
  * \see cl_page_own()
- * \see cl_page_operations::cpo_disown()
  */
 void cl_page_disown(const struct lu_env *env,
                     struct cl_io *io, struct cl_page *pg)
@@ -785,10 +785,7 @@ void cl_page_disown(const struct lu_env *env,
        PINVRNT(env, pg, cl_page_is_owned(pg, io) ||
                pg->cp_state == CPS_FREEING);
 
-       ENTRY;
-       io = cl_io_top(io);
-       cl_page_disown0(env, io, pg);
-       EXIT;
+       cl_page_disown0(env, pg);
 }
 EXPORT_SYMBOL(cl_page_disown);
 
@@ -803,18 +800,28 @@ EXPORT_SYMBOL(cl_page_disown);
  * \see cl_page_operations::cpo_discard()
  */
 void cl_page_discard(const struct lu_env *env,
-                    struct cl_io *io, struct cl_page *cl_page)
+                    struct cl_io *io, struct cl_page *cp)
 {
+       struct page *vmpage;
        const struct cl_page_slice *slice;
        int i;
 
-       PINVRNT(env, cl_page, cl_page_is_owned(cl_page, io));
-       PINVRNT(env, cl_page, cl_page_invariant(cl_page));
+       PINVRNT(env, cp, cl_page_is_owned(cp, io));
+       PINVRNT(env, cp, cl_page_invariant(cp));
 
-       cl_page_slice_for_each(cl_page, slice, i) {
+       cl_page_slice_for_each(cp, slice, i) {
                if (slice->cpl_ops->cpo_discard != NULL)
                        (*slice->cpl_ops->cpo_discard)(env, slice, io);
        }
+
+       if (cp->cp_type == CPT_CACHEABLE) {
+               vmpage = cp->cp_vmpage;
+               LASSERT(vmpage != NULL);
+               LASSERT(PageLocked(vmpage));
+               generic_error_remove_page(vmpage->mapping, vmpage);
+       } else {
+               cl_page_delete(env, cp);
+       }
 }
 EXPORT_SYMBOL(cl_page_discard);
 
@@ -943,7 +950,7 @@ int cl_page_prep(const struct lu_env *env, struct cl_io *io,
 
        if (cl_page->cp_type != CPT_TRANSIENT) {
                cl_page_slice_for_each(cl_page, slice, i) {
-                       if (slice->cpl_ops->cpo_own)
+                       if (slice->cpl_ops->io[crt].cpo_prep)
                                result =
                                 (*slice->cpl_ops->io[crt].cpo_prep)(env,
                                                                     slice,