Whamcloud - gitweb
LU-848 clio: page writeback support
authorJinshan Xiong <jay@whamcloud.com>
Tue, 27 Sep 2011 23:02:22 +0000 (16:02 -0700)
committerOleg Drokin <green@whamcloud.com>
Tue, 13 Dec 2011 08:27:48 +0000 (03:27 -0500)
Page writeback support for clio - release vmpage lock in transfer.
Sync IO still holds page lock.

Change-Id: I5137542b5b6b6eaaa4464804b45d372fd12e5c22
Signed-off-by: Jinshan Xiong <jay@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/1456
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
Tested-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/llite/rw.c
lustre/llite/vvp_page.c
lustre/obdclass/cl_io.c
lustre/obdclass/cl_page.c
lustre/osc/osc_io.c
lustre/osc/osc_page.c
lustre/osc/osc_request.c

index 065fbc7..2243aab 100644 (file)
@@ -1130,7 +1130,7 @@ out_unlock:
         return;
 }
 
-int ll_writepage(struct page *vmpage, struct writeback_control *unused)
+int ll_writepage(struct page *vmpage, struct writeback_control *wbc)
 {
         struct inode           *inode = vmpage->mapping->host;
         struct lu_env          *env;
@@ -1177,21 +1177,18 @@ int ll_writepage(struct page *vmpage, struct writeback_control *unused)
                         cl_2queue_init_page(queue, page);
                         result = cl_io_submit_rw(env, io, CRT_WRITE,
                                                  queue, CRP_NORMAL);
-                        cl_page_list_disown(env, io, &queue->c2_qin);
                         if (result != 0) {
                                 /*
-                                 * There is no need to clear PG_writeback, as
-                                 * cl_io_submit_rw() calls completion callback
-                                 * on failure.
-                                 */
-                                /*
                                  * Re-dirty page on error so it retries write,
                                  * but not in case when IO has actually
                                  * occurred and completed with an error.
                                  */
-                                if (!PageError(vmpage))
-                                        set_page_dirty(vmpage);
+                                if (!PageError(vmpage)) {
+                                        redirty_page_for_writepage(wbc, vmpage);
+                                        result = 0;
+                                }
                         }
+                        cl_page_list_disown(env, io, &queue->c2_qin);
                         LASSERT(!cl_page_is_owned(page, io));
                         lu_ref_del(&page->cp_reference,
                                    "writepage", cfs_current());
index deae49f..1a5dc9b 100644 (file)
@@ -225,13 +225,18 @@ static int vvp_page_prep_write(const struct lu_env *env,
                                const struct cl_page_slice *slice,
                                struct cl_io *unused)
 {
-        cfs_page_t *vmpage = cl2vm_page(slice);
+        struct cl_page *cp     = slice->cpl_page;
+        cfs_page_t     *vmpage = cl2vm_page(slice);
         int result;
 
         if (clear_page_dirty_for_io(vmpage)) {
                 set_page_writeback(vmpage);
                 vvp_write_pending(cl2ccc(slice->cpl_obj), cl2ccc_page(slice));
                 result = 0;
+
+                /* only turn on writeback for async write. */
+                if (cp->cp_sync_io == NULL)
+                        unlock_page(vmpage);
         } else
                 result = -EALREADY;
         return result;
@@ -256,59 +261,47 @@ static void vvp_vmpage_error(struct inode *inode, cfs_page_t *vmpage, int ioret)
         }
 }
 
-static void vvp_page_completion_common(const struct lu_env *env,
-                                       struct ccc_page *cp, int ioret)
-{
-        struct cl_page    *clp    = cp->cpg_cl.cpl_page;
-        cfs_page_t        *vmpage = cp->cpg_page;
-        struct inode      *inode  = ccc_object_inode(clp->cp_obj);
-
-        LINVRNT(cl_page_is_vmlocked(env, clp));
-
-        if (!clp->cp_sync_io && clp->cp_type == CPT_CACHEABLE) {
-                /*
-                 * Only mark the page error only when it's a cacheable page
-                 * and NOT a sync io.
-                 *
-                 * For sync IO and direct IO(CPT_TRANSIENT), the error is able
-                 * to be seen by application, so we don't need to mark a page
-                 * as error at all.
-                 */
-                vvp_vmpage_error(inode, vmpage, ioret);
-                unlock_page(vmpage);
-        }
-}
-
 static void vvp_page_completion_read(const struct lu_env *env,
                                      const struct cl_page_slice *slice,
                                      int ioret)
 {
-        struct ccc_page *cp    = cl2ccc_page(slice);
-        struct cl_page  *page  = cl_page_top(slice->cpl_page);
-        struct inode    *inode = ccc_object_inode(page->cp_obj);
+        struct ccc_page *cp     = cl2ccc_page(slice);
+        cfs_page_t      *vmpage = cp->cpg_page;
+        struct cl_page  *page   = cl_page_top(slice->cpl_page);
+        struct inode    *inode  = ccc_object_inode(page->cp_obj);
         ENTRY;
 
+        LASSERT(PageLocked(vmpage));
         CL_PAGE_HEADER(D_PAGE, env, page, "completing READ with %d\n", ioret);
 
         if (cp->cpg_defer_uptodate)
                 ll_ra_count_put(ll_i2sbi(inode), 1);
 
         if (ioret == 0)  {
-                /* XXX: do we need this for transient pages? */
                 if (!cp->cpg_defer_uptodate)
                         cl_page_export(env, page, 1);
         } else
                 cp->cpg_defer_uptodate = 0;
-        vvp_page_completion_common(env, cp, ioret);
+
+        if (page->cp_sync_io == NULL)
+                unlock_page(vmpage);
 
         EXIT;
 }
 
-static void vvp_page_completion_write_common(const struct lu_env *env,
-                                             const struct cl_page_slice *slice,
-                                             int ioret)
+static void vvp_page_completion_write(const struct lu_env *env,
+                                      const struct cl_page_slice *slice,
+                                      int ioret)
 {
-        struct ccc_page *cp = cl2ccc_page(slice);
+        struct ccc_page *cp     = cl2ccc_page(slice);
+        struct cl_page  *pg     = slice->cpl_page;
+        cfs_page_t      *vmpage = cp->cpg_page;
+        ENTRY;
+
+        LASSERT(ergo(pg->cp_sync_io != NULL, PageLocked(vmpage)));
+        LASSERT(PageWriteback(vmpage));
+
+        CL_PAGE_HEADER(D_PAGE, env, pg, "completing WRITE with %d\n", ioret);
 
         /*
          * TODO: Actually it makes sense to add the page into oap pending
@@ -319,28 +312,17 @@ static void vvp_page_completion_write_common(const struct lu_env *env,
          * ->cpo_completion method. The underlying transfer should be notified
          * and then re-add the page into pending transfer queue.  -jay
          */
+
         cp->cpg_write_queued = 0;
         vvp_write_complete(cl2ccc(slice->cpl_obj), cp);
 
-        vvp_page_completion_common(env, cp, ioret);
-}
-
-static void vvp_page_completion_write(const struct lu_env *env,
-                                      const struct cl_page_slice *slice,
-                                      int ioret)
-{
-        struct ccc_page *cp     = cl2ccc_page(slice);
-        struct cl_page  *pg     = slice->cpl_page;
-        cfs_page_t      *vmpage = cp->cpg_page;
-
-        ENTRY;
-
-        LINVRNT(cl_page_is_vmlocked(env, pg));
-        LASSERT(PageWriteback(vmpage));
-
-        CL_PAGE_HEADER(D_PAGE, env, pg, "completing WRITE with %d\n", ioret);
+        /*
+         * Only mark the page error only when it's an async write because
+         * applications won't wait for IO to finish.
+         */
+        if (pg->cp_sync_io == NULL)
+                vvp_vmpage_error(ccc_object_inode(pg->cp_obj), vmpage, ioret);
 
-        vvp_page_completion_write_common(env, slice, ioret);
         end_page_writeback(vmpage);
         EXIT;
 }
@@ -388,6 +370,7 @@ static int vvp_page_make_ready(const struct lu_env *env,
                          * Page was concurrently truncated.
                          */
                         LASSERT(pg->cp_state == CPS_FREEING);
+                unlock_page(vmpage);
         }
         RETURN(result);
 }
@@ -506,15 +489,13 @@ static int vvp_transient_page_is_vmlocked(const struct lu_env *env,
 }
 
 static void
-vvp_transient_page_completion_write(const struct lu_env *env,
-                                    const struct cl_page_slice *slice,
-                                    int ioret)
+vvp_transient_page_completion(const struct lu_env *env,
+                              const struct cl_page_slice *slice,
+                              int ioret)
 {
         vvp_transient_page_verify(slice->cpl_page);
-        vvp_page_completion_write_common(env, slice, ioret);
 }
 
-
 static void vvp_transient_page_fini(const struct lu_env *env,
                                     struct cl_page_slice *slice)
 {
@@ -541,11 +522,11 @@ static const struct cl_page_operations vvp_transient_page_ops = {
         .io = {
                 [CRT_READ] = {
                         .cpo_prep        = ccc_transient_page_prep,
-                        .cpo_completion  = vvp_page_completion_read,
+                        .cpo_completion  = vvp_transient_page_completion,
                 },
                 [CRT_WRITE] = {
                         .cpo_prep        = ccc_transient_page_prep,
-                        .cpo_completion  = vvp_transient_page_completion_write,
+                        .cpo_completion  = vvp_transient_page_completion,
                 }
         }
 };
index d5d52d9..cf18605 100644 (file)
@@ -1543,7 +1543,6 @@ void cl_req_page_add(const struct lu_env *env,
         ENTRY;
         page = cl_page_top(page);
 
-        LINVRNT(cl_page_is_vmlocked(env, page));
         LASSERT(cfs_list_empty(&page->cp_flight));
         LASSERT(page->cp_req == NULL);
 
@@ -1578,7 +1577,6 @@ void cl_req_page_done(const struct lu_env *env, struct cl_page *page)
         ENTRY;
         page = cl_page_top(page);
 
-        LINVRNT(cl_page_is_vmlocked(env, page));
         LASSERT(!cfs_list_empty(&page->cp_flight));
         LASSERT(req->crq_nrpages > 0);
 
index 1296469..5c8973f 100644 (file)
@@ -423,7 +423,7 @@ static struct cl_page *cl_page_find0(const struct lu_env *env,
         struct cl_site          *site = cl_object_site(o);
         int err;
 
-        LINVRNT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
+        LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
         cfs_might_sleep();
 
         ENTRY;
@@ -560,7 +560,7 @@ static inline int cl_page_invariant(const struct cl_page *pg)
                  * Either page is early in initialization (has neither child
                  * nor parent yet), or it is in the object radix tree.
                  */
-                ergo(pg->cp_state < CPS_FREEING,
+                ergo(pg->cp_state < CPS_FREEING && pg->cp_type == CPT_CACHEABLE,
                      (void *)radix_tree_lookup(&header->coh_tree,
                                                pg->cp_index) == pg ||
                      (child == NULL && parent == NULL));
@@ -637,7 +637,6 @@ static void cl_page_state_set0(const struct lu_env *env,
 static void cl_page_state_set(const struct lu_env *env,
                               struct cl_page *page, enum cl_page_state state)
 {
-        PINVRNT(env, page, cl_page_invariant(page));
         cl_page_state_set0(env, page, state);
 }
 
@@ -1044,7 +1043,6 @@ EXPORT_SYMBOL(cl_page_own_try);
 void cl_page_assume(const struct lu_env *env,
                     struct cl_io *io, struct cl_page *pg)
 {
-        PASSERT(env, pg, pg->cp_state < CPS_OWNED);
         PASSERT(env, pg, pg->cp_owner == NULL);
         PINVRNT(env, pg, cl_object_same(pg->cp_obj, io->ci_obj));
         PINVRNT(env, pg, cl_page_invariant(pg));
@@ -1363,7 +1361,6 @@ void cl_page_completion(const struct lu_env *env,
         /* cl_page::cp_req already cleared by the caller (osc_completion()) */
         PASSERT(env, pg, pg->cp_req == NULL);
         PASSERT(env, pg, pg->cp_state == cl_req_type_state(crt));
-        PINVRNT(env, pg, cl_page_invariant(pg));
 
         ENTRY;
         CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, ioret);
@@ -1377,6 +1374,7 @@ void cl_page_completion(const struct lu_env *env,
                                (const struct lu_env *,
                                 const struct cl_page_slice *, int), ioret);
         if (anchor) {
+                LASSERT(cl_page_is_vmlocked(env, pg));
                 LASSERT(pg->cp_sync_io == anchor);
                 pg->cp_sync_io = NULL;
                 cl_sync_io_note(anchor, ioret);
index 21080e3..055f8fb 100644 (file)
@@ -100,7 +100,6 @@ static void osc_io_unplug(const struct lu_env *env, struct osc_object *osc,
 {
         loi_list_maint(cli, osc->oo_oinfo);
         osc_check_rpcs(env, cli);
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
 }
 
 /**
@@ -153,26 +152,24 @@ static int osc_io_submit(const struct lu_env *env,
                         oap->oap_async_flags |= ASYNC_HP;
                         cfs_spin_unlock(&oap->oap_lock);
                 }
-                /*
-                 * This can be checked without cli->cl_loi_list_lock, because
-                 * ->oap_*_item are always manipulated when the page is owned.
-                 */
-                if (!cfs_list_empty(&oap->oap_urgent_item) ||
-                    !cfs_list_empty(&oap->oap_rpc_item)) {
-                        result = -EBUSY;
-                        break;
-                }
 
                 if (osc0 == NULL) { /* first iteration */
                         cli = &exp->exp_obd->u.cli;
                         osc0 = osc;
+                        client_obd_list_lock(&cli->cl_loi_list_lock);
                 } else /* check that all pages are against the same object
                         * (for now) */
                         LASSERT(osc == osc0);
-                if (queued++ == 0)
-                        client_obd_list_lock(&cli->cl_loi_list_lock);
+
+                if (!cfs_list_empty(&oap->oap_urgent_item) ||
+                    !cfs_list_empty(&oap->oap_rpc_item)) {
+                        result = -EBUSY;
+                        break;
+                }
+
                 result = cl_page_prep(env, io, page, crt);
                 if (result == 0) {
+                        ++queued;
                         cl_page_list_move(qout, qin, page);
                         if (cfs_list_empty(&oap->oap_pending_item)) {
                                 osc_io_submit_page(env, cl2osc_io(env, ios),
@@ -224,6 +221,8 @@ static int osc_io_submit(const struct lu_env *env,
 
         if (queued > 0)
                 osc_io_unplug(env, osc, cli);
+        if (osc0)
+                client_obd_list_unlock(&cli->cl_loi_list_lock);
         CDEBUG(D_INFO, "%d/%d %d\n", qin->pl_nr, qout->pl_nr, result);
         return qout->pl_nr > 0 ? 0 : result;
 }
index 0a3e480..55bb24c 100644 (file)
@@ -201,8 +201,6 @@ static void osc_page_transfer_add(const struct lu_env *env,
 {
         struct osc_object *obj;
 
-        LINVRNT(cl_page_is_vmlocked(env, opg->ops_cl.cpl_page));
-
         obj = cl2osc(opg->ops_cl.cpl_obj);
         cfs_spin_lock(&obj->oo_seatbelt);
         cfs_list_add(&opg->ops_inflight, &obj->oo_inflight[crt]);
@@ -549,9 +547,9 @@ static int osc_completion(const struct lu_env *env,
         struct cl_page        *page = cl_page_top(opg->ops_cl.cpl_page);
         struct osc_object     *obj  = cl2osc(opg->ops_cl.cpl_obj);
         enum cl_req_type crt;
+        int srvlock;
 
         LINVRNT(osc_page_protected(env, opg, CLM_READ, 1));
-        LINVRNT(cl_page_is_vmlocked(env, page));
 
         ENTRY;
 
@@ -584,11 +582,12 @@ static int osc_completion(const struct lu_env *env,
         cfs_spin_unlock(&obj->oo_seatbelt);
 
         opg->ops_submit_time = 0;
+        srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
 
         cl_page_completion(env, page, crt, rc);
 
         /* statistic */
-        if (rc == 0 && oap->oap_brw_flags & OBD_BRW_SRVLOCK) {
+        if (rc == 0 && srvlock) {
                 struct lu_device *ld    = opg->ops_cl.cpl_obj->co_lu.lo_dev;
                 struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
                 int bytes = oap->oap_count;
index c5e4bd0..27fc136 100644 (file)
@@ -2203,9 +2203,9 @@ static void osc_ap_completion(const struct lu_env *env,
         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
                                                 oap->oap_cmd, oa, rc);
 
-        /* ll_ap_completion (from llite) drops PG_locked. so, a new
-         * I/O on the page could start, but OSC calls it under lock
-         * and thus we can add oap back to pending safely */
+        /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
+         * start, but OSC calls it under lock and thus we can add oap back to
+         * pending safely */
         if (rc)
                 /* upper layer wants to leave the page on pending queue */
                 osc_oap_to_pending(oap);
@@ -2539,27 +2539,6 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
                 }
                 if (oap == NULL)
                         break;
-                /*
-                 * Page submitted for IO has to be locked. Either by
-                 * ->ap_make_ready() or by higher layers.
-                 */
-#if defined(__KERNEL__) && defined(__linux__)
-                {
-                        struct cl_page *page;
-
-                        page = osc_oap2cl_page(oap);
-
-                        if (page->cp_type == CPT_CACHEABLE &&
-                            !(PageLocked(oap->oap_page) &&
-                              (CheckWriteback(oap->oap_page, cmd)))) {
-                                CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
-                                       oap->oap_page,
-                                       (long)oap->oap_page->flags,
-                                       oap->oap_async_flags);
-                                LBUG();
-                        }
-                }
-#endif
 
                 /* take the page out of our book-keeping */
                 cfs_list_del_init(&oap->oap_pending_item);