X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flov%2Flov_lock.c;h=547bdca89eb543103046706ccd7e990f4a7b4643;hb=0515ccaef79c0ce8af1fbc918803a1638f148674;hp=563b625f3c27aa7dcd3470da94338f2333a02835;hpb=6e3ec5812ebd1b5ecf7cae584f429b013ffe7431;p=fs%2Flustre-release.git diff --git a/lustre/lov/lov_lock.c b/lustre/lov/lov_lock.c index 563b625..547bdca 100644 --- a/lustre/lov/lov_lock.c +++ b/lustre/lov/lov_lock.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -26,8 +24,10 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved. + * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -147,7 +147,7 @@ static struct cl_lock *lov_sublock_alloc(const struct lu_env *env, LASSERT(idx < lck->lls_nr); ENTRY; - OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, CFS_ALLOC_IO); + OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, __GFP_IO); if (link != NULL) { struct lov_sublock_env *subenv; struct lov_lock_sub *lls; @@ -155,7 +155,7 @@ static struct cl_lock *lov_sublock_alloc(const struct lu_env *env, parent = lck->lls_cl.cls_lock; lls = &lck->lls_sub[idx]; - descr = &lls->sub_descr; + descr = &lls->sub_got; subenv = lov_sublock_env_get(env, parent, lls); if (!IS_ERR(subenv)) { @@ -267,12 +267,14 @@ static int lov_subresult(int result, int rc) int result_rank; int rc_rank; - LASSERT(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT); - LASSERT(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT); - CLASSERT(CLO_WAIT < CLO_REPEAT); - ENTRY; + LASSERTF(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT, + "result = %d", result); + LASSERTF(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT, + "rc = %d\n", rc); + CLASSERT(CLO_WAIT < CLO_REPEAT); + /* calculate ranks in the ordering above */ result_rank = result < 0 ? 1 + CLO_REPEAT : result; rc_rank = rc < 0 ? 1 + CLO_REPEAT : rc; @@ -316,12 +318,12 @@ static int lov_lock_sub_init(const struct lu_env *env, * XXX for wide striping smarter algorithm is desirable, * breaking out of the loop, early. */ - if (lov_stripe_intersects(r0->lo_lsm, i, + if (lov_stripe_intersects(loo->lo_lsm, i, file_start, file_end, &start, &end)) nr++; } LASSERT(nr > 0); - OBD_ALLOC(lck->lls_sub, nr * sizeof lck->lls_sub[0]); + OBD_ALLOC_LARGE(lck->lls_sub, nr * sizeof lck->lls_sub[0]); if (lck->lls_sub == NULL) RETURN(-ENOMEM); @@ -334,7 +336,7 @@ static int lov_lock_sub_init(const struct lu_env *env, * top-lock. */ for (i = 0, nr = 0; i < r0->lo_nr; ++i) { - if (lov_stripe_intersects(r0->lo_lsm, i, + if (lov_stripe_intersects(loo->lo_lsm, i, file_start, file_end, &start, &end)) { struct cl_lock_descr *descr; @@ -485,56 +487,27 @@ static void lov_lock_fini(const struct lu_env *env, * a reference on its parent. */ LASSERT(lck->lls_sub[i].sub_lock == NULL); - OBD_FREE(lck->lls_sub, lck->lls_nr * sizeof lck->lls_sub[0]); + OBD_FREE_LARGE(lck->lls_sub, + lck->lls_nr * sizeof lck->lls_sub[0]); } OBD_SLAB_FREE_PTR(lck, lov_lock_kmem); EXIT; } -/** - * - * \retval 0 if state-transition can proceed - * \retval -ve otherwise. - */ static int lov_lock_enqueue_wait(const struct lu_env *env, struct lov_lock *lck, struct cl_lock *sublock) { - struct cl_lock *lock = lck->lls_cl.cls_lock; - struct cl_lock *conflict = sublock->cll_conflict; - int result = CLO_REPEAT; + struct cl_lock *lock = lck->lls_cl.cls_lock; + int result; ENTRY; LASSERT(cl_lock_is_mutexed(lock)); - LASSERT(cl_lock_is_mutexed(sublock)); - LASSERT(sublock->cll_state == CLS_QUEUING); - LASSERT(conflict != NULL); - sublock->cll_conflict = NULL; cl_lock_mutex_put(env, lock); - cl_lock_mutex_put(env, sublock); - - LASSERT(cl_lock_nr_mutexed(env) == 0); - - cl_lock_mutex_get(env, conflict); - cl_lock_cancel(env, conflict); - cl_lock_delete(env, conflict); - while (conflict->cll_state != CLS_FREEING) { - int rc = 0; - - rc = cl_lock_state_wait(env, conflict); - if (rc == 0) - continue; - - result = lov_subresult(result, rc); - break; - } - cl_lock_mutex_put(env, conflict); - lu_ref_del(&conflict->cll_reference, "cancel-wait", sublock); - cl_lock_put(env, conflict); - + result = cl_lock_enqueue_wait(env, sublock, 0); cl_lock_mutex_get(env, lock); - RETURN(result); + RETURN(result ?: CLO_REPEAT); } /** @@ -553,17 +526,20 @@ static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck, /* first, try to enqueue a sub-lock ... */ result = cl_enqueue_try(env, sublock, io, enqflags); - if (sublock->cll_state == CLS_ENQUEUED) - /* if it is enqueued, try to `wait' on it---maybe it's already - * granted */ - result = cl_wait_try(env, sublock); + if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL)) { + /* if it is enqueued, try to `wait' on it---maybe it's already + * granted */ + result = cl_wait_try(env, sublock); + if (result == CLO_REENQUEUED) + result = CLO_WAIT; + } /* * If CEF_ASYNC flag is set, then all sub-locks can be enqueued in * parallel, otherwise---enqueue has to wait until sub-lock is granted * before proceeding to the next one. */ - if (result == CLO_WAIT && sublock->cll_state <= CLS_HELD && - enqflags & CEF_ASYNC && !last) + if ((result == CLO_WAIT) && (sublock->cll_state <= CLS_HELD) && + (enqflags & CEF_ASYNC) && (!last || (enqflags & CEF_AGL))) result = 0; RETURN(result); } @@ -676,8 +652,19 @@ static int lov_lock_enqueue(const struct lu_env *env, sublock); break; case CLS_CACHED: + cl_lock_get(sublock); + /* take recursive mutex of sublock */ + cl_lock_mutex_get(env, sublock); + /* need to release all locks in closure + * otherwise it may deadlock. LU-2683.*/ + lov_sublock_unlock(env, sub, closure, + subenv); + /* sublock and parent are held. */ rc = lov_sublock_release(env, lck, i, 1, rc); + cl_lock_mutex_put(env, sublock); + cl_lock_put(env, sublock); + break; default: lov_sublock_unlock(env, sub, closure, subenv); @@ -726,7 +713,8 @@ static int lov_lock_unuse(const struct lu_env *env, rc = lov_sublock_lock(env, lck, lls, closure, &subenv); if (rc == 0) { if (lls->sub_flags & LSF_HELD) { - LASSERT(sublock->cll_state == CLS_HELD); + LASSERT(sublock->cll_state == CLS_HELD || + sublock->cll_state == CLS_ENQUEUED); rc = cl_unuse_try(subenv->lse_env, sublock); rc = lov_sublock_release(env, lck, i, 0, rc); } @@ -779,17 +767,9 @@ static void lov_lock_cancel(const struct lu_env *env, switch(sublock->cll_state) { case CLS_HELD: - rc = cl_unuse_try(subenv->lse_env, - sublock); + rc = cl_unuse_try(subenv->lse_env, sublock); lov_sublock_release(env, lck, i, 0, 0); break; - case CLS_ENQUEUED: - /* TODO: it's not a good idea to cancel this - * lock because it's innocent. But it's - * acceptable. The better way would be to - * define a new lock method to unhold the - * dlm lock. */ - cl_lock_cancel(env, sublock); default: lov_sublock_release(env, lck, i, 1, 0); break; @@ -818,12 +798,15 @@ static int lov_lock_wait(const struct lu_env *env, struct lov_lock *lck = cl2lov_lock(slice); struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock); enum cl_lock_state minstate; + int reenqueued; int result; int i; ENTRY; - for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) { +again: + for (result = 0, minstate = CLS_FREEING, i = 0, reenqueued = 0; + i < lck->lls_nr; ++i) { int rc; struct lovsub_lock *sub; struct cl_lock *sublock; @@ -843,10 +826,18 @@ static int lov_lock_wait(const struct lu_env *env, minstate = min(minstate, sublock->cll_state); lov_sublock_unlock(env, sub, closure, subenv); } + if (rc == CLO_REENQUEUED) { + reenqueued++; + rc = 0; + } result = lov_subresult(result, rc); if (result != 0) break; } + /* Each sublock only can be reenqueued once, so will not loop for + * ever. */ + if (result == 0 && reenqueued != 0) + goto again; cl_lock_closure_fini(closure); RETURN(result ?: minstate >= CLS_HELD ? 0 : CLO_WAIT); } @@ -892,6 +883,11 @@ static int lov_lock_use(const struct lu_env *env, if (rc != 0) rc = lov_sublock_release(env, lck, i, 1, rc); + } else if (sublock->cll_state == CLS_NEW) { + /* Sub-lock might have been canceled, while + * top-lock was cached. */ + result = -ESTALE; + lov_sublock_release(env, lck, i, 1, result); } lov_sublock_unlock(env, sub, closure, subenv); } @@ -939,7 +935,7 @@ static int lock_lock_multi_match() if (sub->sub_lock == NULL) continue; subobj = sub->sub_descr.cld_obj; - if (!lov_stripe_intersects(r0->lo_lsm, sub->sub_stripe, + if (!lov_stripe_intersects(loo->lo_lsm, sub->sub_stripe, fstart, fend, &start, &end)) continue; subneed->cld_start = cl_index(subobj, start); @@ -963,7 +959,7 @@ static int lov_lock_stripe_is_matching(const struct lu_env *env, const struct cl_lock_descr *child, const struct cl_lock_descr *descr) { - struct lov_stripe_md *lsm = lov_r0(lov)->lo_lsm; + struct lov_stripe_md *lsm = lov->lo_lsm; obd_off start; obd_off end; int result; @@ -1022,6 +1018,14 @@ static int lov_lock_fits_into(const struct lu_env *env, ENTRY; + /* for top lock, it's necessary to match enq flags otherwise it will + * run into problem if a sublock is missing and reenqueue. */ + if (need->cld_enq_flags != lov->lls_orig.cld_enq_flags) + return 0; + + if (lov->lls_ever_canceled) + return 0; + if (need->cld_mode == CLM_GROUP) /* * always allow to match group lock. @@ -1033,7 +1037,7 @@ static int lov_lock_fits_into(const struct lu_env *env, cl2lov(slice->cls_obj), lov->lls_sub[0].sub_stripe, got, need); - } else if (io->ci_type != CIT_TRUNC && io->ci_type != CIT_MISC && + } else if (io->ci_type != CIT_SETATTR && io->ci_type != CIT_MISC && !cl_io_is_append(io) && need->cld_mode != CLM_PHANTOM) /* * Multi-stripe locks are only suitable for `quick' IO and for @@ -1050,7 +1054,7 @@ static int lov_lock_fits_into(const struct lu_env *env, * match against original lock extent. */ result = cl_lock_ext_match(&lov->lls_orig, need); - CDEBUG(D_DLMTRACE, DDESCR"/"DDESCR" %i %i/%i: %i\n", + CDEBUG(D_DLMTRACE, DDESCR"/"DDESCR" %d %d/%d: %d\n", PDESCR(&lov->lls_orig), PDESCR(&lov->lls_sub[0].sub_got), lov->lls_sub[0].sub_stripe, lov->lls_nr, lov_r0(obj)->lo_nr, result); @@ -1114,44 +1118,40 @@ static void lov_lock_delete(const struct lu_env *env, { struct lov_lock *lck = cl2lov_lock(slice); struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock); - int i; + struct lov_lock_link *link; + int rc; + int i; LASSERT(slice->cls_lock->cll_state == CLS_FREEING); ENTRY; for (i = 0; i < lck->lls_nr; ++i) { - struct lov_lock_sub *lls; - struct lovsub_lock *lsl; - struct cl_lock *sublock; - int rc; + struct lov_lock_sub *lls = &lck->lls_sub[i]; + struct lovsub_lock *lsl = lls->sub_lock; - lls = &lck->lls_sub[i]; - lsl = lls->sub_lock; - if (lsl == NULL) + if (lsl == NULL) /* already removed */ continue; - sublock = lsl->lss_cl.cls_lock; rc = lov_sublock_lock(env, lck, lls, closure, NULL); - if (rc == 0) { - if (lls->sub_flags & LSF_HELD) - lov_sublock_release(env, lck, i, 1, 0); - if (sublock->cll_state < CLS_FREEING) { - struct lov_lock_link *link; - - link = lov_lock_link_find(env, lck, lsl); - LASSERT(link != NULL); - lov_lock_unlink(env, link, lsl); - LASSERT(lck->lls_sub[i].sub_lock == NULL); - } - lov_sublock_unlock(env, lsl, closure, NULL); - } else if (rc == CLO_REPEAT) { - --i; /* repeat with this lock */ - } else { - CL_LOCK_DEBUG(D_ERROR, env, sublock, - "Cannot get sub-lock for delete: %i\n", - rc); + if (rc == CLO_REPEAT) { + --i; + continue; } + + LASSERT(rc == 0); + LASSERT(lsl->lss_cl.cls_lock->cll_state < CLS_FREEING); + + if (lls->sub_flags & LSF_HELD) + lov_sublock_release(env, lck, i, 1, 0); + + link = lov_lock_link_find(env, lck, lsl); + LASSERT(link != NULL); + lov_lock_unlink(env, link, lsl); + LASSERT(lck->lls_sub[i].sub_lock == NULL); + + lov_sublock_unlock(env, lsl, closure, NULL); } + cl_lock_closure_fini(closure); EXIT; } @@ -1196,7 +1196,7 @@ int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj, int result; ENTRY; - OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, CFS_ALLOC_IO); + OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, __GFP_IO); if (lck != NULL) { cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops); result = lov_lock_sub_init(env, lck, io); @@ -1205,6 +1205,42 @@ int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj, RETURN(result); } +static void lov_empty_lock_fini(const struct lu_env *env, + struct cl_lock_slice *slice) +{ + struct lov_lock *lck = cl2lov_lock(slice); + OBD_SLAB_FREE_PTR(lck, lov_lock_kmem); +} + +static int lov_empty_lock_print(const struct lu_env *env, void *cookie, + lu_printer_t p, const struct cl_lock_slice *slice) +{ + (*p)(env, cookie, "empty\n"); + return 0; +} + +/* XXX: more methods will be added later. */ +static const struct cl_lock_operations lov_empty_lock_ops = { + .clo_fini = lov_empty_lock_fini, + .clo_print = lov_empty_lock_print +}; + +int lov_lock_init_empty(const struct lu_env *env, struct cl_object *obj, + struct cl_lock *lock, const struct cl_io *io) +{ + struct lov_lock *lck; + int result = -ENOMEM; + + ENTRY; + OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, __GFP_IO); + if (lck != NULL) { + cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_empty_lock_ops); + lck->lls_orig = lock->cll_descr; + result = 0; + } + RETURN(result); +} + static struct cl_lock_closure *lov_closure_get(const struct lu_env *env, struct cl_lock *parent) {