From f49d4116040818a8b3888df55dacb1f16b0b65c6 Mon Sep 17 00:00:00 2001 From: Jinshan Xiong Date: Fri, 22 Jul 2011 20:52:46 -0700 Subject: [PATCH] LU-523: No prepare_write for lockless IO For page unaligned write, CLIO does prepare_write even for lockless IO and then write full page back, this will cause data corruption since data is not covered by lock. In this patch, we don't do prepare_write for lockless IO, and then submit exact bytes in the page to OST. Change-Id: I4aa5afeb82cb717de499c8a8c004078b279302c7 Signed-off-by: Jinshan Xiong Reviewed-on: http://review.whamcloud.com/1130 Tested-by: Hudson Reviewed-by: Oleg Drokin Reviewed-by: Niu Yawei Tested-by: Maloo --- lustre/obdclass/cl_io.c | 3 +++ lustre/osc/osc_internal.h | 5 ++--- lustre/osc/osc_io.c | 19 +++++++++++++++++-- lustre/osc/osc_page.c | 7 ++++--- lustre/osc/osc_request.c | 21 ++++++++++++--------- 5 files changed, 38 insertions(+), 17 deletions(-) diff --git a/lustre/obdclass/cl_io.c b/lustre/obdclass/cl_io.c index 3b9b876..d5d52d9 100644 --- a/lustre/obdclass/cl_io.c +++ b/lustre/obdclass/cl_io.c @@ -1547,6 +1547,9 @@ void cl_req_page_add(const struct lu_env *env, LASSERT(cfs_list_empty(&page->cp_flight)); LASSERT(page->cp_req == NULL); + CL_PAGE_DEBUG(D_PAGE, env, page, "req %p, %d, %u\n", + req, req->crq_type, req->crq_nrpages); + cfs_list_add_tail(&page->cp_flight, &req->crq_pages); ++req->crq_nrpages; page->cp_req = req; diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h index 305ddd3..2a94ebb 100644 --- a/lustre/osc/osc_internal.h +++ b/lustre/osc/osc_internal.h @@ -159,9 +159,8 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli); int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_oinfo *loi, - struct osc_async_page *oap, int cmd, obd_off off, - int count, obd_flag brw_flags, - enum async_flags async_flags); + struct osc_async_page *oap, int cmd, int off, + int count, obd_flag brw_flags, enum async_flags async_flags); int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_oinfo *loi, struct osc_async_page *oap); int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *cfg); diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 9e77fc6..4cae4a2 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -299,14 +299,24 @@ static int osc_io_prepare_write(const struct lu_env *env, { struct osc_device *dev = lu2osc_dev(slice->cpl_obj->co_lu.lo_dev); struct obd_import *imp = class_exp2cliimp(dev->od_exp); - + struct osc_io *oio = cl2osc_io(env, ios); + int result = 0; ENTRY; /* * This implements OBD_BRW_CHECK logic from old client. */ - RETURN(imp == NULL || imp->imp_invalid ? -EIO : 0); + if (imp == NULL || imp->imp_invalid) + result = -EIO; + if (result == 0 && oio->oi_lockless) + /* this page contains `invalid' data, but who cares? + * nobody can access the invalid data. + * in osc_io_commit_write(), we're going to write exact + * [from, to) bytes of this page to OST. -jay */ + cl_page_export(env, slice->cpl_page, 1); + + RETURN(result); } static int osc_io_commit_write(const struct lu_env *env, @@ -314,6 +324,7 @@ static int osc_io_commit_write(const struct lu_env *env, const struct cl_page_slice *slice, unsigned from, unsigned to) { + struct osc_io *oio = cl2osc_io(env, ios); struct osc_page *opg = cl2osc_page(slice); struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj); struct osc_async_page *oap = &opg->ops_oap; @@ -331,6 +342,10 @@ static int osc_io_commit_write(const struct lu_env *env, cfs_capable(CFS_CAP_SYS_RESOURCE)) oap->oap_brw_flags |= OBD_BRW_NOQUOTA; + if (oio->oi_lockless) + /* see osc_io_prepare_write() for lockless io handling. */ + cl_page_clip(env, slice->cpl_page, from, to); + RETURN(0); } diff --git a/lustre/osc/osc_page.c b/lustre/osc/osc_page.c index d7f7772..d5e24e2 100644 --- a/lustre/osc/osc_page.c +++ b/lustre/osc/osc_page.c @@ -232,7 +232,8 @@ static int osc_page_cache_add(const struct lu_env *env, osc_page_transfer_get(opg, "transfer\0cache"); result = osc_queue_async_io(env, osc_export(obj), NULL, obj->oo_oinfo, &opg->ops_oap, OBD_BRW_WRITE | noquota, - 0, 0, brw_flags, 0); + opg->ops_from, opg->ops_to - opg->ops_from, + brw_flags, 0); if (result != 0) osc_page_transfer_put(env, opg); else @@ -365,7 +366,7 @@ static int osc_page_print(const struct lu_env *env, return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p: " "1< %#x %d %u %s %s %s > " - "2< "LPU64" %u %#x %#x | %p %p %p %p %p > " + "2< "LPU64" %u %u %#x %#x | %p %p %p %p %p > " "3< %s %p %d %lu %d > " "4< %d %d %d %lu %s | %s %s %s %s > " "5< %s %s %s %s | %d %s %s | %d %s %s>\n", @@ -377,7 +378,7 @@ static int osc_page_print(const struct lu_env *env, osc_list(&oap->oap_urgent_item), osc_list(&oap->oap_rpc_item), /* 2 */ - oap->oap_obj_off, oap->oap_page_off, + oap->oap_obj_off, oap->oap_page_off, oap->oap_count, oap->oap_async_flags, oap->oap_brw_flags, oap->oap_request, oap->oap_cli, oap->oap_loi, oap->oap_caller_ops, diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index a7cc433..b4abf57 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -2436,7 +2436,8 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, CFS_LIST_HEAD(rpc_list); CFS_LIST_HEAD(tmp_list); unsigned int ending_offset; - unsigned starting_offset = 0; + obd_off starting_offset = OBD_OBJECT_EOF; + int starting_page_off = 0; int srvlock = 0, mem_tight = 0; struct cl_object *clob = NULL; ENTRY; @@ -2482,7 +2483,13 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, /* If there is a gap at the start of this page, it can't merge * with any previous page, so we'll hand the network a * "fragmented" page array that it can't transfer in 1 RDMA */ - if (page_count != 0 && oap->oap_page_off != 0) + if (oap->oap_obj_off < starting_offset) { + if (starting_page_off != 0) + break; + + starting_page_off = oap->oap_page_off; + starting_offset = oap->oap_obj_off + starting_page_off; + } else if (oap->oap_page_off != 0) break; /* in llite being 'ready' equates to the page being locked @@ -2560,10 +2567,6 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, lop_update_pending(cli, lop, cmd, -1); cfs_list_del_init(&oap->oap_urgent_item); - if (page_count == 0) - starting_offset = (oap->oap_obj_off+oap->oap_page_off) & - (PTLRPC_MAX_BRW_SIZE - 1); - /* ask the caller for the size of the io as the rpc leaves. */ if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) { oap->oap_count = @@ -2628,6 +2631,7 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, aa = ptlrpc_req_async_args(req); + starting_offset &= PTLRPC_MAX_BRW_SIZE - 1; if (cmd == OBD_BRW_READ) { lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight); @@ -2968,9 +2972,8 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_oinfo *loi, - struct osc_async_page *oap, int cmd, obd_off off, - int count, obd_flag brw_flags, - enum async_flags async_flags) + struct osc_async_page *oap, int cmd, int off, + int count, obd_flag brw_flags, enum async_flags async_flags) { struct client_obd *cli = &exp->exp_obd->u.cli; int rc = 0; -- 1.8.3.1