From 24cb5819c38c16656eb4f4d20a6e73bf918eca01 Mon Sep 17 00:00:00 2001 From: Fan Yong Date: Thu, 2 Aug 2012 09:30:45 +0800 Subject: [PATCH] LU-1683 agl: increase lock cll_holds for AGL upcall If without additional cll_holds held for AGL upcall, the AGL lock may be cancelled/deleted by the AGL thread if the AGL lock cannot be granted at once. The osc_lock_wait() will re-trigger lock enqueue for non-granted AGL lock and return 'CLO_REENQUEUED' to the caller. Original lov lock enqueue logic ignored such case for lov_lock_enqueue_one(), so may cause unexpected LASSERT when checking the result. Signed-off-by: Fan Yong Change-Id: I37305aba9d9f9ad525decc20badac4afbe7aedb0 Reviewed-on: http://review.whamcloud.com/3249 Reviewed-by: Jinshan Xiong Tested-by: Hudson Tested-by: Maloo Reviewed-by: James Simmons Reviewed-by: Oleg Drokin --- lustre/include/cl_object.h | 2 ++ lustre/lov/lov_lock.c | 17 +++++++++++------ lustre/obdclass/cl_lock.c | 18 +++++++++++------- lustre/osc/osc_lock.c | 40 ++++++++++++++++++++++------------------ lustre/osc/osc_request.c | 5 ++++- 5 files changed, 50 insertions(+), 32 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index d8f6f17..b0c11e3 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -2885,6 +2885,8 @@ void cl_lock_get_trust (struct cl_lock *lock); void cl_lock_put (const struct lu_env *env, struct cl_lock *lock); void cl_lock_hold_add (const struct lu_env *env, struct cl_lock *lock, const char *scope, const void *source); +void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock, + const char *scope, const void *source); void cl_lock_unhold (const struct lu_env *env, struct cl_lock *lock, const char *scope, const void *source); void cl_lock_release (const struct lu_env *env, struct cl_lock *lock, diff --git a/lustre/lov/lov_lock.c b/lustre/lov/lov_lock.c index 1fc696c..907baae 100644 --- a/lustre/lov/lov_lock.c +++ b/lustre/lov/lov_lock.c @@ -269,8 +269,10 @@ static int lov_subresult(int result, int rc) ENTRY; - LASSERT(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT); - LASSERT(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT); + LASSERTF(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT, + "result = %d", result); + LASSERTF(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT, + "rc = %d\n", rc); CLASSERT(CLO_WAIT < CLO_REPEAT); /* calculate ranks in the ordering above */ @@ -524,10 +526,13 @@ static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck, /* first, try to enqueue a sub-lock ... */ result = cl_enqueue_try(env, sublock, io, enqflags); - if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL)) - /* if it is enqueued, try to `wait' on it---maybe it's already - * granted */ - result = cl_wait_try(env, sublock); + if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL)) { + /* if it is enqueued, try to `wait' on it---maybe it's already + * granted */ + result = cl_wait_try(env, sublock); + if (result == CLO_REENQUEUED) + result = CLO_WAIT; + } /* * If CEF_ASYNC flag is set, then all sub-locks can be enqueued in * parallel, otherwise---enqueue has to wait until sub-lock is granted diff --git a/lustre/obdclass/cl_lock.c b/lustre/obdclass/cl_lock.c index d5efa9b..84ee04f 100644 --- a/lustre/obdclass/cl_lock.c +++ b/lustre/obdclass/cl_lock.c @@ -884,8 +884,8 @@ static void cl_lock_used_mod(const struct lu_env *env, struct cl_lock *lock, } } -static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock, - const char *scope, const void *source) +void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock, + const char *scope, const void *source) { LINVRNT(cl_lock_is_mutexed(lock)); LINVRNT(cl_lock_invariant(env, lock)); @@ -917,6 +917,7 @@ static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock, } EXIT; } +EXPORT_SYMBOL(cl_lock_hold_release); /** * Waits until lock state is changed. @@ -1196,7 +1197,9 @@ int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock, case CLS_QUEUING: /* kick layers. */ result = cl_enqueue_kick(env, lock, io, flags); - if (result == 0) + /* For AGL case, the cl_lock::cll_state may + * become CLS_HELD already. */ + if (result == 0 && lock->cll_state == CLS_QUEUING) cl_lock_state_set(env, lock, CLS_ENQUEUED); break; case CLS_INTRANSIT: @@ -1456,9 +1459,11 @@ int cl_wait_try(const struct lu_env *env, struct cl_lock *lock) do { LINVRNT(cl_lock_is_mutexed(lock)); LINVRNT(cl_lock_invariant(env, lock)); - LASSERT(lock->cll_state == CLS_ENQUEUED || - lock->cll_state == CLS_HELD || - lock->cll_state == CLS_INTRANSIT); + LASSERTF(lock->cll_state == CLS_QUEUING || + lock->cll_state == CLS_ENQUEUED || + lock->cll_state == CLS_HELD || + lock->cll_state == CLS_INTRANSIT, + "lock state: %d\n", lock->cll_state); LASSERT(lock->cll_users > 0); LASSERT(lock->cll_holds > 0); @@ -2038,7 +2043,6 @@ void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel) again: cl_lock_mutex_get(env, lock); if (lock->cll_state < CLS_FREEING) { - LASSERT(lock->cll_holds == 0); LASSERT(lock->cll_users <= 1); if (unlikely(lock->cll_users == 1)) { struct l_wait_info lwi = { 0 }; diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index 467295f..a4b9be03 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -583,17 +583,21 @@ static int osc_lock_upcall(void *cookie, int errcode) cl_lock_error(env, lock, rc); } - cl_lock_mutex_put(env, lock); - - /* release cookie reference, acquired by osc_lock_enqueue() */ - lu_ref_del(&lock->cll_reference, "upcall", lock); - cl_lock_put(env, lock); - - cl_env_nested_put(&nest, env); - } else - /* should never happen, similar to osc_ldlm_blocking_ast(). */ - LBUG(); - RETURN(errcode); + /* release cookie reference, acquired by osc_lock_enqueue() */ + cl_lock_hold_release(env, lock, "upcall", lock); + cl_lock_mutex_put(env, lock); + + lu_ref_del(&lock->cll_reference, "upcall", lock); + /* This maybe the last reference, so must be called after + * cl_lock_mutex_put(). */ + cl_lock_put(env, lock); + + cl_env_nested_put(&nest, env); + } else { + /* should never happen, similar to osc_ldlm_blocking_ast(). */ + LBUG(); + } + RETURN(errcode); } /** @@ -1179,7 +1183,9 @@ static int osc_lock_enqueue(const struct lu_env *env, if (enqflags & CEF_AGL) { ols->ols_flags |= LDLM_FL_BLOCK_NOWAIT; ols->ols_agl = 1; - } + } else { + ols->ols_agl = 0; + } if (ols->ols_flags & LDLM_FL_HAS_INTENT) ols->ols_glimpse = 1; if (!osc_lock_is_lockless(ols) && !(enqflags & CEF_MUST)) @@ -1198,9 +1204,9 @@ static int osc_lock_enqueue(const struct lu_env *env, if (ols->ols_locklessable) ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION; - /* a reference for lock, passed as an upcall cookie */ - cl_lock_get(lock); - lu_ref_add(&lock->cll_reference, "upcall", lock); + /* lock will be passed as upcall cookie, + * hold ref to prevent to be released. */ + cl_lock_hold_add(env, lock, "upcall", lock); /* a user for lock also */ cl_lock_user_add(env, lock); ols->ols_state = OLS_ENQUEUED; @@ -1221,9 +1227,7 @@ static int osc_lock_enqueue(const struct lu_env *env, PTLRPCD_SET, 1, ols->ols_agl); if (result != 0) { cl_lock_user_del(env, lock); - lu_ref_del(&lock->cll_reference, - "upcall", lock); - cl_lock_put(env, lock); + cl_lock_unhold(env, lock, "upcall", lock); if (unlikely(result == -ECANCELED)) { ols->ols_state = OLS_NEW; result = 0; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index b288b04..d693fb6 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -2495,7 +2495,10 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, * are explained in lov_enqueue() */ } - /* We already have a lock, and it's referenced */ + /* We already have a lock, and it's referenced. + * + * At this point, the cl_lock::cll_state is CLS_QUEUING, + * AGL upcall may change it to CLS_HELD directly. */ (*upcall)(cookie, ELDLM_OK); if (einfo->ei_mode != mode) -- 1.8.3.1