-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2013, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
}
static struct cl_lock *lov_sublock_alloc(const struct lu_env *env,
- const struct cl_io *io,
- struct lov_lock *lck,
- int idx, struct lov_lock_link **out)
+ const struct cl_io *io,
+ struct lov_lock *lck,
+ int idx, struct lov_lock_link **out)
{
- struct cl_lock *sublock;
- struct cl_lock *parent;
- struct lov_lock_link *link;
-
- LASSERT(idx < lck->lls_nr);
- ENTRY;
-
- OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, CFS_ALLOC_IO);
- if (link != NULL) {
- struct lov_sublock_env *subenv;
- struct lov_lock_sub *lls;
- struct cl_lock_descr *descr;
-
- parent = lck->lls_cl.cls_lock;
- lls = &lck->lls_sub[idx];
- descr = &lls->sub_descr;
-
- subenv = lov_sublock_env_get(env, parent, lls);
- if (!IS_ERR(subenv)) {
- /* CAVEAT: Don't try to add a field in lov_lock_sub
- * to remember the subio. This is because lock is able
- * to be cached, but this is not true for IO. This
- * further means a sublock might be referenced in
- * different io context. -jay */
-
- sublock = cl_lock_hold(subenv->lse_env, subenv->lse_io,
- descr, "lov-parent", parent);
- lov_sublock_env_put(subenv);
- } else {
- /* error occurs. */
- sublock = (void*)subenv;
- }
-
- if (!IS_ERR(sublock))
- *out = link;
- else
- OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
- } else
- sublock = ERR_PTR(-ENOMEM);
- RETURN(sublock);
+ struct cl_lock *sublock;
+ struct cl_lock *parent;
+ struct lov_lock_link *link;
+
+ LASSERT(idx < lck->lls_nr);
+ ENTRY;
+
+ OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, GFP_NOFS);
+ if (link != NULL) {
+ struct lov_sublock_env *subenv;
+ struct lov_lock_sub *lls;
+ struct cl_lock_descr *descr;
+
+ parent = lck->lls_cl.cls_lock;
+ lls = &lck->lls_sub[idx];
+ descr = &lls->sub_got;
+
+ subenv = lov_sublock_env_get(env, parent, lls);
+ if (!IS_ERR(subenv)) {
+ /* CAVEAT: Don't try to add a field in lov_lock_sub
+ * to remember the subio. This is because lock is able
+ * to be cached, but this is not true for IO. This
+ * further means a sublock might be referenced in
+ * different io context. -jay */
+
+ sublock = cl_lock_hold(subenv->lse_env, subenv->lse_io,
+ descr, "lov-parent", parent);
+ lov_sublock_env_put(subenv);
+ } else {
+ /* error occurs. */
+ sublock = (void *)subenv;
+ }
+
+ if (!IS_ERR(sublock))
+ *out = link;
+ else
+ OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
+ } else
+ sublock = ERR_PTR(-ENOMEM);
+ RETURN(sublock);
}
static void lov_sublock_unlock(const struct lu_env *env,
ENTRY;
- LASSERT(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT);
- LASSERT(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT);
+ LASSERTF(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT,
+ "result = %d", result);
+ LASSERTF(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT,
+ "rc = %d\n", rc);
CLASSERT(CLO_WAIT < CLO_REPEAT);
/* calculate ranks in the ordering above */
* XXX for wide striping smarter algorithm is desirable,
* breaking out of the loop, early.
*/
- if (lov_stripe_intersects(r0->lo_lsm, i,
+ if (lov_stripe_intersects(loo->lo_lsm, i,
file_start, file_end, &start, &end))
nr++;
}
* top-lock.
*/
for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
- if (lov_stripe_intersects(r0->lo_lsm, i,
+ if (lov_stripe_intersects(loo->lo_lsm, i,
file_start, file_end, &start, &end)) {
struct cl_lock_descr *descr;
}
}
LASSERT(nr == lck->lls_nr);
- /*
- * Then, create sub-locks. Once at least one sub-lock was created,
- * top-lock can be reached by other threads.
- */
- for (i = 0; i < lck->lls_nr; ++i) {
- struct cl_lock *sublock;
- struct lov_lock_link *link;
- if (lck->lls_sub[i].sub_lock == NULL) {
- sublock = lov_sublock_alloc(env, io, lck, i, &link);
- if (IS_ERR(sublock)) {
- result = PTR_ERR(sublock);
- break;
- }
- cl_lock_get_trust(sublock);
- cl_lock_mutex_get(env, sublock);
- cl_lock_mutex_get(env, parent);
- /*
- * recheck under mutex that sub-lock wasn't created
- * concurrently, and that top-lock is still alive.
- */
- if (lck->lls_sub[i].sub_lock == NULL &&
- parent->cll_state < CLS_FREEING) {
- lov_sublock_adopt(env, lck, sublock, i, link);
- cl_lock_mutex_put(env, parent);
- } else {
- OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
- cl_lock_mutex_put(env, parent);
- cl_lock_unhold(env, sublock,
- "lov-parent", parent);
- }
- cl_lock_mutex_put(env, sublock);
- cl_lock_put(env, sublock);
- }
- }
/*
* Some sub-locks can be missing at this point. This is not a problem,
* because enqueue will create them anyway. Main duty of this function
/* first, try to enqueue a sub-lock ... */
result = cl_enqueue_try(env, sublock, io, enqflags);
- if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL))
- /* if it is enqueued, try to `wait' on it---maybe it's already
- * granted */
- result = cl_wait_try(env, sublock);
+ if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL)) {
+ /* if it is enqueued, try to `wait' on it---maybe it's already
+ * granted */
+ result = cl_wait_try(env, sublock);
+ if (result == CLO_REENQUEUED)
+ result = CLO_WAIT;
+ }
/*
* If CEF_ASYNC flag is set, then all sub-locks can be enqueued in
* parallel, otherwise---enqueue has to wait until sub-lock is granted
static int lov_sublock_fill(const struct lu_env *env, struct cl_lock *parent,
struct cl_io *io, struct lov_lock *lck, int idx)
{
- struct lov_lock_link *link;
+ struct lov_lock_link *link = NULL;
struct cl_lock *sublock;
int result;
sublock);
break;
case CLS_CACHED:
+ cl_lock_get(sublock);
+ /* take recursive mutex of sublock */
+ cl_lock_mutex_get(env, sublock);
+ /* need to release all locks in closure
+ * otherwise it may deadlock. LU-2683.*/
+ lov_sublock_unlock(env, sub, closure,
+ subenv);
+ /* sublock and parent are held. */
rc = lov_sublock_release(env, lck, i,
1, rc);
+ cl_lock_mutex_put(env, sublock);
+ cl_lock_put(env, sublock);
+ break;
default:
lov_sublock_unlock(env, sub, closure,
subenv);
if (sub == NULL)
continue;
- sublock = sub->lss_cl.cls_lock;
- rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
- if (rc == 0) {
- if (lls->sub_flags & LSF_HELD) {
- LASSERT(sublock->cll_state == CLS_HELD ||
- sublock->cll_state == CLS_ENQUEUED);
- /* For AGL case, the sublock state maybe not
- * match the lower layer state, so sync them
- * before unuse. */
- if (sublock->cll_users == 1 &&
- sublock->cll_state == CLS_ENQUEUED) {
- __u32 save;
-
- save = sublock->cll_descr.cld_enq_flags;
- sublock->cll_descr.cld_enq_flags |=
- CEF_NO_REENQUEUE;
- cl_wait_try(env, sublock);
- sublock->cll_descr.cld_enq_flags = save;
- }
- rc = cl_unuse_try(subenv->lse_env, sublock);
- rc = lov_sublock_release(env, lck, i, 0, rc);
- }
- lov_sublock_unlock(env, sub, closure, subenv);
- }
- result = lov_subresult(result, rc);
+ sublock = sub->lss_cl.cls_lock;
+ rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
+ if (rc == 0) {
+ if (!(lls->sub_flags & LSF_HELD)) {
+ lov_sublock_unlock(env, sub, closure, subenv);
+ continue;
+ }
+
+ switch(sublock->cll_state) {
+ case CLS_HELD:
+ rc = cl_unuse_try(subenv->lse_env, sublock);
+ lov_sublock_release(env, lck, i, 0, 0);
+ break;
+ default:
+ cl_lock_cancel(subenv->lse_env, sublock);
+ lov_sublock_release(env, lck, i, 1, 0);
+ break;
+ }
+ lov_sublock_unlock(env, sub, closure, subenv);
+ }
+ result = lov_subresult(result, rc);
}
if (result == 0 && lck->lls_cancel_race) {
switch(sublock->cll_state) {
case CLS_HELD:
- rc = cl_unuse_try(subenv->lse_env,
- sublock);
+ rc = cl_unuse_try(subenv->lse_env, sublock);
lov_sublock_release(env, lck, i, 0, 0);
break;
- case CLS_ENQUEUED:
- /* TODO: it's not a good idea to cancel this
- * lock because it's innocent. But it's
- * acceptable. The better way would be to
- * define a new lock method to unhold the
- * dlm lock. */
- cl_lock_cancel(env, sublock);
default:
+ cl_lock_cancel(subenv->lse_env, sublock);
lov_sublock_release(env, lck, i, 1, 0);
break;
}
if (sub->sub_lock == NULL)
continue;
subobj = sub->sub_descr.cld_obj;
- if (!lov_stripe_intersects(r0->lo_lsm, sub->sub_stripe,
+ if (!lov_stripe_intersects(loo->lo_lsm, sub->sub_stripe,
fstart, fend, &start, &end))
continue;
subneed->cld_start = cl_index(subobj, start);
const struct cl_lock_descr *child,
const struct cl_lock_descr *descr)
{
- struct lov_stripe_md *lsm = lov_r0(lov)->lo_lsm;
+ struct lov_stripe_md *lsm = lov->lo_lsm;
obd_off start;
obd_off end;
int result;
ENTRY;
+ /* for top lock, it's necessary to match enq flags otherwise it will
+ * run into problem if a sublock is missing and reenqueue. */
+ if (need->cld_enq_flags != lov->lls_orig.cld_enq_flags)
+ return 0;
+
+ if (lov->lls_ever_canceled)
+ return 0;
+
if (need->cld_mode == CLM_GROUP)
/*
* always allow to match group lock.
};
int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj,
- struct cl_lock *lock, const struct cl_io *io)
+ struct cl_lock *lock, const struct cl_io *io)
{
- struct lov_lock *lck;
- int result;
+ struct lov_lock *lck;
+ int result;
+
+ ENTRY;
+ OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, GFP_NOFS);
+ if (lck != NULL) {
+ cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops);
+ result = lov_lock_sub_init(env, lck, io);
+ } else
+ result = -ENOMEM;
+ RETURN(result);
+}
- ENTRY;
- OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, CFS_ALLOC_IO);
- if (lck != NULL) {
- cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops);
- result = lov_lock_sub_init(env, lck, io);
- } else
- result = -ENOMEM;
- RETURN(result);
+static void lov_empty_lock_fini(const struct lu_env *env,
+ struct cl_lock_slice *slice)
+{
+ struct lov_lock *lck = cl2lov_lock(slice);
+ OBD_SLAB_FREE_PTR(lck, lov_lock_kmem);
+}
+
+static int lov_empty_lock_print(const struct lu_env *env, void *cookie,
+ lu_printer_t p, const struct cl_lock_slice *slice)
+{
+ (*p)(env, cookie, "empty\n");
+ return 0;
+}
+
+/* XXX: more methods will be added later. */
+static const struct cl_lock_operations lov_empty_lock_ops = {
+ .clo_fini = lov_empty_lock_fini,
+ .clo_print = lov_empty_lock_print
+};
+
+int lov_lock_init_empty(const struct lu_env *env, struct cl_object *obj,
+ struct cl_lock *lock, const struct cl_io *io)
+{
+ struct lov_lock *lck;
+ int result = -ENOMEM;
+
+ ENTRY;
+ OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, GFP_NOFS);
+ if (lck != NULL) {
+ cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_empty_lock_ops);
+ lck->lls_orig = lock->cll_descr;
+ result = 0;
+ }
+ RETURN(result);
}
static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,