X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_lock.c;h=688cc7b85bda6d4b1f698c7a3bb9adecb00b5364;hp=f26a5e7a5530533057667d7b4f35f8a5bf48d81c;hb=8ac020df4592fc6e85edd75d54cb3795a4e50f8e;hpb=2b0479c0c959e44a4a3e850d36979fdbdf370d3a diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index f26a5e7..688cc7b 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -27,7 +23,7 @@ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2015, Intel Corporation. + * Copyright (c) 2011, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -41,32 +37,16 @@ #define DEBUG_SUBSYSTEM S_OSC -#include /* fid_build_reg_res_name() */ #include +#include -#include "osc_cl_internal.h" +#include "osc_internal.h" /** \addtogroup osc * @{ */ -/***************************************************************************** - * - * Type conversions. - * - */ - -static const struct cl_lock_operations osc_lock_ops; -static const struct cl_lock_operations osc_lock_lockless_ops; -static void osc_lock_to_lockless(const struct lu_env *env, - struct osc_lock *ols, int force); - -int osc_lock_is_lockless(const struct osc_lock *olck) -{ - return (olck->ols_cl.cls_ops == &osc_lock_lockless_ops); -} - /** * Returns a weak pointer to the ldlm lock identified by a handle. Returned * pointer cannot be dereferenced, as lock is not protected from concurrent @@ -126,7 +106,7 @@ static int osc_lock_invariant(struct osc_lock *ols) if (! ergo(ols->ols_state == OLS_GRANTED, olock != NULL && - olock->l_req_mode == olock->l_granted_mode && + ldlm_is_granted(olock) && ols->ols_hold)) return 0; return 1; @@ -138,8 +118,7 @@ static int osc_lock_invariant(struct osc_lock *ols) * */ -static void osc_lock_fini(const struct lu_env *env, - struct cl_lock_slice *slice) +void osc_lock_fini(const struct lu_env *env, struct cl_lock_slice *slice) { struct osc_lock *ols = cl2osc_lock(slice); @@ -148,6 +127,7 @@ static void osc_lock_fini(const struct lu_env *env, OBD_SLAB_FREE_PTR(ols, osc_lock_kmem); } +EXPORT_SYMBOL(osc_lock_fini); static void osc_lock_build_policy(const struct lu_env *env, const struct cl_lock *lock, @@ -159,33 +139,11 @@ static void osc_lock_build_policy(const struct lu_env *env, policy->l_extent.gid = d->cld_gid; } -static __u64 osc_enq2ldlm_flags(__u32 enqflags) -{ - __u64 result = 0; - - LASSERT((enqflags & ~CEF_MASK) == 0); - - if (enqflags & CEF_NONBLOCK) - result |= LDLM_FL_BLOCK_NOWAIT; - if (enqflags & CEF_ASYNC) - result |= LDLM_FL_HAS_INTENT; - if (enqflags & CEF_DISCARD_DATA) - result |= LDLM_FL_AST_DISCARD_DATA; - if (enqflags & CEF_PEEK) - result |= LDLM_FL_TEST_LOCK; - if (enqflags & CEF_LOCK_MATCH) - result |= LDLM_FL_MATCH_LOCK; - return result; -} - /** * Updates object attributes from a lock value block (lvb) received together * with the DLM lock reply from the server. Copy of osc_update_enqueue() * logic. * - * This can be optimized to not update attributes when lock is a result of a - * local match. - * * Called under lock and resource spin-locks. */ static void osc_lock_lvb_update(const struct lu_env *env, @@ -221,13 +179,13 @@ static void osc_lock_lvb_update(const struct lu_env *env, if (size > dlmlock->l_policy_data.l_extent.end) size = dlmlock->l_policy_data.l_extent.end + 1; if (size >= oinfo->loi_kms) { - LDLM_DEBUG(dlmlock, "lock acquired, setting rss="LPU64 - ", kms="LPU64, lvb->lvb_size, size); + LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu" + ", kms=%llu", lvb->lvb_size, size); valid |= CAT_KMS; attr->cat_kms = size; } else { LDLM_DEBUG(dlmlock, "lock acquired, setting rss=" - LPU64"; leaving kms="LPU64", end="LPU64, + "%llu; leaving kms=%llu, end=%llu", lvb->lvb_size, oinfo->loi_kms, dlmlock->l_policy_data.l_extent.end); } @@ -241,7 +199,7 @@ static void osc_lock_lvb_update(const struct lu_env *env, } static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, - struct lustre_handle *lockh, bool lvb_update) + struct lustre_handle *lockh) { struct ldlm_lock *dlmlock; @@ -269,7 +227,7 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, /* Lock must have been granted. */ lock_res_and_lock(dlmlock); - if (dlmlock->l_granted_mode == dlmlock->l_req_mode) { + if (ldlm_is_granted(dlmlock)) { struct ldlm_extent *ext = &dlmlock->l_policy_data.l_extent; struct cl_lock_descr *descr = &oscl->ols_cl.cls_lock->cll_descr; @@ -281,10 +239,11 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, descr->cld_gid = ext->gid; /* no lvb update for matched lock */ - if (lvb_update) { + if (!ldlm_is_lvb_cached(dlmlock)) { LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY); osc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj), dlmlock, NULL); + ldlm_set_lvb_cached(dlmlock); } LINVRNT(osc_lock_invariant(oscl)); } @@ -305,12 +264,11 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh, struct osc_lock *oscl = cookie; struct cl_lock_slice *slice = &oscl->ols_cl; struct lu_env *env; - struct cl_env_nest nest; int rc; ENTRY; - env = cl_env_nested_get(&nest); + env = cl_env_percpu_get(); /* should never happen, similar to osc_ldlm_blocking_ast(). */ LASSERT(!IS_ERR(env)); @@ -325,7 +283,7 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh, } if (rc == 0) - osc_lock_granted(env, oscl, lockh, errcode == ELDLM_OK); + osc_lock_granted(env, oscl, lockh); /* Error handling, some errors are tolerable. */ if (oscl->ols_locklessable && rc == -EUSERS) { @@ -333,7 +291,7 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh, * lockless lock. */ osc_object_set_contended(cl2osc(slice->cls_obj)); - LASSERT(slice->cls_ops == &osc_lock_ops); + LASSERT(slice->cls_ops != oscl->ols_lockless_ops); /* Change this lock to ldlmlock-less lock. */ osc_lock_to_lockless(env, oscl, 1); @@ -345,25 +303,28 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh, NULL, &oscl->ols_lvb); /* Hide the error. */ rc = 0; + } else if (rc < 0 && oscl->ols_flags & LDLM_FL_NDELAY) { + rc = -EWOULDBLOCK; } if (oscl->ols_owner != NULL) cl_sync_io_note(env, oscl->ols_owner, rc); - cl_env_nested_put(&nest, env); + cl_env_percpu_put(env); RETURN(rc); } -static int osc_lock_upcall_agl(void *cookie, struct lustre_handle *lockh, - int errcode) +static int osc_lock_upcall_speculative(void *cookie, + struct lustre_handle *lockh, + int errcode) { struct osc_object *osc = cookie; struct ldlm_lock *dlmlock; struct lu_env *env; - struct cl_env_nest nest; + __u16 refcheck; ENTRY; - env = cl_env_nested_get(&nest); + env = cl_env_get(&refcheck); LASSERT(!IS_ERR(env)); if (errcode == ELDLM_LOCK_MATCHED) @@ -376,9 +337,10 @@ static int osc_lock_upcall_agl(void *cookie, struct lustre_handle *lockh, LASSERT(dlmlock != NULL); lock_res_and_lock(dlmlock); - LASSERT(dlmlock->l_granted_mode == dlmlock->l_req_mode); + LASSERT(ldlm_is_granted(dlmlock)); - /* there is no osc_lock associated with AGL lock */ + /* there is no osc_lock associated with speculative locks + * thus no need to set LDLM_FL_LVB_CACHED */ osc_lock_lvb_update(env, osc, dlmlock, NULL); unlock_res_and_lock(dlmlock); @@ -386,21 +348,21 @@ static int osc_lock_upcall_agl(void *cookie, struct lustre_handle *lockh, out: cl_object_put(env, osc2cl(osc)); - cl_env_nested_put(&nest, env); + cl_env_put(env, &refcheck); RETURN(ldlm_error2errno(errcode)); } static int osc_lock_flush(struct osc_object *obj, pgoff_t start, pgoff_t end, - enum cl_lock_mode mode, int discard) + enum cl_lock_mode mode, bool discard) { struct lu_env *env; - struct cl_env_nest nest; + __u16 refcheck; int rc = 0; int rc2 = 0; ENTRY; - env = cl_env_nested_get(&nest); + env = cl_env_get(&refcheck); if (IS_ERR(env)) RETURN(PTR_ERR(env)); @@ -414,11 +376,16 @@ static int osc_lock_flush(struct osc_object *obj, pgoff_t start, pgoff_t end, rc = 0; } - rc2 = osc_lock_discard_pages(env, obj, start, end, mode); + /* + * Do not try to match other locks with CLM_WRITE since we already + * know there're none + */ + rc2 = osc_lock_discard_pages(env, obj, start, end, + mode == CLM_WRITE || discard); if (rc == 0 && rc2 < 0) rc = rc2; - cl_env_nested_put(&nest, env); + cl_env_put(env, &refcheck); RETURN(rc); } @@ -432,14 +399,14 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env, { struct cl_object *obj = NULL; int result = 0; - int discard; + bool discard; enum cl_lock_mode mode = CLM_READ; ENTRY; LASSERT(flag == LDLM_CB_CANCELING); lock_res_and_lock(dlmlock); - if (dlmlock->l_granted_mode != dlmlock->l_req_mode) { + if (!ldlm_is_granted(dlmlock)) { dlmlock->l_ast_data = NULL; unlock_res_and_lock(dlmlock); RETURN(0); @@ -550,7 +517,7 @@ static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock, } case LDLM_CB_CANCELING: { struct lu_env *env; - struct cl_env_nest nest; + __u16 refcheck; /* * This can be called in the context of outer IO, e.g., @@ -563,14 +530,14 @@ static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock, * new environment has to be created to not corrupt outer * context. */ - env = cl_env_nested_get(&nest); + env = cl_env_get(&refcheck); if (IS_ERR(env)) { result = PTR_ERR(env); break; } result = osc_dlm_blocking_ast0(env, dlmlock, data, flag); - cl_env_nested_put(&nest, env); + cl_env_put(env, &refcheck); break; } default: @@ -579,76 +546,108 @@ static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock, RETURN(result); } -static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) +int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) { struct ptlrpc_request *req = data; - struct cl_env_nest nest; struct lu_env *env; struct ost_lvb *lvb; struct req_capsule *cap; + struct cl_object *obj = NULL; + struct ldlm_resource *res = dlmlock->l_resource; + struct ldlm_match_data matchdata = { 0 }; + union ldlm_policy_data policy; + enum ldlm_mode mode = LCK_PW | LCK_GROUP | LCK_PR; int result; + __u16 refcheck; ENTRY; LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK); - env = cl_env_nested_get(&nest); - if (!IS_ERR(env)) { - struct cl_object *obj = NULL; + env = cl_env_get(&refcheck); + if (IS_ERR(env)) + GOTO(out, result = PTR_ERR(env)); + + policy.l_extent.start = 0; + policy.l_extent.end = LUSTRE_EOF; + + matchdata.lmd_mode = &mode; + matchdata.lmd_policy = &policy; + matchdata.lmd_flags = LDLM_FL_TEST_LOCK | LDLM_FL_CBPENDING; + matchdata.lmd_unref = 1; + matchdata.lmd_has_ast_data = true; + LDLM_LOCK_GET(dlmlock); + + /* If any dlmlock has l_ast_data set, we must find it or we risk + * missing a size update done under a different lock. + */ + while (dlmlock) { lock_res_and_lock(dlmlock); - if (dlmlock->l_ast_data != NULL) { + if (dlmlock->l_ast_data) { obj = osc2cl(dlmlock->l_ast_data); cl_object_get(obj); } unlock_res_and_lock(dlmlock); + LDLM_LOCK_PUT(dlmlock); - if (obj != NULL) { - /* Do not grab the mutex of cl_lock for glimpse. - * See LU-1274 for details. - * BTW, it's okay for cl_lock to be cancelled during - * this period because server can handle this race. - * See ldlm_server_glimpse_ast() for details. - * cl_lock_mutex_get(env, lock); */ - cap = &req->rq_pill; - req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK); - req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER, - sizeof *lvb); - result = req_capsule_server_pack(cap); - if (result == 0) { - lvb = req_capsule_server_get(cap, &RMF_DLM_LVB); - result = cl_object_glimpse(env, obj, lvb); - } - if (!exp_connect_lvb_type(req->rq_export)) - req_capsule_shrink(&req->rq_pill, - &RMF_DLM_LVB, - sizeof(struct ost_lvb_v1), - RCL_SERVER); - cl_object_put(env, obj); - } else { - /* - * These errors are normal races, so we don't want to - * fill the console with messages by calling - * ptlrpc_error() - */ - lustre_pack_reply(req, 1, NULL, NULL); - result = -ELDLM_NO_LOCK_DATA; - } - cl_env_nested_put(&nest, env); - } else - result = PTR_ERR(env); + dlmlock = NULL; + + if (obj == NULL && res->lr_type == LDLM_EXTENT) { + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_SIZE_DATA)) + break; + + lock_res(res); + dlmlock = search_itree(res, &matchdata); + unlock_res(res); + } + } + + if (obj != NULL) { + /* Do not grab the mutex of cl_lock for glimpse. + * See LU-1274 for details. + * BTW, it's okay for cl_lock to be cancelled during + * this period because server can handle this race. + * See ldlm_server_glimpse_ast() for details. + * cl_lock_mutex_get(env, lock); */ + cap = &req->rq_pill; + req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK); + req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER, + sizeof *lvb); + result = req_capsule_server_pack(cap); + if (result == 0) { + lvb = req_capsule_server_get(cap, &RMF_DLM_LVB); + result = cl_object_glimpse(env, obj, lvb); + } + if (!exp_connect_lvb_type(req->rq_export)) + req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, + sizeof(struct ost_lvb_v1), RCL_SERVER); + cl_object_put(env, obj); + } else { + /* + * These errors are normal races, so we don't want to + * fill the console with messages by calling + * ptlrpc_error() + */ + lustre_pack_reply(req, 1, NULL, NULL); + result = -ELDLM_NO_LOCK_DATA; + } + cl_env_put(env, &refcheck); + EXIT; + +out: req->rq_status = result; RETURN(result); } +EXPORT_SYMBOL(osc_ldlm_glimpse_ast); static int weigh_cb(const struct lu_env *env, struct cl_io *io, struct osc_page *ops, void *cbdata) { struct cl_page *page = ops->ops_cl.cpl_page; - if (cl_page_is_vmlocked(env, page) - || PageDirty(page->cp_vmpage) || PageWriteback(page->cp_vmpage) - ) + if (cl_page_is_vmlocked(env, page) || PageDirty(page->cp_vmpage) || + PageWriteback(page->cp_vmpage)) return CLP_GANG_ABORT; *(pgoff_t *)cbdata = osc_index(ops) + 1; @@ -657,12 +656,13 @@ static int weigh_cb(const struct lu_env *env, struct cl_io *io, static unsigned long osc_lock_weight(const struct lu_env *env, struct osc_object *oscobj, - struct ldlm_extent *extent) + loff_t start, loff_t end) { - struct cl_io *io = &osc_env_info(env)->oti_io; + struct cl_io *io = osc_env_thread_io(env); struct cl_object *obj = cl_object_top(&oscobj->oo_cl); - pgoff_t page_index; - int result; + pgoff_t page_index; + int result; + ENTRY; io->ci_obj = obj; @@ -671,11 +671,10 @@ static unsigned long osc_lock_weight(const struct lu_env *env, if (result != 0) RETURN(result); - page_index = cl_index(obj, extent->start); + page_index = cl_index(obj, start); do { result = osc_page_gang_lookup(env, io, oscobj, - page_index, - cl_index(obj, extent->end), + page_index, cl_index(obj, end), weigh_cb, (void *)&page_index); if (result == CLP_GANG_ABORT) break; @@ -692,12 +691,13 @@ static unsigned long osc_lock_weight(const struct lu_env *env, */ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock) { - struct cl_env_nest nest; - struct lu_env *env; - struct osc_object *obj; - struct osc_lock *oscl; - unsigned long weight; - bool found = false; + struct lu_env *env; + struct osc_object *obj; + struct osc_lock *oscl; + unsigned long weight; + bool found = false; + __u16 refcheck; + ENTRY; might_sleep(); @@ -708,21 +708,29 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock) * the upper context because cl_lock_put don't modify environment * variables. But just in case .. */ - env = cl_env_nested_get(&nest); + env = cl_env_get(&refcheck); if (IS_ERR(env)) /* Mostly because lack of memory, do not eliminate this lock */ RETURN(1); - LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT); + LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT || + dlmlock->l_resource->lr_type == LDLM_IBITS); + + lock_res_and_lock(dlmlock); obj = dlmlock->l_ast_data; + if (obj) + cl_object_get(osc2cl(obj)); + unlock_res_and_lock(dlmlock); + if (obj == NULL) GOTO(out, weight = 1); spin_lock(&obj->oo_ol_spin); list_for_each_entry(oscl, &obj->oo_ol_list, ols_nextlock_oscobj) { - if (oscl->ols_dlmlock != NULL && oscl->ols_dlmlock != dlmlock) - continue; - found = true; + if (oscl->ols_dlmlock == dlmlock) { + found = true; + break; + } } spin_unlock(&obj->oo_ol_spin); if (found) { @@ -732,13 +740,28 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock) GOTO(out, weight = 1); } - weight = osc_lock_weight(env, obj, &dlmlock->l_policy_data.l_extent); + if (dlmlock->l_resource->lr_type == LDLM_EXTENT) + weight = osc_lock_weight(env, obj, + dlmlock->l_policy_data.l_extent.start, + dlmlock->l_policy_data.l_extent.end); + else if (ldlm_has_dom(dlmlock)) + weight = osc_lock_weight(env, obj, 0, OBD_OBJECT_EOF); + /* The DOM bit can be cancelled at any time; in that case, we know + * there are no pages, so just return weight of 0 + */ + else + weight = 0; + EXIT; out: - cl_env_nested_put(&nest, env); + if (obj) + cl_object_put(env, osc2cl(obj)); + + cl_env_put(env, &refcheck); return weight; } +EXPORT_SYMBOL(osc_ldlm_weigh_ast); static void osc_lock_build_einfo(const struct lu_env *env, const struct cl_lock *lock, @@ -765,46 +788,46 @@ static void osc_lock_build_einfo(const struct lu_env *env, * Additional policy can be implemented here, e.g., never do lockless-io * for large extents. */ -static void osc_lock_to_lockless(const struct lu_env *env, - struct osc_lock *ols, int force) +void osc_lock_to_lockless(const struct lu_env *env, + struct osc_lock *ols, int force) { - struct cl_lock_slice *slice = &ols->ols_cl; - - LASSERT(ols->ols_state == OLS_NEW || - ols->ols_state == OLS_UPCALL_RECEIVED); - - if (force) { - ols->ols_locklessable = 1; - slice->cls_ops = &osc_lock_lockless_ops; - } else { - struct osc_io *oio = osc_env_io(env); - struct cl_io *io = oio->oi_cl.cis_io; - struct cl_object *obj = slice->cls_obj; - struct osc_object *oob = cl2osc(obj); - const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); - struct obd_connect_data *ocd; - - LASSERT(io->ci_lockreq == CILR_MANDATORY || - io->ci_lockreq == CILR_MAYBE || - io->ci_lockreq == CILR_NEVER); - - ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data; - ols->ols_locklessable = (io->ci_type != CIT_SETATTR) && - (io->ci_lockreq == CILR_MAYBE) && - (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK); - if (io->ci_lockreq == CILR_NEVER || - /* lockless IO */ - (ols->ols_locklessable && osc_object_is_contended(oob)) || - /* lockless truncate */ - (cl_io_is_trunc(io) && - (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) && - osd->od_lockless_truncate)) { - ols->ols_locklessable = 1; - slice->cls_ops = &osc_lock_lockless_ops; - } - } - LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols))); + struct cl_lock_slice *slice = &ols->ols_cl; + struct osc_io *oio = osc_env_io(env); + struct cl_io *io = oio->oi_cl.cis_io; + struct cl_object *obj = slice->cls_obj; + struct osc_object *oob = cl2osc(obj); + const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); + struct obd_connect_data *ocd; + + LASSERT(ols->ols_state == OLS_NEW || + ols->ols_state == OLS_UPCALL_RECEIVED); + + if (force) { + ols->ols_locklessable = 1; + slice->cls_ops = ols->ols_lockless_ops; + } else { + LASSERT(io->ci_lockreq == CILR_MANDATORY || + io->ci_lockreq == CILR_MAYBE || + io->ci_lockreq == CILR_NEVER); + + ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data; + ols->ols_locklessable = (io->ci_type != CIT_SETATTR) && + (io->ci_lockreq == CILR_MAYBE) && + (ocd->ocd_connect_flags & + OBD_CONNECT_SRVLOCK); + if (io->ci_lockreq == CILR_NEVER || + /* lockless IO */ + (ols->ols_locklessable && osc_object_is_contended(oob)) || + /* lockless truncate */ + (cl_io_is_trunc(io) && osd->od_lockless_truncate && + (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK))) { + ols->ols_locklessable = 1; + slice->cls_ops = ols->ols_lockless_ops; + } + } + LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols))); } +EXPORT_SYMBOL(osc_lock_to_lockless); static bool osc_lock_compatible(const struct osc_lock *qing, const struct osc_lock *qed) @@ -812,7 +835,7 @@ static bool osc_lock_compatible(const struct osc_lock *qing, struct cl_lock_descr *qed_descr = &qed->ols_cl.cls_lock->cll_descr; struct cl_lock_descr *qing_descr = &qing->ols_cl.cls_lock->cll_descr; - if (qed->ols_glimpse) + if (qed->ols_glimpse || qed->ols_speculative) return true; if (qing_descr->cld_mode == CLM_READ && qed_descr->cld_mode == CLM_READ) @@ -829,9 +852,8 @@ static bool osc_lock_compatible(const struct osc_lock *qing, return false; } -static void osc_lock_wake_waiters(const struct lu_env *env, - struct osc_object *osc, - struct osc_lock *oscl) +void osc_lock_wake_waiters(const struct lu_env *env, struct osc_object *osc, + struct osc_lock *oscl) { spin_lock(&osc->oo_ol_spin); list_del_init(&oscl->ols_nextlock_oscobj); @@ -849,14 +871,16 @@ static void osc_lock_wake_waiters(const struct lu_env *env, } spin_unlock(&oscl->ols_lock); } +EXPORT_SYMBOL(osc_lock_wake_waiters); -static int osc_lock_enqueue_wait(const struct lu_env *env, - struct osc_object *obj, struct osc_lock *oscl) +int osc_lock_enqueue_wait(const struct lu_env *env, struct osc_object *obj, + struct osc_lock *oscl) { struct osc_lock *tmp_oscl; struct cl_lock_descr *need = &oscl->ols_cl.cls_lock->cll_descr; struct cl_sync_io *waiter = &osc_env_info(env)->oti_anchor; int rc = 0; + ENTRY; spin_lock(&obj->oo_ol_spin); @@ -884,7 +908,7 @@ restart: continue; /* wait for conflicting lock to be canceled */ - cl_sync_io_init(waiter, 1, cl_sync_io_end); + cl_sync_io_init(waiter, 1); oscl->ols_owner = waiter; spin_lock(&tmp_oscl->ols_lock); @@ -907,6 +931,7 @@ restart: RETURN(rc); } +EXPORT_SYMBOL(osc_lock_enqueue_wait); /** * Implementation of cl_lock_operations::clo_enqueue() method for osc @@ -930,6 +955,7 @@ static int osc_lock_enqueue(const struct lu_env *env, struct osc_io *oio = osc_env_io(env); struct osc_object *osc = cl2osc(slice->cls_obj); struct osc_lock *oscl = cl2osc_lock(slice); + struct obd_export *exp = osc_export(osc); struct cl_lock *lock = slice->cls_lock; struct ldlm_res_id *resname = &info->oti_resname; union ldlm_policy_data *policy = &info->oti_policy; @@ -946,11 +972,22 @@ static int osc_lock_enqueue(const struct lu_env *env, if (oscl->ols_state == OLS_GRANTED) RETURN(0); + if ((oscl->ols_flags & LDLM_FL_NO_EXPANSION) && + !(exp_connect_lockahead_old(exp) || exp_connect_lockahead(exp))) { + result = -EOPNOTSUPP; + CERROR("%s: server does not support lockahead/locknoexpand:" + "rc = %d\n", exp->exp_obd->obd_name, result); + RETURN(result); + } + if (oscl->ols_flags & LDLM_FL_TEST_LOCK) GOTO(enqueue_base, 0); - if (oscl->ols_glimpse) { - LASSERT(equi(oscl->ols_agl, anchor == NULL)); + /* For glimpse and/or speculative locks, do not wait for reply from + * server on LDLM request */ + if (oscl->ols_glimpse || oscl->ols_speculative) { + /* Speculative and glimpse locks do not have an anchor */ + LASSERT(equi(oscl->ols_speculative, anchor == NULL)); async = true; GOTO(enqueue_base, 0); } @@ -976,25 +1013,30 @@ enqueue_base: /** * DLM lock's ast data must be osc_object; - * if glimpse or AGL lock, async of osc_enqueue_base() must be true, + * if glimpse or speculative lock, async of osc_enqueue_base() + * must be true + * + * For non-speculative locks: * DLM's enqueue callback set to osc_lock_upcall() with cookie as * osc_lock. + * For speculative locks: + * osc_lock_upcall_speculative & cookie is the osc object, since + * there is no osc_lock */ ostid_build_res_name(&osc->oo_oinfo->loi_oi, resname); osc_lock_build_policy(env, lock, policy); - if (oscl->ols_agl) { + if (oscl->ols_speculative) { oscl->ols_einfo.ei_cbdata = NULL; /* hold a reference for callback */ cl_object_get(osc2cl(osc)); - upcall = osc_lock_upcall_agl; + upcall = osc_lock_upcall_speculative; cookie = osc; } - result = osc_enqueue_base(osc_export(osc), resname, &oscl->ols_flags, + result = osc_enqueue_base(exp, resname, &oscl->ols_flags, policy, &oscl->ols_lvb, - osc->oo_oinfo->loi_kms_valid, upcall, cookie, &oscl->ols_einfo, PTLRPCD_SET, async, - oscl->ols_agl); + oscl->ols_speculative); if (result == 0) { if (osc_lock_is_lockless(oscl)) { oio->oi_lockless = 1; @@ -1003,9 +1045,12 @@ enqueue_base: LASSERT(oscl->ols_hold); LASSERT(oscl->ols_dlmlock != NULL); } - } else if (oscl->ols_agl) { + } else if (oscl->ols_speculative) { cl_object_put(env, osc2cl(osc)); - result = 0; + if (oscl->ols_glimpse) { + /* hide error for AGL request */ + result = 0; + } } out: @@ -1063,8 +1108,8 @@ static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck) * * - cancels ldlm lock (ldlm_cli_cancel()). */ -static void osc_lock_cancel(const struct lu_env *env, - const struct cl_lock_slice *slice) +void osc_lock_cancel(const struct lu_env *env, + const struct cl_lock_slice *slice) { struct osc_object *obj = cl2osc(slice->cls_obj); struct osc_lock *oscl = cl2osc_lock(slice); @@ -1080,18 +1125,20 @@ static void osc_lock_cancel(const struct lu_env *env, osc_lock_wake_waiters(env, obj, oscl); EXIT; } +EXPORT_SYMBOL(osc_lock_cancel); -static int osc_lock_print(const struct lu_env *env, void *cookie, - lu_printer_t p, const struct cl_lock_slice *slice) +int osc_lock_print(const struct lu_env *env, void *cookie, + lu_printer_t p, const struct cl_lock_slice *slice) { struct osc_lock *lock = cl2osc_lock(slice); - (*p)(env, cookie, "%p "LPX64" "LPX64" %d %p ", + (*p)(env, cookie, "%p %#llx %#llx %d %p ", lock->ols_dlmlock, lock->ols_flags, lock->ols_handle.cookie, lock->ols_state, lock->ols_owner); osc_lvb_print(env, cookie, p, &lock->ols_lvb); return 0; } +EXPORT_SYMBOL(osc_lock_print); static const struct cl_lock_operations osc_lock_ops = { .clo_fini = osc_lock_fini, @@ -1110,7 +1157,7 @@ static void osc_lock_lockless_cancel(const struct lu_env *env, LASSERT(ols->ols_dlmlock == NULL); result = osc_lock_flush(osc, descr->cld_start, descr->cld_end, - descr->cld_mode, 0); + descr->cld_mode, false); if (result) CERROR("Pages for lockless lock %p were not purged(%d)\n", ols, result); @@ -1125,9 +1172,8 @@ static const struct cl_lock_operations osc_lock_lockless_ops = { .clo_print = osc_lock_print }; -static void osc_lock_set_writer(const struct lu_env *env, - const struct cl_io *io, - struct cl_object *obj, struct osc_lock *oscl) +void osc_lock_set_writer(const struct lu_env *env, const struct cl_io *io, + struct cl_object *obj, struct osc_lock *oscl) { struct cl_lock_descr *descr = &oscl->ols_cl.cls_lock->cll_descr; pgoff_t io_start; @@ -1140,17 +1186,14 @@ static void osc_lock_set_writer(const struct lu_env *env, io_start = cl_index(obj, io->u.ci_rw.crw_pos); io_end = cl_index(obj, io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count - 1); - if (cl_io_is_append(io)) { - io_start = 0; - io_end = CL_PAGE_EOF; - } } else { LASSERT(cl_io_is_mkwrite(io)); io_start = io_end = io->u.ci_fault.ft_index; } if (descr->cld_mode >= CLM_WRITE && - descr->cld_start <= io_start && descr->cld_end >= io_end) { + (cl_io_is_append(io) || + (descr->cld_start <= io_start && descr->cld_end >= io_end))) { struct osc_io *oio = osc_env_io(env); /* There must be only one lock to match the write region */ @@ -1158,6 +1201,7 @@ static void osc_lock_set_writer(const struct lu_env *env, oio->oi_write_osclock = oscl; } } +EXPORT_SYMBOL(osc_lock_set_writer); int osc_lock_init(const struct lu_env *env, struct cl_object *obj, struct cl_lock *lock, @@ -1175,15 +1219,25 @@ int osc_lock_init(const struct lu_env *env, INIT_LIST_HEAD(&oscl->ols_waiting_list); INIT_LIST_HEAD(&oscl->ols_wait_entry); INIT_LIST_HEAD(&oscl->ols_nextlock_oscobj); + oscl->ols_lockless_ops = &osc_lock_lockless_ops; + + /* Speculative lock requests must be either no_expand or glimpse + * request (CEF_GLIMPSE). non-glimpse no_expand speculative extent + * locks will break ofd_intent_cb. (see comment there)*/ + LASSERT(ergo((enqflags & CEF_SPECULATIVE) != 0, + (enqflags & (CEF_LOCK_NO_EXPAND | CEF_GLIMPSE)) != 0)); oscl->ols_flags = osc_enq2ldlm_flags(enqflags); - oscl->ols_agl = !!(enqflags & CEF_AGL); - if (oscl->ols_agl) - oscl->ols_flags |= LDLM_FL_BLOCK_NOWAIT; + oscl->ols_speculative = !!(enqflags & CEF_SPECULATIVE); + if (lock->cll_descr.cld_mode == CLM_GROUP) + oscl->ols_flags |= LDLM_FL_ATOMIC_CB; + if (oscl->ols_flags & LDLM_FL_HAS_INTENT) { oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED; oscl->ols_glimpse = 1; } + if (io->ci_ndelay && cl_object_same(io->ci_obj, obj)) + oscl->ols_flags |= LDLM_FL_NDELAY; osc_lock_build_einfo(env, lock, cl2osc(obj), &oscl->ols_einfo); cl_lock_slice_add(lock, &oscl->ols_cl, obj, &osc_lock_ops); @@ -1197,7 +1251,7 @@ int osc_lock_init(const struct lu_env *env, if (io->ci_type == CIT_WRITE || cl_io_is_mkwrite(io)) osc_lock_set_writer(env, io, obj, oscl); - LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags "LPX64"\n", + LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %#llx", lock, oscl, oscl->ols_flags); return 0; @@ -1207,9 +1261,10 @@ int osc_lock_init(const struct lu_env *env, * Finds an existing lock covering given index and optionally different from a * given \a except lock. */ -struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env, - struct osc_object *obj, pgoff_t index, - enum osc_dap_flags dap_flags) +struct ldlm_lock *osc_obj_dlmlock_at_pgoff(const struct lu_env *env, + struct osc_object *obj, + pgoff_t index, + enum osc_dap_flags dap_flags) { struct osc_thread_info *info = osc_env_info(env); struct ldlm_res_id *resname = &info->oti_resname;