-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
/*
* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2013, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
LASSERT(idx < lck->lls_nr);
ENTRY;
- OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, CFS_ALLOC_IO);
+ OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, __GFP_IO);
if (link != NULL) {
struct lov_sublock_env *subenv;
struct lov_lock_sub *lls;
parent = lck->lls_cl.cls_lock;
lls = &lck->lls_sub[idx];
- descr = &lls->sub_descr;
+ descr = &lls->sub_got;
subenv = lov_sublock_env_get(env, parent, lls);
if (!IS_ERR(subenv)) {
int result_rank;
int rc_rank;
- LASSERT(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT);
- LASSERT(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT);
- CLASSERT(CLO_WAIT < CLO_REPEAT);
-
ENTRY;
+ LASSERTF(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT,
+ "result = %d", result);
+ LASSERTF(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT,
+ "rc = %d\n", rc);
+ CLASSERT(CLO_WAIT < CLO_REPEAT);
+
/* calculate ranks in the ordering above */
result_rank = result < 0 ? 1 + CLO_REPEAT : result;
rc_rank = rc < 0 ? 1 + CLO_REPEAT : rc;
* XXX for wide striping smarter algorithm is desirable,
* breaking out of the loop, early.
*/
- if (lov_stripe_intersects(r0->lo_lsm, i,
+ if (lov_stripe_intersects(loo->lo_lsm, i,
file_start, file_end, &start, &end))
nr++;
}
LASSERT(nr > 0);
- OBD_ALLOC(lck->lls_sub, nr * sizeof lck->lls_sub[0]);
+ OBD_ALLOC_LARGE(lck->lls_sub, nr * sizeof lck->lls_sub[0]);
if (lck->lls_sub == NULL)
RETURN(-ENOMEM);
* top-lock.
*/
for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
- if (lov_stripe_intersects(r0->lo_lsm, i,
+ if (lov_stripe_intersects(loo->lo_lsm, i,
file_start, file_end, &start, &end)) {
struct cl_lock_descr *descr;
* a reference on its parent.
*/
LASSERT(lck->lls_sub[i].sub_lock == NULL);
- OBD_FREE(lck->lls_sub, lck->lls_nr * sizeof lck->lls_sub[0]);
+ OBD_FREE_LARGE(lck->lls_sub,
+ lck->lls_nr * sizeof lck->lls_sub[0]);
}
OBD_SLAB_FREE_PTR(lck, lov_lock_kmem);
EXIT;
}
-/**
- *
- * \retval 0 if state-transition can proceed
- * \retval -ve otherwise.
- */
static int lov_lock_enqueue_wait(const struct lu_env *env,
struct lov_lock *lck,
struct cl_lock *sublock)
{
- struct cl_lock *lock = lck->lls_cl.cls_lock;
- struct cl_lock *conflict = sublock->cll_conflict;
- int result = CLO_REPEAT;
+ struct cl_lock *lock = lck->lls_cl.cls_lock;
+ int result;
ENTRY;
LASSERT(cl_lock_is_mutexed(lock));
- LASSERT(cl_lock_is_mutexed(sublock));
- LASSERT(sublock->cll_state == CLS_QUEUING);
- LASSERT(conflict != NULL);
- sublock->cll_conflict = NULL;
cl_lock_mutex_put(env, lock);
- cl_lock_mutex_put(env, sublock);
-
- LASSERT(cl_lock_nr_mutexed(env) == 0);
-
- cl_lock_mutex_get(env, conflict);
- cl_lock_cancel(env, conflict);
- cl_lock_delete(env, conflict);
- while (conflict->cll_state != CLS_FREEING) {
- int rc = 0;
-
- rc = cl_lock_state_wait(env, conflict);
- if (rc == 0)
- continue;
-
- result = lov_subresult(result, rc);
- break;
- }
- cl_lock_mutex_put(env, conflict);
- lu_ref_del(&conflict->cll_reference, "cancel-wait", sublock);
- cl_lock_put(env, conflict);
-
+ result = cl_lock_enqueue_wait(env, sublock, 0);
cl_lock_mutex_get(env, lock);
- RETURN(result);
+ RETURN(result ?: CLO_REPEAT);
}
/**
/* first, try to enqueue a sub-lock ... */
result = cl_enqueue_try(env, sublock, io, enqflags);
- if (sublock->cll_state == CLS_ENQUEUED)
- /* if it is enqueued, try to `wait' on it---maybe it's already
- * granted */
- result = cl_wait_try(env, sublock);
+ if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL)) {
+ /* if it is enqueued, try to `wait' on it---maybe it's already
+ * granted */
+ result = cl_wait_try(env, sublock);
+ if (result == CLO_REENQUEUED)
+ result = CLO_WAIT;
+ }
/*
* If CEF_ASYNC flag is set, then all sub-locks can be enqueued in
* parallel, otherwise---enqueue has to wait until sub-lock is granted
* before proceeding to the next one.
*/
- if (result == CLO_WAIT && sublock->cll_state <= CLS_HELD &&
- enqflags & CEF_ASYNC && !last)
+ if ((result == CLO_WAIT) && (sublock->cll_state <= CLS_HELD) &&
+ (enqflags & CEF_ASYNC) && (!last || (enqflags & CEF_AGL)))
result = 0;
RETURN(result);
}
sublock);
break;
case CLS_CACHED:
+ cl_lock_get(sublock);
+ /* take recursive mutex of sublock */
+ cl_lock_mutex_get(env, sublock);
+ /* need to release all locks in closure
+ * otherwise it may deadlock. LU-2683.*/
+ lov_sublock_unlock(env, sub, closure,
+ subenv);
+ /* sublock and parent are held. */
rc = lov_sublock_release(env, lck, i,
1, rc);
+ cl_lock_mutex_put(env, sublock);
+ cl_lock_put(env, sublock);
+ break;
default:
lov_sublock_unlock(env, sub, closure,
subenv);
rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
if (rc == 0) {
if (lls->sub_flags & LSF_HELD) {
- LASSERT(sublock->cll_state == CLS_HELD);
+ LASSERT(sublock->cll_state == CLS_HELD ||
+ sublock->cll_state == CLS_ENQUEUED);
rc = cl_unuse_try(subenv->lse_env, sublock);
rc = lov_sublock_release(env, lck, i, 0, rc);
}
switch(sublock->cll_state) {
case CLS_HELD:
- rc = cl_unuse_try(subenv->lse_env,
- sublock);
+ rc = cl_unuse_try(subenv->lse_env, sublock);
lov_sublock_release(env, lck, i, 0, 0);
break;
- case CLS_ENQUEUED:
- /* TODO: it's not a good idea to cancel this
- * lock because it's innocent. But it's
- * acceptable. The better way would be to
- * define a new lock method to unhold the
- * dlm lock. */
- cl_lock_cancel(env, sublock);
default:
lov_sublock_release(env, lck, i, 1, 0);
break;
struct lov_lock *lck = cl2lov_lock(slice);
struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
enum cl_lock_state minstate;
+ int reenqueued;
int result;
int i;
ENTRY;
- for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
+again:
+ for (result = 0, minstate = CLS_FREEING, i = 0, reenqueued = 0;
+ i < lck->lls_nr; ++i) {
int rc;
struct lovsub_lock *sub;
struct cl_lock *sublock;
minstate = min(minstate, sublock->cll_state);
lov_sublock_unlock(env, sub, closure, subenv);
}
+ if (rc == CLO_REENQUEUED) {
+ reenqueued++;
+ rc = 0;
+ }
result = lov_subresult(result, rc);
if (result != 0)
break;
}
+ /* Each sublock only can be reenqueued once, so will not loop for
+ * ever. */
+ if (result == 0 && reenqueued != 0)
+ goto again;
cl_lock_closure_fini(closure);
RETURN(result ?: minstate >= CLS_HELD ? 0 : CLO_WAIT);
}
if (rc != 0)
rc = lov_sublock_release(env, lck,
i, 1, rc);
+ } else if (sublock->cll_state == CLS_NEW) {
+ /* Sub-lock might have been canceled, while
+ * top-lock was cached. */
+ result = -ESTALE;
+ lov_sublock_release(env, lck, i, 1, result);
}
lov_sublock_unlock(env, sub, closure, subenv);
}
if (sub->sub_lock == NULL)
continue;
subobj = sub->sub_descr.cld_obj;
- if (!lov_stripe_intersects(r0->lo_lsm, sub->sub_stripe,
+ if (!lov_stripe_intersects(loo->lo_lsm, sub->sub_stripe,
fstart, fend, &start, &end))
continue;
subneed->cld_start = cl_index(subobj, start);
const struct cl_lock_descr *child,
const struct cl_lock_descr *descr)
{
- struct lov_stripe_md *lsm = lov_r0(lov)->lo_lsm;
+ struct lov_stripe_md *lsm = lov->lo_lsm;
obd_off start;
obd_off end;
int result;
ENTRY;
+ /* for top lock, it's necessary to match enq flags otherwise it will
+ * run into problem if a sublock is missing and reenqueue. */
+ if (need->cld_enq_flags != lov->lls_orig.cld_enq_flags)
+ return 0;
+
+ if (lov->lls_ever_canceled)
+ return 0;
+
if (need->cld_mode == CLM_GROUP)
/*
* always allow to match group lock.
* match against original lock extent.
*/
result = cl_lock_ext_match(&lov->lls_orig, need);
- CDEBUG(D_DLMTRACE, DDESCR"/"DDESCR" %i %i/%i: %i\n",
+ CDEBUG(D_DLMTRACE, DDESCR"/"DDESCR" %d %d/%d: %d\n",
PDESCR(&lov->lls_orig), PDESCR(&lov->lls_sub[0].sub_got),
lov->lls_sub[0].sub_stripe, lov->lls_nr, lov_r0(obj)->lo_nr,
result);
int result;
ENTRY;
- OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, CFS_ALLOC_IO);
+ OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, __GFP_IO);
if (lck != NULL) {
cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops);
result = lov_lock_sub_init(env, lck, io);
RETURN(result);
}
+static void lov_empty_lock_fini(const struct lu_env *env,
+ struct cl_lock_slice *slice)
+{
+ struct lov_lock *lck = cl2lov_lock(slice);
+ OBD_SLAB_FREE_PTR(lck, lov_lock_kmem);
+}
+
+static int lov_empty_lock_print(const struct lu_env *env, void *cookie,
+ lu_printer_t p, const struct cl_lock_slice *slice)
+{
+ (*p)(env, cookie, "empty\n");
+ return 0;
+}
+
+/* XXX: more methods will be added later. */
+static const struct cl_lock_operations lov_empty_lock_ops = {
+ .clo_fini = lov_empty_lock_fini,
+ .clo_print = lov_empty_lock_print
+};
+
+int lov_lock_init_empty(const struct lu_env *env, struct cl_object *obj,
+ struct cl_lock *lock, const struct cl_io *io)
+{
+ struct lov_lock *lck;
+ int result = -ENOMEM;
+
+ ENTRY;
+ OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, __GFP_IO);
+ if (lck != NULL) {
+ cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_empty_lock_ops);
+ lck->lls_orig = lock->cll_descr;
+ result = 0;
+ }
+ RETURN(result);
+}
+
static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
struct cl_lock *parent)
{