From: Eric Mei Date: Tue, 21 Sep 2010 18:20:49 +0000 (+0400) Subject: b=16774 cancel unused osc locks before replay. X-Git-Tag: 2.0.53.0~33 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=6fd5e00ff03d41b427eec5d70efaef4bbdd8d59c b=16774 cancel unused osc locks before replay. r=oleg.drokin r=di.wang --- diff --git a/lustre/osc/osc_cl_internal.h b/lustre/osc/osc_cl_internal.h index 16cd956..6478c26 100644 --- a/lustre/osc/osc_cl_internal.h +++ b/lustre/osc/osc_cl_internal.h @@ -196,6 +196,17 @@ struct osc_lock { struct lustre_handle ols_handle; struct ldlm_enqueue_info ols_einfo; enum osc_lock_state ols_state; + + /** + * How many pages are using this lock for io, currently only used by + * read-ahead. If non-zero, the underlying dlm lock won't be cancelled + * during recovery to avoid deadlock. see bz16774. + * + * \see osc_page::ops_lock + * \see osc_page_addref_lock(), osc_page_putref_lock() + */ + cfs_atomic_t ols_pageref; + /** * true, if ldlm_lock_addref() was called against * osc_lock::ols_lock. This is used for sanity checking. @@ -301,6 +312,16 @@ struct osc_page { * Submit time - the time when the page is starting RPC. For debugging. */ cfs_time_t ops_submit_time; + + /** + * A lock of which we hold a reference covers this page. Only used by + * read-ahead: for a readahead page, we hold it's covering lock to + * prevent it from being canceled during recovery. + * + * \see osc_lock::ols_pageref + * \see osc_page_addref_lock(), osc_page_putref_lock(). + */ + struct cl_lock *ops_lock; }; extern cfs_mem_cache_t *osc_page_kmem; diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h index 8bf8039..9928169 100644 --- a/lustre/osc/osc_internal.h +++ b/lustre/osc/osc_internal.h @@ -227,5 +227,6 @@ static inline struct osc_device *obd2osc_dev(const struct obd_device *d) return container_of0(d->obd_lu_dev, struct osc_device, od_cl.cd_lu_dev); } +int osc_dlm_lock_pageref(struct ldlm_lock *dlm); #endif /* OSC_INTERNAL_H */ diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index 73f6083..42de3de 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -54,6 +54,8 @@ * @{ */ +#define _PAGEREF_MAGIC (-10000000) + /***************************************************************************** * * Type conversions. @@ -223,6 +225,8 @@ static void osc_lock_fini(const struct lu_env *env, */ osc_lock_unhold(ols); LASSERT(ols->ols_lock == NULL); + LASSERT(cfs_atomic_read(&ols->ols_pageref) == 0 || + cfs_atomic_read(&ols->ols_pageref) == _PAGEREF_MAGIC); OBD_SLAB_FREE_PTR(ols, osc_lock_kmem); } @@ -1599,6 +1603,7 @@ int osc_lock_init(const struct lu_env *env, OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, CFS_ALLOC_IO); if (clk != NULL) { osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo); + cfs_atomic_set(&clk->ols_pageref, 0); clk->ols_state = OLS_NEW; cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops); result = 0; @@ -1607,5 +1612,26 @@ int osc_lock_init(const struct lu_env *env, return result; } +int osc_dlm_lock_pageref(struct ldlm_lock *dlm) +{ + struct osc_lock *olock; + int rc = 0; + + cfs_spin_lock(&osc_ast_guard); + olock = dlm->l_ast_data; + /* + * there's a very rare race with osc_page_addref_lock(), but that + * doesn't matter because in the worst case we don't cancel a lock + * which we actually can, that's no harm. + */ + if (olock != NULL && + cfs_atomic_add_return(_PAGEREF_MAGIC, + &olock->ols_pageref) != _PAGEREF_MAGIC) { + cfs_atomic_sub(_PAGEREF_MAGIC, &olock->ols_pageref); + rc = 1; + } + cfs_spin_unlock(&osc_ast_guard); + return rc; +} /** @} osc */ diff --git a/lustre/osc/osc_page.c b/lustre/osc/osc_page.c index 167e3eb..d7f7772 100644 --- a/lustre/osc/osc_page.c +++ b/lustre/osc/osc_page.c @@ -162,6 +162,7 @@ static void osc_page_fini(const struct lu_env *env, { struct osc_page *opg = cl2osc_page(slice); CDEBUG(D_TRACE, "%p\n", opg); + LASSERT(opg->ops_lock == NULL); OBD_SLAB_FREE_PTR(opg, osc_page_kmem); } @@ -247,6 +248,48 @@ void osc_index2policy(ldlm_policy_data_t *policy, const struct cl_object *obj, policy->l_extent.end = cl_offset(obj, end + 1) - 1; } +static int osc_page_addref_lock(const struct lu_env *env, + struct osc_page *opg, + struct cl_lock *lock) +{ + struct osc_lock *olock; + int rc; + + LASSERT(opg->ops_lock == NULL); + + olock = osc_lock_at(lock); + if (cfs_atomic_inc_return(&olock->ols_pageref) <= 0) { + cfs_atomic_dec(&olock->ols_pageref); + cl_lock_put(env, lock); + rc = 1; + } else { + opg->ops_lock = lock; + rc = 0; + } + return rc; +} + +static void osc_page_putref_lock(const struct lu_env *env, + struct osc_page *opg) +{ + struct cl_lock *lock = opg->ops_lock; + struct osc_lock *olock; + + LASSERT(lock != NULL); + olock = osc_lock_at(lock); + + cfs_atomic_dec(&olock->ols_pageref); + opg->ops_lock = NULL; + + /* + * Note: usually this won't be the last reference of the lock, but if + * it is, then all the lock_put do is at most just freeing some memory, + * so it would be OK that caller is holding spinlocks. + */ + LASSERT(cfs_atomic_read(&lock->cll_ref) > 1 || olock->ols_hold == 0); + cl_lock_put(env, lock); +} + static int osc_page_is_under_lock(const struct lu_env *env, const struct cl_page_slice *slice, struct cl_io *unused) @@ -257,14 +300,34 @@ static int osc_page_is_under_lock(const struct lu_env *env, ENTRY; lock = cl_lock_at_page(env, slice->cpl_obj, slice->cpl_page, NULL, 1, 0); - if (lock != NULL) { - cl_lock_put(env, lock); + if (lock != NULL && + osc_page_addref_lock(env, cl2osc_page(slice), lock) == 0) result = -EBUSY; - } else + else result = -ENODATA; RETURN(result); } +static void osc_page_disown(const struct lu_env *env, + const struct cl_page_slice *slice, + struct cl_io *io) +{ + struct osc_page *opg = cl2osc_page(slice); + + if (unlikely(opg->ops_lock)) + osc_page_putref_lock(env, opg); +} + +static void osc_page_completion_read(const struct lu_env *env, + const struct cl_page_slice *slice, + int ioret) +{ + struct osc_page *opg = cl2osc_page(slice); + + if (likely(opg->ops_lock)) + osc_page_putref_lock(env, opg); +} + static int osc_page_fail(const struct lu_env *env, const struct cl_page_slice *slice, struct cl_io *unused) @@ -410,12 +473,14 @@ static const struct cl_page_operations osc_page_ops = { .cpo_print = osc_page_print, .cpo_delete = osc_page_delete, .cpo_is_under_lock = osc_page_is_under_lock, + .cpo_disown = osc_page_disown, .io = { [CRT_READ] = { - .cpo_cache_add = osc_page_fail + .cpo_cache_add = osc_page_fail, + .cpo_completion = osc_page_completion_read }, [CRT_WRITE] = { - .cpo_cache_add = osc_page_cache_add + .cpo_cache_add = osc_page_cache_add } }, .cpo_clip = osc_page_clip, diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 404bca2..7144f0b 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -4362,6 +4362,32 @@ static int osc_import_event(struct obd_device *obd, RETURN(rc); } +/** + * Determine whether the lock can be canceled before replaying the lock + * during recovery, see bug16774 for detailed information. + * + * \retval zero the lock can't be canceled + * \retval other ok to cancel + */ +static int osc_cancel_for_recovery(struct ldlm_lock *lock) +{ + check_res_locked(lock->l_resource); + + /* + * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR. + * + * XXX as a future improvement, we can also cancel unused write lock + * if it doesn't have dirty data and active mmaps. + */ + if (lock->l_resource->lr_type == LDLM_EXTENT && + (lock->l_granted_mode == LCK_PR || + lock->l_granted_mode == LCK_CR) && + (osc_dlm_lock_pageref(lock) == 0)) + RETURN(1); + + RETURN(0); +} + int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) { int rc; @@ -4400,6 +4426,8 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list); cfs_sema_init(&cli->cl_grant_sem, 1); + + ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery); } RETURN(rc); diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh index e8c790a..a7e3512 100755 --- a/lustre/tests/replay-single.sh +++ b/lustre/tests/replay-single.sh @@ -2097,6 +2097,37 @@ test_85a() { #bug 16774 } run_test 85a "check the cancellation of unused locks during recovery(IBITS)" +test_85b() { #bug 16774 + lctl set_param -n ldlm.cancel_unused_locks_before_replay "1" + + lfs setstripe -o 0 -c 1 $DIR + + for i in `seq 100`; do + dd if=/dev/urandom of=$DIR/$tfile-$i bs=4096 count=32 >/dev/null 2>&1 + done + + cancel_lru_locks osc + + for i in `seq 100`; do + dd if=$DIR/$tfile-$i of=/dev/null bs=4096 count=32 >/dev/null 2>&1 + done + + lov_id=`lctl dl | grep "clilov"` + addr=`echo $lov_id | awk '{print $4}' | awk -F '-' '{print $3}'` + count=`lctl get_param -n ldlm.namespaces.*OST0000*$addr.lock_unused_count` + echo "before recovery: unused locks count = $count" + + fail ost1 + + count2=`lctl get_param -n ldlm.namespaces.*OST0000*$addr.lock_unused_count` + echo "after recovery: unused locks count = $count2" + + if [ $count2 -ge $count ]; then + error "unused locks are not canceled" + fi +} +run_test 85b "check the cancellation of unused locks during recovery(EXTENT)" + test_86() { local clients=${CLIENTS:-$HOSTNAME}