static void cl_lock_finish(const struct lu_env *env, struct cl_lock *lock)
{
cl_lock_mutex_get(env, lock);
+ cl_lock_cancel(env, lock);
cl_lock_delete(env, lock);
cl_lock_mutex_put(env, lock);
cl_lock_put(env, lock);
LASSERT(cl_is_lock(lock));
matched = cl_lock_ext_match(&lock->cll_descr, need) &&
- lock->cll_state < CLS_FREEING &&
- !(lock->cll_flags & CLF_CANCELLED) &&
- cl_lock_fits_into(env, lock, need, io);
+ lock->cll_state < CLS_FREEING &&
+ lock->cll_error == 0 &&
+ !(lock->cll_flags & CLF_CANCELLED) &&
+ cl_lock_fits_into(env, lock, need, io);
CDEBUG(D_DLMTRACE, "has: "DDESCR"(%i) need: "DDESCR": %d\n",
PDESCR(&lock->cll_descr), lock->cll_state, PDESCR(need),
matched);
if (matched) {
cl_lock_get_trust(lock);
- /* move the lock to the LRU head */
- list_move(&lock->cll_linkage, &head->coh_locks);
atomic_inc(&cl_object_site(obj)->cs_locks.cs_hit);
RETURN(lock);
}
spin_lock(&head->coh_lock_guard);
ghost = cl_lock_lookup(env, obj, io, need);
if (ghost == NULL) {
- list_add(&lock->cll_linkage, &head->coh_locks);
+ list_add_tail(&lock->cll_linkage, &head->coh_locks);
spin_unlock(&head->coh_lock_guard);
atomic_inc(&site->cs_locks.cs_busy);
} else {
ENTRY;
if (lock->cll_state < CLS_FREEING) {
+ LASSERT(lock->cll_state != CLS_INTRANSIT);
cl_lock_state_set(env, lock, CLS_FREEING);
head = cl_object_header(lock->cll_descr.cld_obj);
do {
result = 0;
- if (lock->cll_error != 0)
- break;
-
LINVRNT(cl_lock_is_mutexed(lock));
LINVRNT(cl_lock_invariant(env, lock));
LASSERT(lock->cll_state == CLS_INTRANSIT);
- LASSERT(lock->cll_users > 0);
- LASSERT(lock->cll_holds > 0);
result = -ENOSYS;
list_for_each_entry_reverse(slice, &lock->cll_layers,
LASSERT(result != -ENOSYS);
} while (result == CLO_REPEAT);
- return result ?: lock->cll_error;
+ return result;
}
/**
ENTRY;
cl_lock_trace(D_DLMTRACE, env, "use lock", lock);
+ LASSERT(lock->cll_state == CLS_CACHED);
+ if (lock->cll_error)
+ RETURN(lock->cll_error);
+
result = -ENOSYS;
state = cl_lock_intransit(env, lock);
list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
}
LASSERT(result != -ENOSYS);
- LASSERT(lock->cll_state == CLS_INTRANSIT);
+ LASSERTF(lock->cll_state == CLS_INTRANSIT, "Wrong state %d.\n",
+ lock->cll_state);
if (result == 0) {
state = CLS_HELD;
/* @atomic means back-off-on-failure. */
if (atomic) {
int rc;
-
- do {
- rc = cl_unuse_try_internal(env, lock);
- if (rc == 0)
- break;
- if (rc == CLO_WAIT)
- rc = cl_lock_state_wait(env, lock);
- if (rc < 0)
- break;
- } while(1);
-
+ rc = cl_unuse_try_internal(env, lock);
/* Vet the results. */
if (rc < 0 && result > 0)
result = rc;
} while (1);
if (result != 0) {
cl_lock_user_del(env, lock);
- if (result != -EINTR)
- cl_lock_error(env, lock, result);
+ cl_lock_error(env, lock, result);
}
LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
lock->cll_state == CLS_HELD));
*
* This function is called repeatedly by cl_unuse() until either lock is
* unlocked, or error occurs.
+ * cl_unuse_try is a one-shot operation, so it must NOT return CLO_WAIT.
*
- * \pre lock->cll_state <= CLS_HELD || cl_lock_is_intransit(lock)
+ * \pre lock->cll_state == CLS_HELD
*
* \post ergo(result == 0, lock->cll_state == CLS_CACHED)
*
ENTRY;
cl_lock_trace(D_DLMTRACE, env, "unuse lock", lock);
- if (lock->cll_state != CLS_INTRANSIT) {
- if (lock->cll_users > 1) {
- cl_lock_user_del(env, lock);
- RETURN(0);
- }
- /*
- * New lock users (->cll_users) are not protecting unlocking
- * from proceeding. From this point, lock eventually reaches
- * CLS_CACHED, is reinitialized to CLS_NEW or fails into
- * CLS_FREEING.
- */
- state = cl_lock_intransit(env, lock);
+ LASSERT(lock->cll_state == CLS_HELD || lock->cll_state == CLS_ENQUEUED);
+ if (lock->cll_users > 1) {
+ cl_lock_user_del(env, lock);
+ RETURN(0);
}
+ /*
+ * New lock users (->cll_users) are not protecting unlocking
+ * from proceeding. From this point, lock eventually reaches
+ * CLS_CACHED, is reinitialized to CLS_NEW or fails into
+ * CLS_FREEING.
+ */
+ state = cl_lock_intransit(env, lock);
+
result = cl_unuse_try_internal(env, lock);
LASSERT(lock->cll_state == CLS_INTRANSIT);
- if (result != CLO_WAIT)
- /*
- * Once there is no more need to iterate ->clo_unuse() calls,
- * remove lock user. This is done even if unrecoverable error
- * happened during unlocking, because nothing else can be
- * done.
- */
- cl_lock_user_del(env, lock);
+ LASSERT(result != CLO_WAIT);
+ cl_lock_user_del(env, lock);
if (result == 0 || result == -ESTALE) {
/*
* Return lock back to the cache. This is the only
* re-initialized. This happens e.g., when a sub-lock was
* canceled while unlocking was in progress.
*/
- state = result == 0 ? CLS_CACHED : CLS_NEW;
+ if (state == CLS_HELD && result == 0)
+ state = CLS_CACHED;
+ else
+ state = CLS_NEW;
cl_lock_extransit(env, lock, state);
/*
*/
result = 0;
} else {
- CWARN("result = %d, this is unlikely!\n", result);
+ CERROR("result = %d, this is unlikely!\n", result);
cl_lock_extransit(env, lock, state);
}
static void cl_unuse_locked(const struct lu_env *env, struct cl_lock *lock)
{
+ int result;
ENTRY;
- LASSERT(lock->cll_state <= CLS_HELD);
- do {
- int result;
- result = cl_unuse_try(env, lock);
- if (result == CLO_WAIT) {
- result = cl_lock_state_wait(env, lock);
- if (result == 0)
- continue;
- }
- break;
- } while (1);
+ result = cl_unuse_try(env, lock);
+ if (result)
+ CL_LOCK_DEBUG(D_ERROR, env, lock, "unuse return %d\n", result);
+
EXIT;
}
}
}
LASSERT(result != -ENOSYS);
- if (result == 0)
+ if (result == 0) {
+ LASSERT(lock->cll_state != CLS_INTRANSIT);
cl_lock_state_set(env, lock, CLS_HELD);
+ }
} while (result == CLO_REPEAT);
RETURN(result ?: lock->cll_error);
}
cl_lock_mutex_get(env, lock);
LINVRNT(cl_lock_invariant(env, lock));
- LASSERT(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD);
+ LASSERTF(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD,
+ "Wrong state %d \n", lock->cll_state);
LASSERT(lock->cll_holds > 0);
cl_lock_trace(D_DLMTRACE, env, "wait lock", lock);
} while (1);
if (result < 0) {
cl_lock_user_del(env, lock);
- if (result != -EINTR)
- cl_lock_error(env, lock, result);
+ cl_lock_error(env, lock, result);
cl_lock_lockdep_release(env, lock);
}
cl_lock_mutex_put(env, lock);
need->cld_mode = CLM_READ; /* CLM_READ matches both READ & WRITE, but
* not PHANTOM */
need->cld_start = need->cld_end = page->cp_index;
+ need->cld_enq_flags = 0;
spin_lock(&head->coh_lock_guard);
/* It is fine to match any group lock since there could be only one
if (IS_ERR(lock))
break;
cl_lock_mutex_get(env, lock);
- if (lock->cll_state < CLS_FREEING) {
+ if (lock->cll_state < CLS_FREEING &&
+ !(lock->cll_flags & CLF_CANCELLED)) {
cl_lock_hold_mod(env, lock, +1);
lu_ref_add(&lock->cll_holders, scope, source);
lu_ref_add(&lock->cll_reference, scope, source);
*/
struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
const struct cl_lock_descr *need,
- __u32 enqflags,
const char *scope, const void *source)
{
struct cl_lock *lock;
const struct lu_fid *fid;
int rc;
int iter;
- int warn;
+ __u32 enqflags = need->cld_enq_flags;
ENTRY;
fid = lu_object_fid(&io->ci_obj->co_lu);
iter = 0;
do {
- warn = iter >= 16 && IS_PO2(iter);
- CDEBUG(warn ? D_WARNING : D_DLMTRACE,
- DDESCR"@"DFID" %i %08x `%s'\n",
- PDESCR(need), PFID(fid), iter, enqflags, scope);
lock = cl_lock_hold_mutex(env, io, need, scope, source);
if (!IS_ERR(lock)) {
rc = cl_enqueue_locked(env, lock, io, enqflags);
cl_lock_lockdep_acquire(env,
lock, enqflags);
break;
- } else if (warn)
- CL_LOCK_DEBUG(D_WARNING, env, lock,
- "got (see bug 17665)\n");
+ }
cl_unuse_locked(env, lock);
}
cl_lock_trace(D_DLMTRACE, env, "enqueue failed", lock);
}
EXPORT_SYMBOL(cl_lock_user_del);
-/**
- * Check if two lock's mode are compatible.
- *
- * This returns true iff en-queuing \a lock2 won't cause cancellation of \a
- * lock1 even when these locks overlap.
- */
-int cl_lock_compatible(const struct cl_lock *lock1, const struct cl_lock *lock2)
-{
- enum cl_lock_mode mode1;
- enum cl_lock_mode mode2;
-
- ENTRY;
- mode1 = lock1->cll_descr.cld_mode;
- mode2 = lock2->cll_descr.cld_mode;
- RETURN(mode2 == CLM_PHANTOM ||
- (mode1 == CLM_READ && mode2 == CLM_READ));
-}
-EXPORT_SYMBOL(cl_lock_compatible);
-
const char *cl_lock_mode_name(const enum cl_lock_mode mode)
{
static const char *names[] = {