X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_lock.c;h=0b03ea7c32b33e6b4b69de37197be41d1949acae;hp=8bd2dc7ca28ed8c576261aa4d72d4e5e05a84a08;hb=d2dbff42e78d7ebca4db534df7e1c19f6b674a22;hpb=0a859380c36ac24871f221b35042f76c56b04438 diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index 8bd2dc7..0b03ea7 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -26,7 +26,7 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved. + * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. */ /* @@ -54,6 +54,8 @@ * @{ */ +#define _PAGEREF_MAGIC (-10000000) + /***************************************************************************** * * Type conversions. @@ -153,18 +155,19 @@ static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck) if (dlmlock->l_granted_mode == dlmlock->l_req_mode) { struct cl_object *obj = olck->ols_cl.cls_obj; struct cl_attr *attr = &osc_env_info(env)->oti_attr; - __u64 old_kms = cl2osc(obj)->oo_oinfo->loi_kms; + __u64 old_kms; + cl_object_attr_lock(obj); + /* Must get the value under the lock to avoid possible races. */ + old_kms = cl2osc(obj)->oo_oinfo->loi_kms; /* Update the kms. Need to loop all granted locks. * Not a problem for the client */ attr->cat_kms = ldlm_extent_shift_kms(dlmlock, old_kms); - unlock_res_and_lock(dlmlock); - cl_object_attr_lock(obj); cl_object_attr_set(env, obj, attr, CAT_KMS); cl_object_attr_unlock(obj); - } else - unlock_res_and_lock(dlmlock); + } + unlock_res_and_lock(dlmlock); /* release a reference taken in osc_lock_upcall0(). */ LASSERT(olck->ols_has_ref); @@ -223,6 +226,8 @@ static void osc_lock_fini(const struct lu_env *env, */ osc_lock_unhold(ols); LASSERT(ols->ols_lock == NULL); + LASSERT(cfs_atomic_read(&ols->ols_pageref) == 0 || + cfs_atomic_read(&ols->ols_pageref) == _PAGEREF_MAGIC); OBD_SLAB_FREE_PTR(ols, osc_lock_kmem); } @@ -408,7 +413,7 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck, LASSERT(dlmlock->l_granted_mode == dlmlock->l_req_mode); ENTRY; - if (olck->ols_state != OLS_GRANTED) { + if (olck->ols_state < OLS_GRANTED) { lock = olck->ols_cl.cls_lock; ext = &dlmlock->l_policy_data.l_extent; descr = &osc_env_info(env)->oti_descr; @@ -506,7 +511,7 @@ static int osc_lock_upcall(void *cookie, int errcode) } else if (olck->ols_state == OLS_CANCELLED) { rc = -EIO; } else { - CERROR("Impossible state: %i\n", olck->ols_state); + CERROR("Impossible state: %d\n", olck->ols_state); LBUG(); } if (rc) { @@ -1100,12 +1105,26 @@ static int osc_lock_enqueue_wait(const struct lu_env *env, cfs_spin_unlock(&hdr->coh_lock_guard); if (conflict) { - CDEBUG(D_DLMTRACE, "lock %p is confliced with %p, will wait\n", - lock, conflict); - lu_ref_add(&conflict->cll_reference, "cancel-wait", lock); - LASSERT(lock->cll_conflict == NULL); - lock->cll_conflict = conflict; - rc = CLO_WAIT; + if (lock->cll_descr.cld_mode == CLM_GROUP) { + /* we want a group lock but a previous lock request + * conflicts, we do not wait but return 0 so the + * request is send to the server + */ + CDEBUG(D_DLMTRACE, "group lock %p is conflicted " + "with %p, no wait, send to server\n", + lock, conflict); + cl_lock_put(env, conflict); + rc = 0; + } else { + CDEBUG(D_DLMTRACE, "lock %p is conflicted with %p, " + "will wait\n", + lock, conflict); + LASSERT(lock->cll_conflict == NULL); + lu_ref_add(&conflict->cll_reference, "cancel-wait", + lock); + lock->cll_conflict = conflict; + rc = CLO_WAIT; + } } RETURN(rc); } @@ -1130,11 +1149,6 @@ static int osc_lock_enqueue(const struct lu_env *env, { struct osc_lock *ols = cl2osc_lock(slice); struct cl_lock *lock = ols->ols_cl.cls_lock; - struct osc_object *obj = cl2osc(slice->cls_obj); - struct osc_thread_info *info = osc_env_info(env); - struct ldlm_res_id *resname = &info->oti_resname; - ldlm_policy_data_t *policy = &info->oti_policy; - struct ldlm_enqueue_info *einfo = &ols->ols_einfo; int result; ENTRY; @@ -1142,18 +1156,22 @@ static int osc_lock_enqueue(const struct lu_env *env, LASSERT(lock->cll_state == CLS_QUEUING); LASSERT(ols->ols_state == OLS_NEW); - osc_lock_build_res(env, obj, resname); - osc_lock_build_policy(env, lock, policy); ols->ols_flags = osc_enq2ldlm_flags(enqflags); if (ols->ols_flags & LDLM_FL_HAS_INTENT) ols->ols_glimpse = 1; - if (!(enqflags & CEF_MUST)) + if (!osc_lock_is_lockless(ols) && !(enqflags & CEF_MUST)) /* try to convert this lock to a lockless lock */ osc_lock_to_lockless(env, ols, (enqflags & CEF_NEVER)); result = osc_lock_enqueue_wait(env, ols); if (result == 0) { if (!osc_lock_is_lockless(ols)) { + struct osc_object *obj = cl2osc(slice->cls_obj); + struct osc_thread_info *info = osc_env_info(env); + struct ldlm_res_id *resname = &info->oti_resname; + ldlm_policy_data_t *policy = &info->oti_policy; + struct ldlm_enqueue_info *einfo = &ols->ols_einfo; + if (ols->ols_locklessable) ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION; @@ -1167,6 +1185,8 @@ static int osc_lock_enqueue(const struct lu_env *env, * ldlm_lock_match(LDLM_FL_LVB_READY) waits for * LDLM_CP_CALLBACK. */ + osc_lock_build_res(env, obj, resname); + osc_lock_build_policy(env, lock, policy); result = osc_enqueue_base(osc_export(obj), resname, &ols->ols_flags, policy, &ols->ols_lvb, @@ -1497,14 +1517,6 @@ static const struct cl_lock_operations osc_lock_ops = { .clo_fits_into = osc_lock_fits_into, }; -static int osc_lock_lockless_enqueue(const struct lu_env *env, - const struct cl_lock_slice *slice, - struct cl_io *unused, __u32 enqflags) -{ - LBUG(); - return 0; -} - static int osc_lock_lockless_unuse(const struct lu_env *env, const struct cl_lock_slice *slice) { @@ -1580,7 +1592,7 @@ static int osc_lock_lockless_fits_into(const struct lu_env *env, static const struct cl_lock_operations osc_lock_lockless_ops = { .clo_fini = osc_lock_fini, - .clo_enqueue = osc_lock_lockless_enqueue, + .clo_enqueue = osc_lock_enqueue, .clo_wait = osc_lock_lockless_wait, .clo_unuse = osc_lock_lockless_unuse, .clo_state = osc_lock_lockless_state, @@ -1599,6 +1611,7 @@ int osc_lock_init(const struct lu_env *env, OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, CFS_ALLOC_IO); if (clk != NULL) { osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo); + cfs_atomic_set(&clk->ols_pageref, 0); clk->ols_state = OLS_NEW; cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops); result = 0; @@ -1607,5 +1620,26 @@ int osc_lock_init(const struct lu_env *env, return result; } +int osc_dlm_lock_pageref(struct ldlm_lock *dlm) +{ + struct osc_lock *olock; + int rc = 0; + + cfs_spin_lock(&osc_ast_guard); + olock = dlm->l_ast_data; + /* + * there's a very rare race with osc_page_addref_lock(), but that + * doesn't matter because in the worst case we don't cancel a lock + * which we actually can, that's no harm. + */ + if (olock != NULL && + cfs_atomic_add_return(_PAGEREF_MAGIC, + &olock->ols_pageref) != _PAGEREF_MAGIC) { + cfs_atomic_sub(_PAGEREF_MAGIC, &olock->ols_pageref); + rc = 1; + } + cfs_spin_unlock(&osc_ast_guard); + return rc; +} /** @} osc */