static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
struct cl_lock *parent);
+static int lov_lock_unuse(const struct lu_env *env,
+ const struct cl_lock_slice *slice);
/*****************************************************************************
*
* Lov lock operations.
LASSERT(link != NULL);
lov_lock_unlink(env, link, sublock);
lov_sublock_unlock(env, sublock, closure, NULL);
+ lck->lls_cancel_race = 1;
result = CLO_REPEAT;
} else if (lsep) {
struct lov_sublock_env *subenv;
/* top-lock state cannot change concurrently, because single
* thread (one that released the last hold) carries unlocking
* to the completion. */
- LASSERT(slice->cls_lock->cll_state == CLS_UNLOCKING);
+ LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
lls = &lck->lls_sub[i];
sub = lls->sub_lock;
if (sub == NULL)
sublock = sub->lss_cl.cls_lock;
rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
if (rc == 0) {
- if (lck->lls_sub[i].sub_flags & LSF_HELD) {
+ if (lls->sub_flags & LSF_HELD) {
LASSERT(sublock->cll_state == CLS_HELD);
rc = cl_unuse_try(subenv->lse_env, sublock);
if (rc != CLO_WAIT)
if (result < 0)
break;
}
- if (result == 0 && lck->lls_unuse_race) {
- lck->lls_unuse_race = 0;
+
+ if (result == 0 && lck->lls_cancel_race) {
+ lck->lls_cancel_race = 0;
result = -ESTALE;
}
cl_lock_closure_fini(closure);
int result;
int i;
- LASSERT(slice->cls_lock->cll_state == CLS_CACHED);
+ LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
ENTRY;
for (result = 0, i = 0; i < lck->lls_nr; ++i) {
struct lov_lock_sub *lls;
struct lov_sublock_env *subenv;
- if (slice->cls_lock->cll_state != CLS_CACHED) {
- /* see comment in lov_lock_enqueue(). */
- LASSERT(i > 0 && result != 0);
- break;
- }
- /*
- * if a sub-lock was destroyed while top-lock was in
- * CLS_CACHED state, top-lock would have been moved into
- * CLS_NEW state, so all sub-locks have to be in place.
- */
+ LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
+
lls = &lck->lls_sub[i];
sub = lls->sub_lock;
- LASSERT(sub != NULL);
+ if (sub == NULL) {
+ /*
+ * Sub-lock might have been canceled, while top-lock was
+ * cached.
+ */
+ result = -ESTALE;
+ break;
+ }
+
sublock = sub->lss_cl.cls_lock;
rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
if (rc == 0) {
LASSERT(sublock->cll_state != CLS_FREEING);
lov_sublock_hold(env, lck, i);
if (sublock->cll_state == CLS_CACHED) {
- rc = cl_use_try(subenv->lse_env, sublock);
+ rc = cl_use_try(subenv->lse_env, sublock, 0);
if (rc != 0)
rc = lov_sublock_release(env, lck,
i, 1, rc);
- } else
- rc = 0;
+ }
lov_sublock_unlock(env, sub, closure, subenv);
}
result = lov_subresult(result, rc);
if (result != 0)
break;
}
+
+ if (lck->lls_cancel_race) {
+ /*
+ * If there is unlocking happened at the same time, then
+ * sublock_lock state should be FREEING, and lov_sublock_lock
+ * should return CLO_REPEAT. In this case, it should return
+ * ESTALE, and up layer should reset the lock state to be NEW.
+ */
+ lck->lls_cancel_race = 0;
+ LASSERT(result != 0);
+ result = -ESTALE;
+ }
cl_lock_closure_fini(closure);
RETURN(result);
}
sublock = lsl->lss_cl.cls_lock;
rc = lov_sublock_lock(env, lck, lls, closure, NULL);
if (rc == 0) {
- if (lck->lls_sub[i].sub_flags & LSF_HELD)
+ if (lls->sub_flags & LSF_HELD)
lov_sublock_release(env, lck, i, 1, 0);
if (sublock->cll_state < CLS_FREEING) {
struct lov_lock_link *link;