-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
LASSERT(!ols->ols_hold);
LASSERT(ols->ols_agl);
return 0;
- case OLS_UPCALL_RECEIVED:
- LASSERT(!ols->ols_hold);
- ols->ols_state = OLS_NEW;
- return 0;
+ case OLS_UPCALL_RECEIVED:
+ osc_lock_unhold(ols);
+ case OLS_ENQUEUED:
+ LASSERT(!ols->ols_hold);
+ osc_lock_detach(env, ols);
+ ols->ols_state = OLS_NEW;
+ return 0;
case OLS_GRANTED:
LASSERT(!ols->ols_glimpse);
LASSERT(ols->ols_hold);
cl_lock_error(env, lock, rc);
}
- cl_lock_mutex_put(env, lock);
-
- /* release cookie reference, acquired by osc_lock_enqueue() */
- lu_ref_del(&lock->cll_reference, "upcall", lock);
- cl_lock_put(env, lock);
-
- cl_env_nested_put(&nest, env);
- } else
- /* should never happen, similar to osc_ldlm_blocking_ast(). */
- LBUG();
- RETURN(errcode);
+ /* release cookie reference, acquired by osc_lock_enqueue() */
+ cl_lock_hold_release(env, lock, "upcall", lock);
+ cl_lock_mutex_put(env, lock);
+
+ lu_ref_del(&lock->cll_reference, "upcall", lock);
+ /* This maybe the last reference, so must be called after
+ * cl_lock_mutex_put(). */
+ cl_lock_put(env, lock);
+
+ cl_env_nested_put(&nest, env);
+ } else {
+ /* should never happen, similar to osc_ldlm_blocking_ast(). */
+ LBUG();
+ }
+ RETURN(errcode);
}
/**
env = cl_env_nested_get(&nest);
if (!IS_ERR(env)) {
- /*
- * osc_ast_data_get() has to go after environment is
+ /* osc_ast_data_get() has to go after environment is
* allocated, because osc_ast_data() acquires a
* reference to a lock, and it can only be released in
* environment.
olck = osc_ast_data_get(dlmlock);
if (olck != NULL) {
lock = olck->ols_cl.cls_lock;
- cl_lock_mutex_get(env, lock);
+ /* Do not grab the mutex of cl_lock for glimpse.
+ * See LU-1274 for details.
+ * BTW, it's okay for cl_lock to be cancelled during
+ * this period because server can handle this race.
+ * See ldlm_server_glimpse_ast() for details.
+ * cl_lock_mutex_get(env, lock); */
cap = &req->rq_pill;
req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK);
req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER,
obj = lock->cll_descr.cld_obj;
result = cl_object_glimpse(env, obj, lvb);
}
- cl_lock_mutex_put(env, lock);
osc_ast_data_put(env, olck);
} else {
/*
if (!lockless && osc_lock_compatible(olck, scan_ols))
continue;
- /* Now @scan is conflicting with @lock, this means current
- * thread have to sleep for @scan being destroyed. */
- if (scan_ols->ols_owner == osc_env_io(env)) {
- CERROR("DEADLOCK POSSIBLE!\n");
- CL_LOCK_DEBUG(D_ERROR, env, scan, "queued.\n");
- CL_LOCK_DEBUG(D_ERROR, env, lock, "queuing.\n");
- libcfs_debug_dumpstack(NULL);
- }
cl_lock_get_trust(scan);
conflict = scan;
break;
if (enqflags & CEF_AGL) {
ols->ols_flags |= LDLM_FL_BLOCK_NOWAIT;
ols->ols_agl = 1;
- }
+ } else {
+ ols->ols_agl = 0;
+ }
if (ols->ols_flags & LDLM_FL_HAS_INTENT)
ols->ols_glimpse = 1;
if (!osc_lock_is_lockless(ols) && !(enqflags & CEF_MUST))
if (ols->ols_locklessable)
ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
- /* a reference for lock, passed as an upcall cookie */
- cl_lock_get(lock);
- lu_ref_add(&lock->cll_reference, "upcall", lock);
+ /* lock will be passed as upcall cookie,
+ * hold ref to prevent to be released. */
+ cl_lock_hold_add(env, lock, "upcall", lock);
/* a user for lock also */
cl_lock_user_add(env, lock);
ols->ols_state = OLS_ENQUEUED;
PTLRPCD_SET, 1, ols->ols_agl);
if (result != 0) {
cl_lock_user_del(env, lock);
- lu_ref_del(&lock->cll_reference,
- "upcall", lock);
- cl_lock_put(env, lock);
+ cl_lock_unhold(env, lock, "upcall", lock);
if (unlikely(result == -ECANCELED)) {
ols->ols_state = OLS_NEW;
result = 0;
static int osc_lock_flush(struct osc_lock *ols, int discard)
{
- struct cl_lock *lock = ols->ols_cl.cls_lock;
- struct cl_env_nest nest;
- struct lu_env *env;
- int result = 0;
+ struct cl_lock *lock = ols->ols_cl.cls_lock;
+ struct cl_env_nest nest;
+ struct lu_env *env;
+ int result = 0;
+ ENTRY;
+
+ env = cl_env_nested_get(&nest);
+ if (!IS_ERR(env)) {
+ struct osc_object *obj = cl2osc(ols->ols_cl.cls_obj);
+ struct cl_lock_descr *descr = &lock->cll_descr;
+ int rc = 0;
+
+ if (descr->cld_mode >= CLM_WRITE) {
+ result = osc_cache_writeback_range(env, obj,
+ descr->cld_start, descr->cld_end,
+ 1, discard);
+ CDEBUG(D_DLMTRACE, "write out %d pages for lock %p.\n",
+ result, lock);
+ if (result > 0)
+ result = 0;
+ }
+
+ rc = cl_lock_discard_pages(env, lock);
+ if (result == 0 && rc < 0)
+ result = rc;
- env = cl_env_nested_get(&nest);
- if (!IS_ERR(env)) {
- result = cl_lock_page_out(env, lock, discard);
cl_env_nested_put(&nest, env);
} else
result = PTR_ERR(env);
ols->ols_flush = 1;
LINVRNT(!osc_lock_has_pages(ols));
}
- return result;
+ RETURN(result);
}
/**
cfs_mutex_lock(&oob->oo_debug_mutex);
io->ci_obj = cl_object_top(obj);
+ io->ci_ignore_layout = 1;
cl_io_init(env, io, CIT_MISC, io->ci_obj);
do {
result = cl_page_gang_lookup(env, obj, io,
if (need->cld_enq_flags & CEF_NEVER)
return 0;
+ if (ols->ols_state >= OLS_CANCELLED)
+ return 0;
+
if (need->cld_mode == CLM_PHANTOM) {
if (ols->ols_agl)
return !(ols->ols_state > OLS_RELEASED);