From f0d608786a27dfb8dddf06d6b086b491749557f1 Mon Sep 17 00:00:00 2001 From: Fan Yong Date: Thu, 2 Feb 2012 19:17:41 +0800 Subject: [PATCH] LU-1061 agl: cl_locks_prune() waits for the last user The AGL sponsor holds user reference count on the cl_lock before triggering AGL RPC. The user reference count on the cl_lock will be released by AGL RPC reply upcall. Such AGL mechanism conflict with cl_locks_prune(), which requires no lock is in active using when the last iput() called. So the cl_locks_prune() should wait for the last user reference count to be released by the enqueue reply upcall. Signed-off-by: Fan Yong Change-Id: I8998c0a247448b1b6c6e99995c9d956b1666279b Reviewed-on: http://review.whamcloud.com/2079 Tested-by: Hudson Reviewed-by: James Simmons Tested-by: Maloo Reviewed-by: Jinshan Xiong Reviewed-by: Niu Yawei Reviewed-by: Oleg Drokin --- lustre/include/cl_object.h | 2 +- lustre/include/lustre_dlm.h | 31 ++++++++++++++++--------------- lustre/ldlm/ldlm_lock.c | 22 +++++++++++----------- lustre/obdclass/cl_lock.c | 20 +++++++++++++++++--- lustre/osc/osc_lock.c | 6 ++++-- lustre/osc/osc_request.c | 2 +- 6 files changed, 50 insertions(+), 33 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index 628bfbf..0760bf8 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -2815,7 +2815,7 @@ void cl_lock_unhold (const struct lu_env *env, struct cl_lock *lock, void cl_lock_release (const struct lu_env *env, struct cl_lock *lock, const char *scope, const void *source); void cl_lock_user_add (const struct lu_env *env, struct cl_lock *lock); -int cl_lock_user_del (const struct lu_env *env, struct cl_lock *lock); +void cl_lock_user_del (const struct lu_env *env, struct cl_lock *lock); enum cl_lock_state cl_lock_intransit(const struct lu_env *env, struct cl_lock *lock); diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 3f6af4f..bd4d87d 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -699,18 +699,6 @@ struct ldlm_lock { __u64 l_flags; __u32 l_readers; __u32 l_writers; - /* - * Set for locks that were removed from class hash table and will be - * destroyed when last reference to them is released. Set by - * ldlm_lock_destroy_internal(). - * - * Protected by lock and resource locks. - */ - __u8 l_destroyed; - /** - * flag whether this is a server namespace lock - */ - __u8 l_ns_srv; /** * If the lock is granted, a process sleeps on this waitq to learn when * it's no longer in use. If the lock is not granted, a process sleeps @@ -731,11 +719,24 @@ struct ldlm_lock { struct ldlm_extent l_req_extent; + unsigned int l_failed:1, + /* + * Set for locks that were removed from class hash table and will be + * destroyed when last reference to them is released. Set by + * ldlm_lock_destroy_internal(). + * + * Protected by lock and resource locks. + */ + l_destroyed:1, + /** + * flag whether this is a server namespace lock. + */ + l_ns_srv:1; + /* * Client-side-only members. */ - int l_fail_value; /** * Temporary storage for an LVB received during an enqueue operation. */ @@ -1083,8 +1084,8 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode); int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode); -void ldlm_lock_fail_match_locked(struct ldlm_lock *lock, int rc); -void ldlm_lock_fail_match(struct ldlm_lock *lock, int rc); +void ldlm_lock_fail_match_locked(struct ldlm_lock *lock); +void ldlm_lock_fail_match(struct ldlm_lock *lock); void ldlm_lock_allow_match(struct ldlm_lock *lock); void ldlm_lock_allow_match_locked(struct ldlm_lock *lock); ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index f72248a..4c124d4 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1056,7 +1056,7 @@ static struct ldlm_lock *search_queue(cfs_list_t *queue, if (!unref && (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED || - lock->l_fail_value != 0)) + lock->l_failed)) continue; if ((flags & LDLM_FL_LOCAL_ONLY) && @@ -1076,19 +1076,19 @@ static struct ldlm_lock *search_queue(cfs_list_t *queue, return NULL; } -void ldlm_lock_fail_match_locked(struct ldlm_lock *lock, int rc) +void ldlm_lock_fail_match_locked(struct ldlm_lock *lock) { - if (lock->l_fail_value == 0) { - lock->l_fail_value = rc; - cfs_waitq_signal(&lock->l_waitq); + if (!lock->l_failed) { + lock->l_failed = 1; + cfs_waitq_broadcast(&lock->l_waitq); } } EXPORT_SYMBOL(ldlm_lock_fail_match_locked); -void ldlm_lock_fail_match(struct ldlm_lock *lock, int rc) +void ldlm_lock_fail_match(struct ldlm_lock *lock) { lock_res_and_lock(lock); - ldlm_lock_fail_match_locked(lock, rc); + ldlm_lock_fail_match_locked(lock); unlock_res_and_lock(lock); } EXPORT_SYMBOL(ldlm_lock_fail_match); @@ -1096,7 +1096,7 @@ EXPORT_SYMBOL(ldlm_lock_fail_match); void ldlm_lock_allow_match_locked(struct ldlm_lock *lock) { lock->l_flags |= LDLM_FL_LVB_READY; - cfs_waitq_signal(&lock->l_waitq); + cfs_waitq_broadcast(&lock->l_waitq); } void ldlm_lock_allow_match(struct ldlm_lock *lock) @@ -1206,7 +1206,7 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */ l_wait_event(lock->l_waitq, lock->l_flags & LDLM_FL_LVB_READY || - lock->l_fail_value != 0, + lock->l_failed, &lwi); if (!(lock->l_flags & LDLM_FL_LVB_READY)) { if (flags & LDLM_FL_TEST_LOCK) @@ -1263,7 +1263,7 @@ ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh, if (lock != NULL) { lock_res_and_lock(lock); if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED || - lock->l_fail_value != 0) + lock->l_failed) GOTO(out, mode); if (lock->l_flags & LDLM_FL_CBPENDING && @@ -1311,7 +1311,7 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, lock->l_req_mode = mode; lock->l_ast_data = data; lock->l_pid = cfs_curproc_pid(); - lock->l_ns_srv = ns_is_server(ns); + lock->l_ns_srv = !!ns_is_server(ns); if (cbs) { lock->l_blocking_ast = cbs->lcs_blocking; lock->l_completion_ast = cbs->lcs_completion; diff --git a/lustre/obdclass/cl_lock.c b/lustre/obdclass/cl_lock.c index 70d94a9..4202603 100644 --- a/lustre/obdclass/cl_lock.c +++ b/lustre/obdclass/cl_lock.c @@ -2076,10 +2076,22 @@ void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel) cl_lock_get_trust(lock); cfs_spin_unlock(&head->coh_lock_guard); lu_ref_add(&lock->cll_reference, "prune", cfs_current()); + +again: cl_lock_mutex_get(env, lock); if (lock->cll_state < CLS_FREEING) { LASSERT(lock->cll_holds == 0); - LASSERT(lock->cll_users == 0); + LASSERT(lock->cll_users <= 1); + if (unlikely(lock->cll_users == 1)) { + struct l_wait_info lwi = { 0 }; + + cl_lock_mutex_put(env, lock); + l_wait_event(lock->cll_wq, + lock->cll_users == 0, + &lwi); + goto again; + } + if (cancel) cl_lock_cancel(env, lock); cl_lock_delete(env, lock); @@ -2255,7 +2267,7 @@ void cl_lock_user_add(const struct lu_env *env, struct cl_lock *lock) } EXPORT_SYMBOL(cl_lock_user_add); -int cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock) +void cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock) { LINVRNT(cl_lock_is_mutexed(lock)); LINVRNT(cl_lock_invariant(env, lock)); @@ -2263,7 +2275,9 @@ int cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock) ENTRY; cl_lock_used_mod(env, lock, -1); - RETURN(lock->cll_users == 0); + if (lock->cll_users == 0) + cfs_waitq_broadcast(&lock->cll_wq); + EXIT; } EXPORT_SYMBOL(cl_lock_user_del); diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index f239956..a1dc1bf 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -535,13 +535,15 @@ static int osc_lock_upcall(void *cookie, int errcode) dlmlock->l_ast_data = NULL; olck->ols_handle.cookie = 0ULL; cfs_spin_unlock(&osc_ast_guard); - ldlm_lock_fail_match_locked(dlmlock, rc); + ldlm_lock_fail_match_locked(dlmlock); unlock_res_and_lock(dlmlock); LDLM_LOCK_PUT(dlmlock); } } else { - if (olck->ols_glimpse) + if (olck->ols_glimpse) { olck->ols_glimpse = 0; + olck->ols_agl = 0 ; + } osc_lock_upcall0(env, olck); } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index c4ea288..2c4cef7 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -3332,7 +3332,7 @@ void osc_update_enqueue(struct lustre_handle *lov_lockhp, if (lock != NULL) { if (rc != ELDLM_OK) - ldlm_lock_fail_match(lock, rc); + ldlm_lock_fail_match(lock); LDLM_LOCK_PUT(lock); } -- 1.8.3.1