X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosc%2Fosc_lock.c;h=ef82a95af091394cd238435a0b8710ed582e07af;hb=16f226939f6e0c8fec4d9bf2fa21e9fc97ced251;hp=a0e31904a35eb041983021b571a8ab5b3f2460e6;hpb=daa8e083063c4dc90012c90782947fa1a4c72629;p=fs%2Flustre-release.git diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index a0e3190..ef82a95 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -60,6 +60,8 @@ static const struct cl_lock_operations osc_lock_ops; static const struct cl_lock_operations osc_lock_lockless_ops; +static void osc_lock_to_lockless(const struct lu_env *env, + struct osc_lock *ols, int force); int osc_lock_is_lockless(const struct osc_lock *olck) { @@ -247,7 +249,7 @@ static int osc_enq2ldlm_flags(__u32 enqflags) { int result = 0; - LASSERT((enqflags & ~(CEF_NONBLOCK|CEF_ASYNC|CEF_DISCARD_DATA)) == 0); + LASSERT((enqflags & ~CEF_MASK) == 0); if (enqflags & CEF_NONBLOCK) result |= LDLM_FL_BLOCK_NOWAIT; @@ -303,19 +305,6 @@ static void osc_ast_data_put(const struct lu_env *env, struct osc_lock *olck) cl_lock_put(env, lock); } -static void osc_lock_to_lockless(struct osc_lock *olck) -{ - struct cl_lock_slice *slice = &olck->ols_cl; - struct cl_lock *lock = slice->cls_lock; - - /* - * TODO: Discover which locks we need to convert the lock - * to ldlmlockless. - */ - LASSERT(cl_lock_is_mutexed(lock)); - slice->cls_ops = &osc_lock_lockless_ops; -} - /** * Updates object attributes from a lock value block (lvb) received together * with the DLM lock reply from the server. Copy of osc_update_enqueue() @@ -323,6 +312,8 @@ static void osc_lock_to_lockless(struct osc_lock *olck) * * This can be optimized to not update attributes when lock is a result of a * local match. + * + * Called under lock and resource spin-locks. */ static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck, int rc) @@ -355,6 +346,8 @@ static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck, dlmlock = olck->ols_lock; LASSERT(dlmlock != NULL); + /* re-grab LVB from a dlm lock under DLM spin-locks. */ + *lvb = *(struct ost_lvb *)dlmlock->l_lvb_data; size = lvb->lvb_size; /* Extend KMS up to the end of this lock and no further * A lock on [x,y] means a KMS of up to y + 1 bytes! */ @@ -371,7 +364,7 @@ static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck, lvb->lvb_size, oinfo->loi_kms, dlmlock->l_policy_data.l_extent.end); } - ldlm_lock_allow_match(dlmlock); + ldlm_lock_allow_match_locked(dlmlock); } else if (rc == -ENAVAIL && olck->ols_glimpse) { CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving" " kms="LPU64"\n", lvb->lvb_size, oinfo->loi_kms); @@ -386,6 +379,13 @@ static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck, EXIT; } +/** + * Called when a lock is granted, from an upcall (when server returned a + * granted lock), or from completion AST, when server returned a blocked lock. + * + * Called under lock and resource spin-locks, that are released temporarily + * here. + */ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck, struct ldlm_lock *dlmlock, int rc) { @@ -410,11 +410,19 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck, * tell upper layers the extent of the lock that was actually * granted */ - cl_lock_modify(env, lock, descr); - LINVRNT(osc_lock_invariant(olck)); olck->ols_state = OLS_GRANTED; osc_lock_lvb_update(env, olck, rc); + + /* release DLM spin-locks to allow cl_lock_{modify,signal}() + * to take a semaphore on a parent lock. This is safe, because + * spin-locks are needed to protect consistency of + * dlmlock->l_*_mode and LVB, and we have finished processing + * them. */ + unlock_res_and_lock(dlmlock); + cl_lock_modify(env, lock, descr); cl_lock_signal(env, lock); + LINVRNT(osc_lock_invariant(olck)); + lock_res_and_lock(dlmlock); } EXIT; } @@ -435,7 +443,6 @@ static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck) LASSERT(olck->ols_lock == NULL); olck->ols_lock = dlmlock; spin_unlock(&osc_ast_guard); - unlock_res_and_lock(dlmlock); /* * Lock might be not yet granted. In this case, completion ast @@ -444,6 +451,8 @@ static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck) */ if (dlmlock->l_granted_mode == dlmlock->l_req_mode) osc_lock_granted(env, olck, dlmlock, 0); + unlock_res_and_lock(dlmlock); + /* * osc_enqueue_interpret() decrefs asynchronous locks, counter * this. @@ -519,7 +528,7 @@ static int osc_lock_upcall(void *cookie, int errcode) LASSERT(slice->cls_ops == &osc_lock_ops); /* Change this lock to ldlmlock-less lock. */ - osc_lock_to_lockless(olck); + osc_lock_to_lockless(env, olck, 1); olck->ols_state = OLS_GRANTED; rc = 0; } else if (olck->ols_glimpse && rc == -ENAVAIL) { @@ -762,6 +771,7 @@ static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock, * to lock->l_lvb_data, store it in osc_lock. */ LASSERT(dlmlock->l_lvb_data != NULL); + lock_res_and_lock(dlmlock); olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data; if (olck->ols_lock == NULL) /* @@ -778,6 +788,7 @@ static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock, osc_lock_granted(env, olck, dlmlock, dlmrc); if (dlmrc != 0) cl_lock_error(env, lock, dlmrc); + unlock_res_and_lock(dlmlock); cl_lock_mutex_put(env, lock); osc_ast_data_put(env, olck); result = 0; @@ -998,6 +1009,61 @@ static int osc_lock_cancel_wait(const struct lu_env *env, struct cl_lock *lock, } /** + * Determine if the lock should be converted into a lockless lock. + * + * Steps to check: + * - if the lock has an explicite requirment for a non-lockless lock; + * - if the io lock request type ci_lockreq; + * - send the enqueue rpc to ost to make the further decision; + * - special treat to truncate lockless lock + * + * Additional policy can be implemented here, e.g., never do lockless-io + * for large extents. + */ +static void osc_lock_to_lockless(const struct lu_env *env, + struct osc_lock *ols, int force) +{ + struct cl_lock_slice *slice = &ols->ols_cl; + struct cl_lock *lock = slice->cls_lock; + + LASSERT(ols->ols_state == OLS_NEW || + ols->ols_state == OLS_UPCALL_RECEIVED); + + if (force) { + ols->ols_locklessable = 1; + LASSERT(cl_lock_is_mutexed(lock)); + slice->cls_ops = &osc_lock_lockless_ops; + } else { + struct osc_io *oio = osc_env_io(env); + struct cl_io *io = oio->oi_cl.cis_io; + struct cl_object *obj = slice->cls_obj; + struct osc_object *oob = cl2osc(obj); + const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); + struct obd_connect_data *ocd; + + LASSERT(io->ci_lockreq == CILR_MANDATORY || + io->ci_lockreq == CILR_MAYBE || + io->ci_lockreq == CILR_NEVER); + + ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data; + ols->ols_locklessable = (io->ci_type != CIT_TRUNC) && + (io->ci_lockreq == CILR_MAYBE) && + (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK); + if (io->ci_lockreq == CILR_NEVER || + /* lockless IO */ + (ols->ols_locklessable && osc_object_is_contended(oob)) || + /* lockless truncate */ + (io->ci_type == CIT_TRUNC && + (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) && + osd->od_lockless_truncate)) { + ols->ols_locklessable = 1; + slice->cls_ops = &osc_lock_lockless_ops; + } + } + LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols))); +} + +/** * Cancel all conflicting locks and wait for them to be destroyed. * * This function is used for two purposes: @@ -1190,8 +1256,6 @@ static int osc_lock_enqueue(const struct lu_env *env, osc_lock_build_res(env, obj, resname); osc_lock_build_policy(env, lock, policy); ols->ols_flags = osc_enq2ldlm_flags(enqflags); - if (ols->ols_locklessable) - ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION; if (osc_deadlock_is_possible(env, lock)) ols->ols_flags |= LDLM_FL_BLOCK_GRANTED; if (ols->ols_flags & LDLM_FL_HAS_INTENT) @@ -1199,29 +1263,40 @@ static int osc_lock_enqueue(const struct lu_env *env, result = osc_lock_enqueue_wait(env, ols); if (result == 0) { - /* a reference for lock, passed as an upcall cookie */ - cl_lock_get(lock); - lu_ref_add(&lock->cll_reference, "upcall", lock); - ols->ols_state = OLS_ENQUEUED; + if (!(enqflags & CEF_MUST)) + /* try to convert this lock to a lockless lock */ + osc_lock_to_lockless(env, ols, (enqflags & CEF_NEVER)); + if (!osc_lock_is_lockless(ols)) { + if (ols->ols_locklessable) + ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION; + + /* a reference for lock, passed as an upcall cookie */ + cl_lock_get(lock); + lu_ref_add(&lock->cll_reference, "upcall", lock); + ols->ols_state = OLS_ENQUEUED; - /* - * XXX: this is possible blocking point as - * ldlm_lock_match(LDLM_FL_LVB_READY) waits for - * LDLM_CP_CALLBACK. - */ - result = osc_enqueue_base(osc_export(obj), resname, + /* + * XXX: this is possible blocking point as + * ldlm_lock_match(LDLM_FL_LVB_READY) waits for + * LDLM_CP_CALLBACK. + */ + result = osc_enqueue_base(osc_export(obj), resname, &ols->ols_flags, policy, &ols->ols_lvb, obj->oo_oinfo->loi_kms_valid, osc_lock_upcall, ols, einfo, &ols->ols_handle, PTLRPCD_SET, 1); - if (result != 0) { - lu_ref_del(&lock->cll_reference, "upcall", lock); - cl_lock_put(env, lock); + if (result != 0) { + lu_ref_del(&lock->cll_reference, + "upcall", lock); + cl_lock_put(env, lock); + } + } else { + ols->ols_state = OLS_GRANTED; } } - + LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols))); RETURN(result); } @@ -1473,18 +1548,8 @@ static int osc_lock_lockless_enqueue(const struct lu_env *env, const struct cl_lock_slice *slice, struct cl_io *_, __u32 enqflags) { - struct osc_lock *ols = cl2osc_lock(slice); - struct cl_lock *lock = ols->ols_cl.cls_lock; - int result; - - LASSERT(cl_lock_is_mutexed(lock)); - LASSERT(lock->cll_state == CLS_QUEUING); - LASSERT(ols->ols_state == OLS_NEW); - - result = osc_lock_enqueue_wait(env, ols); - if (result == 0) - ols->ols_state = OLS_GRANTED; - return result; + LBUG(); + return 0; } static int osc_lock_lockless_unuse(const struct lu_env *env, @@ -1537,7 +1602,11 @@ static void osc_lock_lockless_state(const struct lu_env *env, if (state == CLS_HELD) { LASSERT(lock->ols_owner == NULL); lock->ols_owner = oio; - oio->oi_lockless = 1; + + /* set the io to be lockless if this lock is for io's + * host object */ + if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj)) + oio->oi_lockless = 1; } else lock->ols_owner = NULL; } @@ -1563,56 +1632,16 @@ static const struct cl_lock_operations osc_lock_lockless_ops = { int osc_lock_init(const struct lu_env *env, struct cl_object *obj, struct cl_lock *lock, - const struct cl_io *io) + const struct cl_io *_) { - struct osc_lock *clk; - struct osc_io *oio = osc_env_io(env); - struct osc_object *oob = cl2osc(obj); + struct osc_lock *clk; int result; - OBD_SLAB_ALLOC_PTR(clk, osc_lock_kmem); + OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, CFS_ALLOC_IO); if (clk != NULL) { - const struct cl_lock_operations *ops; - const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); - struct obd_connect_data *ocd; - osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo); clk->ols_state = OLS_NEW; - - /* - * Check if we need to do lockless IO here. - * Following conditions must be satisfied: - * - the current IO must be locklessable; - * - the stripe is in contention; - * - requested lock is not a glimpse. - * - * if not, we have to inherit the locklessable flag to - * osc_lock, and let ost make the decision. - * - * Additional policy can be implemented here, e.g., never do - * lockless-io for large extents. - */ - LASSERT(io->ci_lockreq == CILR_MANDATORY || - io->ci_lockreq == CILR_MAYBE || - io->ci_lockreq == CILR_NEVER); - ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data; - clk->ols_locklessable = (io->ci_type != CIT_TRUNC) && - (io->ci_lockreq == CILR_MAYBE) && - (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK); - ops = &osc_lock_ops; - if (io->ci_lockreq == CILR_NEVER || - /* lockless IO */ - (clk->ols_locklessable && osc_object_is_contended(oob)) || - /* lockless truncate */ - (io->ci_type == CIT_TRUNC && - (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) && - osd->od_lockless_truncate)) { - ops = &osc_lock_lockless_ops; - oio->oi_lockless = 1; - clk->ols_locklessable = 1; - } - - cl_lock_slice_add(lock, &clk->ols_cl, obj, ops); + cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops); result = 0; } else result = -ENOMEM;