return &info->clt_counters[nesting];
}
+static void cl_lock_trace0(int level, const struct lu_env *env,
+ const char *prefix, const struct cl_lock *lock,
+ const char *func, const int line)
+{
+ struct cl_object_header *h = cl_object_header(lock->cll_descr.cld_obj);
+ CDEBUG(level, "%s: %p@(%i %p %i %d %d %d %d %lx)"
+ "(%p/%d/%i) at %s():%d\n",
+ prefix, lock,
+ atomic_read(&lock->cll_ref), lock->cll_guarder, lock->cll_depth,
+ lock->cll_state, lock->cll_error, lock->cll_holds,
+ lock->cll_users, lock->cll_flags,
+ env, h->coh_nesting, cl_lock_nr_mutexed(env),
+ func, line);
+}
+#define cl_lock_trace(level, env, prefix, lock) \
+ cl_lock_trace0(level, env, prefix, lock, __FUNCTION__, __LINE__)
+
#define RETIP ((unsigned long)__builtin_return_address(0))
#ifdef CONFIG_LOCKDEP
LINVRNT(!cl_lock_is_mutexed(lock));
ENTRY;
+ cl_lock_trace(D_DLMTRACE, env, "free lock", lock);
might_sleep();
while (!list_empty(&lock->cll_layers)) {
struct cl_lock_slice *slice;
}
EXPORT_SYMBOL(cl_lock_at);
-static void cl_lock_trace(struct cl_thread_counters *counters,
- const char *prefix, const struct cl_lock *lock)
-{
- CDEBUG(D_DLMTRACE|D_TRACE, "%s: %i@%p %p %i %i\n", prefix,
- atomic_read(&lock->cll_ref), lock, lock->cll_guarder,
- lock->cll_depth, counters->ctc_nr_locks_locked);
-}
-
static void cl_lock_mutex_tail(const struct lu_env *env, struct cl_lock *lock)
{
struct cl_thread_counters *counters;
lock->cll_depth++;
counters->ctc_nr_locks_locked++;
lu_ref_add(&counters->ctc_locks_locked, "cll_guard", lock);
- cl_lock_trace(counters, "got mutex", lock);
+ cl_lock_trace(D_TRACE, env, "got mutex", lock);
}
/**
counters = cl_lock_counters(env, lock);
LINVRNT(counters->ctc_nr_locks_locked > 0);
- cl_lock_trace(counters, "put mutex", lock);
+ cl_lock_trace(D_TRACE, env, "put mutex", lock);
lu_ref_del(&counters->ctc_locks_locked, "cll_guard", lock);
counters->ctc_nr_locks_locked--;
if (--lock->cll_depth == 0) {
LASSERT(lock->cll_holds > 0);
ENTRY;
+ cl_lock_trace(D_DLMTRACE, env, "hold release lock", lock);
lu_ref_del(&lock->cll_holders, scope, source);
cl_lock_hold_mod(env, lock, -1);
if (lock->cll_holds == 0) {
LASSERT(lock->cll_depth == 1);
LASSERT(lock->cll_state != CLS_FREEING); /* too late to wait */
+ cl_lock_trace(D_DLMTRACE, env, "state wait lock", lock);
result = lock->cll_error;
if (result == 0) {
cfs_waitlink_init(&waiter);
void cl_lock_signal(const struct lu_env *env, struct cl_lock *lock)
{
ENTRY;
+ cl_lock_trace(D_DLMTRACE, env, "state signal lock", lock);
cl_lock_state_signal(env, lock, lock->cll_state);
EXIT;
}
enum cl_lock_state state;
ENTRY;
- result = -ENOSYS;
+ cl_lock_trace(D_DLMTRACE, env, "use lock", lock);
+ result = -ENOSYS;
state = cl_lock_intransit(env, lock);
list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
if (slice->cls_ops->clo_use != NULL) {
int result;
ENTRY;
+ cl_lock_trace(D_DLMTRACE, env, "enqueue lock", lock);
do {
result = 0;
enum cl_lock_state state = CLS_NEW;
ENTRY;
+ cl_lock_trace(D_DLMTRACE, env, "unuse lock", lock);
+
if (lock->cll_state != CLS_INTRANSIT) {
if (lock->cll_users > 1) {
cl_lock_user_del(env, lock);
int result;
ENTRY;
+ cl_lock_trace(D_DLMTRACE, env, "wait lock try", lock);
do {
LINVRNT(cl_lock_is_mutexed(lock));
LINVRNT(cl_lock_invariant(env, lock));
LINVRNT(cl_lock_invariant(env, lock));
LASSERT(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD);
LASSERT(lock->cll_holds > 0);
+ cl_lock_trace(D_DLMTRACE, env, "wait lock", lock);
do {
result = cl_wait_try(env, lock);
int result;
ENTRY;
+ cl_lock_trace(D_DLMTRACE, env, "modify lock", lock);
/* don't allow object to change */
LASSERT(obj == desc->cld_obj);
LINVRNT(cl_lock_is_mutexed(lock));
int cl_lock_enclosure(const struct lu_env *env, struct cl_lock *lock,
struct cl_lock_closure *closure)
{
- int result;
+ int result = 0;
ENTRY;
+ cl_lock_trace(D_DLMTRACE, env, "enclosure lock", lock);
if (!cl_lock_mutex_try(env, lock)) {
/*
* If lock->cll_inclosure is not empty, lock is already in
struct cl_lock *scan;
struct cl_lock *temp;
+ cl_lock_trace(D_DLMTRACE, env, "disclosure lock", closure->clc_origin);
list_for_each_entry_safe(scan, temp, &closure->clc_list, cll_inclosure){
list_del_init(&scan->cll_inclosure);
cl_lock_mutex_put(env, scan);
cl_lock_nr_mutexed(env) == 1));
ENTRY;
+ cl_lock_trace(D_DLMTRACE, env, "delete lock", lock);
if (lock->cll_holds == 0)
cl_lock_delete0(env, lock);
else
LINVRNT(cl_lock_invariant(env, lock));
ENTRY;
+ cl_lock_trace(D_DLMTRACE, env, "set lock error", lock);
if (lock->cll_error == 0 && error != 0) {
lock->cll_error = error;
cl_lock_signal(env, lock);
LINVRNT(cl_lock_invariant(env, lock));
ENTRY;
+ cl_lock_trace(D_DLMTRACE, env, "cancel lock", lock);
if (lock->cll_holds == 0)
cl_lock_cancel0(env, lock);
else
"got (see bug 17665)\n");
cl_unuse_locked(env, lock);
}
+ cl_lock_trace(D_DLMTRACE, env, "enqueue failed", lock);
cl_lock_hold_release(env, lock, scope, source);
cl_lock_mutex_put(env, lock);
lu_ref_del(&lock->cll_reference, scope, source);
{
LINVRNT(cl_lock_invariant(env, lock));
ENTRY;
+ cl_lock_trace(D_DLMTRACE, env, "release lock", lock);
cl_lock_mutex_get(env, lock);
cl_lock_hold_release(env, lock, scope, source);
cl_lock_mutex_put(env, lock);
const char *cl_lock_mode_name(const enum cl_lock_mode mode)
{
static const char *names[] = {
- [CLM_PHANTOM] = "PHANTOM",
- [CLM_READ] = "READ",
- [CLM_WRITE] = "WRITE",
- [CLM_GROUP] = "GROUP"
+ [CLM_PHANTOM] = "P",
+ [CLM_READ] = "R",
+ [CLM_WRITE] = "W",
+ [CLM_GROUP] = "G"
};
if (0 <= mode && mode < ARRAY_SIZE(names))
return names[mode];
else
- return "UNKNW";
+ return "U";
}
EXPORT_SYMBOL(cl_lock_mode_name);
{
struct ldlm_lock *dlmlock;
+ /* reset the osc lock's state because it might be queued again. */
+ olck->ols_state = OLS_NEW;
spin_lock(&osc_ast_guard);
dlmlock = olck->ols_lock;
if (dlmlock == NULL) {
unlock_res_and_lock(dlmlock);
/* release a reference taken in osc_lock_upcall0(). */
+ LASSERT(olck->ols_has_ref);
lu_ref_del(&dlmlock->l_reference, "osc_lock", olck);
LDLM_LOCK_RELEASE(dlmlock);
+ olck->ols_has_ref = 0;
+}
+
+static int osc_lock_unhold(struct osc_lock *ols)
+{
+ int result = 0;
+
+ if (ols->ols_hold) {
+ ols->ols_hold = 0;
+ result = osc_cancel_base(&ols->ols_handle,
+ ols->ols_einfo.ei_mode);
+ }
+ return result;
}
static int osc_lock_unuse(const struct lu_env *env,
const struct cl_lock_slice *slice)
{
struct osc_lock *ols = cl2osc_lock(slice);
- int result;
LASSERT(ols->ols_state == OLS_GRANTED ||
ols->ols_state == OLS_UPCALL_RECEIVED);
* e.g., for liblustre) sees that lock is released.
*/
ols->ols_state = OLS_RELEASED;
- ols->ols_hold = 0;
- result = osc_cancel_base(&ols->ols_handle, ols->ols_einfo.ei_mode);
- ols->ols_has_ref = 0;
- return result;
+ return osc_lock_unhold(ols);
}
static void osc_lock_fini(const struct lu_env *env,
* to the lock), before reply from a server was received. In this case
* lock is destroyed immediately after upcall.
*/
- if (ols->ols_hold)
- osc_lock_unuse(env, slice);
+ osc_lock_unhold(ols);
LASSERT(ols->ols_lock == NULL);
OBD_SLAB_FREE_PTR(ols, osc_lock_kmem);
* this.
*/
ldlm_lock_addref(&olck->ols_handle, olck->ols_einfo.ei_mode);
- olck->ols_hold = olck->ols_has_ref = 1;
+ olck->ols_hold = 1;
/* lock reference taken by ldlm_handle2lock_long() is owned by
* osc_lock and released in osc_lock_detach() */
lu_ref_add(&dlmlock->l_reference, "osc_lock", olck);
+ olck->ols_has_ref = 1;
}
/**
CLASSERT(OLS_BLOCKED < OLS_CANCELLED);
LASSERT(!osc_lock_is_lockless(olck));
- if (olck->ols_hold)
- /*
- * Lock might be still addref-ed here, if e.g., blocking ast
- * is sent for a failed lock.
- */
- osc_lock_unuse(env, &olck->ols_cl);
+ /*
+ * Lock might be still addref-ed here, if e.g., blocking ast
+ * is sent for a failed lock.
+ */
+ osc_lock_unhold(olck);
if (blocking && olck->ols_state < OLS_BLOCKED)
/*
LASSERT(dlmlock->l_lvb_data != NULL);
lock_res_and_lock(dlmlock);
olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
- if (olck->ols_lock == NULL)
+ if (olck->ols_lock == NULL) {
/*
* upcall (osc_lock_upcall()) hasn't yet been
* called. Do nothing now, upcall will bind
* and ldlm_lock are always bound when
* osc_lock is in OLS_GRANTED state.
*/
- ;
- else if (dlmlock->l_granted_mode != LCK_MINMODE)
+ } else if (dlmlock->l_granted_mode ==
+ dlmlock->l_req_mode) {
osc_lock_granted(env, olck, dlmlock, dlmrc);
+ }
unlock_res_and_lock(dlmlock);
- if (dlmrc != 0)
+
+ if (dlmrc != 0) {
+ CL_LOCK_DEBUG(D_ERROR, env, lock,
+ "dlmlock returned %d\n", dlmrc);
cl_lock_error(env, lock, dlmrc);
+ }
cl_lock_mutex_put(env, lock);
osc_ast_data_put(env, olck);
result = 0;
int rc;
LASSERT(!olck->ols_hold);
+
/*
* Atomically check for LDLM_FL_CBPENDING and addref a lock if this
* flag is not set. This protects us from a concurrent blocking ast.
*/
rc = ldlm_lock_addref_try(&olck->ols_handle, olck->ols_einfo.ei_mode);
if (rc == 0) {
- olck->ols_hold = olck->ols_has_ref = 1;
+ olck->ols_hold = 1;
olck->ols_state = OLS_GRANTED;
} else {
struct cl_lock *lock;
discard = dlmlock->l_flags & LDLM_FL_DISCARD_DATA;
result = osc_lock_flush(olck, discard);
- if (olck->ols_hold)
- osc_lock_unuse(env, slice);
+ osc_lock_unhold(olck);
lock_res_and_lock(dlmlock);
/* Now that we're the only user of dlm read/write reference,
LINVRNT(osc_lock_invariant(olck));
LINVRNT(!osc_lock_has_pages(olck));
- if (olck->ols_hold)
- osc_lock_unuse(env, slice);
+ osc_lock_unhold(olck);
osc_lock_detach(env, olck);
}
/*
* XXX print ldlm lock and einfo properly.
*/
- (*p)(env, cookie, "%p %08x "LPU64" %d %p ",
+ (*p)(env, cookie, "%p %08x "LPX64" %d %p ",
lock->ols_lock, lock->ols_flags, lock->ols_handle.cookie,
lock->ols_state, lock->ols_owner);
osc_lvb_print(env, cookie, p, &lock->ols_lvb);