X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flov%2Flov_lock.c;h=1d1d980e051e03b95431bbec20ee47b98e5e8c1b;hb=04b27bfadf72e46962e26991ee85c44596709f4f;hp=794354d354a0883c3787788f003cb37aa7098fd8;hpb=1b2547843817b4b7adbeb87ea9b070d9cac35c90;p=fs%2Flustre-release.git diff --git a/lustre/lov/lov_lock.c b/lustre/lov/lov_lock.c index 794354d..1d1d980 100644 --- a/lustre/lov/lov_lock.c +++ b/lustre/lov/lov_lock.c @@ -122,7 +122,7 @@ static void lov_sublock_adopt(const struct lu_env *env, struct lov_lock *lck, lck->lls_sub[idx].sub_lock = lsl; lck->lls_nr_filled++; LASSERT(lck->lls_nr_filled <= lck->lls_nr); - cfs_list_add_tail(&link->lll_list, &lsl->lss_parents); + list_add_tail(&link->lll_list, &lsl->lss_parents); link->lll_idx = idx; link->lll_super = lck; cl_lock_get(parent); @@ -136,50 +136,50 @@ static void lov_sublock_adopt(const struct lu_env *env, struct lov_lock *lck, } static struct cl_lock *lov_sublock_alloc(const struct lu_env *env, - const struct cl_io *io, - struct lov_lock *lck, - int idx, struct lov_lock_link **out) + const struct cl_io *io, + struct lov_lock *lck, + int idx, struct lov_lock_link **out) { - struct cl_lock *sublock; - struct cl_lock *parent; - struct lov_lock_link *link; - - LASSERT(idx < lck->lls_nr); - ENTRY; + struct cl_lock *sublock; + struct cl_lock *parent; + struct lov_lock_link *link; - OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, __GFP_IO); - if (link != NULL) { - struct lov_sublock_env *subenv; - struct lov_lock_sub *lls; - struct cl_lock_descr *descr; - - parent = lck->lls_cl.cls_lock; - lls = &lck->lls_sub[idx]; - descr = &lls->sub_got; - - subenv = lov_sublock_env_get(env, parent, lls); - if (!IS_ERR(subenv)) { - /* CAVEAT: Don't try to add a field in lov_lock_sub - * to remember the subio. This is because lock is able - * to be cached, but this is not true for IO. This - * further means a sublock might be referenced in - * different io context. -jay */ - - sublock = cl_lock_hold(subenv->lse_env, subenv->lse_io, - descr, "lov-parent", parent); - lov_sublock_env_put(subenv); - } else { - /* error occurs. */ - sublock = (void*)subenv; - } + LASSERT(idx < lck->lls_nr); + ENTRY; - if (!IS_ERR(sublock)) - *out = link; - else - OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem); - } else - sublock = ERR_PTR(-ENOMEM); - RETURN(sublock); + OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, GFP_NOFS); + if (link != NULL) { + struct lov_sublock_env *subenv; + struct lov_lock_sub *lls; + struct cl_lock_descr *descr; + + parent = lck->lls_cl.cls_lock; + lls = &lck->lls_sub[idx]; + descr = &lls->sub_got; + + subenv = lov_sublock_env_get(env, parent, lls); + if (!IS_ERR(subenv)) { + /* CAVEAT: Don't try to add a field in lov_lock_sub + * to remember the subio. This is because lock is able + * to be cached, but this is not true for IO. This + * further means a sublock might be referenced in + * different io context. -jay */ + + sublock = cl_lock_hold(subenv->lse_env, subenv->lse_io, + descr, "lov-parent", parent); + lov_sublock_env_put(subenv); + } else { + /* error occurs. */ + sublock = (void *)subenv; + } + + if (!IS_ERR(sublock)) + *out = link; + else + OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem); + } else + sublock = ERR_PTR(-ENOMEM); + RETURN(sublock); } static void lov_sublock_unlock(const struct lu_env *env, @@ -205,7 +205,7 @@ static int lov_sublock_lock(const struct lu_env *env, int result = 0; ENTRY; - LASSERT(cfs_list_empty(&closure->clc_list)); + LASSERT(list_empty(&closure->clc_list)); sublock = lls->sub_lock; child = sublock->lss_cl.cls_lock; @@ -270,7 +270,7 @@ static int lov_subresult(int result, int rc) ENTRY; LASSERTF(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT, - "result = %d", result); + "result = %d\n", result); LASSERTF(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT, "rc = %d\n", rc); CLASSERT(CLO_WAIT < CLO_REPEAT); @@ -318,8 +318,9 @@ static int lov_lock_sub_init(const struct lu_env *env, * XXX for wide striping smarter algorithm is desirable, * breaking out of the loop, early. */ - if (lov_stripe_intersects(loo->lo_lsm, i, - file_start, file_end, &start, &end)) + if (likely(r0->lo_sub[i] != NULL) && + lov_stripe_intersects(loo->lo_lsm, i, + file_start, file_end, &start, &end)) nr++; } LASSERT(nr > 0); @@ -336,8 +337,9 @@ static int lov_lock_sub_init(const struct lu_env *env, * top-lock. */ for (i = 0, nr = 0; i < r0->lo_nr; ++i) { - if (lov_stripe_intersects(loo->lo_lsm, i, - file_start, file_end, &start, &end)) { + if (likely(r0->lo_sub[i] != NULL) && + lov_stripe_intersects(loo->lo_lsm, i, + file_start, file_end, &start, &end)) { struct cl_lock_descr *descr; descr = &lck->lls_sub[nr].sub_descr; @@ -356,41 +358,7 @@ static int lov_lock_sub_init(const struct lu_env *env, } } LASSERT(nr == lck->lls_nr); - /* - * Then, create sub-locks. Once at least one sub-lock was created, - * top-lock can be reached by other threads. - */ - for (i = 0; i < lck->lls_nr; ++i) { - struct cl_lock *sublock; - struct lov_lock_link *link; - if (lck->lls_sub[i].sub_lock == NULL) { - sublock = lov_sublock_alloc(env, io, lck, i, &link); - if (IS_ERR(sublock)) { - result = PTR_ERR(sublock); - break; - } - cl_lock_get_trust(sublock); - cl_lock_mutex_get(env, sublock); - cl_lock_mutex_get(env, parent); - /* - * recheck under mutex that sub-lock wasn't created - * concurrently, and that top-lock is still alive. - */ - if (lck->lls_sub[i].sub_lock == NULL && - parent->cll_state < CLS_FREEING) { - lov_sublock_adopt(env, lck, sublock, i, link); - cl_lock_mutex_put(env, parent); - } else { - OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem); - cl_lock_mutex_put(env, parent); - cl_lock_unhold(env, sublock, - "lov-parent", parent); - } - cl_lock_mutex_put(env, sublock); - cl_lock_put(env, sublock); - } - } /* * Some sub-locks can be missing at this point. This is not a problem, * because enqueue will create them anyway. Main duty of this function @@ -550,7 +518,7 @@ static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck, static int lov_sublock_fill(const struct lu_env *env, struct cl_lock *parent, struct cl_io *io, struct lov_lock *lck, int idx) { - struct lov_lock_link *link; + struct lov_lock_link *link = NULL; struct cl_lock *sublock; int result; @@ -709,18 +677,27 @@ static int lov_lock_unuse(const struct lu_env *env, if (sub == NULL) continue; - sublock = sub->lss_cl.cls_lock; - rc = lov_sublock_lock(env, lck, lls, closure, &subenv); - if (rc == 0) { - if (lls->sub_flags & LSF_HELD) { - LASSERT(sublock->cll_state == CLS_HELD || - sublock->cll_state == CLS_ENQUEUED); - rc = cl_unuse_try(subenv->lse_env, sublock); - rc = lov_sublock_release(env, lck, i, 0, rc); - } - lov_sublock_unlock(env, sub, closure, subenv); - } - result = lov_subresult(result, rc); + sublock = sub->lss_cl.cls_lock; + rc = lov_sublock_lock(env, lck, lls, closure, &subenv); + if (rc == 0) { + if (!(lls->sub_flags & LSF_HELD)) { + lov_sublock_unlock(env, sub, closure, subenv); + continue; + } + + switch(sublock->cll_state) { + case CLS_HELD: + rc = cl_unuse_try(subenv->lse_env, sublock); + lov_sublock_release(env, lck, i, 0, 0); + break; + default: + cl_lock_cancel(subenv->lse_env, sublock); + lov_sublock_release(env, lck, i, 1, 0); + break; + } + lov_sublock_unlock(env, sub, closure, subenv); + } + result = lov_subresult(result, rc); } if (result == 0 && lck->lls_cancel_race) { @@ -771,6 +748,7 @@ static void lov_lock_cancel(const struct lu_env *env, lov_sublock_release(env, lck, i, 0, 0); break; default: + cl_lock_cancel(subenv->lse_env, sublock); lov_sublock_release(env, lck, i, 1, 0); break; } @@ -974,10 +952,25 @@ static int lov_lock_stripe_is_matching(const struct lu_env *env, */ start = cl_offset(&lov->lo_cl, descr->cld_start); end = cl_offset(&lov->lo_cl, descr->cld_end + 1) - 1; - result = end - start <= lsm->lsm_stripe_size && - stripe == lov_stripe_number(lsm, start) && - stripe == lov_stripe_number(lsm, end); - if (result) { + + result = 0; + /* glimpse should work on the object with LOV EA hole. */ + if ((end - start <= lsm->lsm_stripe_size) || + (descr->cld_end == CL_PAGE_EOF && + unlikely(lov->lo_lsm->lsm_pattern & LOV_PATTERN_F_HOLE))) { + int idx; + + idx = lov_stripe_number(lsm, start); + if (idx == stripe || + unlikely(lov_r0(lov)->lo_sub[idx] == NULL)) { + idx = lov_stripe_number(lsm, end); + if (idx == stripe || + unlikely(lov_r0(lov)->lo_sub[idx] == NULL)) + result = 1; + } + } + + if (result != 0) { struct cl_lock_descr *subd = &lov_env_info(env)->lti_ldescr; obd_off sub_start; obd_off sub_end; @@ -1023,6 +1016,9 @@ static int lov_lock_fits_into(const struct lu_env *env, if (need->cld_enq_flags != lov->lls_orig.cld_enq_flags) return 0; + if (lov->lls_ever_canceled) + return 0; + if (need->cld_mode == CLM_GROUP) /* * always allow to match group lock. @@ -1068,7 +1064,7 @@ void lov_lock_unlink(const struct lu_env *env, LASSERT(cl_lock_is_mutexed(sub->lss_cl.cls_lock)); ENTRY; - cfs_list_del_init(&link->lll_list); + list_del_init(&link->lll_list); LASSERT(lck->lls_sub[link->lll_idx].sub_lock == sub); /* yank this sub-lock from parent's array */ lck->lls_sub[link->lll_idx].sub_lock = NULL; @@ -1089,7 +1085,7 @@ struct lov_lock_link *lov_lock_link_find(const struct lu_env *env, LASSERT(cl_lock_is_mutexed(sub->lss_cl.cls_lock)); ENTRY; - cfs_list_for_each_entry(scan, &sub->lss_parents, lll_list) { + list_for_each_entry(scan, &sub->lss_parents, lll_list) { if (scan->lll_super == lck) RETURN(scan); } @@ -1187,19 +1183,19 @@ static const struct cl_lock_operations lov_lock_ops = { }; int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj, - struct cl_lock *lock, const struct cl_io *io) + struct cl_lock *lock, const struct cl_io *io) { - struct lov_lock *lck; - int result; + struct lov_lock *lck; + int result; - ENTRY; - OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, __GFP_IO); - if (lck != NULL) { - cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops); - result = lov_lock_sub_init(env, lck, io); - } else - result = -ENOMEM; - RETURN(result); + ENTRY; + OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, GFP_NOFS); + if (lck != NULL) { + cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops); + result = lov_lock_sub_init(env, lck, io); + } else + result = -ENOMEM; + RETURN(result); } static void lov_empty_lock_fini(const struct lu_env *env, @@ -1223,13 +1219,13 @@ static const struct cl_lock_operations lov_empty_lock_ops = { }; int lov_lock_init_empty(const struct lu_env *env, struct cl_object *obj, - struct cl_lock *lock, const struct cl_io *io) + struct cl_lock *lock, const struct cl_io *io) { struct lov_lock *lck; int result = -ENOMEM; ENTRY; - OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, __GFP_IO); + OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, GFP_NOFS); if (lck != NULL) { cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_empty_lock_ops); lck->lls_orig = lock->cll_descr; @@ -1244,7 +1240,7 @@ static struct cl_lock_closure *lov_closure_get(const struct lu_env *env, struct cl_lock_closure *closure; closure = &lov_env_info(env)->lti_closure; - LASSERT(cfs_list_empty(&closure->clc_list)); + LASSERT(list_empty(&closure->clc_list)); cl_lock_closure_init(env, closure, parent, 1); return closure; }