X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_lock.c;h=f3ed62458e810406690c8731a56ccea5f6e75ed9;hp=6d15dad1cdf4247aa7417c871d29ead2e82865e2;hb=36eca1017fe4643638d5f8bde646472fe8abb933;hpb=74dfa6c3f6111750c773e2484b65302026af6a53;ds=sidebyside diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index 6d15dad..f3ed624 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -27,7 +23,7 @@ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2014, Intel Corporation. + * Copyright (c) 2011, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -41,32 +37,16 @@ #define DEBUG_SUBSYSTEM S_OSC -#include /* fid_build_reg_res_name() */ #include +#include -#include "osc_cl_internal.h" +#include "osc_internal.h" /** \addtogroup osc * @{ */ -/***************************************************************************** - * - * Type conversions. - * - */ - -static const struct cl_lock_operations osc_lock_ops; -static const struct cl_lock_operations osc_lock_lockless_ops; -static void osc_lock_to_lockless(const struct lu_env *env, - struct osc_lock *ols, int force); - -int osc_lock_is_lockless(const struct osc_lock *olck) -{ - return (olck->ols_cl.cls_ops == &osc_lock_lockless_ops); -} - /** * Returns a weak pointer to the ldlm lock identified by a handle. Returned * pointer cannot be dereferenced, as lock is not protected from concurrent @@ -126,7 +106,7 @@ static int osc_lock_invariant(struct osc_lock *ols) if (! ergo(ols->ols_state == OLS_GRANTED, olock != NULL && - olock->l_req_mode == olock->l_granted_mode && + ldlm_is_granted(olock) && ols->ols_hold)) return 0; return 1; @@ -138,8 +118,7 @@ static int osc_lock_invariant(struct osc_lock *ols) * */ -static void osc_lock_fini(const struct lu_env *env, - struct cl_lock_slice *slice) +void osc_lock_fini(const struct lu_env *env, struct cl_lock_slice *slice) { struct osc_lock *ols = cl2osc_lock(slice); @@ -148,32 +127,16 @@ static void osc_lock_fini(const struct lu_env *env, OBD_SLAB_FREE_PTR(ols, osc_lock_kmem); } +EXPORT_SYMBOL(osc_lock_fini); static void osc_lock_build_policy(const struct lu_env *env, - const struct cl_lock *lock, - ldlm_policy_data_t *policy) + const struct cl_lock *lock, + union ldlm_policy_data *policy) { - const struct cl_lock_descr *d = &lock->cll_descr; + const struct cl_lock_descr *d = &lock->cll_descr; - osc_index2policy(policy, d->cld_obj, d->cld_start, d->cld_end); - policy->l_extent.gid = d->cld_gid; -} - -static __u64 osc_enq2ldlm_flags(__u32 enqflags) -{ - __u64 result = 0; - - LASSERT((enqflags & ~CEF_MASK) == 0); - - if (enqflags & CEF_NONBLOCK) - result |= LDLM_FL_BLOCK_NOWAIT; - if (enqflags & CEF_ASYNC) - result |= LDLM_FL_HAS_INTENT; - if (enqflags & CEF_DISCARD_DATA) - result |= LDLM_FL_AST_DISCARD_DATA; - if (enqflags & CEF_PEEK) - result |= LDLM_FL_TEST_LOCK; - return result; + osc_index2policy(policy, d->cld_obj, d->cld_start, d->cld_end); + policy->l_extent.gid = d->cld_gid; } /** @@ -181,20 +144,17 @@ static __u64 osc_enq2ldlm_flags(__u32 enqflags) * with the DLM lock reply from the server. Copy of osc_update_enqueue() * logic. * - * This can be optimized to not update attributes when lock is a result of a - * local match. - * * Called under lock and resource spin-locks. */ -static void osc_lock_lvb_update(const struct lu_env *env, - struct osc_object *osc, - struct ldlm_lock *dlmlock, - struct ost_lvb *lvb) +void osc_lock_lvb_update(const struct lu_env *env, + struct osc_object *osc, + struct ldlm_lock *dlmlock, + struct ost_lvb *lvb) { - struct cl_object *obj = osc2cl(osc); - struct lov_oinfo *oinfo = osc->oo_oinfo; - struct cl_attr *attr = &osc_env_info(env)->oti_attr; - unsigned valid; + struct cl_object *obj = osc2cl(osc); + struct lov_oinfo *oinfo = osc->oo_oinfo; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + unsigned valid, setkms = 0; ENTRY; @@ -219,28 +179,33 @@ static void osc_lock_lvb_update(const struct lu_env *env, if (size > dlmlock->l_policy_data.l_extent.end) size = dlmlock->l_policy_data.l_extent.end + 1; if (size >= oinfo->loi_kms) { - LDLM_DEBUG(dlmlock, "lock acquired, setting rss="LPU64 - ", kms="LPU64, lvb->lvb_size, size); valid |= CAT_KMS; attr->cat_kms = size; - } else { - LDLM_DEBUG(dlmlock, "lock acquired, setting rss=" - LPU64"; leaving kms="LPU64", end="LPU64, - lvb->lvb_size, oinfo->loi_kms, - dlmlock->l_policy_data.l_extent.end); + setkms = 1; } ldlm_lock_allow_match_locked(dlmlock); } - cl_object_attr_set(env, obj, attr, valid); + /* The size should not be less than the kms */ + if (attr->cat_size < oinfo->loi_kms) + attr->cat_size = oinfo->loi_kms; + + LDLM_DEBUG(dlmlock, "acquired size %llu, setting rss=%llu;%s " + "kms=%llu, end=%llu", lvb->lvb_size, attr->cat_size, + setkms ? "" : " leaving", + setkms ? attr->cat_kms : oinfo->loi_kms, + dlmlock ? dlmlock->l_policy_data.l_extent.end : -1ull); + + cl_object_attr_update(env, obj, attr, valid); cl_object_attr_unlock(obj); EXIT; } static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, - struct lustre_handle *lockh, bool lvb_update) + struct lustre_handle *lockh) { + struct osc_object *osc = cl2osc(oscl->ols_cl.cls_obj); struct ldlm_lock *dlmlock; dlmlock = ldlm_handle2lock_long(lockh, 0); @@ -249,7 +214,7 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, /* lock reference taken by ldlm_handle2lock_long() is * owned by osc_lock and released in osc_lock_detach() */ - lu_ref_add(&dlmlock->l_reference, "osc_lock", oscl); + lu_ref_add_atomic(&dlmlock->l_reference, "osc_lock", oscl); oscl->ols_has_ref = 1; LASSERT(oscl->ols_dlmlock == NULL); @@ -267,7 +232,7 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, /* Lock must have been granted. */ lock_res_and_lock(dlmlock); - if (dlmlock->l_granted_mode == dlmlock->l_req_mode) { + if (ldlm_is_granted(dlmlock)) { struct ldlm_extent *ext = &dlmlock->l_policy_data.l_extent; struct cl_lock_descr *descr = &oscl->ols_cl.cls_lock->cll_descr; @@ -279,10 +244,11 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, descr->cld_gid = ext->gid; /* no lvb update for matched lock */ - if (lvb_update) { + if (!ldlm_is_lvb_cached(dlmlock)) { LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY); - osc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj), - dlmlock, NULL); + LASSERT(osc == dlmlock->l_ast_data); + osc_lock_lvb_update(env, osc, dlmlock, NULL); + ldlm_set_lvb_cached(dlmlock); } LINVRNT(osc_lock_invariant(oscl)); } @@ -303,12 +269,11 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh, struct osc_lock *oscl = cookie; struct cl_lock_slice *slice = &oscl->ols_cl; struct lu_env *env; - struct cl_env_nest nest; int rc; ENTRY; - env = cl_env_nested_get(&nest); + env = cl_env_percpu_get(); /* should never happen, similar to osc_ldlm_blocking_ast(). */ LASSERT(!IS_ERR(env)); @@ -323,7 +288,7 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh, } if (rc == 0) - osc_lock_granted(env, oscl, lockh, errcode == ELDLM_OK); + osc_lock_granted(env, oscl, lockh); /* Error handling, some errors are tolerable. */ if (oscl->ols_locklessable && rc == -EUSERS) { @@ -331,7 +296,7 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh, * lockless lock. */ osc_object_set_contended(cl2osc(slice->cls_obj)); - LASSERT(slice->cls_ops == &osc_lock_ops); + LASSERT(slice->cls_ops != oscl->ols_lockless_ops); /* Change this lock to ldlmlock-less lock. */ osc_lock_to_lockless(env, oscl, 1); @@ -343,25 +308,28 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh, NULL, &oscl->ols_lvb); /* Hide the error. */ rc = 0; + } else if (rc < 0 && oscl->ols_flags & LDLM_FL_NDELAY) { + rc = -EWOULDBLOCK; } if (oscl->ols_owner != NULL) cl_sync_io_note(env, oscl->ols_owner, rc); - cl_env_nested_put(&nest, env); + cl_env_percpu_put(env); RETURN(rc); } -static int osc_lock_upcall_agl(void *cookie, struct lustre_handle *lockh, - int errcode) +static int osc_lock_upcall_speculative(void *cookie, + struct lustre_handle *lockh, + int errcode) { struct osc_object *osc = cookie; struct ldlm_lock *dlmlock; struct lu_env *env; - struct cl_env_nest nest; + __u16 refcheck; ENTRY; - env = cl_env_nested_get(&nest); + env = cl_env_get(&refcheck); LASSERT(!IS_ERR(env)); if (errcode == ELDLM_LOCK_MATCHED) @@ -374,9 +342,10 @@ static int osc_lock_upcall_agl(void *cookie, struct lustre_handle *lockh, LASSERT(dlmlock != NULL); lock_res_and_lock(dlmlock); - LASSERT(dlmlock->l_granted_mode == dlmlock->l_req_mode); + LASSERT(ldlm_is_granted(dlmlock)); - /* there is no osc_lock associated with AGL lock */ + /* there is no osc_lock associated with speculative locks + * thus no need to set LDLM_FL_LVB_CACHED */ osc_lock_lvb_update(env, osc, dlmlock, NULL); unlock_res_and_lock(dlmlock); @@ -384,21 +353,21 @@ static int osc_lock_upcall_agl(void *cookie, struct lustre_handle *lockh, out: cl_object_put(env, osc2cl(osc)); - cl_env_nested_put(&nest, env); + cl_env_put(env, &refcheck); RETURN(ldlm_error2errno(errcode)); } static int osc_lock_flush(struct osc_object *obj, pgoff_t start, pgoff_t end, - enum cl_lock_mode mode, int discard) + enum cl_lock_mode mode, bool discard) { struct lu_env *env; - struct cl_env_nest nest; + __u16 refcheck; int rc = 0; int rc2 = 0; ENTRY; - env = cl_env_nested_get(&nest); + env = cl_env_get(&refcheck); if (IS_ERR(env)) RETURN(PTR_ERR(env)); @@ -412,11 +381,16 @@ static int osc_lock_flush(struct osc_object *obj, pgoff_t start, pgoff_t end, rc = 0; } - rc2 = osc_lock_discard_pages(env, obj, start, end, mode); + /* + * Do not try to match other locks with CLM_WRITE since we already + * know there're none + */ + rc2 = osc_lock_discard_pages(env, obj, start, end, + mode == CLM_WRITE || discard); if (rc == 0 && rc2 < 0) rc = rc2; - cl_env_nested_put(&nest, env); + cl_env_put(env, &refcheck); RETURN(rc); } @@ -430,14 +404,14 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env, { struct cl_object *obj = NULL; int result = 0; - int discard; + bool discard; enum cl_lock_mode mode = CLM_READ; ENTRY; LASSERT(flag == LDLM_CB_CANCELING); lock_res_and_lock(dlmlock); - if (dlmlock->l_granted_mode != dlmlock->l_req_mode) { + if (!ldlm_is_granted(dlmlock)) { dlmlock->l_ast_data = NULL; unlock_res_and_lock(dlmlock); RETURN(0); @@ -449,13 +423,13 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env, if (dlmlock->l_ast_data != NULL) { obj = osc2cl(dlmlock->l_ast_data); - dlmlock->l_ast_data = NULL; - cl_object_get(obj); } unlock_res_and_lock(dlmlock); + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_CANCEL, 5); + /* if l_ast_data is NULL, the dlmlock was enqueued by AGL or * the object has been destroyed. */ if (obj != NULL) { @@ -471,6 +445,9 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env, /* losing a lock, update kms */ lock_res_and_lock(dlmlock); + /* clearing l_ast_data after flushing data, + * to let glimpse ast find the lock and the object */ + dlmlock->l_ast_data = NULL; cl_object_attr_lock(obj); /* Must get the value under the lock to avoid race. */ old_kms = cl2osc(obj)->oo_oinfo->loi_kms; @@ -478,7 +455,7 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env, * Not a problem for the client */ attr->cat_kms = ldlm_extent_shift_kms(dlmlock, old_kms); - cl_object_attr_set(env, obj, attr, CAT_KMS); + cl_object_attr_update(env, obj, attr, CAT_KMS); cl_object_attr_unlock(obj); unlock_res_and_lock(dlmlock); @@ -512,7 +489,7 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env, * * - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_BLOCKING) to notify * us that dlmlock conflicts with another lock that some client is - * enqueing. Lock is canceled. + * enqueuing. Lock is canceled. * * - cl_lock_cancel() is called. osc_lock_cancel() calls * ldlm_cli_cancel() that calls @@ -548,7 +525,7 @@ static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock, } case LDLM_CB_CANCELING: { struct lu_env *env; - struct cl_env_nest nest; + __u16 refcheck; /* * This can be called in the context of outer IO, e.g., @@ -561,14 +538,14 @@ static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock, * new environment has to be created to not corrupt outer * context. */ - env = cl_env_nested_get(&nest); + env = cl_env_get(&refcheck); if (IS_ERR(env)) { result = PTR_ERR(env); break; } result = osc_dlm_blocking_ast0(env, dlmlock, data, flag); - cl_env_nested_put(&nest, env); + cl_env_put(env, &refcheck); break; } default: @@ -577,112 +554,139 @@ static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock, RETURN(result); } -static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) +int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) { struct ptlrpc_request *req = data; - struct cl_env_nest nest; struct lu_env *env; struct ost_lvb *lvb; struct req_capsule *cap; + struct cl_object *obj = NULL; + struct ldlm_resource *res = dlmlock->l_resource; + struct ldlm_match_data matchdata = { 0 }; + union ldlm_policy_data policy; + enum ldlm_mode mode = LCK_PW | LCK_GROUP | LCK_PR; int result; + __u16 refcheck; ENTRY; LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK); - env = cl_env_nested_get(&nest); - if (!IS_ERR(env)) { - struct cl_object *obj = NULL; + env = cl_env_get(&refcheck); + if (IS_ERR(env)) + GOTO(out, result = PTR_ERR(env)); + + policy.l_extent.start = 0; + policy.l_extent.end = LUSTRE_EOF; + + matchdata.lmd_mode = &mode; + matchdata.lmd_policy = &policy; + matchdata.lmd_flags = LDLM_FL_TEST_LOCK | LDLM_FL_CBPENDING; + matchdata.lmd_match = LDLM_MATCH_UNREF | LDLM_MATCH_AST_ANY; + + LDLM_LOCK_GET(dlmlock); + /* If any dlmlock has l_ast_data set, we must find it or we risk + * missing a size update done under a different lock. + */ + while (dlmlock) { lock_res_and_lock(dlmlock); - if (dlmlock->l_ast_data != NULL) { + if (dlmlock->l_ast_data) { obj = osc2cl(dlmlock->l_ast_data); cl_object_get(obj); } unlock_res_and_lock(dlmlock); + LDLM_LOCK_RELEASE(dlmlock); - if (obj != NULL) { - /* Do not grab the mutex of cl_lock for glimpse. - * See LU-1274 for details. - * BTW, it's okay for cl_lock to be cancelled during - * this period because server can handle this race. - * See ldlm_server_glimpse_ast() for details. - * cl_lock_mutex_get(env, lock); */ - cap = &req->rq_pill; - req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK); - req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER, - sizeof *lvb); - result = req_capsule_server_pack(cap); - if (result == 0) { - lvb = req_capsule_server_get(cap, &RMF_DLM_LVB); - result = cl_object_glimpse(env, obj, lvb); - } - if (!exp_connect_lvb_type(req->rq_export)) - req_capsule_shrink(&req->rq_pill, - &RMF_DLM_LVB, - sizeof(struct ost_lvb_v1), - RCL_SERVER); - cl_object_put(env, obj); - } else { - /* - * These errors are normal races, so we don't want to - * fill the console with messages by calling - * ptlrpc_error() - */ - lustre_pack_reply(req, 1, NULL, NULL); - result = -ELDLM_NO_LOCK_DATA; - } - cl_env_nested_put(&nest, env); - } else - result = PTR_ERR(env); + dlmlock = NULL; + + if (obj == NULL && res->lr_type == LDLM_EXTENT) { + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_SIZE_DATA)) + break; + + lock_res(res); + dlmlock = search_itree(res, &matchdata); + unlock_res(res); + } + } + + if (obj != NULL) { + /* Do not grab the mutex of cl_lock for glimpse. + * See LU-1274 for details. + * BTW, it's okay for cl_lock to be cancelled during + * this period because server can handle this race. + * See ldlm_server_glimpse_ast() for details. + * cl_lock_mutex_get(env, lock); */ + cap = &req->rq_pill; + req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK); + req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER, + sizeof *lvb); + result = req_capsule_server_pack(cap); + if (result == 0) { + lvb = req_capsule_server_get(cap, &RMF_DLM_LVB); + result = cl_object_glimpse(env, obj, lvb); + } + if (!exp_connect_lvb_type(req->rq_export)) + req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, + sizeof(struct ost_lvb_v1), RCL_SERVER); + cl_object_put(env, obj); + } else { + /* + * These errors are normal races, so we don't want to + * fill the console with messages by calling + * ptlrpc_error() + */ + lustre_pack_reply(req, 1, NULL, NULL); + result = -ELDLM_NO_LOCK_DATA; + } + cl_env_put(env, &refcheck); + EXIT; + +out: req->rq_status = result; RETURN(result); } +EXPORT_SYMBOL(osc_ldlm_glimpse_ast); -static int weigh_cb(const struct lu_env *env, struct cl_io *io, - struct osc_page *ops, void *cbdata) +static bool weigh_cb(const struct lu_env *env, struct cl_io *io, + struct osc_page *ops, void *cbdata) { struct cl_page *page = ops->ops_cl.cpl_page; - if (cl_page_is_vmlocked(env, page) - || PageDirty(page->cp_vmpage) || PageWriteback(page->cp_vmpage) - ) - return CLP_GANG_ABORT; + if (cl_page_is_vmlocked(env, page) || PageDirty(page->cp_vmpage) || + PageWriteback(page->cp_vmpage)) + return false; *(pgoff_t *)cbdata = osc_index(ops) + 1; - return CLP_GANG_OKAY; + return true; } static unsigned long osc_lock_weight(const struct lu_env *env, struct osc_object *oscobj, - struct ldlm_extent *extent) + loff_t start, loff_t end) { - struct cl_io *io = &osc_env_info(env)->oti_io; + struct cl_io *io = osc_env_thread_io(env); struct cl_object *obj = cl_object_top(&oscobj->oo_cl); - pgoff_t page_index; - int result; + pgoff_t page_index; + int result; + ENTRY; io->ci_obj = obj; io->ci_ignore_layout = 1; result = cl_io_init(env, io, CIT_MISC, io->ci_obj); if (result != 0) - RETURN(result); + RETURN(1); - page_index = cl_index(obj, extent->start); - do { - result = osc_page_gang_lookup(env, io, oscobj, - page_index, - cl_index(obj, extent->end), - weigh_cb, (void *)&page_index); - if (result == CLP_GANG_ABORT) - break; - if (result == CLP_GANG_RESCHED) - cond_resched(); - } while (result != CLP_GANG_OKAY); + page_index = cl_index(obj, start); + + if (!osc_page_gang_lookup(env, io, oscobj, + page_index, cl_index(obj, end), + weigh_cb, (void *)&page_index)) + result = 1; cl_io_fini(env, io); - return result == CLP_GANG_ABORT ? 1 : 0; + return result; } /** @@ -690,12 +694,13 @@ static unsigned long osc_lock_weight(const struct lu_env *env, */ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock) { - struct cl_env_nest nest; - struct lu_env *env; - struct osc_object *obj; - struct osc_lock *oscl; - unsigned long weight; - bool found = false; + struct lu_env *env; + struct osc_object *obj; + struct osc_lock *oscl; + unsigned long weight; + bool found = false; + __u16 refcheck; + ENTRY; might_sleep(); @@ -706,21 +711,29 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock) * the upper context because cl_lock_put don't modify environment * variables. But just in case .. */ - env = cl_env_nested_get(&nest); + env = cl_env_get(&refcheck); if (IS_ERR(env)) /* Mostly because lack of memory, do not eliminate this lock */ RETURN(1); - LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT); + LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT || + dlmlock->l_resource->lr_type == LDLM_IBITS); + + lock_res_and_lock(dlmlock); obj = dlmlock->l_ast_data; + if (obj) + cl_object_get(osc2cl(obj)); + unlock_res_and_lock(dlmlock); + if (obj == NULL) - GOTO(out, weight = 1); + GOTO(out, weight = 0); spin_lock(&obj->oo_ol_spin); list_for_each_entry(oscl, &obj->oo_ol_list, ols_nextlock_oscobj) { - if (oscl->ols_dlmlock != NULL && oscl->ols_dlmlock != dlmlock) - continue; - found = true; + if (oscl->ols_dlmlock == dlmlock) { + found = true; + break; + } } spin_unlock(&obj->oo_ol_spin); if (found) { @@ -730,13 +743,28 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock) GOTO(out, weight = 1); } - weight = osc_lock_weight(env, obj, &dlmlock->l_policy_data.l_extent); + if (dlmlock->l_resource->lr_type == LDLM_EXTENT) + weight = osc_lock_weight(env, obj, + dlmlock->l_policy_data.l_extent.start, + dlmlock->l_policy_data.l_extent.end); + else if (ldlm_has_dom(dlmlock)) + weight = osc_lock_weight(env, obj, 0, OBD_OBJECT_EOF); + /* The DOM bit can be cancelled at any time; in that case, we know + * there are no pages, so just return weight of 0 + */ + else + weight = 0; + EXIT; out: - cl_env_nested_put(&nest, env); + if (obj) + cl_object_put(env, osc2cl(obj)); + + cl_env_put(env, &refcheck); return weight; } +EXPORT_SYMBOL(osc_ldlm_weigh_ast); static void osc_lock_build_einfo(const struct lu_env *env, const struct cl_lock *lock, @@ -763,46 +791,46 @@ static void osc_lock_build_einfo(const struct lu_env *env, * Additional policy can be implemented here, e.g., never do lockless-io * for large extents. */ -static void osc_lock_to_lockless(const struct lu_env *env, - struct osc_lock *ols, int force) +void osc_lock_to_lockless(const struct lu_env *env, + struct osc_lock *ols, int force) { - struct cl_lock_slice *slice = &ols->ols_cl; - - LASSERT(ols->ols_state == OLS_NEW || - ols->ols_state == OLS_UPCALL_RECEIVED); - - if (force) { - ols->ols_locklessable = 1; - slice->cls_ops = &osc_lock_lockless_ops; - } else { - struct osc_io *oio = osc_env_io(env); - struct cl_io *io = oio->oi_cl.cis_io; - struct cl_object *obj = slice->cls_obj; - struct osc_object *oob = cl2osc(obj); - const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); - struct obd_connect_data *ocd; - - LASSERT(io->ci_lockreq == CILR_MANDATORY || - io->ci_lockreq == CILR_MAYBE || - io->ci_lockreq == CILR_NEVER); - - ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data; - ols->ols_locklessable = (io->ci_type != CIT_SETATTR) && - (io->ci_lockreq == CILR_MAYBE) && - (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK); - if (io->ci_lockreq == CILR_NEVER || - /* lockless IO */ - (ols->ols_locklessable && osc_object_is_contended(oob)) || - /* lockless truncate */ - (cl_io_is_trunc(io) && - (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) && - osd->od_lockless_truncate)) { - ols->ols_locklessable = 1; - slice->cls_ops = &osc_lock_lockless_ops; - } - } - LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols))); + struct cl_lock_slice *slice = &ols->ols_cl; + struct osc_io *oio = osc_env_io(env); + struct cl_io *io = oio->oi_cl.cis_io; + struct cl_object *obj = slice->cls_obj; + struct osc_object *oob = cl2osc(obj); + const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); + struct obd_connect_data *ocd; + + LASSERT(ols->ols_state == OLS_NEW || + ols->ols_state == OLS_UPCALL_RECEIVED); + + if (force) { + ols->ols_locklessable = 1; + slice->cls_ops = ols->ols_lockless_ops; + } else { + LASSERT(io->ci_lockreq == CILR_MANDATORY || + io->ci_lockreq == CILR_MAYBE || + io->ci_lockreq == CILR_NEVER); + + ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data; + ols->ols_locklessable = (io->ci_type != CIT_SETATTR) && + (io->ci_lockreq == CILR_MAYBE) && + (ocd->ocd_connect_flags & + OBD_CONNECT_SRVLOCK); + if (io->ci_lockreq == CILR_NEVER || + /* lockless IO */ + (ols->ols_locklessable && osc_object_is_contended(oob)) || + /* lockless truncate */ + (cl_io_is_trunc(io) && osd->od_lockless_truncate && + (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK))) { + ols->ols_locklessable = 1; + slice->cls_ops = ols->ols_lockless_ops; + } + } + LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols))); } +EXPORT_SYMBOL(osc_lock_to_lockless); static bool osc_lock_compatible(const struct osc_lock *qing, const struct osc_lock *qed) @@ -810,7 +838,7 @@ static bool osc_lock_compatible(const struct osc_lock *qing, struct cl_lock_descr *qed_descr = &qed->ols_cl.cls_lock->cll_descr; struct cl_lock_descr *qing_descr = &qing->ols_cl.cls_lock->cll_descr; - if (qed->ols_glimpse) + if (qed->ols_glimpse || qed->ols_speculative) return true; if (qing_descr->cld_mode == CLM_READ && qed_descr->cld_mode == CLM_READ) @@ -827,9 +855,8 @@ static bool osc_lock_compatible(const struct osc_lock *qing, return false; } -static void osc_lock_wake_waiters(const struct lu_env *env, - struct osc_object *osc, - struct osc_lock *oscl) +void osc_lock_wake_waiters(const struct lu_env *env, struct osc_object *osc, + struct osc_lock *oscl) { spin_lock(&osc->oo_ol_spin); list_del_init(&oscl->ols_nextlock_oscobj); @@ -847,14 +874,17 @@ static void osc_lock_wake_waiters(const struct lu_env *env, } spin_unlock(&oscl->ols_lock); } +EXPORT_SYMBOL(osc_lock_wake_waiters); -static void osc_lock_enqueue_wait(const struct lu_env *env, - struct osc_object *obj, - struct osc_lock *oscl) +int osc_lock_enqueue_wait(const struct lu_env *env, struct osc_object *obj, + struct osc_lock *oscl) { struct osc_lock *tmp_oscl; struct cl_lock_descr *need = &oscl->ols_cl.cls_lock->cll_descr; struct cl_sync_io *waiter = &osc_env_info(env)->oti_anchor; + int rc = 0; + + ENTRY; spin_lock(&obj->oo_ol_spin); list_add_tail(&oscl->ols_nextlock_oscobj, &obj->oo_ol_list); @@ -881,7 +911,7 @@ restart: continue; /* wait for conflicting lock to be canceled */ - cl_sync_io_init(waiter, 1, cl_sync_io_end); + cl_sync_io_init(waiter, 1); oscl->ols_owner = waiter; spin_lock(&tmp_oscl->ols_lock); @@ -891,14 +921,20 @@ restart: spin_unlock(&tmp_oscl->ols_lock); spin_unlock(&obj->oo_ol_spin); - (void)cl_sync_io_wait(env, waiter, 0); - + rc = cl_sync_io_wait(env, waiter, 0); spin_lock(&obj->oo_ol_spin); + + if (rc < 0) + break; + oscl->ols_owner = NULL; goto restart; } spin_unlock(&obj->oo_ol_spin); + + RETURN(rc); } +EXPORT_SYMBOL(osc_lock_enqueue_wait); /** * Implementation of cl_lock_operations::clo_enqueue() method for osc @@ -922,9 +958,10 @@ static int osc_lock_enqueue(const struct lu_env *env, struct osc_io *oio = osc_env_io(env); struct osc_object *osc = cl2osc(slice->cls_obj); struct osc_lock *oscl = cl2osc_lock(slice); + struct obd_export *exp = osc_export(osc); struct cl_lock *lock = slice->cls_lock; struct ldlm_res_id *resname = &info->oti_resname; - ldlm_policy_data_t *policy = &info->oti_policy; + union ldlm_policy_data *policy = &info->oti_policy; osc_enqueue_upcall_f upcall = osc_lock_upcall; void *cookie = oscl; bool async = false; @@ -938,16 +975,29 @@ static int osc_lock_enqueue(const struct lu_env *env, if (oscl->ols_state == OLS_GRANTED) RETURN(0); + if ((oscl->ols_flags & LDLM_FL_NO_EXPANSION) && + !exp_connect_lockahead(exp)) { + result = -EOPNOTSUPP; + CERROR("%s: server does not support lockahead/locknoexpand: rc = %d\n", + exp->exp_obd->obd_name, result); + RETURN(result); + } + if (oscl->ols_flags & LDLM_FL_TEST_LOCK) GOTO(enqueue_base, 0); - if (oscl->ols_glimpse) { - LASSERT(equi(oscl->ols_agl, anchor == NULL)); + /* For glimpse and/or speculative locks, do not wait for reply from + * server on LDLM request */ + if (oscl->ols_glimpse || oscl->ols_speculative) { + /* Speculative and glimpse locks do not have an anchor */ + LASSERT(equi(oscl->ols_speculative, anchor == NULL)); async = true; GOTO(enqueue_base, 0); } - osc_lock_enqueue_wait(env, osc, oscl); + result = osc_lock_enqueue_wait(env, osc, oscl); + if (result < 0) + GOTO(out, result); /* we can grant lockless lock right after all conflicting locks * are canceled. */ @@ -966,39 +1016,31 @@ enqueue_base: /** * DLM lock's ast data must be osc_object; - * if glimpse or AGL lock, async of osc_enqueue_base() must be true, + * if glimpse or speculative lock, async of osc_enqueue_base() + * must be true + * + * For non-speculative locks: * DLM's enqueue callback set to osc_lock_upcall() with cookie as * osc_lock. + * For speculative locks: + * osc_lock_upcall_speculative & cookie is the osc object, since + * there is no osc_lock */ ostid_build_res_name(&osc->oo_oinfo->loi_oi, resname); - osc_lock_build_einfo(env, lock, osc, &oscl->ols_einfo); osc_lock_build_policy(env, lock, policy); - if (oscl->ols_agl) { + if (oscl->ols_speculative) { oscl->ols_einfo.ei_cbdata = NULL; /* hold a reference for callback */ cl_object_get(osc2cl(osc)); - upcall = osc_lock_upcall_agl; + upcall = osc_lock_upcall_speculative; cookie = osc; } - result = osc_enqueue_base(osc_export(osc), resname, &oscl->ols_flags, + result = osc_enqueue_base(exp, resname, &oscl->ols_flags, policy, &oscl->ols_lvb, - osc->oo_oinfo->loi_kms_valid, upcall, cookie, &oscl->ols_einfo, PTLRPCD_SET, async, - oscl->ols_agl); - if (result != 0) { - oscl->ols_state = OLS_CANCELLED; - osc_lock_wake_waiters(env, osc, oscl); - - /* hide error for AGL lock. */ - if (oscl->ols_agl) { - cl_object_put(env, osc2cl(osc)); - result = 0; - } - - if (anchor != NULL) - cl_sync_io_note(env, anchor, result); - } else { + oscl->ols_speculative); + if (result == 0) { if (osc_lock_is_lockless(oscl)) { oio->oi_lockless = 1; } else if (!async) { @@ -1006,6 +1048,21 @@ enqueue_base: LASSERT(oscl->ols_hold); LASSERT(oscl->ols_dlmlock != NULL); } + } else if (oscl->ols_speculative) { + cl_object_put(env, osc2cl(osc)); + if (oscl->ols_glimpse) { + /* hide error for AGL request */ + result = 0; + } + } + +out: + if (result < 0) { + oscl->ols_state = OLS_CANCELLED; + osc_lock_wake_waiters(env, osc, oscl); + + if (anchor != NULL) + cl_sync_io_note(env, anchor, result); } RETURN(result); } @@ -1017,13 +1074,15 @@ static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck) { struct ldlm_lock *dlmlock; + ENTRY; + dlmlock = olck->ols_dlmlock; if (dlmlock == NULL) - return; + RETURN_EXIT; if (olck->ols_hold) { olck->ols_hold = 0; - osc_cancel_base(&olck->ols_handle, olck->ols_einfo.ei_mode); + ldlm_lock_decref(&olck->ols_handle, olck->ols_einfo.ei_mode); olck->ols_handle.cookie = 0ULL; } @@ -1034,6 +1093,8 @@ static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck) lu_ref_del(&dlmlock->l_reference, "osc_lock", olck); LDLM_LOCK_RELEASE(dlmlock); olck->ols_has_ref = 0; + + EXIT; } /** @@ -1050,8 +1111,8 @@ static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck) * * - cancels ldlm lock (ldlm_cli_cancel()). */ -static void osc_lock_cancel(const struct lu_env *env, - const struct cl_lock_slice *slice) +void osc_lock_cancel(const struct lu_env *env, + const struct cl_lock_slice *slice) { struct osc_object *obj = cl2osc(slice->cls_obj); struct osc_lock *oscl = cl2osc_lock(slice); @@ -1067,18 +1128,20 @@ static void osc_lock_cancel(const struct lu_env *env, osc_lock_wake_waiters(env, obj, oscl); EXIT; } +EXPORT_SYMBOL(osc_lock_cancel); -static int osc_lock_print(const struct lu_env *env, void *cookie, - lu_printer_t p, const struct cl_lock_slice *slice) +int osc_lock_print(const struct lu_env *env, void *cookie, + lu_printer_t p, const struct cl_lock_slice *slice) { struct osc_lock *lock = cl2osc_lock(slice); - (*p)(env, cookie, "%p "LPX64" "LPX64" %d %p ", + (*p)(env, cookie, "%p %#llx %#llx %d %p ", lock->ols_dlmlock, lock->ols_flags, lock->ols_handle.cookie, lock->ols_state, lock->ols_owner); osc_lvb_print(env, cookie, p, &lock->ols_lvb); return 0; } +EXPORT_SYMBOL(osc_lock_print); static const struct cl_lock_operations osc_lock_ops = { .clo_fini = osc_lock_fini, @@ -1097,7 +1160,7 @@ static void osc_lock_lockless_cancel(const struct lu_env *env, LASSERT(ols->ols_dlmlock == NULL); result = osc_lock_flush(osc, descr->cld_start, descr->cld_end, - descr->cld_mode, 0); + descr->cld_mode, false); if (result) CERROR("Pages for lockless lock %p were not purged(%d)\n", ols, result); @@ -1112,9 +1175,8 @@ static const struct cl_lock_operations osc_lock_lockless_ops = { .clo_print = osc_lock_print }; -static void osc_lock_set_writer(const struct lu_env *env, - const struct cl_io *io, - struct cl_object *obj, struct osc_lock *oscl) +void osc_lock_set_writer(const struct lu_env *env, const struct cl_io *io, + struct cl_object *obj, struct osc_lock *oscl) { struct cl_lock_descr *descr = &oscl->ols_cl.cls_lock->cll_descr; pgoff_t io_start; @@ -1127,17 +1189,14 @@ static void osc_lock_set_writer(const struct lu_env *env, io_start = cl_index(obj, io->u.ci_rw.crw_pos); io_end = cl_index(obj, io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count - 1); - if (cl_io_is_append(io)) { - io_start = 0; - io_end = CL_PAGE_EOF; - } } else { LASSERT(cl_io_is_mkwrite(io)); io_start = io_end = io->u.ci_fault.ft_index; } if (descr->cld_mode >= CLM_WRITE && - descr->cld_start <= io_start && descr->cld_end >= io_end) { + (cl_io_is_append(io) || + (descr->cld_start <= io_start && descr->cld_end >= io_end))) { struct osc_io *oio = osc_env_io(env); /* There must be only one lock to match the write region */ @@ -1145,6 +1204,7 @@ static void osc_lock_set_writer(const struct lu_env *env, oio->oi_write_osclock = oscl; } } +EXPORT_SYMBOL(osc_lock_set_writer); int osc_lock_init(const struct lu_env *env, struct cl_object *obj, struct cl_lock *lock, @@ -1162,15 +1222,26 @@ int osc_lock_init(const struct lu_env *env, INIT_LIST_HEAD(&oscl->ols_waiting_list); INIT_LIST_HEAD(&oscl->ols_wait_entry); INIT_LIST_HEAD(&oscl->ols_nextlock_oscobj); + oscl->ols_lockless_ops = &osc_lock_lockless_ops; + + /* Speculative lock requests must be either no_expand or glimpse + * request (CEF_GLIMPSE). non-glimpse no_expand speculative extent + * locks will break ofd_intent_cb. (see comment there)*/ + LASSERT(ergo((enqflags & CEF_SPECULATIVE) != 0, + (enqflags & (CEF_LOCK_NO_EXPAND | CEF_GLIMPSE)) != 0)); oscl->ols_flags = osc_enq2ldlm_flags(enqflags); - oscl->ols_agl = !!(enqflags & CEF_AGL); - if (oscl->ols_agl) - oscl->ols_flags |= LDLM_FL_BLOCK_NOWAIT; + oscl->ols_speculative = !!(enqflags & CEF_SPECULATIVE); + if (lock->cll_descr.cld_mode == CLM_GROUP) + oscl->ols_flags |= LDLM_FL_ATOMIC_CB; + if (oscl->ols_flags & LDLM_FL_HAS_INTENT) { oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED; oscl->ols_glimpse = 1; } + if (io->ci_ndelay && cl_object_same(io->ci_obj, obj)) + oscl->ols_flags |= LDLM_FL_NDELAY; + osc_lock_build_einfo(env, lock, cl2osc(obj), &oscl->ols_einfo); cl_lock_slice_add(lock, &oscl->ols_cl, obj, &osc_lock_ops); @@ -1183,7 +1254,7 @@ int osc_lock_init(const struct lu_env *env, if (io->ci_type == CIT_WRITE || cl_io_is_mkwrite(io)) osc_lock_set_writer(env, io, obj, oscl); - LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags "LPX64"\n", + LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %#llx", lock, oscl, oscl->ols_flags); return 0; @@ -1193,17 +1264,19 @@ int osc_lock_init(const struct lu_env *env, * Finds an existing lock covering given index and optionally different from a * given \a except lock. */ -struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env, - struct osc_object *obj, pgoff_t index, - int pending, int canceling) +struct ldlm_lock *osc_obj_dlmlock_at_pgoff(const struct lu_env *env, + struct osc_object *obj, + pgoff_t index, + enum osc_dap_flags dap_flags) { struct osc_thread_info *info = osc_env_info(env); - struct ldlm_res_id *resname = &info->oti_resname; - ldlm_policy_data_t *policy = &info->oti_policy; - struct lustre_handle lockh; - struct ldlm_lock *lock = NULL; - ldlm_mode_t mode; - __u64 flags; + struct ldlm_res_id *resname = &info->oti_resname; + union ldlm_policy_data *policy = &info->oti_policy; + struct lustre_handle lockh; + struct ldlm_lock *lock = NULL; + enum ldlm_mode mode; + __u64 flags; + enum ldlm_match_flags match_flags = 0; ENTRY; @@ -1211,17 +1284,24 @@ struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env, osc_index2policy(policy, osc2cl(obj), index, index); policy->l_extent.gid = LDLM_GID_ANY; - flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK; - if (pending) - flags |= LDLM_FL_CBPENDING; + flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; + if (dap_flags & OSC_DAP_FL_TEST_LOCK) + flags |= LDLM_FL_TEST_LOCK; + + if (dap_flags & OSC_DAP_FL_AST) + match_flags |= LDLM_MATCH_AST; + + if (dap_flags & OSC_DAP_FL_CANCELING) + match_flags |= LDLM_MATCH_UNREF; + /* * It is fine to match any group lock since there could be only one * with a uniq gid and it conflicts with all other lock modes too */ again: - mode = ldlm_lock_match(osc_export(obj)->exp_obd->obd_namespace, - flags, resname, LDLM_EXTENT, policy, - LCK_PR | LCK_PW | LCK_GROUP, &lockh, canceling); + mode = osc_match_base(env, osc_export(obj), resname, LDLM_EXTENT, + policy, LCK_PR | LCK_PW | LCK_GROUP, &flags, + obj, &lockh, match_flags); if (mode != 0) { lock = ldlm_handle2lock(&lockh); /* RACE: the lock is cancelled so let's try again */