static const struct cl_lock_operations osc_lock_ops;
static const struct cl_lock_operations osc_lock_lockless_ops;
+static void osc_lock_to_lockless(const struct lu_env *env,
+ struct osc_lock *ols, int force);
+static int osc_lock_has_pages(struct osc_lock *olck);
int osc_lock_is_lockless(const struct osc_lock *olck)
{
*/
if (ols->ols_hold)
osc_lock_unuse(env, slice);
- if (ols->ols_lock != NULL)
- osc_lock_detach(env, ols);
+ LASSERT(ols->ols_lock == NULL);
OBD_SLAB_FREE_PTR(ols, osc_lock_kmem);
}
const struct cl_lock_descr *d = &lock->cll_descr;
osc_index2policy(policy, d->cld_obj, d->cld_start, d->cld_end);
+ policy->l_extent.gid = d->cld_gid;
}
static int osc_enq2ldlm_flags(__u32 enqflags)
{
int result = 0;
- LASSERT((enqflags & ~(CEF_NONBLOCK|CEF_ASYNC|CEF_DISCARD_DATA)) == 0);
+ LASSERT((enqflags & ~CEF_MASK) == 0);
if (enqflags & CEF_NONBLOCK)
result |= LDLM_FL_BLOCK_NOWAIT;
cl_lock_put(env, lock);
}
-static void osc_lock_to_lockless(struct osc_lock *olck)
-{
- struct cl_lock_slice *slice = &olck->ols_cl;
- struct cl_lock *lock = slice->cls_lock;
-
- /*
- * TODO: Discover which locks we need to convert the lock
- * to ldlmlockless.
- */
- LASSERT(cl_lock_is_mutexed(lock));
- slice->cls_ops = &osc_lock_lockless_ops;
-}
-
/**
* Updates object attributes from a lock value block (lvb) received together
* with the DLM lock reply from the server. Copy of osc_update_enqueue()
*
* This can be optimized to not update attributes when lock is a result of a
* local match.
+ *
+ * Called under lock and resource spin-locks.
*/
static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck,
int rc)
dlmlock = olck->ols_lock;
LASSERT(dlmlock != NULL);
+ /* re-grab LVB from a dlm lock under DLM spin-locks. */
+ *lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
size = lvb->lvb_size;
/* Extend KMS up to the end of this lock and no further
* A lock on [x,y] means a KMS of up to y + 1 bytes! */
lvb->lvb_size, oinfo->loi_kms,
dlmlock->l_policy_data.l_extent.end);
}
- ldlm_lock_allow_match(dlmlock);
+ ldlm_lock_allow_match_locked(dlmlock);
} else if (rc == -ENAVAIL && olck->ols_glimpse) {
CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
" kms="LPU64"\n", lvb->lvb_size, oinfo->loi_kms);
EXIT;
}
+/**
+ * Called when a lock is granted, from an upcall (when server returned a
+ * granted lock), or from completion AST, when server returned a blocked lock.
+ *
+ * Called under lock and resource spin-locks, that are released temporarily
+ * here.
+ */
static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck,
struct ldlm_lock *dlmlock, int rc)
{
descr->cld_mode = osc_ldlm2cl_lock(dlmlock->l_granted_mode);
descr->cld_start = cl_index(descr->cld_obj, ext->start);
descr->cld_end = cl_index(descr->cld_obj, ext->end);
+ descr->cld_gid = ext->gid;
/*
* tell upper layers the extent of the lock that was actually
* granted
*/
- cl_lock_modify(env, lock, descr);
- LINVRNT(osc_lock_invariant(olck));
olck->ols_state = OLS_GRANTED;
osc_lock_lvb_update(env, olck, rc);
+
+ /* release DLM spin-locks to allow cl_lock_{modify,signal}()
+ * to take a semaphore on a parent lock. This is safe, because
+ * spin-locks are needed to protect consistency of
+ * dlmlock->l_*_mode and LVB, and we have finished processing
+ * them. */
+ unlock_res_and_lock(dlmlock);
+ cl_lock_modify(env, lock, descr);
cl_lock_signal(env, lock);
+ LINVRNT(osc_lock_invariant(olck));
+ lock_res_and_lock(dlmlock);
}
EXIT;
}
LASSERT(olck->ols_lock == NULL);
olck->ols_lock = dlmlock;
spin_unlock(&osc_ast_guard);
- unlock_res_and_lock(dlmlock);
/*
* Lock might be not yet granted. In this case, completion ast
*/
if (dlmlock->l_granted_mode == dlmlock->l_req_mode)
osc_lock_granted(env, olck, dlmlock, 0);
+ unlock_res_and_lock(dlmlock);
+
/*
* osc_enqueue_interpret() decrefs asynchronous locks, counter
* this.
*/
static int osc_lock_upcall(void *cookie, int errcode)
{
- struct osc_lock *olck = cookie;
- struct cl_lock_slice *slice = &olck->ols_cl;
- struct cl_lock *lock = slice->cls_lock;
- struct lu_env *env;
-
- int refcheck;
+ struct osc_lock *olck = cookie;
+ struct cl_lock_slice *slice = &olck->ols_cl;
+ struct cl_lock *lock = slice->cls_lock;
+ struct lu_env *env;
+ struct cl_env_nest nest;
ENTRY;
- /*
- * XXX environment should be created in ptlrpcd.
- */
- env = cl_env_get(&refcheck);
+ env = cl_env_nested_get(&nest);
if (!IS_ERR(env)) {
int rc;
LASSERT(slice->cls_ops == &osc_lock_ops);
/* Change this lock to ldlmlock-less lock. */
- osc_lock_to_lockless(olck);
+ osc_lock_to_lockless(env, olck, 1);
olck->ols_state = OLS_GRANTED;
rc = 0;
} else if (olck->ols_glimpse && rc == -ENAVAIL) {
/* release cookie reference, acquired by osc_lock_enqueue() */
lu_ref_del(&lock->cll_reference, "upcall", lock);
cl_lock_put(env, lock);
- cl_env_put(env, &refcheck);
+ cl_env_nested_put(&nest, env);
} else
/* should never happen, similar to osc_ldlm_blocking_ast(). */
LBUG();
* new environment has to be created to not corrupt outer context.
*/
env = cl_env_nested_get(&nest);
- if (!IS_ERR(env))
+ if (!IS_ERR(env)) {
result = osc_dlm_blocking_ast0(env, dlmlock, data, flag);
- else {
+ cl_env_nested_put(&nest, env);
+ } else {
result = PTR_ERR(env);
/*
* XXX This should never happen, as cl_lock is
else
CERROR("BAST failed: %d\n", result);
}
- cl_env_nested_put(&nest, env);
return result;
}
static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock,
int flags, void *data)
{
- struct lu_env *env;
- void *env_cookie;
- struct osc_lock *olck;
- struct cl_lock *lock;
- int refcheck;
+ struct cl_env_nest nest;
+ struct lu_env *env;
+ struct osc_lock *olck;
+ struct cl_lock *lock;
int result;
int dlmrc;
/* first, do dlm part of the work */
dlmrc = ldlm_completion_ast_async(dlmlock, flags, data);
/* then, notify cl_lock */
- env_cookie = cl_env_reenter();
- env = cl_env_get(&refcheck);
+ env = cl_env_nested_get(&nest);
if (!IS_ERR(env)) {
olck = osc_ast_data_get(dlmlock);
if (olck != NULL) {
* to lock->l_lvb_data, store it in osc_lock.
*/
LASSERT(dlmlock->l_lvb_data != NULL);
+ lock_res_and_lock(dlmlock);
olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
if (olck->ols_lock == NULL)
/*
;
else if (dlmlock->l_granted_mode != LCK_MINMODE)
osc_lock_granted(env, olck, dlmlock, dlmrc);
+ unlock_res_and_lock(dlmlock);
if (dlmrc != 0)
cl_lock_error(env, lock, dlmrc);
cl_lock_mutex_put(env, lock);
result = 0;
} else
result = -ELDLM_NO_LOCK_DATA;
- cl_env_put(env, &refcheck);
+ cl_env_nested_put(&nest, env);
} else
result = PTR_ERR(env);
- cl_env_reexit(env_cookie);
return dlmrc ?: result;
}
struct osc_lock *olck;
struct cl_lock *lock;
struct cl_object *obj;
+ struct cl_env_nest nest;
struct lu_env *env;
struct ost_lvb *lvb;
struct req_capsule *cap;
int result;
- int refcheck;
LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
- env = cl_env_get(&refcheck);
+ env = cl_env_nested_get(&nest);
if (!IS_ERR(env)) {
/*
* osc_ast_data_get() has to go after environment is
lustre_pack_reply(req, 1, NULL, NULL);
result = -ELDLM_NO_LOCK_DATA;
}
- cl_env_put(env, &refcheck);
+ cl_env_nested_put(&nest, env);
} else
result = PTR_ERR(env);
req->rq_status = result;
*/
static unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
{
+ struct cl_env_nest nest;
struct lu_env *env;
- int refcheck;
- void *cookie;
struct osc_lock *lock;
struct cl_lock *cll;
unsigned long weight;
ENTRY;
might_sleep();
- cookie = cl_env_reenter();
/*
* osc_ldlm_weigh_ast has a complex context since it might be called
* because of lock canceling, or from user's input. We have to make
* the upper context because cl_lock_put don't modify environment
* variables. But in case of ..
*/
- env = cl_env_get(&refcheck);
- if (IS_ERR(env)) {
+ env = cl_env_nested_get(&nest);
+ if (IS_ERR(env))
/* Mostly because lack of memory, tend to eliminate this lock*/
- cl_env_reexit(cookie);
RETURN(0);
- }
LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT);
lock = osc_ast_data_get(dlmlock);
EXIT;
out:
- cl_env_put(env, &refcheck);
- cl_env_reexit(cookie);
+ cl_env_nested_put(&nest, env);
return weight;
}
einfo->ei_cbdata = lock; /* value to be put into ->l_ast_data */
}
+static int osc_lock_delete0(struct cl_lock *conflict)
+{
+ struct cl_env_nest nest;
+ struct lu_env *env;
+ int rc = 0;
+
+ env = cl_env_nested_get(&nest);
+ if (!IS_ERR(env)) {
+ cl_lock_delete(env, conflict);
+ cl_env_nested_put(&nest, env);
+ } else
+ rc = PTR_ERR(env);
+ return rc;
+}
/**
* Cancels \a conflict lock and waits until it reached CLS_FREEING state. This
* is called as a part of enqueuing to cancel conflicting locks early.
rc = 0;
if (conflict->cll_state != CLS_FREEING) {
cl_lock_cancel(env, conflict);
- cl_lock_delete(env, conflict);
+ rc = osc_lock_delete0(conflict);
+ if (rc)
+ return rc;
if (conflict->cll_flags & (CLF_CANCELPEND|CLF_DOOMED)) {
rc = -EWOULDBLOCK;
if (cl_lock_nr_mutexed(env) > 2)
}
/**
+ * Determine if the lock should be converted into a lockless lock.
+ *
+ * Steps to check:
+ * - if the lock has an explicite requirment for a non-lockless lock;
+ * - if the io lock request type ci_lockreq;
+ * - send the enqueue rpc to ost to make the further decision;
+ * - special treat to truncate lockless lock
+ *
+ * Additional policy can be implemented here, e.g., never do lockless-io
+ * for large extents.
+ */
+static void osc_lock_to_lockless(const struct lu_env *env,
+ struct osc_lock *ols, int force)
+{
+ struct cl_lock_slice *slice = &ols->ols_cl;
+ struct cl_lock *lock = slice->cls_lock;
+
+ LASSERT(ols->ols_state == OLS_NEW ||
+ ols->ols_state == OLS_UPCALL_RECEIVED);
+
+ if (force) {
+ ols->ols_locklessable = 1;
+ LASSERT(cl_lock_is_mutexed(lock));
+ slice->cls_ops = &osc_lock_lockless_ops;
+ } else {
+ struct osc_io *oio = osc_env_io(env);
+ struct cl_io *io = oio->oi_cl.cis_io;
+ struct cl_object *obj = slice->cls_obj;
+ struct osc_object *oob = cl2osc(obj);
+ const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
+ struct obd_connect_data *ocd;
+
+ LASSERT(io->ci_lockreq == CILR_MANDATORY ||
+ io->ci_lockreq == CILR_MAYBE ||
+ io->ci_lockreq == CILR_NEVER);
+
+ ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
+ ols->ols_locklessable = (io->ci_type != CIT_TRUNC) &&
+ (io->ci_lockreq == CILR_MAYBE) &&
+ (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
+ if (io->ci_lockreq == CILR_NEVER ||
+ /* lockless IO */
+ (ols->ols_locklessable && osc_object_is_contended(oob)) ||
+ /* lockless truncate */
+ (io->ci_type == CIT_TRUNC &&
+ (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
+ osd->od_lockless_truncate)) {
+ ols->ols_locklessable = 1;
+ slice->cls_ops = &osc_lock_lockless_ops;
+ }
+ }
+ LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
+}
+
+/**
* Cancel all conflicting locks and wait for them to be destroyed.
*
* This function is used for two purposes:
continue;
/* overlapped and living locks. */
+
+ /* We're not supposed to give up group lock. */
+ if (scan->cll_descr.cld_mode == CLM_GROUP) {
+ LASSERT(descr->cld_mode != CLM_GROUP ||
+ descr->cld_gid != scan->cll_descr.cld_gid);
+ continue;
+ }
+
/* A tricky case for lockless pages:
* We need to cancel the compatible locks if we're enqueuing
* a lockless lock, for example:
*/
static int osc_lock_enqueue(const struct lu_env *env,
const struct cl_lock_slice *slice,
- struct cl_io *_, __u32 enqflags)
+ struct cl_io *unused, __u32 enqflags)
{
struct osc_lock *ols = cl2osc_lock(slice);
struct cl_lock *lock = ols->ols_cl.cls_lock;
osc_lock_build_res(env, obj, resname);
osc_lock_build_policy(env, lock, policy);
ols->ols_flags = osc_enq2ldlm_flags(enqflags);
- if (ols->ols_locklessable)
- ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
if (osc_deadlock_is_possible(env, lock))
ols->ols_flags |= LDLM_FL_BLOCK_GRANTED;
if (ols->ols_flags & LDLM_FL_HAS_INTENT)
result = osc_lock_enqueue_wait(env, ols);
if (result == 0) {
- /* a reference for lock, passed as an upcall cookie */
- cl_lock_get(lock);
- lu_ref_add(&lock->cll_reference, "upcall", lock);
- ols->ols_state = OLS_ENQUEUED;
+ if (!(enqflags & CEF_MUST))
+ /* try to convert this lock to a lockless lock */
+ osc_lock_to_lockless(env, ols, (enqflags & CEF_NEVER));
+ if (!osc_lock_is_lockless(ols)) {
+ if (ols->ols_locklessable)
+ ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
+
+ /* a reference for lock, passed as an upcall cookie */
+ cl_lock_get(lock);
+ lu_ref_add(&lock->cll_reference, "upcall", lock);
+ ols->ols_state = OLS_ENQUEUED;
- /*
- * XXX: this is possible blocking point as
- * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
- * LDLM_CP_CALLBACK.
- */
- result = osc_enqueue_base(osc_export(obj), resname,
+ /*
+ * XXX: this is possible blocking point as
+ * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
+ * LDLM_CP_CALLBACK.
+ */
+ result = osc_enqueue_base(osc_export(obj), resname,
&ols->ols_flags, policy,
&ols->ols_lvb,
obj->oo_oinfo->loi_kms_valid,
osc_lock_upcall,
ols, einfo, &ols->ols_handle,
PTLRPCD_SET, 1);
- if (result != 0) {
- lu_ref_del(&lock->cll_reference, "upcall", lock);
- cl_lock_put(env, lock);
+ if (result != 0) {
+ lu_ref_del(&lock->cll_reference,
+ "upcall", lock);
+ cl_lock_put(env, lock);
+ }
+ } else {
+ ols->ols_state = OLS_GRANTED;
}
}
-
+ LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
RETURN(result);
}
lock = slice->cls_lock;
LASSERT(lock->cll_state == CLS_CACHED);
LASSERT(lock->cll_users > 0);
- LASSERT(olck->ols_lock->l_flags & LDLM_FL_CBPENDING);
/* set a flag for osc_dlm_blocking_ast0() to signal the
* lock.*/
olck->ols_ast_wait = 1;
cl_env_nested_put(&nest, env);
} else
result = PTR_ERR(env);
- if (result == 0)
+ if (result == 0) {
ols->ols_flush = 1;
+ LINVRNT(!osc_lock_has_pages(ols));
+ }
return result;
}
struct cl_lock *lock = slice->cls_lock;
struct osc_lock *olck = cl2osc_lock(slice);
struct ldlm_lock *dlmlock = olck->ols_lock;
- int result;
+ int result = 0;
int discard;
LASSERT(cl_lock_is_mutexed(lock));
LINVRNT(osc_lock_invariant(olck));
if (dlmlock != NULL) {
+ int do_cancel;
+
discard = dlmlock->l_flags & LDLM_FL_DISCARD_DATA;
result = osc_lock_flush(olck, discard);
if (olck->ols_hold)
osc_lock_unuse(env, slice);
- LASSERT(dlmlock->l_readers == 0 && dlmlock->l_writers == 0);
- result = ldlm_cli_cancel(&olck->ols_handle);
+
+ lock_res_and_lock(dlmlock);
+ /* Now that we're the only user of dlm read/write reference,
+ * mostly the ->l_readers + ->l_writers should be zero.
+ * However, there is a corner case.
+ * See bug 18829 for details.*/
+ do_cancel = (dlmlock->l_readers == 0 &&
+ dlmlock->l_writers == 0);
+ dlmlock->l_flags |= LDLM_FL_CBPENDING;
+ unlock_res_and_lock(dlmlock);
+ if (do_cancel)
+ result = ldlm_cli_cancel(&olck->ols_handle);
if (result < 0)
CL_LOCK_DEBUG(D_ERROR, env, lock,
"lock %p cancel failure with error(%d)\n",
return result;
}
#else
-# define osc_lock_has_pages(olck) (0)
+static int osc_lock_has_pages(struct osc_lock *olck)
+{
+ return 0;
+}
#endif /* INVARIANT_CHECK */
static void osc_lock_delete(const struct lu_env *env,
struct osc_lock *olck;
olck = cl2osc_lock(slice);
+ if (olck->ols_glimpse) {
+ LASSERT(!olck->ols_hold);
+ LASSERT(!olck->ols_lock);
+ return;
+ }
+
LINVRNT(osc_lock_invariant(olck));
LINVRNT(!osc_lock_has_pages(olck));
static int osc_lock_lockless_enqueue(const struct lu_env *env,
const struct cl_lock_slice *slice,
- struct cl_io *_, __u32 enqflags)
+ struct cl_io *unused, __u32 enqflags)
{
- struct osc_lock *ols = cl2osc_lock(slice);
- struct cl_lock *lock = ols->ols_cl.cls_lock;
- int result;
-
- LASSERT(cl_lock_is_mutexed(lock));
- LASSERT(lock->cll_state == CLS_QUEUING);
- LASSERT(ols->ols_state == OLS_NEW);
-
- result = osc_lock_enqueue_wait(env, ols);
- if (result == 0)
- ols->ols_state = OLS_GRANTED;
- return result;
+ LBUG();
+ return 0;
}
static int osc_lock_lockless_unuse(const struct lu_env *env,
if (state == CLS_HELD) {
LASSERT(lock->ols_owner == NULL);
lock->ols_owner = oio;
- oio->oi_lockless = 1;
+
+ /* set the io to be lockless if this lock is for io's
+ * host object */
+ if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj))
+ oio->oi_lockless = 1;
} else
lock->ols_owner = NULL;
}
int osc_lock_init(const struct lu_env *env,
struct cl_object *obj, struct cl_lock *lock,
- const struct cl_io *io)
+ const struct cl_io *unused)
{
- struct osc_lock *clk;
- struct osc_io *oio = osc_env_io(env);
- struct osc_object *oob = cl2osc(obj);
+ struct osc_lock *clk;
int result;
- OBD_SLAB_ALLOC_PTR(clk, osc_lock_kmem);
+ OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, CFS_ALLOC_IO);
if (clk != NULL) {
- const struct cl_lock_operations *ops;
- const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
- struct obd_connect_data *ocd;
-
osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
clk->ols_state = OLS_NEW;
-
- /*
- * Check if we need to do lockless IO here.
- * Following conditions must be satisfied:
- * - the current IO must be locklessable;
- * - the stripe is in contention;
- * - requested lock is not a glimpse.
- *
- * if not, we have to inherit the locklessable flag to
- * osc_lock, and let ost make the decision.
- *
- * Additional policy can be implemented here, e.g., never do
- * lockless-io for large extents.
- */
- LASSERT(io->ci_lockreq == CILR_MANDATORY ||
- io->ci_lockreq == CILR_MAYBE ||
- io->ci_lockreq == CILR_NEVER);
- ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
- clk->ols_locklessable = (io->ci_type != CIT_TRUNC) &&
- (io->ci_lockreq == CILR_MAYBE) &&
- (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
- ops = &osc_lock_ops;
- if (io->ci_lockreq == CILR_NEVER ||
- /* lockless IO */
- (clk->ols_locklessable && osc_object_is_contended(oob)) ||
- /* lockless truncate */
- (io->ci_type == CIT_TRUNC &&
- (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
- osd->od_lockless_truncate)) {
- ops = &osc_lock_lockless_ops;
- oio->oi_lockless = 1;
- clk->ols_locklessable = 1;
- }
-
- cl_lock_slice_add(lock, &clk->ols_cl, obj, ops);
+ cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops);
result = 0;
} else
result = -ENOMEM;