__u64 cld_gid;
/** Lock mode. */
enum cl_lock_mode cld_mode;
+ /**
+ * flags to enqueue lock. A combination of bit-flags from
+ * enum cl_enq_flags.
+ */
+ __u32 cld_enq_flags;
};
#define DDESCR "%s(%d):[%lu, %lu]"
* usual return values of lock state-machine methods, this can return
* -ESTALE to indicate that lock cannot be returned to the cache, and
* has to be re-initialized.
+ * unuse is a one-shot operation, so it must NOT return CLO_WAIT.
*
* \see ccc_lock_unuse(), lov_lock_unuse(), osc_lock_unuse()
*/
struct list_head cill_linkage;
struct cl_lock_descr cill_descr;
struct cl_lock *cill_lock;
- /**
- * flags to enqueue lock for this IO. A combination of bit-flags from
- * enum cl_enq_flags.
- */
- __u32 cill_enq_flags;
/** optional destructor */
void (*cill_fini)(const struct lu_env *env,
struct cl_io_lock_link *link);
const char *scope, const void *source);
struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
const struct cl_lock_descr *need,
- __u32 enqflags,
const char *scope, const void *source);
struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj,
struct cl_page *page, struct cl_lock *except,
int cl_io_lock_add (const struct lu_env *env, struct cl_io *io,
struct cl_io_lock_link *link);
int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
- struct cl_lock_descr *descr, int enqflags);
+ struct cl_lock_descr *descr);
int cl_io_read_page (const struct lu_env *env, struct cl_io *io,
struct cl_page *page);
int cl_io_prepare_write(const struct lu_env *env, struct cl_io *io,
*descr = whole_file;
descr->cld_obj = clob;
descr->cld_mode = CLM_PHANTOM;
+ descr->cld_enq_flags = CEF_ASYNC | CEF_MUST;
cio->cui_glimpse = 1;
/*
* CEF_ASYNC is used because glimpse sub-locks cannot
* CEF_MUST protects glimpse lock from conversion into
* a lockless mode.
*/
- lock = cl_lock_request(env, io, descr,
- CEF_ASYNC|CEF_MUST,
- "glimpse", cfs_current());
+ lock = cl_lock_request(env, io, descr, "glimpse",
+ cfs_current());
cio->cui_glimpse = 0;
if (!IS_ERR(lock)) {
result = cl_wait(env, lock);
descr->cld_obj = obj;
descr->cld_start = start;
descr->cld_end = end;
+ descr->cld_enq_flags = enqflags;
- cio->cui_link.cill_enq_flags = enqflags;
cl_io_lock_add(env, io, &cio->cui_link);
RETURN(0);
}
descr->cld_mode = CLM_GROUP;
enqflags = CEF_MUST | (nonblock ? CEF_NONBLOCK : 0);
- lock = cl_lock_request(env, io, descr, enqflags,
- GROUPLOCK_SCOPE, cfs_current());
+ descr->cld_enq_flags = enqflags;
+
+ lock = cl_lock_request(env, io, descr, GROUPLOCK_SCOPE, cfs_current());
if (IS_ERR(lock)) {
cl_io_fini(env, io);
cl_env_put(env, &refcheck);
policy.l_extent.start);
descr->cld_end = cl_index(descr->cld_obj,
policy.l_extent.end);
- result = cl_io_lock_alloc_add(env, io, descr, flags);
+ descr->cld_enq_flags = flags;
+ result = cl_io_lock_alloc_add(env, io, descr);
if (result < 0)
RETURN(result);
LASSERT(cl_lock_is_mutexed(child));
sublock->lss_active = parent;
- if (unlikely(child->cll_state == CLS_FREEING)) {
+ if (unlikely((child->cll_state == CLS_FREEING) ||
+ (child->cll_flags & CLF_CANCELLED))) {
struct lov_lock_link *link;
/*
* we could race with lock deletion which temporarily
descr->cld_end = cl_index(descr->cld_obj, end);
descr->cld_mode = parent->cll_descr.cld_mode;
descr->cld_gid = parent->cll_descr.cld_gid;
+ descr->cld_enq_flags = parent->cll_descr.cld_enq_flags;
/* XXX has no effect */
lck->lls_sub[nr].sub_got = *descr;
lck->lls_sub[nr].sub_stripe = i;
result = PTR_ERR(sublock);
break;
}
+ cl_lock_get_trust(sublock);
cl_lock_mutex_get(env, sublock);
cl_lock_mutex_get(env, parent);
/*
"lov-parent", parent);
}
cl_lock_mutex_put(env, sublock);
+ cl_lock_put(env, sublock);
}
}
/*
cl_lock_mutex_get(env, parent);
if (!IS_ERR(sublock)) {
+ cl_lock_get_trust(sublock);
if (parent->cll_state == CLS_QUEUING &&
- lck->lls_sub[idx].sub_lock == NULL)
+ lck->lls_sub[idx].sub_lock == NULL) {
lov_sublock_adopt(env, lck, sublock, idx, link);
- else {
+ } else {
OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
/* other thread allocated sub-lock, or enqueue is no
* longer going on */
cl_lock_mutex_get(env, parent);
}
cl_lock_mutex_put(env, sublock);
+ cl_lock_put(env, sublock);
result = CLO_REPEAT;
} else
result = PTR_ERR(sublock);
if (lls->sub_flags & LSF_HELD) {
LASSERT(sublock->cll_state == CLS_HELD);
rc = cl_unuse_try(subenv->lse_env, sublock);
- if (rc != CLO_WAIT)
- rc = lov_sublock_release(env, lck,
- i, 0, rc);
+ rc = lov_sublock_release(env, lck, i, 0, rc);
}
lov_sublock_unlock(env, sub, closure, subenv);
}
result = lov_subresult(result, rc);
- if (result < 0)
- break;
}
if (result == 0 && lck->lls_cancel_race) {
RETURN(result);
}
+
+static void lov_lock_cancel(const struct lu_env *env,
+ const struct cl_lock_slice *slice)
+{
+ struct lov_lock *lck = cl2lov_lock(slice);
+ struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
+ int i;
+ int result;
+
+ ENTRY;
+
+ for (result = 0, i = 0; i < lck->lls_nr; ++i) {
+ int rc;
+ struct lovsub_lock *sub;
+ struct cl_lock *sublock;
+ struct lov_lock_sub *lls;
+ struct lov_sublock_env *subenv;
+
+ /* top-lock state cannot change concurrently, because single
+ * thread (one that released the last hold) carries unlocking
+ * to the completion. */
+ lls = &lck->lls_sub[i];
+ sub = lls->sub_lock;
+ if (sub == NULL)
+ continue;
+
+ sublock = sub->lss_cl.cls_lock;
+ rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
+ if (rc == 0) {
+ if (!(lls->sub_flags & LSF_HELD)) {
+ lov_sublock_unlock(env, sub, closure, subenv);
+ continue;
+ }
+
+ switch(sublock->cll_state) {
+ case CLS_HELD:
+ rc = cl_unuse_try(subenv->lse_env,
+ sublock);
+ lov_sublock_release(env, lck, i, 0, 0);
+ break;
+ case CLS_ENQUEUED:
+ /* TODO: it's not a good idea to cancel this
+ * lock because it's innocent. But it's
+ * acceptable. The better way would be to
+ * define a new lock method to unhold the
+ * dlm lock. */
+ cl_lock_cancel(env, sublock);
+ default:
+ lov_sublock_release(env, lck, i, 1, 0);
+ break;
+ }
+ lov_sublock_unlock(env, sub, closure, subenv);
+ }
+
+ if (rc == CLO_REPEAT) {
+ --i;
+ continue;
+ }
+
+ result = lov_subresult(result, rc);
+ }
+
+ if (result)
+ CL_LOCK_DEBUG(D_ERROR, env, slice->cls_lock,
+ "lov_lock_cancel fails with %d.\n", result);
+
+ cl_lock_closure_fini(closure);
+}
+
static int lov_lock_wait(const struct lu_env *env,
const struct cl_lock_slice *slice)
{
.clo_wait = lov_lock_wait,
.clo_use = lov_lock_use,
.clo_unuse = lov_lock_unuse,
+ .clo_cancel = lov_lock_cancel,
.clo_fits_into = lov_lock_fits_into,
.clo_delete = lov_lock_delete,
.clo_print = lov_lock_print
int result;
ENTRY;
- parent = lov->lls_cl.cls_lock;
- result = 0;
+ parent = lov->lls_cl.cls_lock;
+ if (parent->cll_error)
+ RETURN(0);
+ result = 0;
switch (parent->cll_state) {
case CLS_NEW:
case CLS_QUEUING:
ENTRY;
- lock = cl_lock_request(env, io, &link->cill_descr, link->cill_enq_flags,
- "io", io);
+ lock = cl_lock_request(env, io, &link->cill_descr, "io", io);
if (!IS_ERR(lock)) {
link->cill_lock = lock;
list_move(&link->cill_linkage, &set->cls_curr);
- if (!(link->cill_enq_flags & CEF_ASYNC)) {
+ if (!(link->cill_descr.cld_enq_flags & CEF_ASYNC)) {
result = cl_wait(env, lock);
if (result == 0)
list_move(&link->cill_linkage, &set->cls_done);
* Allocates new lock link, and uses it to add a lock to a lockset.
*/
int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
- struct cl_lock_descr *descr, int enqflags)
+ struct cl_lock_descr *descr)
{
struct cl_io_lock_link *link;
int result;
OBD_ALLOC_PTR(link);
if (link != NULL) {
link->cill_descr = *descr;
- link->cill_enq_flags = enqflags;
link->cill_fini = cl_free_io_lock_link;
result = cl_io_lock_add(env, io, link);
if (result) /* lock match */
static void cl_lock_finish(const struct lu_env *env, struct cl_lock *lock)
{
cl_lock_mutex_get(env, lock);
+ cl_lock_cancel(env, lock);
cl_lock_delete(env, lock);
cl_lock_mutex_put(env, lock);
cl_lock_put(env, lock);
LASSERT(cl_is_lock(lock));
matched = cl_lock_ext_match(&lock->cll_descr, need) &&
- lock->cll_state < CLS_FREEING &&
- !(lock->cll_flags & CLF_CANCELLED) &&
- cl_lock_fits_into(env, lock, need, io);
+ lock->cll_state < CLS_FREEING &&
+ lock->cll_error == 0 &&
+ !(lock->cll_flags & CLF_CANCELLED) &&
+ cl_lock_fits_into(env, lock, need, io);
CDEBUG(D_DLMTRACE, "has: "DDESCR"(%i) need: "DDESCR": %d\n",
PDESCR(&lock->cll_descr), lock->cll_state, PDESCR(need),
matched);
ENTRY;
if (lock->cll_state < CLS_FREEING) {
+ LASSERT(lock->cll_state != CLS_INTRANSIT);
cl_lock_state_set(env, lock, CLS_FREEING);
head = cl_object_header(lock->cll_descr.cld_obj);
do {
result = 0;
- if (lock->cll_error != 0)
- break;
-
LINVRNT(cl_lock_is_mutexed(lock));
LINVRNT(cl_lock_invariant(env, lock));
LASSERT(lock->cll_state == CLS_INTRANSIT);
LASSERT(result != -ENOSYS);
} while (result == CLO_REPEAT);
- return result ?: lock->cll_error;
+ return result;
}
/**
ENTRY;
cl_lock_trace(D_DLMTRACE, env, "use lock", lock);
+ LASSERT(lock->cll_state == CLS_CACHED);
+ if (lock->cll_error)
+ RETURN(lock->cll_error);
+
result = -ENOSYS;
state = cl_lock_intransit(env, lock);
list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
}
LASSERT(result != -ENOSYS);
- LASSERT(lock->cll_state == CLS_INTRANSIT);
+ LASSERTF(lock->cll_state == CLS_INTRANSIT, "Wrong state %d.\n",
+ lock->cll_state);
if (result == 0) {
state = CLS_HELD;
/* @atomic means back-off-on-failure. */
if (atomic) {
int rc;
-
- do {
- rc = cl_unuse_try_internal(env, lock);
- if (rc == 0)
- break;
- if (rc == CLO_WAIT)
- rc = cl_lock_state_wait(env, lock);
- if (rc < 0)
- break;
- } while(1);
-
+ rc = cl_unuse_try_internal(env, lock);
/* Vet the results. */
if (rc < 0 && result > 0)
result = rc;
} while (1);
if (result != 0) {
cl_lock_user_del(env, lock);
- if (result != -EINTR)
- cl_lock_error(env, lock, result);
+ cl_lock_error(env, lock, result);
}
LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
lock->cll_state == CLS_HELD));
*
* This function is called repeatedly by cl_unuse() until either lock is
* unlocked, or error occurs.
+ * cl_unuse_try is a one-shot operation, so it must NOT return CLO_WAIT.
*
- * \pre lock->cll_state <= CLS_HELD || cl_lock_is_intransit(lock)
+ * \pre lock->cll_state == CLS_HELD
*
* \post ergo(result == 0, lock->cll_state == CLS_CACHED)
*
ENTRY;
cl_lock_trace(D_DLMTRACE, env, "unuse lock", lock);
- if (lock->cll_state != CLS_INTRANSIT) {
- if (lock->cll_users > 1) {
- cl_lock_user_del(env, lock);
- RETURN(0);
- }
- /*
- * New lock users (->cll_users) are not protecting unlocking
- * from proceeding. From this point, lock eventually reaches
- * CLS_CACHED, is reinitialized to CLS_NEW or fails into
- * CLS_FREEING.
- */
- state = cl_lock_intransit(env, lock);
+ LASSERT(lock->cll_state == CLS_HELD || lock->cll_state == CLS_ENQUEUED);
+ if (lock->cll_users > 1) {
+ cl_lock_user_del(env, lock);
+ RETURN(0);
}
+ /*
+ * New lock users (->cll_users) are not protecting unlocking
+ * from proceeding. From this point, lock eventually reaches
+ * CLS_CACHED, is reinitialized to CLS_NEW or fails into
+ * CLS_FREEING.
+ */
+ state = cl_lock_intransit(env, lock);
+
result = cl_unuse_try_internal(env, lock);
LASSERT(lock->cll_state == CLS_INTRANSIT);
- if (result != CLO_WAIT)
- /*
- * Once there is no more need to iterate ->clo_unuse() calls,
- * remove lock user. This is done even if unrecoverable error
- * happened during unlocking, because nothing else can be
- * done.
- */
- cl_lock_user_del(env, lock);
+ LASSERT(result != CLO_WAIT);
+ cl_lock_user_del(env, lock);
if (result == 0 || result == -ESTALE) {
/*
* Return lock back to the cache. This is the only
* re-initialized. This happens e.g., when a sub-lock was
* canceled while unlocking was in progress.
*/
- state = result == 0 ? CLS_CACHED : CLS_NEW;
+ if (state == CLS_HELD && result == 0)
+ state = CLS_CACHED;
+ else
+ state = CLS_NEW;
cl_lock_extransit(env, lock, state);
/*
*/
result = 0;
} else {
- CWARN("result = %d, this is unlikely!\n", result);
+ CERROR("result = %d, this is unlikely!\n", result);
cl_lock_extransit(env, lock, state);
}
static void cl_unuse_locked(const struct lu_env *env, struct cl_lock *lock)
{
+ int result;
ENTRY;
- LASSERT(lock->cll_state <= CLS_HELD);
- do {
- int result;
- result = cl_unuse_try(env, lock);
- if (result == CLO_WAIT) {
- result = cl_lock_state_wait(env, lock);
- if (result == 0)
- continue;
- }
- break;
- } while (1);
+ result = cl_unuse_try(env, lock);
+ if (result)
+ CL_LOCK_DEBUG(D_ERROR, env, lock, "unuse return %d\n", result);
+
EXIT;
}
}
}
LASSERT(result != -ENOSYS);
- if (result == 0)
+ if (result == 0) {
+ LASSERT(lock->cll_state != CLS_INTRANSIT);
cl_lock_state_set(env, lock, CLS_HELD);
+ }
} while (result == CLO_REPEAT);
RETURN(result ?: lock->cll_error);
}
cl_lock_mutex_get(env, lock);
LINVRNT(cl_lock_invariant(env, lock));
- LASSERT(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD);
+ LASSERTF(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD,
+ "Wrong state %d \n", lock->cll_state);
LASSERT(lock->cll_holds > 0);
cl_lock_trace(D_DLMTRACE, env, "wait lock", lock);
} while (1);
if (result < 0) {
cl_lock_user_del(env, lock);
- if (result != -EINTR)
- cl_lock_error(env, lock, result);
+ cl_lock_error(env, lock, result);
cl_lock_lockdep_release(env, lock);
}
cl_lock_mutex_put(env, lock);
need->cld_mode = CLM_READ; /* CLM_READ matches both READ & WRITE, but
* not PHANTOM */
need->cld_start = need->cld_end = page->cp_index;
+ need->cld_enq_flags = 0;
spin_lock(&head->coh_lock_guard);
/* It is fine to match any group lock since there could be only one
if (IS_ERR(lock))
break;
cl_lock_mutex_get(env, lock);
- if (lock->cll_state < CLS_FREEING) {
+ if (lock->cll_state < CLS_FREEING &&
+ !(lock->cll_flags & CLF_CANCELLED)) {
cl_lock_hold_mod(env, lock, +1);
lu_ref_add(&lock->cll_holders, scope, source);
lu_ref_add(&lock->cll_reference, scope, source);
*/
struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
const struct cl_lock_descr *need,
- __u32 enqflags,
const char *scope, const void *source)
{
struct cl_lock *lock;
const struct lu_fid *fid;
int rc;
int iter;
- int warn;
+ __u32 enqflags = need->cld_enq_flags;
ENTRY;
fid = lu_object_fid(&io->ci_obj->co_lu);
iter = 0;
do {
- warn = iter >= 16 && IS_PO2(iter);
- CDEBUG(warn ? D_WARNING : D_DLMTRACE,
- DDESCR"@"DFID" %i %08x `%s'\n",
- PDESCR(need), PFID(fid), iter, enqflags, scope);
lock = cl_lock_hold_mutex(env, io, need, scope, source);
if (!IS_ERR(lock)) {
rc = cl_enqueue_locked(env, lock, io, enqflags);
cl_lock_lockdep_acquire(env,
lock, enqflags);
break;
- } else if (warn)
- CL_LOCK_DEBUG(D_WARNING, env, lock,
- "got (see bug 17665)\n");
+ }
cl_unuse_locked(env, lock);
}
cl_lock_trace(D_DLMTRACE, env, "enqueue failed", lock);
descr->cld_start = cl_index(obj, start);
descr->cld_end = cl_index(obj, end);
descr->cld_mode = mode == LCK_PW ? CLM_WRITE : CLM_READ;
+ descr->cld_enq_flags = CEF_ASYNC | enqflags;
io->ci_obj = obj;
- lck = cl_lock_request(env, io, descr, CEF_ASYNC | enqflags,
- "ec enqueue", eco);
+ lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
if (lck) {
struct echo_client_obd *ec = eco->eo_dev->ed_ec;
struct echo_lock *el;
{
struct osc_lock *ols = cl2osc_lock(slice);
- /* If the lock hasn't ever enqueued, it can't be matched because
- * enqueue process brings in many information which can be used to
- * determine things such as lockless, CEF_MUST, etc.
- */
- if (ols->ols_state < OLS_ENQUEUED)
- return 0;
-
- /* Don't match this lock if the lock is able to become lockless lock.
- * This is because the new lock might be covering a mmap region and
- * so that it must have a cached at the local side. */
- if (ols->ols_state < OLS_UPCALL_RECEIVED && ols->ols_locklessable)
- return 0;
-
- /* If the lock is going to be canceled, no reason to match it as well */
- if (ols->ols_state > OLS_RELEASED)
+ if (need->cld_enq_flags & CEF_NEVER)
return 0;
- /* go for it. */
+ if (need->cld_mode == CLM_PHANTOM) {
+ /*
+ * Note: the QUEUED lock can't be matched here, otherwise
+ * it might cause the deadlocks.
+ * In read_process,
+ * P1: enqueued read lock, create sublock1
+ * P2: enqueued write lock, create sublock2(conflicted
+ * with sublock1).
+ * P1: Grant read lock.
+ * P1: enqueued glimpse lock(with holding sublock1_read),
+ * matched with sublock2, waiting sublock2 to be granted.
+ * But sublock2 can not be granted, because P1
+ * will not release sublock1. Bang!
+ */
+ if (ols->ols_state < OLS_GRANTED ||
+ ols->ols_state > OLS_RELEASED)
+ return 0;
+ } else if (need->cld_enq_flags & CEF_MUST) {
+ /*
+ * If the lock hasn't ever enqueued, it can't be matched
+ * because enqueue process brings in many information
+ * which can be used to determine things such as lockless,
+ * CEF_MUST, etc.
+ */
+ if (ols->ols_state < OLS_GRANTED ||
+ ols->ols_state > OLS_RELEASED)
+ return 0;
+ if (ols->ols_state < OLS_UPCALL_RECEIVED &&
+ ols->ols_locklessable)
+ return 0;
+ }
return 1;
}