void cl_lock_put (const struct lu_env *env, struct cl_lock *lock);
void cl_lock_hold_add (const struct lu_env *env, struct cl_lock *lock,
const char *scope, const void *source);
+void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
+ const char *scope, const void *source);
void cl_lock_unhold (const struct lu_env *env, struct cl_lock *lock,
const char *scope, const void *source);
void cl_lock_release (const struct lu_env *env, struct cl_lock *lock,
ENTRY;
- LASSERT(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT);
- LASSERT(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT);
+ LASSERTF(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT,
+ "result = %d", result);
+ LASSERTF(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT,
+ "rc = %d\n", rc);
CLASSERT(CLO_WAIT < CLO_REPEAT);
/* calculate ranks in the ordering above */
/* first, try to enqueue a sub-lock ... */
result = cl_enqueue_try(env, sublock, io, enqflags);
- if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL))
- /* if it is enqueued, try to `wait' on it---maybe it's already
- * granted */
- result = cl_wait_try(env, sublock);
+ if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL)) {
+ /* if it is enqueued, try to `wait' on it---maybe it's already
+ * granted */
+ result = cl_wait_try(env, sublock);
+ if (result == CLO_REENQUEUED)
+ result = CLO_WAIT;
+ }
/*
* If CEF_ASYNC flag is set, then all sub-locks can be enqueued in
* parallel, otherwise---enqueue has to wait until sub-lock is granted
}
}
-static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
- const char *scope, const void *source)
+void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
+ const char *scope, const void *source)
{
LINVRNT(cl_lock_is_mutexed(lock));
LINVRNT(cl_lock_invariant(env, lock));
}
EXIT;
}
+EXPORT_SYMBOL(cl_lock_hold_release);
/**
* Waits until lock state is changed.
case CLS_QUEUING:
/* kick layers. */
result = cl_enqueue_kick(env, lock, io, flags);
- if (result == 0)
+ /* For AGL case, the cl_lock::cll_state may
+ * become CLS_HELD already. */
+ if (result == 0 && lock->cll_state == CLS_QUEUING)
cl_lock_state_set(env, lock, CLS_ENQUEUED);
break;
case CLS_INTRANSIT:
do {
LINVRNT(cl_lock_is_mutexed(lock));
LINVRNT(cl_lock_invariant(env, lock));
- LASSERT(lock->cll_state == CLS_ENQUEUED ||
- lock->cll_state == CLS_HELD ||
- lock->cll_state == CLS_INTRANSIT);
+ LASSERTF(lock->cll_state == CLS_QUEUING ||
+ lock->cll_state == CLS_ENQUEUED ||
+ lock->cll_state == CLS_HELD ||
+ lock->cll_state == CLS_INTRANSIT,
+ "lock state: %d\n", lock->cll_state);
LASSERT(lock->cll_users > 0);
LASSERT(lock->cll_holds > 0);
again:
cl_lock_mutex_get(env, lock);
if (lock->cll_state < CLS_FREEING) {
- LASSERT(lock->cll_holds == 0);
LASSERT(lock->cll_users <= 1);
if (unlikely(lock->cll_users == 1)) {
struct l_wait_info lwi = { 0 };
cl_lock_error(env, lock, rc);
}
- cl_lock_mutex_put(env, lock);
-
- /* release cookie reference, acquired by osc_lock_enqueue() */
- lu_ref_del(&lock->cll_reference, "upcall", lock);
- cl_lock_put(env, lock);
-
- cl_env_nested_put(&nest, env);
- } else
- /* should never happen, similar to osc_ldlm_blocking_ast(). */
- LBUG();
- RETURN(errcode);
+ /* release cookie reference, acquired by osc_lock_enqueue() */
+ cl_lock_hold_release(env, lock, "upcall", lock);
+ cl_lock_mutex_put(env, lock);
+
+ lu_ref_del(&lock->cll_reference, "upcall", lock);
+ /* This maybe the last reference, so must be called after
+ * cl_lock_mutex_put(). */
+ cl_lock_put(env, lock);
+
+ cl_env_nested_put(&nest, env);
+ } else {
+ /* should never happen, similar to osc_ldlm_blocking_ast(). */
+ LBUG();
+ }
+ RETURN(errcode);
}
/**
if (enqflags & CEF_AGL) {
ols->ols_flags |= LDLM_FL_BLOCK_NOWAIT;
ols->ols_agl = 1;
- }
+ } else {
+ ols->ols_agl = 0;
+ }
if (ols->ols_flags & LDLM_FL_HAS_INTENT)
ols->ols_glimpse = 1;
if (!osc_lock_is_lockless(ols) && !(enqflags & CEF_MUST))
if (ols->ols_locklessable)
ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
- /* a reference for lock, passed as an upcall cookie */
- cl_lock_get(lock);
- lu_ref_add(&lock->cll_reference, "upcall", lock);
+ /* lock will be passed as upcall cookie,
+ * hold ref to prevent to be released. */
+ cl_lock_hold_add(env, lock, "upcall", lock);
/* a user for lock also */
cl_lock_user_add(env, lock);
ols->ols_state = OLS_ENQUEUED;
PTLRPCD_SET, 1, ols->ols_agl);
if (result != 0) {
cl_lock_user_del(env, lock);
- lu_ref_del(&lock->cll_reference,
- "upcall", lock);
- cl_lock_put(env, lock);
+ cl_lock_unhold(env, lock, "upcall", lock);
if (unlikely(result == -ECANCELED)) {
ols->ols_state = OLS_NEW;
result = 0;
* are explained in lov_enqueue() */
}
- /* We already have a lock, and it's referenced */
+ /* We already have a lock, and it's referenced.
+ *
+ * At this point, the cl_lock::cll_state is CLS_QUEUING,
+ * AGL upcall may change it to CLS_HELD directly. */
(*upcall)(cookie, ELDLM_OK);
if (einfo->ei_mode != mode)