X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flov%2Flov_lock.c;h=40192cecadaa4381abd1d7956da518c8fef7ba3d;hp=d0cdbed8b51fd1efb8a7e1bcbfee0db357e07cbc;hb=1364ab2c166d69bc857a729f3eff2c965db847c9;hpb=f95393b0d0a59cf3dc2f29cffc35dcc4cc9d7728 diff --git a/lustre/lov/lov_lock.c b/lustre/lov/lov_lock.c index d0cdbed..40192ce 100644 --- a/lustre/lov/lov_lock.c +++ b/lustre/lov/lov_lock.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -28,6 +26,8 @@ /* * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -267,12 +267,12 @@ static int lov_subresult(int result, int rc) int result_rank; int rc_rank; + ENTRY; + LASSERT(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT); LASSERT(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT); CLASSERT(CLO_WAIT < CLO_REPEAT); - ENTRY; - /* calculate ranks in the ordering above */ result_rank = result < 0 ? 1 + CLO_REPEAT : result; rc_rank = rc < 0 ? 1 + CLO_REPEAT : rc; @@ -321,7 +321,7 @@ static int lov_lock_sub_init(const struct lu_env *env, nr++; } LASSERT(nr > 0); - OBD_ALLOC(lck->lls_sub, nr * sizeof lck->lls_sub[0]); + OBD_ALLOC_LARGE(lck->lls_sub, nr * sizeof lck->lls_sub[0]); if (lck->lls_sub == NULL) RETURN(-ENOMEM); @@ -485,56 +485,27 @@ static void lov_lock_fini(const struct lu_env *env, * a reference on its parent. */ LASSERT(lck->lls_sub[i].sub_lock == NULL); - OBD_FREE(lck->lls_sub, lck->lls_nr * sizeof lck->lls_sub[0]); + OBD_FREE_LARGE(lck->lls_sub, + lck->lls_nr * sizeof lck->lls_sub[0]); } OBD_SLAB_FREE_PTR(lck, lov_lock_kmem); EXIT; } -/** - * - * \retval 0 if state-transition can proceed - * \retval -ve otherwise. - */ static int lov_lock_enqueue_wait(const struct lu_env *env, struct lov_lock *lck, struct cl_lock *sublock) { - struct cl_lock *lock = lck->lls_cl.cls_lock; - struct cl_lock *conflict = sublock->cll_conflict; - int result = CLO_REPEAT; + struct cl_lock *lock = lck->lls_cl.cls_lock; + int result; ENTRY; LASSERT(cl_lock_is_mutexed(lock)); - LASSERT(cl_lock_is_mutexed(sublock)); - LASSERT(sublock->cll_state == CLS_QUEUING); - LASSERT(conflict != NULL); - sublock->cll_conflict = NULL; cl_lock_mutex_put(env, lock); - cl_lock_mutex_put(env, sublock); - - LASSERT(cl_lock_nr_mutexed(env) == 0); - - cl_lock_mutex_get(env, conflict); - cl_lock_cancel(env, conflict); - cl_lock_delete(env, conflict); - while (conflict->cll_state != CLS_FREEING) { - int rc = 0; - - rc = cl_lock_state_wait(env, conflict); - if (rc == 0) - continue; - - result = lov_subresult(result, rc); - break; - } - cl_lock_mutex_put(env, conflict); - lu_ref_del(&conflict->cll_reference, "cancel-wait", sublock); - cl_lock_put(env, conflict); - + result = cl_lock_enqueue_wait(env, sublock, 0); cl_lock_mutex_get(env, lock); - RETURN(result); + RETURN(result ?: CLO_REPEAT); } /** @@ -553,7 +524,7 @@ static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck, /* first, try to enqueue a sub-lock ... */ result = cl_enqueue_try(env, sublock, io, enqflags); - if (sublock->cll_state == CLS_ENQUEUED) + if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL)) /* if it is enqueued, try to `wait' on it---maybe it's already * granted */ result = cl_wait_try(env, sublock); @@ -562,8 +533,8 @@ static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck, * parallel, otherwise---enqueue has to wait until sub-lock is granted * before proceeding to the next one. */ - if (result == CLO_WAIT && sublock->cll_state <= CLS_HELD && - enqflags & CEF_ASYNC && !last) + if ((result == CLO_WAIT) && (sublock->cll_state <= CLS_HELD) && + (enqflags & CEF_ASYNC) && (!last || (enqflags & CEF_AGL))) result = 0; RETURN(result); } @@ -726,7 +697,8 @@ static int lov_lock_unuse(const struct lu_env *env, rc = lov_sublock_lock(env, lck, lls, closure, &subenv); if (rc == 0) { if (lls->sub_flags & LSF_HELD) { - LASSERT(sublock->cll_state == CLS_HELD); + LASSERT(sublock->cll_state == CLS_HELD || + sublock->cll_state == CLS_ENQUEUED); rc = cl_unuse_try(subenv->lse_env, sublock); rc = lov_sublock_release(env, lck, i, 0, rc); } @@ -779,17 +751,9 @@ static void lov_lock_cancel(const struct lu_env *env, switch(sublock->cll_state) { case CLS_HELD: - rc = cl_unuse_try(subenv->lse_env, - sublock); + rc = cl_unuse_try(subenv->lse_env, sublock); lov_sublock_release(env, lck, i, 0, 0); break; - case CLS_ENQUEUED: - /* TODO: it's not a good idea to cancel this - * lock because it's innocent. But it's - * acceptable. The better way would be to - * define a new lock method to unhold the - * dlm lock. */ - cl_lock_cancel(env, sublock); default: lov_sublock_release(env, lck, i, 1, 0); break; @@ -818,12 +782,15 @@ static int lov_lock_wait(const struct lu_env *env, struct lov_lock *lck = cl2lov_lock(slice); struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock); enum cl_lock_state minstate; + int reenqueued; int result; int i; ENTRY; - for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) { +again: + for (result = 0, minstate = CLS_FREEING, i = 0, reenqueued = 0; + i < lck->lls_nr; ++i) { int rc; struct lovsub_lock *sub; struct cl_lock *sublock; @@ -843,10 +810,18 @@ static int lov_lock_wait(const struct lu_env *env, minstate = min(minstate, sublock->cll_state); lov_sublock_unlock(env, sub, closure, subenv); } + if (rc == CLO_REENQUEUED) { + reenqueued++; + rc = 0; + } result = lov_subresult(result, rc); if (result != 0) break; } + /* Each sublock only can be reenqueued once, so will not loop for + * ever. */ + if (result == 0 && reenqueued != 0) + goto again; cl_lock_closure_fini(closure); RETURN(result ?: minstate >= CLS_HELD ? 0 : CLO_WAIT); } @@ -892,6 +867,11 @@ static int lov_lock_use(const struct lu_env *env, if (rc != 0) rc = lov_sublock_release(env, lck, i, 1, rc); + } else if (sublock->cll_state == CLS_NEW) { + /* Sub-lock might have been canceled, while + * top-lock was cached. */ + result = -ESTALE; + lov_sublock_release(env, lck, i, 1, result); } lov_sublock_unlock(env, sub, closure, subenv); } @@ -1050,7 +1030,7 @@ static int lov_lock_fits_into(const struct lu_env *env, * match against original lock extent. */ result = cl_lock_ext_match(&lov->lls_orig, need); - CDEBUG(D_DLMTRACE, DDESCR"/"DDESCR" %i %i/%i: %i\n", + CDEBUG(D_DLMTRACE, DDESCR"/"DDESCR" %d %d/%d: %d\n", PDESCR(&lov->lls_orig), PDESCR(&lov->lls_sub[0].sub_got), lov->lls_sub[0].sub_stripe, lov->lls_nr, lov_r0(obj)->lo_nr, result); @@ -1114,44 +1094,40 @@ static void lov_lock_delete(const struct lu_env *env, { struct lov_lock *lck = cl2lov_lock(slice); struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock); - int i; + struct lov_lock_link *link; + int rc; + int i; LASSERT(slice->cls_lock->cll_state == CLS_FREEING); ENTRY; for (i = 0; i < lck->lls_nr; ++i) { - struct lov_lock_sub *lls; - struct lovsub_lock *lsl; - struct cl_lock *sublock; - int rc; + struct lov_lock_sub *lls = &lck->lls_sub[i]; + struct lovsub_lock *lsl = lls->sub_lock; - lls = &lck->lls_sub[i]; - lsl = lls->sub_lock; - if (lsl == NULL) + if (lsl == NULL) /* already removed */ continue; - sublock = lsl->lss_cl.cls_lock; rc = lov_sublock_lock(env, lck, lls, closure, NULL); - if (rc == 0) { - if (lls->sub_flags & LSF_HELD) - lov_sublock_release(env, lck, i, 1, 0); - if (sublock->cll_state < CLS_FREEING) { - struct lov_lock_link *link; - - link = lov_lock_link_find(env, lck, lsl); - LASSERT(link != NULL); - lov_lock_unlink(env, link, lsl); - LASSERT(lck->lls_sub[i].sub_lock == NULL); - } - lov_sublock_unlock(env, lsl, closure, NULL); - } else if (rc == CLO_REPEAT) { - --i; /* repeat with this lock */ - } else { - CL_LOCK_DEBUG(D_ERROR, env, sublock, - "Cannot get sub-lock for delete: %i\n", - rc); + if (rc == CLO_REPEAT) { + --i; + continue; } + + LASSERT(rc == 0); + LASSERT(lsl->lss_cl.cls_lock->cll_state < CLS_FREEING); + + if (lls->sub_flags & LSF_HELD) + lov_sublock_release(env, lck, i, 1, 0); + + link = lov_lock_link_find(env, lck, lsl); + LASSERT(link != NULL); + lov_lock_unlink(env, link, lsl); + LASSERT(lck->lls_sub[i].sub_lock == NULL); + + lov_sublock_unlock(env, lsl, closure, NULL); } + cl_lock_closure_fini(closure); EXIT; }