X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fosc%2Fosc_lock.c;h=6fd75daa92c0b1073ffb5e61607971dabff7fe18;hb=a8dcf372f430c308d3e96fb506563068d0a80c2d;hp=35d02cc077931dc4a50c822d4e34e9d1c5ae5fee;hpb=72057a3af19ee02d9a686bd7e7d074917e381310;p=fs%2Flustre-release.git diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index 35d02cc..6fd75da 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -23,7 +23,7 @@ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2015, Intel Corporation. + * Copyright (c) 2011, 2016, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -40,8 +40,9 @@ #include /* fid_build_reg_res_name() */ #include +#include -#include "osc_cl_internal.h" +#include "osc_internal.h" /** \addtogroup osc * @{ @@ -159,11 +160,13 @@ static __u64 osc_enq2ldlm_flags(__u32 enqflags) { __u64 result = 0; + CDEBUG(D_DLMTRACE, "flags: %x\n", enqflags); + LASSERT((enqflags & ~CEF_MASK) == 0); if (enqflags & CEF_NONBLOCK) result |= LDLM_FL_BLOCK_NOWAIT; - if (enqflags & CEF_ASYNC) + if (enqflags & CEF_GLIMPSE) result |= LDLM_FL_HAS_INTENT; if (enqflags & CEF_DISCARD_DATA) result |= LDLM_FL_AST_DISCARD_DATA; @@ -171,6 +174,10 @@ static __u64 osc_enq2ldlm_flags(__u32 enqflags) result |= LDLM_FL_TEST_LOCK; if (enqflags & CEF_LOCK_MATCH) result |= LDLM_FL_MATCH_LOCK; + if (enqflags & CEF_LOCK_NO_EXPAND) + result |= LDLM_FL_NO_EXPANSION; + if (enqflags & CEF_SPECULATIVE) + result |= LDLM_FL_SPECULATIVE; return result; } @@ -302,11 +309,10 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh, struct cl_lock_slice *slice = &oscl->ols_cl; struct lu_env *env; int rc; - __u16 refcheck; ENTRY; - env = cl_env_get(&refcheck); + env = cl_env_percpu_get(); /* should never happen, similar to osc_ldlm_blocking_ast(). */ LASSERT(!IS_ERR(env)); @@ -345,13 +351,14 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh, if (oscl->ols_owner != NULL) cl_sync_io_note(env, oscl->ols_owner, rc); - cl_env_put(env, &refcheck); + cl_env_percpu_put(env); RETURN(rc); } -static int osc_lock_upcall_agl(void *cookie, struct lustre_handle *lockh, - int errcode) +static int osc_lock_upcall_speculative(void *cookie, + struct lustre_handle *lockh, + int errcode) { struct osc_object *osc = cookie; struct ldlm_lock *dlmlock; @@ -374,7 +381,7 @@ static int osc_lock_upcall_agl(void *cookie, struct lustre_handle *lockh, lock_res_and_lock(dlmlock); LASSERT(dlmlock->l_granted_mode == dlmlock->l_req_mode); - /* there is no osc_lock associated with AGL lock */ + /* there is no osc_lock associated with speculative locks */ osc_lock_lvb_update(env, osc, dlmlock, NULL); unlock_res_and_lock(dlmlock); @@ -387,7 +394,7 @@ out: } static int osc_lock_flush(struct osc_object *obj, pgoff_t start, pgoff_t end, - enum cl_lock_mode mode, int discard) + enum cl_lock_mode mode, bool discard) { struct lu_env *env; __u16 refcheck; @@ -410,7 +417,7 @@ static int osc_lock_flush(struct osc_object *obj, pgoff_t start, pgoff_t end, rc = 0; } - rc2 = osc_lock_discard_pages(env, obj, start, end, mode); + rc2 = osc_lock_discard_pages(env, obj, start, end, discard); if (rc == 0 && rc2 < 0) rc = rc2; @@ -428,7 +435,7 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env, { struct cl_object *obj = NULL; int result = 0; - int discard; + bool discard; enum cl_lock_mode mode = CLM_READ; ENTRY; @@ -711,7 +718,12 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock) RETURN(1); LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT); + lock_res_and_lock(dlmlock); obj = dlmlock->l_ast_data; + if (obj) + cl_object_get(osc2cl(obj)); + unlock_res_and_lock(dlmlock); + if (obj == NULL) GOTO(out, weight = 1); @@ -733,6 +745,9 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock) EXIT; out: + if (obj) + cl_object_put(env, osc2cl(obj)); + cl_env_put(env, &refcheck); return weight; } @@ -809,7 +824,7 @@ static bool osc_lock_compatible(const struct osc_lock *qing, struct cl_lock_descr *qed_descr = &qed->ols_cl.cls_lock->cll_descr; struct cl_lock_descr *qing_descr = &qing->ols_cl.cls_lock->cll_descr; - if (qed->ols_glimpse) + if (qed->ols_glimpse || qed->ols_speculative) return true; if (qing_descr->cld_mode == CLM_READ && qed_descr->cld_mode == CLM_READ) @@ -927,6 +942,7 @@ static int osc_lock_enqueue(const struct lu_env *env, struct osc_io *oio = osc_env_io(env); struct osc_object *osc = cl2osc(slice->cls_obj); struct osc_lock *oscl = cl2osc_lock(slice); + struct obd_export *exp = osc_export(osc); struct cl_lock *lock = slice->cls_lock; struct ldlm_res_id *resname = &info->oti_resname; union ldlm_policy_data *policy = &info->oti_policy; @@ -943,11 +959,22 @@ static int osc_lock_enqueue(const struct lu_env *env, if (oscl->ols_state == OLS_GRANTED) RETURN(0); + if ((oscl->ols_flags & LDLM_FL_NO_EXPANSION) && + !(exp_connect_lockahead_old(exp) || exp_connect_lockahead(exp))) { + result = -EOPNOTSUPP; + CERROR("%s: server does not support lockahead/locknoexpand:" + "rc = %d\n", exp->exp_obd->obd_name, result); + RETURN(result); + } + if (oscl->ols_flags & LDLM_FL_TEST_LOCK) GOTO(enqueue_base, 0); - if (oscl->ols_glimpse) { - LASSERT(equi(oscl->ols_agl, anchor == NULL)); + /* For glimpse and/or speculative locks, do not wait for reply from + * server on LDLM request */ + if (oscl->ols_glimpse || oscl->ols_speculative) { + /* Speculative and glimpse locks do not have an anchor */ + LASSERT(equi(oscl->ols_speculative, anchor == NULL)); async = true; GOTO(enqueue_base, 0); } @@ -973,25 +1000,31 @@ enqueue_base: /** * DLM lock's ast data must be osc_object; - * if glimpse or AGL lock, async of osc_enqueue_base() must be true, + * if glimpse or speculative lock, async of osc_enqueue_base() + * must be true + * + * For non-speculative locks: * DLM's enqueue callback set to osc_lock_upcall() with cookie as * osc_lock. + * For speculative locks: + * osc_lock_upcall_speculative & cookie is the osc object, since + * there is no osc_lock */ ostid_build_res_name(&osc->oo_oinfo->loi_oi, resname); osc_lock_build_policy(env, lock, policy); - if (oscl->ols_agl) { + if (oscl->ols_speculative) { oscl->ols_einfo.ei_cbdata = NULL; /* hold a reference for callback */ cl_object_get(osc2cl(osc)); - upcall = osc_lock_upcall_agl; + upcall = osc_lock_upcall_speculative; cookie = osc; } - result = osc_enqueue_base(osc_export(osc), resname, &oscl->ols_flags, + result = osc_enqueue_base(exp, resname, &oscl->ols_flags, policy, &oscl->ols_lvb, osc->oo_oinfo->loi_kms_valid, upcall, cookie, &oscl->ols_einfo, PTLRPCD_SET, async, - oscl->ols_agl); + oscl->ols_speculative); if (result == 0) { if (osc_lock_is_lockless(oscl)) { oio->oi_lockless = 1; @@ -1000,9 +1033,12 @@ enqueue_base: LASSERT(oscl->ols_hold); LASSERT(oscl->ols_dlmlock != NULL); } - } else if (oscl->ols_agl) { + } else if (oscl->ols_speculative) { cl_object_put(env, osc2cl(osc)); - result = 0; + if (oscl->ols_glimpse) { + /* hide error for AGL request */ + result = 0; + } } out: @@ -1107,7 +1143,7 @@ static void osc_lock_lockless_cancel(const struct lu_env *env, LASSERT(ols->ols_dlmlock == NULL); result = osc_lock_flush(osc, descr->cld_start, descr->cld_end, - descr->cld_mode, 0); + descr->cld_mode, false); if (result) CERROR("Pages for lockless lock %p were not purged(%d)\n", ols, result); @@ -1134,20 +1170,17 @@ static void osc_lock_set_writer(const struct lu_env *env, return; if (likely(io->ci_type == CIT_WRITE)) { - io_start = cl_index(obj, io->u.ci_rw.crw_pos); - io_end = cl_index(obj, io->u.ci_rw.crw_pos + - io->u.ci_rw.crw_count - 1); - if (cl_io_is_append(io)) { - io_start = 0; - io_end = CL_PAGE_EOF; - } + io_start = cl_index(obj, io->u.ci_rw.rw_range.cir_pos); + io_end = cl_index(obj, io->u.ci_rw.rw_range.cir_pos + + io->u.ci_rw.rw_range.cir_count - 1); } else { LASSERT(cl_io_is_mkwrite(io)); io_start = io_end = io->u.ci_fault.ft_index; } if (descr->cld_mode >= CLM_WRITE && - descr->cld_start <= io_start && descr->cld_end >= io_end) { + (cl_io_is_append(io) || + (descr->cld_start <= io_start && descr->cld_end >= io_end))) { struct osc_io *oio = osc_env_io(env); /* There must be only one lock to match the write region */ @@ -1173,10 +1206,15 @@ int osc_lock_init(const struct lu_env *env, INIT_LIST_HEAD(&oscl->ols_wait_entry); INIT_LIST_HEAD(&oscl->ols_nextlock_oscobj); + /* Speculative lock requests must be either no_expand or glimpse + * request (CEF_GLIMPSE). non-glimpse no_expand speculative extent + * locks will break ofd_intent_cb. (see comment there)*/ + LASSERT(ergo((enqflags & CEF_SPECULATIVE) != 0, + (enqflags & (CEF_LOCK_NO_EXPAND | CEF_GLIMPSE)) != 0)); + oscl->ols_flags = osc_enq2ldlm_flags(enqflags); - oscl->ols_agl = !!(enqflags & CEF_AGL); - if (oscl->ols_agl) - oscl->ols_flags |= LDLM_FL_BLOCK_NOWAIT; + oscl->ols_speculative = !!(enqflags & CEF_SPECULATIVE); + if (oscl->ols_flags & LDLM_FL_HAS_INTENT) { oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED; oscl->ols_glimpse = 1;