From 40ac868676c42015e2aaec49242de54c80bc5c92 Mon Sep 17 00:00:00 2001 From: jxiong Date: Tue, 20 Oct 2009 09:20:45 +0000 Subject: [PATCH] b=20824 r=wangdi,eric.mei Reimplement cl_lock_peek(). --- lustre/include/cl_object.h | 45 +++++--- lustre/lov/lov_cl_internal.h | 4 +- lustre/lov/lov_lock.c | 55 ++++++---- lustre/lov/lovsub_lock.c | 7 +- lustre/obdclass/cl_lock.c | 239 ++++++++++++++++++++++++++++++++----------- lustre/obdclass/cl_object.c | 2 +- lustre/osc/osc_lock.c | 2 +- 7 files changed, 251 insertions(+), 103 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index 9fd2d88..7536f80 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -1337,15 +1337,15 @@ const char *cl_lock_mode_name(const enum cl_lock_mode mode); * | | V * | | HELD<---------+ * | | | | - * | | | | + * | | | | cl_use_try() * | | cl_unuse_try() | | * | | | | - * | | V | cached - * | +------------>UNLOCKING (*) | lock found - * | | | - * | cl_unuse_try() | | + * | | V ---+ + * | +------------>INTRANSIT (D) <--+ * | | | + * | cl_unuse_try() | | cached lock found * | | | cl_use_try() + * | | | * | V | * +------------------CACHED---------+ * | @@ -1364,6 +1364,8 @@ const char *cl_lock_mode_name(const enum cl_lock_mode mode); * * (C) is the point where Cancellation call-back is invoked. * + * (D) is the transit state which means the lock is changing. + * * Transition to FREEING state is possible from any other state in the * diagram in case of unrecoverable error. * @@ -1382,9 +1384,6 @@ const char *cl_lock_mode_name(const enum cl_lock_mode mode); * handled, and is in ENQUEUED state after enqueue to S2 has been sent (note * that in this case, sub-locks move from state to state, and top-lock remains * in the same state). - * - * Separate UNLOCKING state is needed to maintain an invariant that in HELD - * state lock is immediately ready for use. */ enum cl_lock_state { /** @@ -1406,10 +1405,16 @@ enum cl_lock_state { */ CLS_HELD, /** - * Lock is in the transition from CLS_HELD to CLS_CACHED. Lock is in - * this state only while cl_unuse() is executing against it. + * This state is used to mark the lock is being used, or unused. + * We need this state because the lock may have several sublocks, + * so it's impossible to have an atomic way to bring all sublocks + * into CLS_HELD state at use case, or all sublocks to CLS_CACHED + * at unuse case. + * If a thread is referring to a lock, and it sees the lock is in this + * state, it must wait for the lock. + * See state diagram for details. */ - CLS_UNLOCKING, + CLS_INTRANSIT, /** * Lock granted, not used. */ @@ -1430,9 +1435,7 @@ enum cl_lock_flags { /** cancellation is pending for this lock. */ CLF_CANCELPEND = 1 << 1, /** destruction is pending for this lock. */ - CLF_DOOMED = 1 << 2, - /** State update is pending. */ - CLF_STATE = 1 << 3 + CLF_DOOMED = 1 << 2 }; /** @@ -1530,6 +1533,10 @@ struct cl_lock { cfs_task_t *cll_guarder; int cll_depth; + /** + * the owner for INTRANSIT state + */ + cfs_task_t *cll_intransit_owner; int cll_error; /** * Number of holds on a lock. A hold prevents a lock from being @@ -2779,6 +2786,14 @@ int cl_lock_user_del (const struct lu_env *env, struct cl_lock *lock); int cl_lock_compatible(const struct cl_lock *lock1, const struct cl_lock *lock2); +enum cl_lock_state cl_lock_intransit(const struct lu_env *env, + struct cl_lock *lock); + +void cl_lock_extransit(const struct lu_env *env, struct cl_lock *lock, + enum cl_lock_state state); + +int cl_lock_is_intransit(struct cl_lock *lock); + /** \name statemachine statemachine * Interface to lock state machine consists of 3 parts: * @@ -2819,7 +2834,7 @@ int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock, struct cl_io *io, __u32 flags); int cl_unuse_try (const struct lu_env *env, struct cl_lock *lock); int cl_wait_try (const struct lu_env *env, struct cl_lock *lock); -int cl_use_try (const struct lu_env *env, struct cl_lock *lock); +int cl_use_try (const struct lu_env *env, struct cl_lock *lock, int atomic); /** @} statemachine */ void cl_lock_signal (const struct lu_env *env, struct cl_lock *lock); diff --git a/lustre/lov/lov_cl_internal.h b/lustre/lov/lov_cl_internal.h index 5401f6b..dbc63dc 100644 --- a/lustre/lov/lov_cl_internal.h +++ b/lustre/lov/lov_cl_internal.h @@ -283,9 +283,9 @@ struct lov_lock { unsigned lls_nr_filled; /** * Set when sub-lock was canceled, while top-lock was being - * unlocked. + * used, or unused. */ - int lls_unuse_race; + int lls_cancel_race:1; /** * An array of sub-locks * diff --git a/lustre/lov/lov_lock.c b/lustre/lov/lov_lock.c index e2b1520..bc6ab44 100644 --- a/lustre/lov/lov_lock.c +++ b/lustre/lov/lov_lock.c @@ -49,6 +49,8 @@ static struct cl_lock_closure *lov_closure_get(const struct lu_env *env, struct cl_lock *parent); +static int lov_lock_unuse(const struct lu_env *env, + const struct cl_lock_slice *slice); /***************************************************************************** * * Lov lock operations. @@ -226,6 +228,7 @@ static int lov_sublock_lock(const struct lu_env *env, LASSERT(link != NULL); lov_lock_unlink(env, link, sublock); lov_sublock_unlock(env, sublock, closure, NULL); + lck->lls_cancel_race = 1; result = CLO_REPEAT; } else if (lsep) { struct lov_sublock_env *subenv; @@ -644,7 +647,7 @@ static int lov_lock_unuse(const struct lu_env *env, /* top-lock state cannot change concurrently, because single * thread (one that released the last hold) carries unlocking * to the completion. */ - LASSERT(slice->cls_lock->cll_state == CLS_UNLOCKING); + LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT); lls = &lck->lls_sub[i]; sub = lls->sub_lock; if (sub == NULL) @@ -653,7 +656,7 @@ static int lov_lock_unuse(const struct lu_env *env, sublock = sub->lss_cl.cls_lock; rc = lov_sublock_lock(env, lck, lls, closure, &subenv); if (rc == 0) { - if (lck->lls_sub[i].sub_flags & LSF_HELD) { + if (lls->sub_flags & LSF_HELD) { LASSERT(sublock->cll_state == CLS_HELD); rc = cl_unuse_try(subenv->lse_env, sublock); if (rc != CLO_WAIT) @@ -666,8 +669,9 @@ static int lov_lock_unuse(const struct lu_env *env, if (result < 0) break; } - if (result == 0 && lck->lls_unuse_race) { - lck->lls_unuse_race = 0; + + if (result == 0 && lck->lls_cancel_race) { + lck->lls_cancel_race = 0; result = -ESTALE; } cl_lock_closure_fini(closure); @@ -721,7 +725,7 @@ static int lov_lock_use(const struct lu_env *env, int result; int i; - LASSERT(slice->cls_lock->cll_state == CLS_CACHED); + LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT); ENTRY; for (result = 0, i = 0; i < lck->lls_nr; ++i) { @@ -731,37 +735,48 @@ static int lov_lock_use(const struct lu_env *env, struct lov_lock_sub *lls; struct lov_sublock_env *subenv; - if (slice->cls_lock->cll_state != CLS_CACHED) { - /* see comment in lov_lock_enqueue(). */ - LASSERT(i > 0 && result != 0); - break; - } - /* - * if a sub-lock was destroyed while top-lock was in - * CLS_CACHED state, top-lock would have been moved into - * CLS_NEW state, so all sub-locks have to be in place. - */ + LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT); + lls = &lck->lls_sub[i]; sub = lls->sub_lock; - LASSERT(sub != NULL); + if (sub == NULL) { + /* + * Sub-lock might have been canceled, while top-lock was + * cached. + */ + result = -ESTALE; + break; + } + sublock = sub->lss_cl.cls_lock; rc = lov_sublock_lock(env, lck, lls, closure, &subenv); if (rc == 0) { LASSERT(sublock->cll_state != CLS_FREEING); lov_sublock_hold(env, lck, i); if (sublock->cll_state == CLS_CACHED) { - rc = cl_use_try(subenv->lse_env, sublock); + rc = cl_use_try(subenv->lse_env, sublock, 0); if (rc != 0) rc = lov_sublock_release(env, lck, i, 1, rc); - } else - rc = 0; + } lov_sublock_unlock(env, sub, closure, subenv); } result = lov_subresult(result, rc); if (result != 0) break; } + + if (lck->lls_cancel_race) { + /* + * If there is unlocking happened at the same time, then + * sublock_lock state should be FREEING, and lov_sublock_lock + * should return CLO_REPEAT. In this case, it should return + * ESTALE, and up layer should reset the lock state to be NEW. + */ + lck->lls_cancel_race = 0; + LASSERT(result != 0); + result = -ESTALE; + } cl_lock_closure_fini(closure); RETURN(result); } @@ -984,7 +999,7 @@ static void lov_lock_delete(const struct lu_env *env, sublock = lsl->lss_cl.cls_lock; rc = lov_sublock_lock(env, lck, lls, closure, NULL); if (rc == 0) { - if (lck->lls_sub[i].sub_flags & LSF_HELD) + if (lls->sub_flags & LSF_HELD) lov_sublock_release(env, lck, i, 1, 0); if (sublock->cll_state < CLS_FREEING) { struct lov_lock_link *link; diff --git a/lustre/lov/lovsub_lock.c b/lustre/lov/lovsub_lock.c index c97cb35..e4ff065 100644 --- a/lustre/lov/lovsub_lock.c +++ b/lustre/lov/lovsub_lock.c @@ -344,7 +344,7 @@ static int lovsub_lock_delete_one(const struct lu_env *env, case CLS_FREEING: cl_lock_signal(env, parent); break; - case CLS_UNLOCKING: + case CLS_INTRANSIT: /* * Here lies a problem: a sub-lock is canceled while top-lock * is being unlocked. Top-lock cannot be moved into CLS_NEW @@ -356,13 +356,14 @@ static int lovsub_lock_delete_one(const struct lu_env *env, * to be reused immediately). Nor can we wait for top-lock * state to change, because this can be synchronous to the * current thread. - * + * * We know for sure that lov_lock_unuse() will be called at * least one more time to finish un-using, so leave a mark on * the top-lock, that will be seen by the next call to * lov_lock_unuse(). */ - lov->lls_unuse_race = 1; + if (cl_lock_is_intransit(parent)) + lov->lls_cancel_race = 1; break; case CLS_CACHED: /* diff --git a/lustre/obdclass/cl_lock.c b/lustre/obdclass/cl_lock.c index 5316965..5b9ca93 100644 --- a/lustre/obdclass/cl_lock.c +++ b/lustre/obdclass/cl_lock.c @@ -399,6 +399,57 @@ static struct cl_lock *cl_lock_alloc(const struct lu_env *env, } /** + * Transfer the lock into INTRANSIT state and return the original state. + * + * \pre state: CLS_CACHED, CLS_HELD or CLS_ENQUEUED + * \post state: CLS_INTRANSIT + * \see CLS_INTRANSIT + */ +enum cl_lock_state cl_lock_intransit(const struct lu_env *env, + struct cl_lock *lock) +{ + enum cl_lock_state state = lock->cll_state; + + LASSERT(cl_lock_is_mutexed(lock)); + LASSERT(state != CLS_INTRANSIT); + LASSERTF(state >= CLS_ENQUEUED && state <= CLS_CACHED, + "Malformed lock state %d.\n", state); + + cl_lock_state_set(env, lock, CLS_INTRANSIT); + lock->cll_intransit_owner = cfs_current(); + cl_lock_hold_add(env, lock, "intransit", cfs_current()); + return state; +} +EXPORT_SYMBOL(cl_lock_intransit); + +/** + * Exit the intransit state and restore the lock state to the original state + */ +void cl_lock_extransit(const struct lu_env *env, struct cl_lock *lock, + enum cl_lock_state state) +{ + LASSERT(cl_lock_is_mutexed(lock)); + LASSERT(lock->cll_state == CLS_INTRANSIT); + LASSERT(state != CLS_INTRANSIT); + LASSERT(lock->cll_intransit_owner == cfs_current()); + + lock->cll_intransit_owner = NULL; + cl_lock_state_set(env, lock, state); + cl_lock_unhold(env, lock, "intransit", cfs_current()); +} +EXPORT_SYMBOL(cl_lock_extransit); + +/** + * Checking whether the lock is intransit state + */ +int cl_lock_is_intransit(struct cl_lock *lock) +{ + LASSERT(cl_lock_is_mutexed(lock)); + return lock->cll_state == CLS_INTRANSIT && + lock->cll_intransit_owner != cfs_current(); +} +EXPORT_SYMBOL(cl_lock_is_intransit); +/** * Returns true iff lock is "suitable" for given io. E.g., locks acquired by * truncate and O_APPEND cannot be reused for read/non-append-write, as they * cover multiple stripes and can trigger cascading timeouts. @@ -524,6 +575,7 @@ struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io, struct cl_object_header *head; struct cl_object *obj; struct cl_lock *lock; + int ok; obj = need->cld_obj; head = cl_object_header(obj); @@ -532,24 +584,30 @@ struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io, lock = cl_lock_lookup(env, obj, io, need); spin_unlock(&head->coh_lock_guard); - if (lock != NULL) { - int ok; + if (lock == NULL) + return NULL; - cl_lock_mutex_get(env, lock); - if (lock->cll_state == CLS_CACHED) - cl_use_try(env, lock); - ok = lock->cll_state == CLS_HELD; - if (ok) { - cl_lock_hold_add(env, lock, scope, source); - cl_lock_user_add(env, lock); - cl_lock_put(env, lock); - } - cl_lock_mutex_put(env, lock); - if (!ok) { - cl_lock_put(env, lock); - lock = NULL; - } + cl_lock_mutex_get(env, lock); + if (lock->cll_state == CLS_INTRANSIT) + cl_lock_state_wait(env, lock); /* Don't care return value. */ + if (lock->cll_state == CLS_CACHED) { + int result; + result = cl_use_try(env, lock, 1); + if (result < 0) + cl_lock_error(env, lock, result); } + ok = lock->cll_state == CLS_HELD; + if (ok) { + cl_lock_hold_add(env, lock, scope, source); + cl_lock_user_add(env, lock); + cl_lock_put(env, lock); + } + cl_lock_mutex_put(env, lock); + if (!ok) { + cl_lock_put(env, lock); + lock = NULL; + } + return lock; } EXPORT_SYMBOL(cl_lock_peek); @@ -666,7 +724,7 @@ int cl_lock_mutex_try(const struct lu_env *env, struct cl_lock *lock) EXPORT_SYMBOL(cl_lock_mutex_try); /** - * Unlocks cl_lock object. + {* Unlocks cl_lock object. * * \pre cl_lock_is_mutexed(lock) * @@ -885,7 +943,7 @@ int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock) LASSERT(lock->cll_state != CLS_FREEING); /* too late to wait */ result = lock->cll_error; - if (result == 0 && !(lock->cll_flags & CLF_STATE)) { + if (result == 0) { cfs_waitlink_init(&waiter); cfs_waitq_add(&lock->cll_wq, &waiter); set_current_state(CFS_TASK_INTERRUPTIBLE); @@ -899,7 +957,6 @@ int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock) cfs_waitq_del(&lock->cll_wq, &waiter); result = cfs_signal_pending() ? -EINTR : 0; } - lock->cll_flags &= ~CLF_STATE; RETURN(result); } EXPORT_SYMBOL(cl_lock_state_wait); @@ -916,7 +973,6 @@ static void cl_lock_state_signal(const struct lu_env *env, struct cl_lock *lock, list_for_each_entry(slice, &lock->cll_layers, cls_linkage) if (slice->cls_ops->clo_state != NULL) slice->cls_ops->clo_state(env, slice, state); - lock->cll_flags |= CLF_STATE; cfs_waitq_broadcast(&lock->cll_wq); EXIT; } @@ -955,9 +1011,10 @@ void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock, LASSERT(lock->cll_state <= state || (lock->cll_state == CLS_CACHED && (state == CLS_HELD || /* lock found in cache */ - state == CLS_NEW /* sub-lock canceled */)) || - /* sub-lock canceled during unlocking */ - (lock->cll_state == CLS_UNLOCKING && state == CLS_NEW)); + state == CLS_NEW || /* sub-lock canceled */ + state == CLS_INTRANSIT)) || + /* lock is in transit state */ + lock->cll_state == CLS_INTRANSIT); if (lock->cll_state != state) { atomic_dec(&site->cs_locks_state[lock->cll_state]); @@ -970,17 +1027,54 @@ void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock, } EXPORT_SYMBOL(cl_lock_state_set); +static int cl_unuse_try_internal(const struct lu_env *env, struct cl_lock *lock) +{ + const struct cl_lock_slice *slice; + int result; + + do { + result = 0; + + if (lock->cll_error != 0) + break; + + LINVRNT(cl_lock_is_mutexed(lock)); + LINVRNT(cl_lock_invariant(env, lock)); + LASSERT(lock->cll_state == CLS_INTRANSIT); + LASSERT(lock->cll_users > 0); + LASSERT(lock->cll_holds > 0); + + result = -ENOSYS; + list_for_each_entry_reverse(slice, &lock->cll_layers, + cls_linkage) { + if (slice->cls_ops->clo_unuse != NULL) { + result = slice->cls_ops->clo_unuse(env, slice); + if (result != 0) + break; + } + } + LASSERT(result != -ENOSYS); + } while (result == CLO_REPEAT); + + return result ?: lock->cll_error; +} + /** * Yanks lock from the cache (cl_lock_state::CLS_CACHED state) by calling * cl_lock_operations::clo_use() top-to-bottom to notify layers. + * @atomic = 1, it must unuse the lock to recovery the lock to keep the + * use process atomic */ -int cl_use_try(const struct lu_env *env, struct cl_lock *lock) +int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic) { - int result; const struct cl_lock_slice *slice; + int result; + enum cl_lock_state state; ENTRY; result = -ENOSYS; + + state = cl_lock_intransit(env, lock); list_for_each_entry(slice, &lock->cll_layers, cls_linkage) { if (slice->cls_ops->clo_use != NULL) { result = slice->cls_ops->clo_use(env, slice); @@ -989,8 +1083,43 @@ int cl_use_try(const struct lu_env *env, struct cl_lock *lock) } } LASSERT(result != -ENOSYS); - if (result == 0) - cl_lock_state_set(env, lock, CLS_HELD); + + LASSERT(lock->cll_state == CLS_INTRANSIT); + + if (result == 0) { + state = CLS_HELD; + } else { + if (result == -ESTALE) { + /* + * ESTALE means sublock being cancelled + * at this time, and set lock state to + * be NEW here and ask the caller to repeat. + */ + state = CLS_NEW; + result = CLO_REPEAT; + } + + /* @atomic means back-off-on-failure. */ + if (atomic) { + int rc; + + do { + rc = cl_unuse_try_internal(env, lock); + if (rc == 0) + break; + if (rc == CLO_WAIT) + rc = cl_lock_state_wait(env, lock); + if (rc < 0) + break; + } while(1); + + /* Vet the results. */ + if (rc < 0 && result > 0) + result = rc; + } + + } + cl_lock_extransit(env, lock, state); RETURN(result); } EXPORT_SYMBOL(cl_use_try); @@ -1056,14 +1185,13 @@ int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock, if (result == 0) cl_lock_state_set(env, lock, CLS_ENQUEUED); break; - case CLS_UNLOCKING: - /* wait until unlocking finishes, and enqueue lock - * afresh. */ + case CLS_INTRANSIT: + LASSERT(cl_lock_is_intransit(lock)); result = CLO_WAIT; break; case CLS_CACHED: /* yank lock from the cache. */ - result = cl_use_try(env, lock); + result = cl_use_try(env, lock, 0); break; case CLS_ENQUEUED: case CLS_HELD: @@ -1150,7 +1278,7 @@ EXPORT_SYMBOL(cl_enqueue); * This function is called repeatedly by cl_unuse() until either lock is * unlocked, or error occurs. * - * \pre lock->cll_state <= CLS_HELD || lock->cll_state == CLS_UNLOCKING + * \pre lock->cll_state <= CLS_HELD || cl_lock_is_intransit(lock) * * \post ergo(result == 0, lock->cll_state == CLS_CACHED) * @@ -1159,11 +1287,11 @@ EXPORT_SYMBOL(cl_enqueue); */ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock) { - const struct cl_lock_slice *slice; int result; + enum cl_lock_state state = CLS_NEW; ENTRY; - if (lock->cll_state != CLS_UNLOCKING) { + if (lock->cll_state != CLS_INTRANSIT) { if (lock->cll_users > 1) { cl_lock_user_del(env, lock); RETURN(0); @@ -1174,31 +1302,11 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock) * CLS_CACHED, is reinitialized to CLS_NEW or fails into * CLS_FREEING. */ - cl_lock_state_set(env, lock, CLS_UNLOCKING); + state = cl_lock_intransit(env, lock); } - do { - result = 0; - - if (lock->cll_error != 0) - break; - - LINVRNT(cl_lock_is_mutexed(lock)); - LINVRNT(cl_lock_invariant(env, lock)); - LASSERT(lock->cll_state == CLS_UNLOCKING); - LASSERT(lock->cll_users > 0); - LASSERT(lock->cll_holds > 0); - result = -ENOSYS; - list_for_each_entry_reverse(slice, &lock->cll_layers, - cls_linkage) { - if (slice->cls_ops->clo_unuse != NULL) { - result = slice->cls_ops->clo_unuse(env, slice); - if (result != 0) - break; - } - } - LASSERT(result != -ENOSYS); - } while (result == CLO_REPEAT); + result = cl_unuse_try_internal(env, lock); + LASSERT(lock->cll_state == CLS_INTRANSIT); if (result != CLO_WAIT) /* * Once there is no more need to iterate ->clo_unuse() calls, @@ -1208,8 +1316,6 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock) */ cl_lock_user_del(env, lock); if (result == 0 || result == -ESTALE) { - enum cl_lock_state state; - /* * Return lock back to the cache. This is the only * place where lock is moved into CLS_CACHED state. @@ -1220,7 +1326,7 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock) * canceled while unlocking was in progress. */ state = result == 0 ? CLS_CACHED : CLS_NEW; - cl_lock_state_set(env, lock, state); + cl_lock_extransit(env, lock, state); /* * Hide -ESTALE error. @@ -1232,7 +1338,11 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock) * pages won't be written to OSTs. -jay */ result = 0; + } else { + CWARN("result = %d, this is unlikely!\n", result); + cl_lock_extransit(env, lock, state); } + result = result ?: lock->cll_error; if (result < 0) cl_lock_error(env, lock, result); @@ -1292,13 +1402,20 @@ int cl_wait_try(const struct lu_env *env, struct cl_lock *lock) LINVRNT(cl_lock_is_mutexed(lock)); LINVRNT(cl_lock_invariant(env, lock)); LASSERT(lock->cll_state == CLS_ENQUEUED || - lock->cll_state == CLS_HELD); + lock->cll_state == CLS_HELD || + lock->cll_state == CLS_INTRANSIT); LASSERT(lock->cll_users > 0); LASSERT(lock->cll_holds > 0); result = 0; if (lock->cll_error != 0) break; + + if (cl_lock_is_intransit(lock)) { + result = CLO_WAIT; + break; + } + if (lock->cll_state == CLS_HELD) /* nothing to do */ break; diff --git a/lustre/obdclass/cl_object.c b/lustre/obdclass/cl_object.c index 7ca27c6..4838a0a 100644 --- a/lustre/obdclass/cl_object.c +++ b/lustre/obdclass/cl_object.c @@ -480,7 +480,7 @@ int cl_site_stats_print(const struct cl_site *site, char *page, int count) [CLS_QUEUING] = "q", [CLS_ENQUEUED] = "e", [CLS_HELD] = "h", - [CLS_UNLOCKING] = "u", + [CLS_INTRANSIT] = "t", [CLS_CACHED] = "c", [CLS_FREEING] = "f" }; diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index 2cc82bc..6ca2014 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -1361,7 +1361,7 @@ static int osc_lock_use(const struct lu_env *env, * cl_lock mutex. */ lock = slice->cls_lock; - LASSERT(lock->cll_state == CLS_CACHED); + LASSERT(lock->cll_state == CLS_INTRANSIT); LASSERT(lock->cll_users > 0); /* set a flag for osc_dlm_blocking_ast0() to signal the * lock.*/ -- 1.8.3.1