From c9d2c07ef22f968eaf7183062ed5ace23086d2f4 Mon Sep 17 00:00:00 2001 From: jxiong Date: Thu, 3 Dec 2009 03:22:59 +0000 Subject: [PATCH] b=21434 r=wangdi,eric.mei We can't use env and io in osc page operations because they are not correctly setup. To support lockless IO, I choose to set srvlock flag at page initialization time. --- lustre/lov/lov_page.c | 111 ++++++++++++++++++++++++------------------- lustre/osc/osc_cl_internal.h | 6 ++- lustre/osc/osc_io.c | 1 + lustre/osc/osc_page.c | 12 +++-- 4 files changed, 75 insertions(+), 55 deletions(-) diff --git a/lustre/lov/lov_page.c b/lustre/lov/lov_page.c index 4609de2..2e293bb 100644 --- a/lustre/lov/lov_page.c +++ b/lustre/lov/lov_page.c @@ -140,60 +140,73 @@ struct cl_page *lov_page_init_raid0(const struct lu_env *env, struct cl_object *obj, struct cl_page *page, cfs_page_t *vmpage) { - struct lov_page *lpg; struct lov_object *loo = cl2lov(obj); + struct lov_io *lio = lov_env_io(env); int result; + loff_t offset; + int stripe; + obd_off suboff; + struct cl_page *subpage; + struct cl_object *subobj; + struct lov_layout_raid0 *r0 = lov_r0(loo); + struct lov_io_sub *sub; + ENTRY; - OBD_SLAB_ALLOC_PTR_GFP(lpg, lov_page_kmem, CFS_ALLOC_IO); - if (lpg != NULL) { - loff_t offset; - int stripe; - obd_off suboff; - struct cl_page *subpage; - struct cl_object *subobj; - struct lov_layout_raid0 *r0 = lov_r0(loo); - - offset = cl_offset(obj, page->cp_index); - stripe = lov_stripe_number(r0->lo_lsm, offset); - result = lov_stripe_offset(r0->lo_lsm, offset, stripe, - &suboff); - LASSERT(stripe < r0->lo_nr); - LASSERT(result == 0); - - subobj = lovsub2cl(r0->lo_sub[stripe]); - subpage = cl_page_find(env, subobj, - cl_index(subobj, suboff), vmpage, - page->cp_type); - if (!IS_ERR(subpage)) { - if (subpage->cp_parent != NULL) { - /* - * This is only possible when TRANSIENT page - * is being created, and CACHEABLE sub-page - * (attached to already existing top-page) has - * been found. Tell cl_page_find() to use - * existing page. - */ - LASSERT(subpage->cp_type == CPT_CACHEABLE); - LASSERT(page->cp_type == CPT_TRANSIENT); - lpg->lps_invalid = 1; - cl_page_put(env, subpage); - /* - * XXX This assumes that lov is in the topmost - * cl_page. - */ - result = PTR_ERR(cl_page_top(subpage)); - } else { - lu_ref_add(&subpage->cp_reference, "lov", page); - subpage->cp_parent = page; - page->cp_child = subpage; - } - cl_page_slice_add(page, &lpg->lps_cl, - obj, &lov_page_ops); - } else - result = PTR_ERR(subpage); + + offset = cl_offset(obj, page->cp_index); + stripe = lov_stripe_number(r0->lo_lsm, offset); + LASSERT(stripe < r0->lo_nr); + result = lov_stripe_offset(r0->lo_lsm, offset, stripe, + &suboff); + LASSERT(result == 0); + + subobj = lovsub2cl(r0->lo_sub[stripe]); + sub = lov_sub_get(env, lio, stripe); + if (IS_ERR(sub)) + GOTO(out, result = PTR_ERR(sub)); + + subpage = cl_page_find(sub->sub_env, subobj, + cl_index(subobj, suboff), vmpage, + page->cp_type); + lov_sub_put(sub); + if (!IS_ERR(subpage)) { + struct lov_page *lpg; + + OBD_SLAB_ALLOC_PTR_GFP(lpg, lov_page_kmem, CFS_ALLOC_IO); + if (lpg == NULL) { + cl_page_put(env, subpage); + GOTO(out, result = -ENOMEM); + } + + if (subpage->cp_parent != NULL) { + /* + * This is only possible when TRANSIENT page + * is being created, and CACHEABLE sub-page + * (attached to already existing top-page) has + * been found. Tell cl_page_find() to use + * existing page. + */ + LASSERT(subpage->cp_type == CPT_CACHEABLE); + LASSERT(page->cp_type == CPT_TRANSIENT); + lpg->lps_invalid = 1; + cl_page_put(env, subpage); + /* + * XXX This assumes that lov is in the topmost + * cl_page. + */ + result = PTR_ERR(cl_page_top(subpage)); + } else { + lu_ref_add(&subpage->cp_reference, "lov", page); + subpage->cp_parent = page; + page->cp_child = subpage; + } + cl_page_slice_add(page, &lpg->lps_cl, + obj, &lov_page_ops); } else - result = -ENOMEM; + result = PTR_ERR(subpage); + +out: RETURN(ERR_PTR(result)); } diff --git a/lustre/osc/osc_cl_internal.h b/lustre/osc/osc_cl_internal.h index 7e0dfe5..7f91fb1 100644 --- a/lustre/osc/osc_cl_internal.h +++ b/lustre/osc/osc_cl_internal.h @@ -281,7 +281,11 @@ struct osc_page { * True for a `temporary page' created by read-ahead code, probably * outside of any DLM lock. */ - ops_temp:1; + ops_temp:1, + /** + * Set if the page must be transferred with OBD_BRW_SRVLOCK. + */ + ops_srvlock:1; /** * Linkage into a per-osc_object list of pages in flight. For * debugging. diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 294f415..3d3fe9f 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -653,6 +653,7 @@ static void osc_req_attr_set(const struct lu_env *env, olck = osc_lock_at(lock); LASSERT(olck != NULL); + LASSERT(ergo(opg->ops_srvlock, olck->ols_lock == NULL)); /* check for lockless io. */ if (olck->ols_lock != NULL) { oa->o_handle = olck->ols_lock->l_remote_handle; diff --git a/lustre/osc/osc_page.c b/lustre/osc/osc_page.c index 2d5b63f..38581f5 100644 --- a/lustre/osc/osc_page.c +++ b/lustre/osc/osc_page.c @@ -212,7 +212,6 @@ static int osc_page_cache_add(const struct lu_env *env, { struct osc_page *opg = cl2osc_page(slice); struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj); - struct osc_io *oio = osc_env_io(env); int result; int brw_flags; int noquota = 0; @@ -221,7 +220,7 @@ static int osc_page_cache_add(const struct lu_env *env, ENTRY; /* Set the OBD_BRW_SRVLOCK before the page is queued. */ - brw_flags = osc_io_srvlock(oio) ? OBD_BRW_SRVLOCK : 0; + brw_flags = opg->ops_srvlock ? OBD_BRW_SRVLOCK : 0; if (!client_is_remote(osc_export(obj)) && cfs_capable(CFS_CAP_SYS_RESOURCE)) { brw_flags |= OBD_BRW_NOQUOTA; @@ -303,7 +302,7 @@ static int osc_page_print(const struct lu_env *env, return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p: " "1< %#x %d %u %s %s %s > " "2< %llu %u %#x %#x | %p %p %p %p %p > " - "3< %s %p %d %lu > " + "3< %s %p %d %lu %d > " "4< %d %d %d %lu %s | %s %s %s %s > " "5< %s %s %s %s | %d %s %s | %d %s %s>\n", opg, @@ -322,7 +321,7 @@ static int osc_page_print(const struct lu_env *env, /* 3 */ osc_list(&opg->ops_inflight), opg->ops_submitter, opg->ops_transfer_pinned, - osc_submit_duration(opg), + osc_submit_duration(opg), opg->ops_srvlock, /* 4 */ cli->cl_r_in_flight, cli->cl_w_in_flight, cli->cl_max_rpcs_in_flight, @@ -571,9 +570,12 @@ struct cl_page *osc_page_init(const struct lu_env *env, cl_offset(obj, page->cp_index), &osc_async_page_ops, opg, (void **)&oap, 1, NULL); - if (result == 0) + if (result == 0) { + struct osc_io *oio = osc_env_io(env); + opg->ops_srvlock = osc_io_srvlock(oio); cl_page_slice_add(page, &opg->ops_cl, obj, &osc_page_ops); + } /* * Cannot assert osc_page_protected() here as read-ahead * creates temporary pages outside of a lock. -- 1.8.3.1