-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2013, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
LASSERT(idx < lck->lls_nr);
ENTRY;
- OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, CFS_ALLOC_IO);
+ OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, __GFP_IO);
if (link != NULL) {
struct lov_sublock_env *subenv;
struct lov_lock_sub *lls;
parent = lck->lls_cl.cls_lock;
lls = &lck->lls_sub[idx];
- descr = &lls->sub_descr;
+ descr = &lls->sub_got;
subenv = lov_sublock_env_get(env, parent, lls);
if (!IS_ERR(subenv)) {
ENTRY;
- LASSERT(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT);
- LASSERT(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT);
+ LASSERTF(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT,
+ "result = %d", result);
+ LASSERTF(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT,
+ "rc = %d\n", rc);
CLASSERT(CLO_WAIT < CLO_REPEAT);
/* calculate ranks in the ordering above */
* XXX for wide striping smarter algorithm is desirable,
* breaking out of the loop, early.
*/
- if (lov_stripe_intersects(r0->lo_lsm, i,
+ if (lov_stripe_intersects(loo->lo_lsm, i,
file_start, file_end, &start, &end))
nr++;
}
* top-lock.
*/
for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
- if (lov_stripe_intersects(r0->lo_lsm, i,
+ if (lov_stripe_intersects(loo->lo_lsm, i,
file_start, file_end, &start, &end)) {
struct cl_lock_descr *descr;
/* first, try to enqueue a sub-lock ... */
result = cl_enqueue_try(env, sublock, io, enqflags);
- if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL))
- /* if it is enqueued, try to `wait' on it---maybe it's already
- * granted */
- result = cl_wait_try(env, sublock);
+ if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL)) {
+ /* if it is enqueued, try to `wait' on it---maybe it's already
+ * granted */
+ result = cl_wait_try(env, sublock);
+ if (result == CLO_REENQUEUED)
+ result = CLO_WAIT;
+ }
/*
* If CEF_ASYNC flag is set, then all sub-locks can be enqueued in
* parallel, otherwise---enqueue has to wait until sub-lock is granted
sublock);
break;
case CLS_CACHED:
+ cl_lock_get(sublock);
+ /* take recursive mutex of sublock */
+ cl_lock_mutex_get(env, sublock);
+ /* need to release all locks in closure
+ * otherwise it may deadlock. LU-2683.*/
+ lov_sublock_unlock(env, sub, closure,
+ subenv);
+ /* sublock and parent are held. */
rc = lov_sublock_release(env, lck, i,
1, rc);
+ cl_lock_mutex_put(env, sublock);
+ cl_lock_put(env, sublock);
+ break;
default:
lov_sublock_unlock(env, sub, closure,
subenv);
if (lls->sub_flags & LSF_HELD) {
LASSERT(sublock->cll_state == CLS_HELD ||
sublock->cll_state == CLS_ENQUEUED);
- /* For AGL case, the sublock state maybe not
- * match the lower layer state, so sync them
- * before unuse. */
- if (sublock->cll_users == 1 &&
- sublock->cll_state == CLS_ENQUEUED) {
- __u32 save;
-
- save = sublock->cll_descr.cld_enq_flags;
- sublock->cll_descr.cld_enq_flags |=
- CEF_NO_REENQUEUE;
- cl_wait_try(env, sublock);
- sublock->cll_descr.cld_enq_flags = save;
- }
rc = cl_unuse_try(subenv->lse_env, sublock);
rc = lov_sublock_release(env, lck, i, 0, rc);
}
switch(sublock->cll_state) {
case CLS_HELD:
- rc = cl_unuse_try(subenv->lse_env,
- sublock);
+ rc = cl_unuse_try(subenv->lse_env, sublock);
lov_sublock_release(env, lck, i, 0, 0);
break;
- case CLS_ENQUEUED:
- /* TODO: it's not a good idea to cancel this
- * lock because it's innocent. But it's
- * acceptable. The better way would be to
- * define a new lock method to unhold the
- * dlm lock. */
- cl_lock_cancel(env, sublock);
default:
lov_sublock_release(env, lck, i, 1, 0);
break;
if (sub->sub_lock == NULL)
continue;
subobj = sub->sub_descr.cld_obj;
- if (!lov_stripe_intersects(r0->lo_lsm, sub->sub_stripe,
+ if (!lov_stripe_intersects(loo->lo_lsm, sub->sub_stripe,
fstart, fend, &start, &end))
continue;
subneed->cld_start = cl_index(subobj, start);
const struct cl_lock_descr *child,
const struct cl_lock_descr *descr)
{
- struct lov_stripe_md *lsm = lov_r0(lov)->lo_lsm;
+ struct lov_stripe_md *lsm = lov->lo_lsm;
obd_off start;
obd_off end;
int result;
ENTRY;
+ /* for top lock, it's necessary to match enq flags otherwise it will
+ * run into problem if a sublock is missing and reenqueue. */
+ if (need->cld_enq_flags != lov->lls_orig.cld_enq_flags)
+ return 0;
+
+ if (lov->lls_ever_canceled)
+ return 0;
+
if (need->cld_mode == CLM_GROUP)
/*
* always allow to match group lock.
int result;
ENTRY;
- OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, CFS_ALLOC_IO);
+ OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, __GFP_IO);
if (lck != NULL) {
cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops);
result = lov_lock_sub_init(env, lck, io);
RETURN(result);
}
+static void lov_empty_lock_fini(const struct lu_env *env,
+ struct cl_lock_slice *slice)
+{
+ struct lov_lock *lck = cl2lov_lock(slice);
+ OBD_SLAB_FREE_PTR(lck, lov_lock_kmem);
+}
+
+static int lov_empty_lock_print(const struct lu_env *env, void *cookie,
+ lu_printer_t p, const struct cl_lock_slice *slice)
+{
+ (*p)(env, cookie, "empty\n");
+ return 0;
+}
+
+/* XXX: more methods will be added later. */
+static const struct cl_lock_operations lov_empty_lock_ops = {
+ .clo_fini = lov_empty_lock_fini,
+ .clo_print = lov_empty_lock_print
+};
+
+int lov_lock_init_empty(const struct lu_env *env, struct cl_object *obj,
+ struct cl_lock *lock, const struct cl_io *io)
+{
+ struct lov_lock *lck;
+ int result = -ENOMEM;
+
+ ENTRY;
+ OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, __GFP_IO);
+ if (lck != NULL) {
+ cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_empty_lock_ops);
+ lck->lls_orig = lock->cll_descr;
+ result = 0;
+ }
+ RETURN(result);
+}
+
static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
struct cl_lock *parent)
{