static const struct cl_lock_operations osc_lock_ops;
static const struct cl_lock_operations osc_lock_lockless_ops;
+static void osc_lock_to_lockless(const struct lu_env *env,
+ struct osc_lock *ols, int force);
int osc_lock_is_lockless(const struct osc_lock *olck)
{
{
int result = 0;
- LASSERT((enqflags & ~(CEF_NONBLOCK|CEF_ASYNC|CEF_DISCARD_DATA)) == 0);
+ LASSERT((enqflags & ~CEF_MASK) == 0);
if (enqflags & CEF_NONBLOCK)
result |= LDLM_FL_BLOCK_NOWAIT;
cl_lock_put(env, lock);
}
-static void osc_lock_to_lockless(struct osc_lock *olck)
-{
- struct cl_lock_slice *slice = &olck->ols_cl;
- struct cl_lock *lock = slice->cls_lock;
-
- /*
- * TODO: Discover which locks we need to convert the lock
- * to ldlmlockless.
- */
- LASSERT(cl_lock_is_mutexed(lock));
- slice->cls_ops = &osc_lock_lockless_ops;
-}
-
/**
* Updates object attributes from a lock value block (lvb) received together
* with the DLM lock reply from the server. Copy of osc_update_enqueue()
*
* This can be optimized to not update attributes when lock is a result of a
* local match.
+ *
+ * Called under lock and resource spin-locks.
*/
static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck,
int rc)
dlmlock = olck->ols_lock;
LASSERT(dlmlock != NULL);
+ /* re-grab LVB from a dlm lock under DLM spin-locks. */
+ *lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
size = lvb->lvb_size;
/* Extend KMS up to the end of this lock and no further
* A lock on [x,y] means a KMS of up to y + 1 bytes! */
lvb->lvb_size, oinfo->loi_kms,
dlmlock->l_policy_data.l_extent.end);
}
- ldlm_lock_allow_match(dlmlock);
+ ldlm_lock_allow_match_locked(dlmlock);
} else if (rc == -ENAVAIL && olck->ols_glimpse) {
CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
" kms="LPU64"\n", lvb->lvb_size, oinfo->loi_kms);
EXIT;
}
+/**
+ * Called when a lock is granted, from an upcall (when server returned a
+ * granted lock), or from completion AST, when server returned a blocked lock.
+ *
+ * Called under lock and resource spin-locks, that are released temporarily
+ * here.
+ */
static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck,
struct ldlm_lock *dlmlock, int rc)
{
* tell upper layers the extent of the lock that was actually
* granted
*/
- cl_lock_modify(env, lock, descr);
- LINVRNT(osc_lock_invariant(olck));
olck->ols_state = OLS_GRANTED;
osc_lock_lvb_update(env, olck, rc);
+
+ /* release DLM spin-locks to allow cl_lock_{modify,signal}()
+ * to take a semaphore on a parent lock. This is safe, because
+ * spin-locks are needed to protect consistency of
+ * dlmlock->l_*_mode and LVB, and we have finished processing
+ * them. */
+ unlock_res_and_lock(dlmlock);
+ cl_lock_modify(env, lock, descr);
cl_lock_signal(env, lock);
+ LINVRNT(osc_lock_invariant(olck));
+ lock_res_and_lock(dlmlock);
}
EXIT;
}
LASSERT(olck->ols_lock == NULL);
olck->ols_lock = dlmlock;
spin_unlock(&osc_ast_guard);
- unlock_res_and_lock(dlmlock);
/*
* Lock might be not yet granted. In this case, completion ast
*/
if (dlmlock->l_granted_mode == dlmlock->l_req_mode)
osc_lock_granted(env, olck, dlmlock, 0);
+ unlock_res_and_lock(dlmlock);
+
/*
* osc_enqueue_interpret() decrefs asynchronous locks, counter
* this.
LASSERT(slice->cls_ops == &osc_lock_ops);
/* Change this lock to ldlmlock-less lock. */
- osc_lock_to_lockless(olck);
+ osc_lock_to_lockless(env, olck, 1);
olck->ols_state = OLS_GRANTED;
rc = 0;
} else if (olck->ols_glimpse && rc == -ENAVAIL) {
* to lock->l_lvb_data, store it in osc_lock.
*/
LASSERT(dlmlock->l_lvb_data != NULL);
+ lock_res_and_lock(dlmlock);
olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
if (olck->ols_lock == NULL)
/*
osc_lock_granted(env, olck, dlmlock, dlmrc);
if (dlmrc != 0)
cl_lock_error(env, lock, dlmrc);
+ unlock_res_and_lock(dlmlock);
cl_lock_mutex_put(env, lock);
osc_ast_data_put(env, olck);
result = 0;
}
/**
+ * Determine if the lock should be converted into a lockless lock.
+ *
+ * Steps to check:
+ * - if the lock has an explicite requirment for a non-lockless lock;
+ * - if the io lock request type ci_lockreq;
+ * - send the enqueue rpc to ost to make the further decision;
+ * - special treat to truncate lockless lock
+ *
+ * Additional policy can be implemented here, e.g., never do lockless-io
+ * for large extents.
+ */
+static void osc_lock_to_lockless(const struct lu_env *env,
+ struct osc_lock *ols, int force)
+{
+ struct cl_lock_slice *slice = &ols->ols_cl;
+ struct cl_lock *lock = slice->cls_lock;
+
+ LASSERT(ols->ols_state == OLS_NEW ||
+ ols->ols_state == OLS_UPCALL_RECEIVED);
+
+ if (force) {
+ ols->ols_locklessable = 1;
+ LASSERT(cl_lock_is_mutexed(lock));
+ slice->cls_ops = &osc_lock_lockless_ops;
+ } else {
+ struct osc_io *oio = osc_env_io(env);
+ struct cl_io *io = oio->oi_cl.cis_io;
+ struct cl_object *obj = slice->cls_obj;
+ struct osc_object *oob = cl2osc(obj);
+ const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
+ struct obd_connect_data *ocd;
+
+ LASSERT(io->ci_lockreq == CILR_MANDATORY ||
+ io->ci_lockreq == CILR_MAYBE ||
+ io->ci_lockreq == CILR_NEVER);
+
+ ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
+ ols->ols_locklessable = (io->ci_type != CIT_TRUNC) &&
+ (io->ci_lockreq == CILR_MAYBE) &&
+ (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
+ if (io->ci_lockreq == CILR_NEVER ||
+ /* lockless IO */
+ (ols->ols_locklessable && osc_object_is_contended(oob)) ||
+ /* lockless truncate */
+ (io->ci_type == CIT_TRUNC &&
+ (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
+ osd->od_lockless_truncate)) {
+ ols->ols_locklessable = 1;
+ slice->cls_ops = &osc_lock_lockless_ops;
+ }
+ }
+ LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
+}
+
+/**
* Cancel all conflicting locks and wait for them to be destroyed.
*
* This function is used for two purposes:
osc_lock_build_res(env, obj, resname);
osc_lock_build_policy(env, lock, policy);
ols->ols_flags = osc_enq2ldlm_flags(enqflags);
- if (ols->ols_locklessable)
- ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
if (osc_deadlock_is_possible(env, lock))
ols->ols_flags |= LDLM_FL_BLOCK_GRANTED;
if (ols->ols_flags & LDLM_FL_HAS_INTENT)
result = osc_lock_enqueue_wait(env, ols);
if (result == 0) {
- /* a reference for lock, passed as an upcall cookie */
- cl_lock_get(lock);
- lu_ref_add(&lock->cll_reference, "upcall", lock);
- ols->ols_state = OLS_ENQUEUED;
+ if (!(enqflags & CEF_MUST))
+ /* try to convert this lock to a lockless lock */
+ osc_lock_to_lockless(env, ols, (enqflags & CEF_NEVER));
+ if (!osc_lock_is_lockless(ols)) {
+ if (ols->ols_locklessable)
+ ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
+
+ /* a reference for lock, passed as an upcall cookie */
+ cl_lock_get(lock);
+ lu_ref_add(&lock->cll_reference, "upcall", lock);
+ ols->ols_state = OLS_ENQUEUED;
- /*
- * XXX: this is possible blocking point as
- * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
- * LDLM_CP_CALLBACK.
- */
- result = osc_enqueue_base(osc_export(obj), resname,
+ /*
+ * XXX: this is possible blocking point as
+ * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
+ * LDLM_CP_CALLBACK.
+ */
+ result = osc_enqueue_base(osc_export(obj), resname,
&ols->ols_flags, policy,
&ols->ols_lvb,
obj->oo_oinfo->loi_kms_valid,
osc_lock_upcall,
ols, einfo, &ols->ols_handle,
PTLRPCD_SET, 1);
- if (result != 0) {
- lu_ref_del(&lock->cll_reference, "upcall", lock);
- cl_lock_put(env, lock);
+ if (result != 0) {
+ lu_ref_del(&lock->cll_reference,
+ "upcall", lock);
+ cl_lock_put(env, lock);
+ }
+ } else {
+ ols->ols_state = OLS_GRANTED;
}
}
-
+ LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
RETURN(result);
}
const struct cl_lock_slice *slice,
struct cl_io *_, __u32 enqflags)
{
- struct osc_lock *ols = cl2osc_lock(slice);
- struct cl_lock *lock = ols->ols_cl.cls_lock;
- int result;
-
- LASSERT(cl_lock_is_mutexed(lock));
- LASSERT(lock->cll_state == CLS_QUEUING);
- LASSERT(ols->ols_state == OLS_NEW);
-
- result = osc_lock_enqueue_wait(env, ols);
- if (result == 0)
- ols->ols_state = OLS_GRANTED;
- return result;
+ LBUG();
+ return 0;
}
static int osc_lock_lockless_unuse(const struct lu_env *env,
if (state == CLS_HELD) {
LASSERT(lock->ols_owner == NULL);
lock->ols_owner = oio;
- oio->oi_lockless = 1;
+
+ /* set the io to be lockless if this lock is for io's
+ * host object */
+ if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj))
+ oio->oi_lockless = 1;
} else
lock->ols_owner = NULL;
}
int osc_lock_init(const struct lu_env *env,
struct cl_object *obj, struct cl_lock *lock,
- const struct cl_io *io)
+ const struct cl_io *_)
{
- struct osc_lock *clk;
- struct osc_io *oio = osc_env_io(env);
- struct osc_object *oob = cl2osc(obj);
+ struct osc_lock *clk;
int result;
- OBD_SLAB_ALLOC_PTR(clk, osc_lock_kmem);
+ OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, CFS_ALLOC_IO);
if (clk != NULL) {
- const struct cl_lock_operations *ops;
- const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
- struct obd_connect_data *ocd;
-
osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
clk->ols_state = OLS_NEW;
-
- /*
- * Check if we need to do lockless IO here.
- * Following conditions must be satisfied:
- * - the current IO must be locklessable;
- * - the stripe is in contention;
- * - requested lock is not a glimpse.
- *
- * if not, we have to inherit the locklessable flag to
- * osc_lock, and let ost make the decision.
- *
- * Additional policy can be implemented here, e.g., never do
- * lockless-io for large extents.
- */
- LASSERT(io->ci_lockreq == CILR_MANDATORY ||
- io->ci_lockreq == CILR_MAYBE ||
- io->ci_lockreq == CILR_NEVER);
- ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
- clk->ols_locklessable = (io->ci_type != CIT_TRUNC) &&
- (io->ci_lockreq == CILR_MAYBE) &&
- (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
- ops = &osc_lock_ops;
- if (io->ci_lockreq == CILR_NEVER ||
- /* lockless IO */
- (clk->ols_locklessable && osc_object_is_contended(oob)) ||
- /* lockless truncate */
- (io->ci_type == CIT_TRUNC &&
- (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
- osd->od_lockless_truncate)) {
- ops = &osc_lock_lockless_ops;
- oio->oi_lockless = 1;
- clk->ols_locklessable = 1;
- }
-
- cl_lock_slice_add(lock, &clk->ols_cl, obj, ops);
+ cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops);
result = 0;
} else
result = -ENOMEM;