From 1205771e26c9cda227819ecb55f1e06eb1388f2b Mon Sep 17 00:00:00 2001 From: nikita Date: Tue, 20 Jan 2009 20:06:45 +0000 Subject: [PATCH] For extent locks (that are `asynchronous', that is, enqueued through ptlrpcd) CLIO checks lock->l_granted_mode and accesses lock->l_lvb_data in its completion AST and in the lock reply upcall (called from osc_enqueue_fini()). As such things were never done in the pre-existing code, execution of completion AST and reply upcall is not serialized. Take lock and resource spin-locks manually to avoid races. b=17927 i=oleg.drokin@sun.com i=h.huang@sun.com --- lustre/include/lustre_dlm.h | 1 + lustre/ldlm/ldlm_lock.c | 9 +++++++-- lustre/ldlm/ldlm_lockd.c | 3 ++- lustre/osc/osc_lock.c | 29 ++++++++++++++++++++++++++--- 4 files changed, 36 insertions(+), 6 deletions(-) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 96c5543..6b38baf 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -963,6 +963,7 @@ int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_allow_match(struct ldlm_lock *lock); +void ldlm_lock_allow_match_locked(struct ldlm_lock *lock); ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, const struct ldlm_res_id *, ldlm_type_t type, ldlm_policy_data_t *, ldlm_mode_t mode, diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index c0c566a..050c45c 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -998,11 +998,16 @@ static struct ldlm_lock *search_queue(struct list_head *queue, return NULL; } -void ldlm_lock_allow_match(struct ldlm_lock *lock) +void ldlm_lock_allow_match_locked(struct ldlm_lock *lock) { - lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_LVB_READY; cfs_waitq_signal(&lock->l_waitq); +} + +void ldlm_lock_allow_match(struct ldlm_lock *lock) +{ + lock_res_and_lock(lock); + ldlm_lock_allow_match_locked(lock); unlock_res_and_lock(lock); } diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 4b55123..dec536a 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -1814,7 +1814,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) RETURN(0); } - if ((lock->l_flags & LDLM_FL_FAIL_LOC) && + if ((lock->l_flags & LDLM_FL_FAIL_LOC) && lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE); @@ -2494,6 +2494,7 @@ EXPORT_SYMBOL(ldlm_lock_dump); EXPORT_SYMBOL(ldlm_lock_dump_handle); EXPORT_SYMBOL(ldlm_cancel_locks_for_export); EXPORT_SYMBOL(ldlm_reprocess_all_ns); +EXPORT_SYMBOL(ldlm_lock_allow_match_locked); EXPORT_SYMBOL(ldlm_lock_allow_match); EXPORT_SYMBOL(ldlm_lock_downgrade); EXPORT_SYMBOL(ldlm_lock_convert); diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index 8f2b2b5..0e3521e 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -312,6 +312,8 @@ static void osc_ast_data_put(const struct lu_env *env, struct osc_lock *olck) * * This can be optimized to not update attributes when lock is a result of a * local match. + * + * Called under lock and resource spin-locks. */ static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck, int rc) @@ -344,6 +346,8 @@ static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck, dlmlock = olck->ols_lock; LASSERT(dlmlock != NULL); + /* re-grab LVB from a dlm lock under DLM spin-locks. */ + *lvb = *(struct ost_lvb *)dlmlock->l_lvb_data; size = lvb->lvb_size; /* Extend KMS up to the end of this lock and no further * A lock on [x,y] means a KMS of up to y + 1 bytes! */ @@ -360,7 +364,7 @@ static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck, lvb->lvb_size, oinfo->loi_kms, dlmlock->l_policy_data.l_extent.end); } - ldlm_lock_allow_match(dlmlock); + ldlm_lock_allow_match_locked(dlmlock); } else if (rc == -ENAVAIL && olck->ols_glimpse) { CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving" " kms="LPU64"\n", lvb->lvb_size, oinfo->loi_kms); @@ -375,6 +379,13 @@ static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck, EXIT; } +/** + * Called when a lock is granted, from an upcall (when server returned a + * granted lock), or from completion AST, when server returned a blocked lock. + * + * Called under lock and resource spin-locks, that are released temporarily + * here. + */ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck, struct ldlm_lock *dlmlock, int rc) { @@ -399,10 +410,19 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck, * tell upper layers the extent of the lock that was actually * granted */ - cl_lock_modify(env, lock, descr); LINVRNT(osc_lock_invariant(olck)); olck->ols_state = OLS_GRANTED; osc_lock_lvb_update(env, olck, rc); + + /* release DLM spin-locks to allow cl_lock_modify() to take a + * semaphore on a parent lock. This is safe, because + * spin-locks are needed to protect consistency of + * dlmlock->l_*_mode and LVB, and we have finished processing + * them. */ + unlock_res_and_lock(dlmlock); + cl_lock_modify(env, lock, descr); + lock_res_and_lock(dlmlock); + cl_lock_signal(env, lock); } EXIT; @@ -424,7 +444,6 @@ static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck) LASSERT(olck->ols_lock == NULL); olck->ols_lock = dlmlock; spin_unlock(&osc_ast_guard); - unlock_res_and_lock(dlmlock); /* * Lock might be not yet granted. In this case, completion ast @@ -433,6 +452,8 @@ static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck) */ if (dlmlock->l_granted_mode == dlmlock->l_req_mode) osc_lock_granted(env, olck, dlmlock, 0); + unlock_res_and_lock(dlmlock); + /* * osc_enqueue_interpret() decrefs asynchronous locks, counter * this. @@ -751,6 +772,7 @@ static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock, * to lock->l_lvb_data, store it in osc_lock. */ LASSERT(dlmlock->l_lvb_data != NULL); + lock_res_and_lock(dlmlock); olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data; if (olck->ols_lock == NULL) /* @@ -767,6 +789,7 @@ static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock, osc_lock_granted(env, olck, dlmlock, dlmrc); if (dlmrc != 0) cl_lock_error(env, lock, dlmrc); + unlock_res_and_lock(dlmlock); cl_lock_mutex_put(env, lock); osc_ast_data_put(env, olck); result = 0; -- 1.8.3.1