From: Jinshan Xiong Date: Tue, 27 Sep 2011 23:02:22 +0000 (-0700) Subject: LU-848 clio: page writeback support X-Git-Tag: 2.1.53~16 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=c5361360e51de22a59d4427327bddf9fd398f352 LU-848 clio: page writeback support Page writeback support for clio - release vmpage lock in transfer. Sync IO still holds page lock. Change-Id: I5137542b5b6b6eaaa4464804b45d372fd12e5c22 Signed-off-by: Jinshan Xiong Reviewed-on: http://review.whamcloud.com/1456 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Johann Lombardi Tested-by: Jinshan Xiong Reviewed-by: Niu Yawei Reviewed-by: Oleg Drokin --- diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 065fbc7..2243aab 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -1130,7 +1130,7 @@ out_unlock: return; } -int ll_writepage(struct page *vmpage, struct writeback_control *unused) +int ll_writepage(struct page *vmpage, struct writeback_control *wbc) { struct inode *inode = vmpage->mapping->host; struct lu_env *env; @@ -1177,21 +1177,18 @@ int ll_writepage(struct page *vmpage, struct writeback_control *unused) cl_2queue_init_page(queue, page); result = cl_io_submit_rw(env, io, CRT_WRITE, queue, CRP_NORMAL); - cl_page_list_disown(env, io, &queue->c2_qin); if (result != 0) { /* - * There is no need to clear PG_writeback, as - * cl_io_submit_rw() calls completion callback - * on failure. - */ - /* * Re-dirty page on error so it retries write, * but not in case when IO has actually * occurred and completed with an error. */ - if (!PageError(vmpage)) - set_page_dirty(vmpage); + if (!PageError(vmpage)) { + redirty_page_for_writepage(wbc, vmpage); + result = 0; + } } + cl_page_list_disown(env, io, &queue->c2_qin); LASSERT(!cl_page_is_owned(page, io)); lu_ref_del(&page->cp_reference, "writepage", cfs_current()); diff --git a/lustre/llite/vvp_page.c b/lustre/llite/vvp_page.c index deae49f..1a5dc9b 100644 --- a/lustre/llite/vvp_page.c +++ b/lustre/llite/vvp_page.c @@ -225,13 +225,18 @@ static int vvp_page_prep_write(const struct lu_env *env, const struct cl_page_slice *slice, struct cl_io *unused) { - cfs_page_t *vmpage = cl2vm_page(slice); + struct cl_page *cp = slice->cpl_page; + cfs_page_t *vmpage = cl2vm_page(slice); int result; if (clear_page_dirty_for_io(vmpage)) { set_page_writeback(vmpage); vvp_write_pending(cl2ccc(slice->cpl_obj), cl2ccc_page(slice)); result = 0; + + /* only turn on writeback for async write. */ + if (cp->cp_sync_io == NULL) + unlock_page(vmpage); } else result = -EALREADY; return result; @@ -256,59 +261,47 @@ static void vvp_vmpage_error(struct inode *inode, cfs_page_t *vmpage, int ioret) } } -static void vvp_page_completion_common(const struct lu_env *env, - struct ccc_page *cp, int ioret) -{ - struct cl_page *clp = cp->cpg_cl.cpl_page; - cfs_page_t *vmpage = cp->cpg_page; - struct inode *inode = ccc_object_inode(clp->cp_obj); - - LINVRNT(cl_page_is_vmlocked(env, clp)); - - if (!clp->cp_sync_io && clp->cp_type == CPT_CACHEABLE) { - /* - * Only mark the page error only when it's a cacheable page - * and NOT a sync io. - * - * For sync IO and direct IO(CPT_TRANSIENT), the error is able - * to be seen by application, so we don't need to mark a page - * as error at all. - */ - vvp_vmpage_error(inode, vmpage, ioret); - unlock_page(vmpage); - } -} - static void vvp_page_completion_read(const struct lu_env *env, const struct cl_page_slice *slice, int ioret) { - struct ccc_page *cp = cl2ccc_page(slice); - struct cl_page *page = cl_page_top(slice->cpl_page); - struct inode *inode = ccc_object_inode(page->cp_obj); + struct ccc_page *cp = cl2ccc_page(slice); + cfs_page_t *vmpage = cp->cpg_page; + struct cl_page *page = cl_page_top(slice->cpl_page); + struct inode *inode = ccc_object_inode(page->cp_obj); ENTRY; + LASSERT(PageLocked(vmpage)); CL_PAGE_HEADER(D_PAGE, env, page, "completing READ with %d\n", ioret); if (cp->cpg_defer_uptodate) ll_ra_count_put(ll_i2sbi(inode), 1); if (ioret == 0) { - /* XXX: do we need this for transient pages? */ if (!cp->cpg_defer_uptodate) cl_page_export(env, page, 1); } else cp->cpg_defer_uptodate = 0; - vvp_page_completion_common(env, cp, ioret); + + if (page->cp_sync_io == NULL) + unlock_page(vmpage); EXIT; } -static void vvp_page_completion_write_common(const struct lu_env *env, - const struct cl_page_slice *slice, - int ioret) +static void vvp_page_completion_write(const struct lu_env *env, + const struct cl_page_slice *slice, + int ioret) { - struct ccc_page *cp = cl2ccc_page(slice); + struct ccc_page *cp = cl2ccc_page(slice); + struct cl_page *pg = slice->cpl_page; + cfs_page_t *vmpage = cp->cpg_page; + ENTRY; + + LASSERT(ergo(pg->cp_sync_io != NULL, PageLocked(vmpage))); + LASSERT(PageWriteback(vmpage)); + + CL_PAGE_HEADER(D_PAGE, env, pg, "completing WRITE with %d\n", ioret); /* * TODO: Actually it makes sense to add the page into oap pending @@ -319,28 +312,17 @@ static void vvp_page_completion_write_common(const struct lu_env *env, * ->cpo_completion method. The underlying transfer should be notified * and then re-add the page into pending transfer queue. -jay */ + cp->cpg_write_queued = 0; vvp_write_complete(cl2ccc(slice->cpl_obj), cp); - vvp_page_completion_common(env, cp, ioret); -} - -static void vvp_page_completion_write(const struct lu_env *env, - const struct cl_page_slice *slice, - int ioret) -{ - struct ccc_page *cp = cl2ccc_page(slice); - struct cl_page *pg = slice->cpl_page; - cfs_page_t *vmpage = cp->cpg_page; - - ENTRY; - - LINVRNT(cl_page_is_vmlocked(env, pg)); - LASSERT(PageWriteback(vmpage)); - - CL_PAGE_HEADER(D_PAGE, env, pg, "completing WRITE with %d\n", ioret); + /* + * Only mark the page error only when it's an async write because + * applications won't wait for IO to finish. + */ + if (pg->cp_sync_io == NULL) + vvp_vmpage_error(ccc_object_inode(pg->cp_obj), vmpage, ioret); - vvp_page_completion_write_common(env, slice, ioret); end_page_writeback(vmpage); EXIT; } @@ -388,6 +370,7 @@ static int vvp_page_make_ready(const struct lu_env *env, * Page was concurrently truncated. */ LASSERT(pg->cp_state == CPS_FREEING); + unlock_page(vmpage); } RETURN(result); } @@ -506,15 +489,13 @@ static int vvp_transient_page_is_vmlocked(const struct lu_env *env, } static void -vvp_transient_page_completion_write(const struct lu_env *env, - const struct cl_page_slice *slice, - int ioret) +vvp_transient_page_completion(const struct lu_env *env, + const struct cl_page_slice *slice, + int ioret) { vvp_transient_page_verify(slice->cpl_page); - vvp_page_completion_write_common(env, slice, ioret); } - static void vvp_transient_page_fini(const struct lu_env *env, struct cl_page_slice *slice) { @@ -541,11 +522,11 @@ static const struct cl_page_operations vvp_transient_page_ops = { .io = { [CRT_READ] = { .cpo_prep = ccc_transient_page_prep, - .cpo_completion = vvp_page_completion_read, + .cpo_completion = vvp_transient_page_completion, }, [CRT_WRITE] = { .cpo_prep = ccc_transient_page_prep, - .cpo_completion = vvp_transient_page_completion_write, + .cpo_completion = vvp_transient_page_completion, } } }; diff --git a/lustre/obdclass/cl_io.c b/lustre/obdclass/cl_io.c index d5d52d9..cf18605 100644 --- a/lustre/obdclass/cl_io.c +++ b/lustre/obdclass/cl_io.c @@ -1543,7 +1543,6 @@ void cl_req_page_add(const struct lu_env *env, ENTRY; page = cl_page_top(page); - LINVRNT(cl_page_is_vmlocked(env, page)); LASSERT(cfs_list_empty(&page->cp_flight)); LASSERT(page->cp_req == NULL); @@ -1578,7 +1577,6 @@ void cl_req_page_done(const struct lu_env *env, struct cl_page *page) ENTRY; page = cl_page_top(page); - LINVRNT(cl_page_is_vmlocked(env, page)); LASSERT(!cfs_list_empty(&page->cp_flight)); LASSERT(req->crq_nrpages > 0); diff --git a/lustre/obdclass/cl_page.c b/lustre/obdclass/cl_page.c index 1296469..5c8973f 100644 --- a/lustre/obdclass/cl_page.c +++ b/lustre/obdclass/cl_page.c @@ -423,7 +423,7 @@ static struct cl_page *cl_page_find0(const struct lu_env *env, struct cl_site *site = cl_object_site(o); int err; - LINVRNT(type == CPT_CACHEABLE || type == CPT_TRANSIENT); + LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT); cfs_might_sleep(); ENTRY; @@ -560,7 +560,7 @@ static inline int cl_page_invariant(const struct cl_page *pg) * Either page is early in initialization (has neither child * nor parent yet), or it is in the object radix tree. */ - ergo(pg->cp_state < CPS_FREEING, + ergo(pg->cp_state < CPS_FREEING && pg->cp_type == CPT_CACHEABLE, (void *)radix_tree_lookup(&header->coh_tree, pg->cp_index) == pg || (child == NULL && parent == NULL)); @@ -637,7 +637,6 @@ static void cl_page_state_set0(const struct lu_env *env, static void cl_page_state_set(const struct lu_env *env, struct cl_page *page, enum cl_page_state state) { - PINVRNT(env, page, cl_page_invariant(page)); cl_page_state_set0(env, page, state); } @@ -1044,7 +1043,6 @@ EXPORT_SYMBOL(cl_page_own_try); void cl_page_assume(const struct lu_env *env, struct cl_io *io, struct cl_page *pg) { - PASSERT(env, pg, pg->cp_state < CPS_OWNED); PASSERT(env, pg, pg->cp_owner == NULL); PINVRNT(env, pg, cl_object_same(pg->cp_obj, io->ci_obj)); PINVRNT(env, pg, cl_page_invariant(pg)); @@ -1363,7 +1361,6 @@ void cl_page_completion(const struct lu_env *env, /* cl_page::cp_req already cleared by the caller (osc_completion()) */ PASSERT(env, pg, pg->cp_req == NULL); PASSERT(env, pg, pg->cp_state == cl_req_type_state(crt)); - PINVRNT(env, pg, cl_page_invariant(pg)); ENTRY; CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, ioret); @@ -1377,6 +1374,7 @@ void cl_page_completion(const struct lu_env *env, (const struct lu_env *, const struct cl_page_slice *, int), ioret); if (anchor) { + LASSERT(cl_page_is_vmlocked(env, pg)); LASSERT(pg->cp_sync_io == anchor); pg->cp_sync_io = NULL; cl_sync_io_note(anchor, ioret); diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 21080e3..055f8fb 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -100,7 +100,6 @@ static void osc_io_unplug(const struct lu_env *env, struct osc_object *osc, { loi_list_maint(cli, osc->oo_oinfo); osc_check_rpcs(env, cli); - client_obd_list_unlock(&cli->cl_loi_list_lock); } /** @@ -153,26 +152,24 @@ static int osc_io_submit(const struct lu_env *env, oap->oap_async_flags |= ASYNC_HP; cfs_spin_unlock(&oap->oap_lock); } - /* - * This can be checked without cli->cl_loi_list_lock, because - * ->oap_*_item are always manipulated when the page is owned. - */ - if (!cfs_list_empty(&oap->oap_urgent_item) || - !cfs_list_empty(&oap->oap_rpc_item)) { - result = -EBUSY; - break; - } if (osc0 == NULL) { /* first iteration */ cli = &exp->exp_obd->u.cli; osc0 = osc; + client_obd_list_lock(&cli->cl_loi_list_lock); } else /* check that all pages are against the same object * (for now) */ LASSERT(osc == osc0); - if (queued++ == 0) - client_obd_list_lock(&cli->cl_loi_list_lock); + + if (!cfs_list_empty(&oap->oap_urgent_item) || + !cfs_list_empty(&oap->oap_rpc_item)) { + result = -EBUSY; + break; + } + result = cl_page_prep(env, io, page, crt); if (result == 0) { + ++queued; cl_page_list_move(qout, qin, page); if (cfs_list_empty(&oap->oap_pending_item)) { osc_io_submit_page(env, cl2osc_io(env, ios), @@ -224,6 +221,8 @@ static int osc_io_submit(const struct lu_env *env, if (queued > 0) osc_io_unplug(env, osc, cli); + if (osc0) + client_obd_list_unlock(&cli->cl_loi_list_lock); CDEBUG(D_INFO, "%d/%d %d\n", qin->pl_nr, qout->pl_nr, result); return qout->pl_nr > 0 ? 0 : result; } diff --git a/lustre/osc/osc_page.c b/lustre/osc/osc_page.c index 0a3e480..55bb24c 100644 --- a/lustre/osc/osc_page.c +++ b/lustre/osc/osc_page.c @@ -201,8 +201,6 @@ static void osc_page_transfer_add(const struct lu_env *env, { struct osc_object *obj; - LINVRNT(cl_page_is_vmlocked(env, opg->ops_cl.cpl_page)); - obj = cl2osc(opg->ops_cl.cpl_obj); cfs_spin_lock(&obj->oo_seatbelt); cfs_list_add(&opg->ops_inflight, &obj->oo_inflight[crt]); @@ -549,9 +547,9 @@ static int osc_completion(const struct lu_env *env, struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page); struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj); enum cl_req_type crt; + int srvlock; LINVRNT(osc_page_protected(env, opg, CLM_READ, 1)); - LINVRNT(cl_page_is_vmlocked(env, page)); ENTRY; @@ -584,11 +582,12 @@ static int osc_completion(const struct lu_env *env, cfs_spin_unlock(&obj->oo_seatbelt); opg->ops_submit_time = 0; + srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK; cl_page_completion(env, page, crt, rc); /* statistic */ - if (rc == 0 && oap->oap_brw_flags & OBD_BRW_SRVLOCK) { + if (rc == 0 && srvlock) { struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev; struct osc_stats *stats = &lu2osc_dev(ld)->od_stats; int bytes = oap->oap_count; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index c5e4bd0..27fc136 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -2203,9 +2203,9 @@ static void osc_ap_completion(const struct lu_env *env, rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data, oap->oap_cmd, oa, rc); - /* ll_ap_completion (from llite) drops PG_locked. so, a new - * I/O on the page could start, but OSC calls it under lock - * and thus we can add oap back to pending safely */ + /* cl_page_completion() drops PG_locked. so, a new I/O on the page could + * start, but OSC calls it under lock and thus we can add oap back to + * pending safely */ if (rc) /* upper layer wants to leave the page on pending queue */ osc_oap_to_pending(oap); @@ -2539,27 +2539,6 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, } if (oap == NULL) break; - /* - * Page submitted for IO has to be locked. Either by - * ->ap_make_ready() or by higher layers. - */ -#if defined(__KERNEL__) && defined(__linux__) - { - struct cl_page *page; - - page = osc_oap2cl_page(oap); - - if (page->cp_type == CPT_CACHEABLE && - !(PageLocked(oap->oap_page) && - (CheckWriteback(oap->oap_page, cmd)))) { - CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n", - oap->oap_page, - (long)oap->oap_page->flags, - oap->oap_async_flags); - LBUG(); - } - } -#endif /* take the page out of our book-keeping */ cfs_list_del_init(&oap->oap_pending_item);