Page writeback support for clio - release vmpage lock in transfer.
Sync IO still holds page lock.
Change-Id: I5137542b5b6b6eaaa4464804b45d372fd12e5c22
Signed-off-by: Jinshan Xiong <jay@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/1456
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
Tested-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
return;
}
-int ll_writepage(struct page *vmpage, struct writeback_control *unused)
+int ll_writepage(struct page *vmpage, struct writeback_control *wbc)
{
struct inode *inode = vmpage->mapping->host;
struct lu_env *env;
cl_2queue_init_page(queue, page);
result = cl_io_submit_rw(env, io, CRT_WRITE,
queue, CRP_NORMAL);
- cl_page_list_disown(env, io, &queue->c2_qin);
if (result != 0) {
/*
- * There is no need to clear PG_writeback, as
- * cl_io_submit_rw() calls completion callback
- * on failure.
- */
- /*
* Re-dirty page on error so it retries write,
* but not in case when IO has actually
* occurred and completed with an error.
*/
- if (!PageError(vmpage))
- set_page_dirty(vmpage);
+ if (!PageError(vmpage)) {
+ redirty_page_for_writepage(wbc, vmpage);
+ result = 0;
+ }
}
+ cl_page_list_disown(env, io, &queue->c2_qin);
LASSERT(!cl_page_is_owned(page, io));
lu_ref_del(&page->cp_reference,
"writepage", cfs_current());
const struct cl_page_slice *slice,
struct cl_io *unused)
{
- cfs_page_t *vmpage = cl2vm_page(slice);
+ struct cl_page *cp = slice->cpl_page;
+ cfs_page_t *vmpage = cl2vm_page(slice);
int result;
if (clear_page_dirty_for_io(vmpage)) {
set_page_writeback(vmpage);
vvp_write_pending(cl2ccc(slice->cpl_obj), cl2ccc_page(slice));
result = 0;
+
+ /* only turn on writeback for async write. */
+ if (cp->cp_sync_io == NULL)
+ unlock_page(vmpage);
} else
result = -EALREADY;
return result;
}
}
-static void vvp_page_completion_common(const struct lu_env *env,
- struct ccc_page *cp, int ioret)
-{
- struct cl_page *clp = cp->cpg_cl.cpl_page;
- cfs_page_t *vmpage = cp->cpg_page;
- struct inode *inode = ccc_object_inode(clp->cp_obj);
-
- LINVRNT(cl_page_is_vmlocked(env, clp));
-
- if (!clp->cp_sync_io && clp->cp_type == CPT_CACHEABLE) {
- /*
- * Only mark the page error only when it's a cacheable page
- * and NOT a sync io.
- *
- * For sync IO and direct IO(CPT_TRANSIENT), the error is able
- * to be seen by application, so we don't need to mark a page
- * as error at all.
- */
- vvp_vmpage_error(inode, vmpage, ioret);
- unlock_page(vmpage);
- }
-}
-
static void vvp_page_completion_read(const struct lu_env *env,
const struct cl_page_slice *slice,
int ioret)
{
- struct ccc_page *cp = cl2ccc_page(slice);
- struct cl_page *page = cl_page_top(slice->cpl_page);
- struct inode *inode = ccc_object_inode(page->cp_obj);
+ struct ccc_page *cp = cl2ccc_page(slice);
+ cfs_page_t *vmpage = cp->cpg_page;
+ struct cl_page *page = cl_page_top(slice->cpl_page);
+ struct inode *inode = ccc_object_inode(page->cp_obj);
ENTRY;
+ LASSERT(PageLocked(vmpage));
CL_PAGE_HEADER(D_PAGE, env, page, "completing READ with %d\n", ioret);
if (cp->cpg_defer_uptodate)
ll_ra_count_put(ll_i2sbi(inode), 1);
if (ioret == 0) {
- /* XXX: do we need this for transient pages? */
if (!cp->cpg_defer_uptodate)
cl_page_export(env, page, 1);
} else
cp->cpg_defer_uptodate = 0;
- vvp_page_completion_common(env, cp, ioret);
+
+ if (page->cp_sync_io == NULL)
+ unlock_page(vmpage);
EXIT;
}
-static void vvp_page_completion_write_common(const struct lu_env *env,
- const struct cl_page_slice *slice,
- int ioret)
+static void vvp_page_completion_write(const struct lu_env *env,
+ const struct cl_page_slice *slice,
+ int ioret)
{
- struct ccc_page *cp = cl2ccc_page(slice);
+ struct ccc_page *cp = cl2ccc_page(slice);
+ struct cl_page *pg = slice->cpl_page;
+ cfs_page_t *vmpage = cp->cpg_page;
+ ENTRY;
+
+ LASSERT(ergo(pg->cp_sync_io != NULL, PageLocked(vmpage)));
+ LASSERT(PageWriteback(vmpage));
+
+ CL_PAGE_HEADER(D_PAGE, env, pg, "completing WRITE with %d\n", ioret);
/*
* TODO: Actually it makes sense to add the page into oap pending
* ->cpo_completion method. The underlying transfer should be notified
* and then re-add the page into pending transfer queue. -jay
*/
+
cp->cpg_write_queued = 0;
vvp_write_complete(cl2ccc(slice->cpl_obj), cp);
- vvp_page_completion_common(env, cp, ioret);
-}
-
-static void vvp_page_completion_write(const struct lu_env *env,
- const struct cl_page_slice *slice,
- int ioret)
-{
- struct ccc_page *cp = cl2ccc_page(slice);
- struct cl_page *pg = slice->cpl_page;
- cfs_page_t *vmpage = cp->cpg_page;
-
- ENTRY;
-
- LINVRNT(cl_page_is_vmlocked(env, pg));
- LASSERT(PageWriteback(vmpage));
-
- CL_PAGE_HEADER(D_PAGE, env, pg, "completing WRITE with %d\n", ioret);
+ /*
+ * Only mark the page error only when it's an async write because
+ * applications won't wait for IO to finish.
+ */
+ if (pg->cp_sync_io == NULL)
+ vvp_vmpage_error(ccc_object_inode(pg->cp_obj), vmpage, ioret);
- vvp_page_completion_write_common(env, slice, ioret);
end_page_writeback(vmpage);
EXIT;
}
* Page was concurrently truncated.
*/
LASSERT(pg->cp_state == CPS_FREEING);
+ unlock_page(vmpage);
}
RETURN(result);
}
}
static void
-vvp_transient_page_completion_write(const struct lu_env *env,
- const struct cl_page_slice *slice,
- int ioret)
+vvp_transient_page_completion(const struct lu_env *env,
+ const struct cl_page_slice *slice,
+ int ioret)
{
vvp_transient_page_verify(slice->cpl_page);
- vvp_page_completion_write_common(env, slice, ioret);
}
-
static void vvp_transient_page_fini(const struct lu_env *env,
struct cl_page_slice *slice)
{
.io = {
[CRT_READ] = {
.cpo_prep = ccc_transient_page_prep,
- .cpo_completion = vvp_page_completion_read,
+ .cpo_completion = vvp_transient_page_completion,
},
[CRT_WRITE] = {
.cpo_prep = ccc_transient_page_prep,
- .cpo_completion = vvp_transient_page_completion_write,
+ .cpo_completion = vvp_transient_page_completion,
}
}
};
ENTRY;
page = cl_page_top(page);
- LINVRNT(cl_page_is_vmlocked(env, page));
LASSERT(cfs_list_empty(&page->cp_flight));
LASSERT(page->cp_req == NULL);
ENTRY;
page = cl_page_top(page);
- LINVRNT(cl_page_is_vmlocked(env, page));
LASSERT(!cfs_list_empty(&page->cp_flight));
LASSERT(req->crq_nrpages > 0);
struct cl_site *site = cl_object_site(o);
int err;
- LINVRNT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
+ LASSERT(type == CPT_CACHEABLE || type == CPT_TRANSIENT);
cfs_might_sleep();
ENTRY;
* Either page is early in initialization (has neither child
* nor parent yet), or it is in the object radix tree.
*/
- ergo(pg->cp_state < CPS_FREEING,
+ ergo(pg->cp_state < CPS_FREEING && pg->cp_type == CPT_CACHEABLE,
(void *)radix_tree_lookup(&header->coh_tree,
pg->cp_index) == pg ||
(child == NULL && parent == NULL));
static void cl_page_state_set(const struct lu_env *env,
struct cl_page *page, enum cl_page_state state)
{
- PINVRNT(env, page, cl_page_invariant(page));
cl_page_state_set0(env, page, state);
}
void cl_page_assume(const struct lu_env *env,
struct cl_io *io, struct cl_page *pg)
{
- PASSERT(env, pg, pg->cp_state < CPS_OWNED);
PASSERT(env, pg, pg->cp_owner == NULL);
PINVRNT(env, pg, cl_object_same(pg->cp_obj, io->ci_obj));
PINVRNT(env, pg, cl_page_invariant(pg));
/* cl_page::cp_req already cleared by the caller (osc_completion()) */
PASSERT(env, pg, pg->cp_req == NULL);
PASSERT(env, pg, pg->cp_state == cl_req_type_state(crt));
- PINVRNT(env, pg, cl_page_invariant(pg));
ENTRY;
CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, ioret);
(const struct lu_env *,
const struct cl_page_slice *, int), ioret);
if (anchor) {
+ LASSERT(cl_page_is_vmlocked(env, pg));
LASSERT(pg->cp_sync_io == anchor);
pg->cp_sync_io = NULL;
cl_sync_io_note(anchor, ioret);
{
loi_list_maint(cli, osc->oo_oinfo);
osc_check_rpcs(env, cli);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
}
/**
oap->oap_async_flags |= ASYNC_HP;
cfs_spin_unlock(&oap->oap_lock);
}
- /*
- * This can be checked without cli->cl_loi_list_lock, because
- * ->oap_*_item are always manipulated when the page is owned.
- */
- if (!cfs_list_empty(&oap->oap_urgent_item) ||
- !cfs_list_empty(&oap->oap_rpc_item)) {
- result = -EBUSY;
- break;
- }
if (osc0 == NULL) { /* first iteration */
cli = &exp->exp_obd->u.cli;
osc0 = osc;
+ client_obd_list_lock(&cli->cl_loi_list_lock);
} else /* check that all pages are against the same object
* (for now) */
LASSERT(osc == osc0);
- if (queued++ == 0)
- client_obd_list_lock(&cli->cl_loi_list_lock);
+
+ if (!cfs_list_empty(&oap->oap_urgent_item) ||
+ !cfs_list_empty(&oap->oap_rpc_item)) {
+ result = -EBUSY;
+ break;
+ }
+
result = cl_page_prep(env, io, page, crt);
if (result == 0) {
+ ++queued;
cl_page_list_move(qout, qin, page);
if (cfs_list_empty(&oap->oap_pending_item)) {
osc_io_submit_page(env, cl2osc_io(env, ios),
if (queued > 0)
osc_io_unplug(env, osc, cli);
+ if (osc0)
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
CDEBUG(D_INFO, "%d/%d %d\n", qin->pl_nr, qout->pl_nr, result);
return qout->pl_nr > 0 ? 0 : result;
}
{
struct osc_object *obj;
- LINVRNT(cl_page_is_vmlocked(env, opg->ops_cl.cpl_page));
-
obj = cl2osc(opg->ops_cl.cpl_obj);
cfs_spin_lock(&obj->oo_seatbelt);
cfs_list_add(&opg->ops_inflight, &obj->oo_inflight[crt]);
struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page);
struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
enum cl_req_type crt;
+ int srvlock;
LINVRNT(osc_page_protected(env, opg, CLM_READ, 1));
- LINVRNT(cl_page_is_vmlocked(env, page));
ENTRY;
cfs_spin_unlock(&obj->oo_seatbelt);
opg->ops_submit_time = 0;
+ srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
cl_page_completion(env, page, crt, rc);
/* statistic */
- if (rc == 0 && oap->oap_brw_flags & OBD_BRW_SRVLOCK) {
+ if (rc == 0 && srvlock) {
struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev;
struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
int bytes = oap->oap_count;
rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
oap->oap_cmd, oa, rc);
- /* ll_ap_completion (from llite) drops PG_locked. so, a new
- * I/O on the page could start, but OSC calls it under lock
- * and thus we can add oap back to pending safely */
+ /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
+ * start, but OSC calls it under lock and thus we can add oap back to
+ * pending safely */
if (rc)
/* upper layer wants to leave the page on pending queue */
osc_oap_to_pending(oap);
}
if (oap == NULL)
break;
- /*
- * Page submitted for IO has to be locked. Either by
- * ->ap_make_ready() or by higher layers.
- */
-#if defined(__KERNEL__) && defined(__linux__)
- {
- struct cl_page *page;
-
- page = osc_oap2cl_page(oap);
-
- if (page->cp_type == CPT_CACHEABLE &&
- !(PageLocked(oap->oap_page) &&
- (CheckWriteback(oap->oap_page, cmd)))) {
- CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
- oap->oap_page,
- (long)oap->oap_page->flags,
- oap->oap_async_flags);
- LBUG();
- }
- }
-#endif
/* take the page out of our book-keeping */
cfs_list_del_init(&oap->oap_pending_item);