Whamcloud - gitweb
git://git.whamcloud.com
/
fs
/
lustre-release.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
LU-1304 osd: ->do_ah_init() to accept new object
[fs/lustre-release.git]
/
lustre
/
lov
/
lov_lock.c
diff --git
a/lustre/lov/lov_lock.c
b/lustre/lov/lov_lock.c
index
1c118f0
..
907baae
100644
(file)
--- a/
lustre/lov/lov_lock.c
+++ b/
lustre/lov/lov_lock.c
@@
-1,6
+1,4
@@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
@@
-28,6
+26,8
@@
/*
* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
/*
* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
*/
/*
* This file is part of Lustre, http://www.lustre.org/
@@
-267,12
+267,14
@@
static int lov_subresult(int result, int rc)
int result_rank;
int rc_rank;
int result_rank;
int rc_rank;
- LASSERT(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT);
- LASSERT(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT);
- CLASSERT(CLO_WAIT < CLO_REPEAT);
-
ENTRY;
ENTRY;
+ LASSERTF(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT,
+ "result = %d", result);
+ LASSERTF(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT,
+ "rc = %d\n", rc);
+ CLASSERT(CLO_WAIT < CLO_REPEAT);
+
/* calculate ranks in the ordering above */
result_rank = result < 0 ? 1 + CLO_REPEAT : result;
rc_rank = rc < 0 ? 1 + CLO_REPEAT : rc;
/* calculate ranks in the ordering above */
result_rank = result < 0 ? 1 + CLO_REPEAT : result;
rc_rank = rc < 0 ? 1 + CLO_REPEAT : rc;
@@
-316,12
+318,12
@@
static int lov_lock_sub_init(const struct lu_env *env,
* XXX for wide striping smarter algorithm is desirable,
* breaking out of the loop, early.
*/
* XXX for wide striping smarter algorithm is desirable,
* breaking out of the loop, early.
*/
-
if (lov_stripe_intersects(r0
->lo_lsm, i,
+
if (lov_stripe_intersects(loo
->lo_lsm, i,
file_start, file_end, &start, &end))
nr++;
}
LASSERT(nr > 0);
file_start, file_end, &start, &end))
nr++;
}
LASSERT(nr > 0);
- OBD_ALLOC(lck->lls_sub, nr * sizeof lck->lls_sub[0]);
+ OBD_ALLOC
_LARGE
(lck->lls_sub, nr * sizeof lck->lls_sub[0]);
if (lck->lls_sub == NULL)
RETURN(-ENOMEM);
if (lck->lls_sub == NULL)
RETURN(-ENOMEM);
@@
-334,7
+336,7
@@
static int lov_lock_sub_init(const struct lu_env *env,
* top-lock.
*/
for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
* top-lock.
*/
for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
-
if (lov_stripe_intersects(r0
->lo_lsm, i,
+
if (lov_stripe_intersects(loo
->lo_lsm, i,
file_start, file_end, &start, &end)) {
struct cl_lock_descr *descr;
file_start, file_end, &start, &end)) {
struct cl_lock_descr *descr;
@@
-485,7
+487,8
@@
static void lov_lock_fini(const struct lu_env *env,
* a reference on its parent.
*/
LASSERT(lck->lls_sub[i].sub_lock == NULL);
* a reference on its parent.
*/
LASSERT(lck->lls_sub[i].sub_lock == NULL);
- OBD_FREE(lck->lls_sub, lck->lls_nr * sizeof lck->lls_sub[0]);
+ OBD_FREE_LARGE(lck->lls_sub,
+ lck->lls_nr * sizeof lck->lls_sub[0]);
}
OBD_SLAB_FREE_PTR(lck, lov_lock_kmem);
EXIT;
}
OBD_SLAB_FREE_PTR(lck, lov_lock_kmem);
EXIT;
@@
-523,17
+526,20
@@
static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck,
/* first, try to enqueue a sub-lock ... */
result = cl_enqueue_try(env, sublock, io, enqflags);
/* first, try to enqueue a sub-lock ... */
result = cl_enqueue_try(env, sublock, io, enqflags);
- if (sublock->cll_state == CLS_ENQUEUED)
- /* if it is enqueued, try to `wait' on it---maybe it's already
- * granted */
- result = cl_wait_try(env, sublock);
+ if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL)) {
+ /* if it is enqueued, try to `wait' on it---maybe it's already
+ * granted */
+ result = cl_wait_try(env, sublock);
+ if (result == CLO_REENQUEUED)
+ result = CLO_WAIT;
+ }
/*
* If CEF_ASYNC flag is set, then all sub-locks can be enqueued in
* parallel, otherwise---enqueue has to wait until sub-lock is granted
* before proceeding to the next one.
*/
/*
* If CEF_ASYNC flag is set, then all sub-locks can be enqueued in
* parallel, otherwise---enqueue has to wait until sub-lock is granted
* before proceeding to the next one.
*/
- if (
result == CLO_WAIT && sublock->cll_state <= CLS_HELD
&&
-
enqflags & CEF_ASYNC && !last
)
+ if (
(result == CLO_WAIT) && (sublock->cll_state <= CLS_HELD)
&&
+
(enqflags & CEF_ASYNC) && (!last || (enqflags & CEF_AGL))
)
result = 0;
RETURN(result);
}
result = 0;
RETURN(result);
}
@@
-696,7
+702,8
@@
static int lov_lock_unuse(const struct lu_env *env,
rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
if (rc == 0) {
if (lls->sub_flags & LSF_HELD) {
rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
if (rc == 0) {
if (lls->sub_flags & LSF_HELD) {
- LASSERT(sublock->cll_state == CLS_HELD);
+ LASSERT(sublock->cll_state == CLS_HELD ||
+ sublock->cll_state == CLS_ENQUEUED);
rc = cl_unuse_try(subenv->lse_env, sublock);
rc = lov_sublock_release(env, lck, i, 0, rc);
}
rc = cl_unuse_try(subenv->lse_env, sublock);
rc = lov_sublock_release(env, lck, i, 0, rc);
}
@@
-749,17
+756,9
@@
static void lov_lock_cancel(const struct lu_env *env,
switch(sublock->cll_state) {
case CLS_HELD:
switch(sublock->cll_state) {
case CLS_HELD:
- rc = cl_unuse_try(subenv->lse_env,
- sublock);
+ rc = cl_unuse_try(subenv->lse_env, sublock);
lov_sublock_release(env, lck, i, 0, 0);
break;
lov_sublock_release(env, lck, i, 0, 0);
break;
- case CLS_ENQUEUED:
- /* TODO: it's not a good idea to cancel this
- * lock because it's innocent. But it's
- * acceptable. The better way would be to
- * define a new lock method to unhold the
- * dlm lock. */
- cl_lock_cancel(env, sublock);
default:
lov_sublock_release(env, lck, i, 1, 0);
break;
default:
lov_sublock_release(env, lck, i, 1, 0);
break;
@@
-788,12
+787,15
@@
static int lov_lock_wait(const struct lu_env *env,
struct lov_lock *lck = cl2lov_lock(slice);
struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
enum cl_lock_state minstate;
struct lov_lock *lck = cl2lov_lock(slice);
struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
enum cl_lock_state minstate;
+ int reenqueued;
int result;
int i;
ENTRY;
int result;
int i;
ENTRY;
- for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
+again:
+ for (result = 0, minstate = CLS_FREEING, i = 0, reenqueued = 0;
+ i < lck->lls_nr; ++i) {
int rc;
struct lovsub_lock *sub;
struct cl_lock *sublock;
int rc;
struct lovsub_lock *sub;
struct cl_lock *sublock;
@@
-813,10
+815,18
@@
static int lov_lock_wait(const struct lu_env *env,
minstate = min(minstate, sublock->cll_state);
lov_sublock_unlock(env, sub, closure, subenv);
}
minstate = min(minstate, sublock->cll_state);
lov_sublock_unlock(env, sub, closure, subenv);
}
+ if (rc == CLO_REENQUEUED) {
+ reenqueued++;
+ rc = 0;
+ }
result = lov_subresult(result, rc);
if (result != 0)
break;
}
result = lov_subresult(result, rc);
if (result != 0)
break;
}
+ /* Each sublock only can be reenqueued once, so will not loop for
+ * ever. */
+ if (result == 0 && reenqueued != 0)
+ goto again;
cl_lock_closure_fini(closure);
RETURN(result ?: minstate >= CLS_HELD ? 0 : CLO_WAIT);
}
cl_lock_closure_fini(closure);
RETURN(result ?: minstate >= CLS_HELD ? 0 : CLO_WAIT);
}
@@
-862,6
+872,11
@@
static int lov_lock_use(const struct lu_env *env,
if (rc != 0)
rc = lov_sublock_release(env, lck,
i, 1, rc);
if (rc != 0)
rc = lov_sublock_release(env, lck,
i, 1, rc);
+ } else if (sublock->cll_state == CLS_NEW) {
+ /* Sub-lock might have been canceled, while
+ * top-lock was cached. */
+ result = -ESTALE;
+ lov_sublock_release(env, lck, i, 1, result);
}
lov_sublock_unlock(env, sub, closure, subenv);
}
}
lov_sublock_unlock(env, sub, closure, subenv);
}
@@
-909,7
+924,7
@@
static int lock_lock_multi_match()
if (sub->sub_lock == NULL)
continue;
subobj = sub->sub_descr.cld_obj;
if (sub->sub_lock == NULL)
continue;
subobj = sub->sub_descr.cld_obj;
-
if (!lov_stripe_intersects(r0
->lo_lsm, sub->sub_stripe,
+
if (!lov_stripe_intersects(loo
->lo_lsm, sub->sub_stripe,
fstart, fend, &start, &end))
continue;
subneed->cld_start = cl_index(subobj, start);
fstart, fend, &start, &end))
continue;
subneed->cld_start = cl_index(subobj, start);
@@
-933,7
+948,7
@@
static int lov_lock_stripe_is_matching(const struct lu_env *env,
const struct cl_lock_descr *child,
const struct cl_lock_descr *descr)
{
const struct cl_lock_descr *child,
const struct cl_lock_descr *descr)
{
-
struct lov_stripe_md *lsm = lov_r0(lov)
->lo_lsm;
+
struct lov_stripe_md *lsm = lov
->lo_lsm;
obd_off start;
obd_off end;
int result;
obd_off start;
obd_off end;
int result;