-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
/*
* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
int result_rank;
int rc_rank;
+ ENTRY;
+
LASSERT(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT);
LASSERT(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT);
CLASSERT(CLO_WAIT < CLO_REPEAT);
- ENTRY;
-
/* calculate ranks in the ordering above */
result_rank = result < 0 ? 1 + CLO_REPEAT : result;
rc_rank = rc < 0 ? 1 + CLO_REPEAT : rc;
nr++;
}
LASSERT(nr > 0);
- OBD_ALLOC(lck->lls_sub, nr * sizeof lck->lls_sub[0]);
+ OBD_ALLOC_LARGE(lck->lls_sub, nr * sizeof lck->lls_sub[0]);
if (lck->lls_sub == NULL)
RETURN(-ENOMEM);
* a reference on its parent.
*/
LASSERT(lck->lls_sub[i].sub_lock == NULL);
- OBD_FREE(lck->lls_sub, lck->lls_nr * sizeof lck->lls_sub[0]);
+ OBD_FREE_LARGE(lck->lls_sub,
+ lck->lls_nr * sizeof lck->lls_sub[0]);
}
OBD_SLAB_FREE_PTR(lck, lov_lock_kmem);
EXIT;
/* first, try to enqueue a sub-lock ... */
result = cl_enqueue_try(env, sublock, io, enqflags);
- if (sublock->cll_state == CLS_ENQUEUED)
+ if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL))
/* if it is enqueued, try to `wait' on it---maybe it's already
* granted */
result = cl_wait_try(env, sublock);
* parallel, otherwise---enqueue has to wait until sub-lock is granted
* before proceeding to the next one.
*/
- if (result == CLO_WAIT && sublock->cll_state <= CLS_HELD &&
- enqflags & CEF_ASYNC && !last)
+ if ((result == CLO_WAIT) && (sublock->cll_state <= CLS_HELD) &&
+ (enqflags & CEF_ASYNC) && (!last || (enqflags & CEF_AGL)))
result = 0;
RETURN(result);
}
rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
if (rc == 0) {
if (lls->sub_flags & LSF_HELD) {
- LASSERT(sublock->cll_state == CLS_HELD);
+ LASSERT(sublock->cll_state == CLS_HELD ||
+ sublock->cll_state == CLS_ENQUEUED);
rc = cl_unuse_try(subenv->lse_env, sublock);
rc = lov_sublock_release(env, lck, i, 0, rc);
}
switch(sublock->cll_state) {
case CLS_HELD:
- rc = cl_unuse_try(subenv->lse_env,
- sublock);
+ rc = cl_unuse_try(subenv->lse_env, sublock);
lov_sublock_release(env, lck, i, 0, 0);
break;
- case CLS_ENQUEUED:
- /* TODO: it's not a good idea to cancel this
- * lock because it's innocent. But it's
- * acceptable. The better way would be to
- * define a new lock method to unhold the
- * dlm lock. */
- cl_lock_cancel(env, sublock);
default:
lov_sublock_release(env, lck, i, 1, 0);
break;
struct lov_lock *lck = cl2lov_lock(slice);
struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
enum cl_lock_state minstate;
+ int reenqueued;
int result;
int i;
ENTRY;
- for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
+again:
+ for (result = 0, minstate = CLS_FREEING, i = 0, reenqueued = 0;
+ i < lck->lls_nr; ++i) {
int rc;
struct lovsub_lock *sub;
struct cl_lock *sublock;
minstate = min(minstate, sublock->cll_state);
lov_sublock_unlock(env, sub, closure, subenv);
}
+ if (rc == CLO_REENQUEUED) {
+ reenqueued++;
+ rc = 0;
+ }
result = lov_subresult(result, rc);
if (result != 0)
break;
}
+ /* Each sublock only can be reenqueued once, so will not loop for
+ * ever. */
+ if (result == 0 && reenqueued != 0)
+ goto again;
cl_lock_closure_fini(closure);
RETURN(result ?: minstate >= CLS_HELD ? 0 : CLO_WAIT);
}
if (rc != 0)
rc = lov_sublock_release(env, lck,
i, 1, rc);
+ } else if (sublock->cll_state == CLS_NEW) {
+ /* Sub-lock might have been canceled, while
+ * top-lock was cached. */
+ result = -ESTALE;
+ lov_sublock_release(env, lck, i, 1, result);
}
lov_sublock_unlock(env, sub, closure, subenv);
}