From 7d3723d80078c0852b9511523960ba5f4590643c Mon Sep 17 00:00:00 2001 From: Fan Yong Date: Tue, 31 Jan 2012 13:51:41 +0800 Subject: [PATCH] LU-1046 ldlm: hold ldlm_cb_set_arg until the last user exit There is race condition between ldlm_cb_interpret() and ldlm_run_ast_work() when test/release "ldlm_cb_set_arg": ldlm_run_ast_work() can exit before ldlm_cb_interpret() processing "ldlm_cb_set_arg::waitq", then causes invalid memory accessing. Signed-off-by: Fan Yong Change-Id: Idfb3f41739e02a79fe4310ecbc7bb842e75a82d8 Reviewed-on: http://review.whamcloud.com/2065 Tested-by: Hudson Reviewed-by: Jinshan Xiong Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/ldlm/ldlm_internal.h | 10 ++++++++++ lustre/ldlm/ldlm_lock.c | 38 +++++++++++++++++++++++--------------- lustre/ldlm/ldlm_lockd.c | 9 +++++++-- 3 files changed, 40 insertions(+), 17 deletions(-) diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index f6bb70e..aa0faef 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -108,9 +108,19 @@ struct ldlm_cb_set_arg { unsigned int threshold; /* threshold to wake up the waiting proc */ cfs_atomic_t rpcs; /* # of inflight rpcs in set */ cfs_atomic_t restart; + cfs_atomic_t refcount; cfs_waitq_t waitq; }; +static inline void ldlm_csa_put(struct ldlm_cb_set_arg *arg) +{ + if (cfs_atomic_dec_and_test(&arg->refcount)) { + LASSERT(cfs_atomic_read(&arg->rpcs) == 0); + + OBD_FREE_PTR(arg); + } +} + typedef enum { LDLM_WORK_BL_AST, LDLM_WORK_CP_AST, diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index fe7aac9..8e34904 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1577,31 +1577,37 @@ ldlm_work_revoke_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg) int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list, ldlm_desc_ast_t ast_type) { - struct ldlm_cb_set_arg arg = { 0 }; struct l_wait_info lwi = { 0 }; + struct ldlm_cb_set_arg *arg; cfs_list_t *tmp, *pos; int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg); unsigned int max_ast_count; + int rc; ENTRY; if (cfs_list_empty(rpc_list)) RETURN(0); - cfs_atomic_set(&arg.restart, 0); - cfs_atomic_set(&arg.rpcs, 0); - cfs_waitq_init(&arg.waitq); + OBD_ALLOC_PTR(arg); + if (arg == NULL) + RETURN(-ENOMEM); + + cfs_atomic_set(&arg->restart, 0); + cfs_atomic_set(&arg->rpcs, 0); + cfs_atomic_set(&arg->refcount, 1); + cfs_waitq_init(&arg->waitq); switch (ast_type) { case LDLM_WORK_BL_AST: - arg.type = LDLM_BL_CALLBACK; + arg->type = LDLM_BL_CALLBACK; work_ast_lock = ldlm_work_bl_ast_lock; break; case LDLM_WORK_CP_AST: - arg.type = LDLM_CP_CALLBACK; + arg->type = LDLM_CP_CALLBACK; work_ast_lock = ldlm_work_cp_ast_lock; break; case LDLM_WORK_REVOKE_AST: - arg.type = LDLM_BL_CALLBACK; + arg->type = LDLM_BL_CALLBACK; work_ast_lock = ldlm_work_revoke_ast_lock; break; default: @@ -1609,22 +1615,24 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list, } max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX; - arg.threshold = max_ast_count; + arg->threshold = max_ast_count; cfs_list_for_each_safe(tmp, pos, rpc_list) { - (void)work_ast_lock(tmp, &arg); - if (cfs_atomic_read(&arg.rpcs) < max_ast_count) + (void)work_ast_lock(tmp, arg); + if (cfs_atomic_read(&arg->rpcs) < max_ast_count) continue; - l_wait_event(arg.waitq, - cfs_atomic_read(&arg.rpcs) < arg.threshold, + l_wait_event(arg->waitq, + cfs_atomic_read(&arg->rpcs) < arg->threshold, &lwi); } - arg.threshold = 1; - l_wait_event(arg.waitq, cfs_atomic_read(&arg.rpcs) == 0, &lwi); + arg->threshold = 1; + l_wait_event(arg->waitq, cfs_atomic_read(&arg->rpcs) == 0, &lwi); - RETURN(cfs_atomic_read(&arg.restart) ? -ERESTART : 0); + rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0; + ldlm_csa_put(arg); + RETURN(rc); } static int reprocess_one_queue(struct ldlm_resource *res, void *closure) diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index d77a893..5de675d 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -650,6 +650,7 @@ static int ldlm_cb_interpret(const struct lu_env *env, struct ldlm_cb_async_args *ca = data; struct ldlm_lock *lock = ca->ca_lock; struct ldlm_cb_set_arg *arg = ca->ca_set_arg; + int count; ENTRY; LASSERT(lock != NULL); @@ -662,8 +663,11 @@ static int ldlm_cb_interpret(const struct lu_env *env, } LDLM_LOCK_RELEASE(lock); - if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold) + count = cfs_atomic_dec_return(&arg->rpcs); + if (count < arg->threshold) cfs_waitq_signal(&arg->waitq); + if (count == 0) + ldlm_csa_put(arg); RETURN(0); } @@ -682,8 +686,9 @@ static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req, cfs_atomic_inc(&arg->restart); } else { LDLM_LOCK_GET(lock); + if (cfs_atomic_inc_return(&arg->rpcs) == 1) + cfs_atomic_inc(&arg->refcount); ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - cfs_atomic_inc(&arg->rpcs); } RETURN(rc); -- 1.8.3.1