X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosc%2Fosc_lock.c;h=a1dc1bfbe092e181f55bc49cdc7e0daa72143819;hb=f0d608786a27dfb8dddf06d6b086b491749557f1;hp=0b03ea7c32b33e6b4b69de37197be41d1949acae;hpb=d2dbff42e78d7ebca4db534df7e1c19f6b674a22;p=fs%2Flustre-release.git diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index 0b03ea7..a1dc1bf 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -28,6 +28,9 @@ /* * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011 Whamcloud, Inc. + * */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -193,23 +196,32 @@ static int osc_lock_unuse(const struct lu_env *env, { struct osc_lock *ols = cl2osc_lock(slice); - LASSERT(ols->ols_state == OLS_GRANTED || - ols->ols_state == OLS_UPCALL_RECEIVED); LINVRNT(osc_lock_invariant(ols)); - if (ols->ols_glimpse) { - LASSERT(ols->ols_hold == 0); + switch (ols->ols_state) { + case OLS_NEW: + LASSERT(!ols->ols_hold); + LASSERT(ols->ols_agl); + return 0; + case OLS_UPCALL_RECEIVED: + LASSERT(!ols->ols_hold); + ols->ols_state = OLS_NEW; return 0; + case OLS_GRANTED: + LASSERT(!ols->ols_glimpse); + LASSERT(ols->ols_hold); + /* + * Move lock into OLS_RELEASED state before calling + * osc_cancel_base() so that possible synchronous cancellation + * (that always happens e.g., for liblustre) sees that lock is + * released. + */ + ols->ols_state = OLS_RELEASED; + return osc_lock_unhold(ols); + default: + CERROR("Impossible state: %d\n", ols->ols_state); + LBUG(); } - LASSERT(ols->ols_hold); - - /* - * Move lock into OLS_RELEASED state before calling osc_cancel_base() - * so that possible synchronous cancellation (that always happens - * e.g., for liblustre) sees that lock is released. - */ - ols->ols_state = OLS_RELEASED; - return osc_lock_unhold(ols); } static void osc_lock_fini(const struct lu_env *env, @@ -343,10 +355,8 @@ static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck, ENTRY; - if (!(olck->ols_flags & LDLM_FL_LVB_READY)) { - EXIT; - return; - } + if (!(olck->ols_flags & LDLM_FL_LVB_READY)) + RETURN_EXIT; lvb = &olck->ols_lvb; obj = olck->ols_cl.cls_obj; @@ -525,12 +535,15 @@ static int osc_lock_upcall(void *cookie, int errcode) dlmlock->l_ast_data = NULL; olck->ols_handle.cookie = 0ULL; cfs_spin_unlock(&osc_ast_guard); + ldlm_lock_fail_match_locked(dlmlock); unlock_res_and_lock(dlmlock); LDLM_LOCK_PUT(dlmlock); } } else { - if (olck->ols_glimpse) + if (olck->ols_glimpse) { olck->ols_glimpse = 0; + olck->ols_agl = 0 ; + } osc_lock_upcall0(env, olck); } @@ -553,17 +566,22 @@ static int osc_lock_upcall(void *cookie, int errcode) rc = 0; } - if (rc == 0) - /* on error, lock was signaled by cl_lock_error() */ + if (rc == 0) { cl_lock_signal(env, lock); - else + /* del user for lock upcall cookie */ + cl_unuse_try(env, lock); + } else { + /* del user for lock upcall cookie */ + cl_lock_user_del(env, lock); cl_lock_error(env, lock, rc); + } cl_lock_mutex_put(env, lock); /* release cookie reference, acquired by osc_lock_enqueue() */ lu_ref_del(&lock->cll_reference, "upcall", lock); cl_lock_put(env, lock); + cl_env_nested_put(&nest, env); } else /* should never happen, similar to osc_ldlm_blocking_ast(). */ @@ -1049,7 +1067,6 @@ static int osc_lock_enqueue_wait(const struct lu_env *env, ENTRY; LASSERT(cl_lock_is_mutexed(lock)); - LASSERT(lock->cll_state == CLS_QUEUING); /* make it enqueue anyway for glimpse lock, because we actually * don't need to cancel any conflicting locks. */ @@ -1153,10 +1170,14 @@ static int osc_lock_enqueue(const struct lu_env *env, ENTRY; LASSERT(cl_lock_is_mutexed(lock)); - LASSERT(lock->cll_state == CLS_QUEUING); - LASSERT(ols->ols_state == OLS_NEW); + LASSERTF(ols->ols_state == OLS_NEW, + "Impossible state: %d\n", ols->ols_state); ols->ols_flags = osc_enq2ldlm_flags(enqflags); + if (enqflags & CEF_AGL) { + ols->ols_flags |= LDLM_FL_BLOCK_NOWAIT; + ols->ols_agl = 1; + } if (ols->ols_flags & LDLM_FL_HAS_INTENT) ols->ols_glimpse = 1; if (!osc_lock_is_lockless(ols) && !(enqflags & CEF_MUST)) @@ -1178,6 +1199,8 @@ static int osc_lock_enqueue(const struct lu_env *env, /* a reference for lock, passed as an upcall cookie */ cl_lock_get(lock); lu_ref_add(&lock->cll_reference, "upcall", lock); + /* a user for lock also */ + cl_lock_user_add(env, lock); ols->ols_state = OLS_ENQUEUED; /* @@ -1193,11 +1216,16 @@ static int osc_lock_enqueue(const struct lu_env *env, obj->oo_oinfo->loi_kms_valid, osc_lock_upcall, ols, einfo, &ols->ols_handle, - PTLRPCD_SET, 1); + PTLRPCD_SET, 1, ols->ols_agl); if (result != 0) { + cl_lock_user_del(env, lock); lu_ref_del(&lock->cll_reference, "upcall", lock); cl_lock_put(env, lock); + if (unlikely(result == -ECANCELED)) { + ols->ols_state = OLS_NEW; + result = 0; + } } } else { ols->ols_state = OLS_GRANTED; @@ -1215,8 +1243,34 @@ static int osc_lock_wait(const struct lu_env *env, struct cl_lock *lock = olck->ols_cl.cls_lock; LINVRNT(osc_lock_invariant(olck)); - if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED) - return 0; + + if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED) { + if (olck->ols_flags & LDLM_FL_LVB_READY) { + return 0; + } else if (olck->ols_agl) { + olck->ols_state = OLS_NEW; + } else { + LASSERT(lock->cll_error); + return lock->cll_error; + } + } + + if (olck->ols_state == OLS_NEW) { + if (lock->cll_descr.cld_enq_flags & CEF_NO_REENQUEUE) { + return -ENAVAIL; + } else { + int rc; + + LASSERT(olck->ols_agl); + + rc = osc_lock_enqueue(env, slice, NULL, CEF_ASYNC | + CEF_MUST); + if (rc != 0) + return rc; + else + return CLO_REENQUEUED; + } + } LASSERT(equi(olck->ols_state >= OLS_UPCALL_RECEIVED && lock->cll_error == 0, olck->ols_lock != NULL)); @@ -1313,7 +1367,7 @@ static void osc_lock_cancel(const struct lu_env *env, if (dlmlock != NULL) { int do_cancel; - discard = dlmlock->l_flags & LDLM_FL_DISCARD_DATA; + discard = !!(dlmlock->l_flags & LDLM_FL_DISCARD_DATA); result = osc_lock_flush(olck, discard); osc_lock_unhold(olck); @@ -1334,14 +1388,31 @@ static void osc_lock_cancel(const struct lu_env *env, lock, result); } olck->ols_state = OLS_CANCELLED; + olck->ols_flags &= ~LDLM_FL_LVB_READY; osc_lock_detach(env, olck); } -void cl_lock_page_list_fixup(const struct lu_env *env, - struct cl_io *io, struct cl_lock *lock, - struct cl_page_list *queue); - #ifdef INVARIANT_CHECK +static int check_cb(const struct lu_env *env, struct cl_io *io, + struct cl_page *page, void *cbdata) +{ + struct cl_lock *lock = cbdata; + + if (lock->cll_descr.cld_mode == CLM_READ) { + struct cl_lock *tmp; + tmp = cl_lock_at_page(env, lock->cll_descr.cld_obj, + page, lock, 1, 0); + if (tmp != NULL) { + cl_lock_put(env, tmp); + return CLP_GANG_OKAY; + } + } + + CL_LOCK_DEBUG(D_ERROR, env, lock, "still has pages\n"); + CL_PAGE_DEBUG(D_ERROR, env, page, "\n"); + return CLP_GANG_ABORT; +} + /** * Returns true iff there are pages under \a olck not protected by other * locks. @@ -1352,45 +1423,39 @@ static int osc_lock_has_pages(struct osc_lock *olck) struct cl_lock_descr *descr; struct cl_object *obj; struct osc_object *oob; - struct cl_page_list *plist; - struct cl_page *page; struct cl_env_nest nest; struct cl_io *io; struct lu_env *env; int result; env = cl_env_nested_get(&nest); - if (!IS_ERR(env)) { - obj = olck->ols_cl.cls_obj; - oob = cl2osc(obj); - io = &oob->oo_debug_io; - lock = olck->ols_cl.cls_lock; - descr = &lock->cll_descr; - plist = &osc_env_info(env)->oti_plist; - cl_page_list_init(plist); - - cfs_mutex_lock(&oob->oo_debug_mutex); - - io->ci_obj = cl_object_top(obj); - cl_io_init(env, io, CIT_MISC, io->ci_obj); - cl_page_gang_lookup(env, obj, io, - descr->cld_start, descr->cld_end, plist, 0, - NULL); - cl_lock_page_list_fixup(env, io, lock, plist); - if (plist->pl_nr > 0) { - CL_LOCK_DEBUG(D_ERROR, env, lock, "still has pages\n"); - cl_page_list_for_each(page, plist) - CL_PAGE_DEBUG(D_ERROR, env, page, "\n"); - } - result = plist->pl_nr > 0; - cl_page_list_disown(env, io, plist); - cl_page_list_fini(env, plist); - cl_io_fini(env, io); - cfs_mutex_unlock(&oob->oo_debug_mutex); - cl_env_nested_put(&nest, env); - } else - result = 0; - return result; + if (IS_ERR(env)) + return 0; + + obj = olck->ols_cl.cls_obj; + oob = cl2osc(obj); + io = &oob->oo_debug_io; + lock = olck->ols_cl.cls_lock; + descr = &lock->cll_descr; + + cfs_mutex_lock(&oob->oo_debug_mutex); + + io->ci_obj = cl_object_top(obj); + cl_io_init(env, io, CIT_MISC, io->ci_obj); + do { + result = cl_page_gang_lookup(env, obj, io, + descr->cld_start, descr->cld_end, + check_cb, (void *)lock); + if (result == CLP_GANG_ABORT) + break; + if (result == CLP_GANG_RESCHED) + cfs_cond_resched(); + } while (result != CLP_GANG_OKAY); + cl_io_fini(env, io); + cfs_mutex_unlock(&oob->oo_debug_mutex); + cl_env_nested_put(&nest, env); + + return (result == CLP_GANG_ABORT); } #else static int osc_lock_has_pages(struct osc_lock *olck) @@ -1473,6 +1538,9 @@ static int osc_lock_fits_into(const struct lu_env *env, return 0; if (need->cld_mode == CLM_PHANTOM) { + if (ols->ols_agl) + return !(ols->ols_state > OLS_RELEASED); + /* * Note: the QUEUED lock can't be matched here, otherwise * it might cause the deadlocks.