From e482463ea5d52e696f772ab581650659ac2ab096 Mon Sep 17 00:00:00 2001 From: Jinshan Date: Fri, 19 Oct 2012 12:28:00 -0400 Subject: [PATCH] LU-2170 osc: set osc_lock attribute only once Set osc_lock's attribute by lock allocator, otherwise if this lock is matched and enqueued by a glimpse thread, the osc_lock's ols_glimpse will be set to true and the lock state will be messed in osc_lock_upcall(). Signed-off-by: Jinshan Xiong Change-Id: Ib8492fa159a43dad11febe5a01f8c4ef72b8c4f3 Reviewed-on: http://review.whamcloud.com/4316 Tested-by: Hudson Reviewed-by: Fan Yong Reviewed-by: Niu Yawei Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/lov/lov_lock.c | 2 +- lustre/osc/osc_lock.c | 49 ++++++++++++++++++++++++++----------------------- lustre/osc/osc_page.c | 16 ++++++++-------- 3 files changed, 35 insertions(+), 32 deletions(-) diff --git a/lustre/lov/lov_lock.c b/lustre/lov/lov_lock.c index 907baae..6d66adb 100644 --- a/lustre/lov/lov_lock.c +++ b/lustre/lov/lov_lock.c @@ -155,7 +155,7 @@ static struct cl_lock *lov_sublock_alloc(const struct lu_env *env, parent = lck->lls_cl.cls_lock; lls = &lck->lls_sub[idx]; - descr = &lls->sub_descr; + descr = &lls->sub_got; subenv = lov_sublock_env_get(env, parent, lls); if (!IS_ERR(subenv)) { diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index f517631..08b3a96 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -1005,14 +1005,12 @@ static void osc_lock_to_lockless(const struct lu_env *env, struct osc_lock *ols, int force) { struct cl_lock_slice *slice = &ols->ols_cl; - struct cl_lock *lock = slice->cls_lock; LASSERT(ols->ols_state == OLS_NEW || ols->ols_state == OLS_UPCALL_RECEIVED); if (force) { ols->ols_locklessable = 1; - LASSERT(cl_lock_is_mutexed(lock)); slice->cls_ops = &osc_lock_lockless_ops; } else { struct osc_io *oio = osc_env_io(env); @@ -1181,18 +1179,8 @@ static int osc_lock_enqueue(const struct lu_env *env, LASSERTF(ols->ols_state == OLS_NEW, "Impossible state: %d\n", ols->ols_state); - ols->ols_flags = osc_enq2ldlm_flags(enqflags); - if (enqflags & CEF_AGL) { - ols->ols_flags |= LDLM_FL_BLOCK_NOWAIT; - ols->ols_agl = 1; - } else { - ols->ols_agl = 0; - } - if (ols->ols_flags & LDLM_FL_HAS_INTENT) - ols->ols_glimpse = 1; - if (!osc_lock_is_lockless(ols) && !(enqflags & CEF_MUST)) - /* try to convert this lock to a lockless lock */ - osc_lock_to_lockless(env, ols, (enqflags & CEF_NEVER)); + LASSERTF(ergo(ols->ols_glimpse, lock->cll_descr.cld_mode <= CLM_READ), + "lock = %p, ols = %p\n", lock, ols); result = osc_lock_enqueue_wait(env, ols); if (result == 0) { @@ -1203,9 +1191,6 @@ static int osc_lock_enqueue(const struct lu_env *env, ldlm_policy_data_t *policy = &info->oti_policy; struct ldlm_enqueue_info *einfo = &ols->ols_einfo; - if (ols->ols_locklessable) - ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION; - /* lock will be passed as upcall cookie, * hold ref to prevent to be released. */ cl_lock_hold_add(env, lock, "upcall", lock); @@ -1272,7 +1257,7 @@ static int osc_lock_wait(const struct lu_env *env, int rc; LASSERT(olck->ols_agl); - + olck->ols_agl = 0; rc = osc_lock_enqueue(env, slice, NULL, CEF_ASYNC | CEF_MUST); if (rc != 0) return rc; @@ -1394,7 +1379,8 @@ static void osc_lock_cancel(const struct lu_env *env, int do_cancel; discard = !!(dlmlock->l_flags & LDLM_FL_DISCARD_DATA); - result = osc_lock_flush(olck, discard); + if (olck->ols_state >= OLS_GRANTED) + result = osc_lock_flush(olck, discard); osc_lock_unhold(olck); lock_res_and_lock(dlmlock); @@ -1708,10 +1694,27 @@ int osc_lock_init(const struct lu_env *env, OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, CFS_ALLOC_IO); if (clk != NULL) { - osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo); - cfs_atomic_set(&clk->ols_pageref, 0); - clk->ols_state = OLS_NEW; - cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops); + __u32 enqflags = lock->cll_descr.cld_enq_flags; + + osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo); + cfs_atomic_set(&clk->ols_pageref, 0); + clk->ols_state = OLS_NEW; + + clk->ols_flags = osc_enq2ldlm_flags(enqflags); + clk->ols_agl = !!(enqflags & CEF_AGL); + if (clk->ols_agl) + clk->ols_flags |= LDLM_FL_BLOCK_NOWAIT; + if (clk->ols_flags & LDLM_FL_HAS_INTENT) + clk->ols_glimpse = 1; + + cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops); + + if (!(enqflags & CEF_MUST)) + /* try to convert this lock to a lockless lock */ + osc_lock_to_lockless(env, clk, (enqflags & CEF_NEVER)); + if (clk->ols_locklessable && !(enqflags & CEF_DISCARD_DATA)) + clk->ols_flags |= LDLM_FL_DENY_ON_CONTENTION; + result = 0; } else result = -ENOMEM; diff --git a/lustre/osc/osc_page.c b/lustre/osc/osc_page.c index c46db69..29774de 100644 --- a/lustre/osc/osc_page.c +++ b/lustre/osc/osc_page.c @@ -264,9 +264,9 @@ static int osc_page_addref_lock(const struct lu_env *env, olock = osc_lock_at(lock); if (cfs_atomic_inc_return(&olock->ols_pageref) <= 0) { cfs_atomic_dec(&olock->ols_pageref); - cl_lock_put(env, lock); - rc = 1; + rc = -ENODATA; } else { + cl_lock_get(lock); opg->ops_lock = lock; rc = 0; } @@ -293,16 +293,16 @@ static int osc_page_is_under_lock(const struct lu_env *env, struct cl_io *unused) { struct cl_lock *lock; - int result; + int result = -ENODATA; ENTRY; lock = cl_lock_at_page(env, slice->cpl_obj, slice->cpl_page, NULL, 1, 0); - if (lock != NULL && - osc_page_addref_lock(env, cl2osc_page(slice), lock) == 0) - result = -EBUSY; - else - result = -ENODATA; + if (lock != NULL) { + if (osc_page_addref_lock(env, cl2osc_page(slice), lock) == 0) + result = -EBUSY; + cl_lock_put(env, lock); + } RETURN(result); } -- 1.8.3.1