static const struct cl_lock_operations osc_lock_lockless_ops;
static void osc_lock_to_lockless(const struct lu_env *env,
struct osc_lock *ols, int force);
+static int osc_lock_has_pages(struct osc_lock *olck);
int osc_lock_is_lockless(const struct osc_lock *olck)
{
*/
if (ols->ols_hold)
osc_lock_unuse(env, slice);
- if (ols->ols_lock != NULL)
- osc_lock_detach(env, ols);
+ LASSERT(ols->ols_lock == NULL);
OBD_SLAB_FREE_PTR(ols, osc_lock_kmem);
}
const struct cl_lock_descr *d = &lock->cll_descr;
osc_index2policy(policy, d->cld_obj, d->cld_start, d->cld_end);
+ policy->l_extent.gid = d->cld_gid;
}
static int osc_enq2ldlm_flags(__u32 enqflags)
descr->cld_mode = osc_ldlm2cl_lock(dlmlock->l_granted_mode);
descr->cld_start = cl_index(descr->cld_obj, ext->start);
descr->cld_end = cl_index(descr->cld_obj, ext->end);
+ descr->cld_gid = ext->gid;
/*
* tell upper layers the extent of the lock that was actually
* granted
*/
static int osc_lock_upcall(void *cookie, int errcode)
{
- struct osc_lock *olck = cookie;
- struct cl_lock_slice *slice = &olck->ols_cl;
- struct cl_lock *lock = slice->cls_lock;
- struct lu_env *env;
-
- int refcheck;
+ struct osc_lock *olck = cookie;
+ struct cl_lock_slice *slice = &olck->ols_cl;
+ struct cl_lock *lock = slice->cls_lock;
+ struct lu_env *env;
+ struct cl_env_nest nest;
ENTRY;
- /*
- * XXX environment should be created in ptlrpcd.
- */
- env = cl_env_get(&refcheck);
+ env = cl_env_nested_get(&nest);
if (!IS_ERR(env)) {
int rc;
/* release cookie reference, acquired by osc_lock_enqueue() */
lu_ref_del(&lock->cll_reference, "upcall", lock);
cl_lock_put(env, lock);
- cl_env_put(env, &refcheck);
+ cl_env_nested_put(&nest, env);
} else
/* should never happen, similar to osc_ldlm_blocking_ast(). */
LBUG();
* new environment has to be created to not corrupt outer context.
*/
env = cl_env_nested_get(&nest);
- if (!IS_ERR(env))
+ if (!IS_ERR(env)) {
result = osc_dlm_blocking_ast0(env, dlmlock, data, flag);
- else {
+ cl_env_nested_put(&nest, env);
+ } else {
result = PTR_ERR(env);
/*
* XXX This should never happen, as cl_lock is
else
CERROR("BAST failed: %d\n", result);
}
- cl_env_nested_put(&nest, env);
return result;
}
static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock,
int flags, void *data)
{
- struct lu_env *env;
- void *env_cookie;
- struct osc_lock *olck;
- struct cl_lock *lock;
- int refcheck;
+ struct cl_env_nest nest;
+ struct lu_env *env;
+ struct osc_lock *olck;
+ struct cl_lock *lock;
int result;
int dlmrc;
/* first, do dlm part of the work */
dlmrc = ldlm_completion_ast_async(dlmlock, flags, data);
/* then, notify cl_lock */
- env_cookie = cl_env_reenter();
- env = cl_env_get(&refcheck);
+ env = cl_env_nested_get(&nest);
if (!IS_ERR(env)) {
olck = osc_ast_data_get(dlmlock);
if (olck != NULL) {
result = 0;
} else
result = -ELDLM_NO_LOCK_DATA;
- cl_env_put(env, &refcheck);
+ cl_env_nested_put(&nest, env);
} else
result = PTR_ERR(env);
- cl_env_reexit(env_cookie);
return dlmrc ?: result;
}
struct osc_lock *olck;
struct cl_lock *lock;
struct cl_object *obj;
+ struct cl_env_nest nest;
struct lu_env *env;
struct ost_lvb *lvb;
struct req_capsule *cap;
int result;
- int refcheck;
LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
- env = cl_env_get(&refcheck);
+ env = cl_env_nested_get(&nest);
if (!IS_ERR(env)) {
/*
* osc_ast_data_get() has to go after environment is
lustre_pack_reply(req, 1, NULL, NULL);
result = -ELDLM_NO_LOCK_DATA;
}
- cl_env_put(env, &refcheck);
+ cl_env_nested_put(&nest, env);
} else
result = PTR_ERR(env);
req->rq_status = result;
*/
static unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
{
+ struct cl_env_nest nest;
struct lu_env *env;
- int refcheck;
- void *cookie;
struct osc_lock *lock;
struct cl_lock *cll;
unsigned long weight;
ENTRY;
might_sleep();
- cookie = cl_env_reenter();
/*
* osc_ldlm_weigh_ast has a complex context since it might be called
* because of lock canceling, or from user's input. We have to make
* the upper context because cl_lock_put don't modify environment
* variables. But in case of ..
*/
- env = cl_env_get(&refcheck);
- if (IS_ERR(env)) {
+ env = cl_env_nested_get(&nest);
+ if (IS_ERR(env))
/* Mostly because lack of memory, tend to eliminate this lock*/
- cl_env_reexit(cookie);
RETURN(0);
- }
LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT);
lock = osc_ast_data_get(dlmlock);
EXIT;
out:
- cl_env_put(env, &refcheck);
- cl_env_reexit(cookie);
+ cl_env_nested_put(&nest, env);
return weight;
}
einfo->ei_cbdata = lock; /* value to be put into ->l_ast_data */
}
+static int osc_lock_delete0(struct cl_lock *conflict)
+{
+ struct cl_env_nest nest;
+ struct lu_env *env;
+ int rc = 0;
+
+ env = cl_env_nested_get(&nest);
+ if (!IS_ERR(env)) {
+ cl_lock_delete(env, conflict);
+ cl_env_nested_put(&nest, env);
+ } else
+ rc = PTR_ERR(env);
+ return rc;
+}
/**
* Cancels \a conflict lock and waits until it reached CLS_FREEING state. This
* is called as a part of enqueuing to cancel conflicting locks early.
rc = 0;
if (conflict->cll_state != CLS_FREEING) {
cl_lock_cancel(env, conflict);
- cl_lock_delete(env, conflict);
+ rc = osc_lock_delete0(conflict);
+ if (rc)
+ return rc;
if (conflict->cll_flags & (CLF_CANCELPEND|CLF_DOOMED)) {
rc = -EWOULDBLOCK;
if (cl_lock_nr_mutexed(env) > 2)
continue;
/* overlapped and living locks. */
+
+ /* We're not supposed to give up group lock. */
+ if (scan->cll_descr.cld_mode == CLM_GROUP) {
+ LASSERT(descr->cld_mode != CLM_GROUP ||
+ descr->cld_gid != scan->cll_descr.cld_gid);
+ continue;
+ }
+
/* A tricky case for lockless pages:
* We need to cancel the compatible locks if we're enqueuing
* a lockless lock, for example:
*/
static int osc_lock_enqueue(const struct lu_env *env,
const struct cl_lock_slice *slice,
- struct cl_io *_, __u32 enqflags)
+ struct cl_io *unused, __u32 enqflags)
{
struct osc_lock *ols = cl2osc_lock(slice);
struct cl_lock *lock = ols->ols_cl.cls_lock;
lock = slice->cls_lock;
LASSERT(lock->cll_state == CLS_CACHED);
LASSERT(lock->cll_users > 0);
- LASSERT(olck->ols_lock->l_flags & LDLM_FL_CBPENDING);
/* set a flag for osc_dlm_blocking_ast0() to signal the
* lock.*/
olck->ols_ast_wait = 1;
cl_env_nested_put(&nest, env);
} else
result = PTR_ERR(env);
- if (result == 0)
+ if (result == 0) {
ols->ols_flush = 1;
+ LINVRNT(!osc_lock_has_pages(ols));
+ }
return result;
}
return result;
}
#else
-# define osc_lock_has_pages(olck) (0)
+static int osc_lock_has_pages(struct osc_lock *olck)
+{
+ return 0;
+}
#endif /* INVARIANT_CHECK */
static void osc_lock_delete(const struct lu_env *env,
struct osc_lock *olck;
olck = cl2osc_lock(slice);
+ if (olck->ols_glimpse) {
+ LASSERT(!olck->ols_hold);
+ LASSERT(!olck->ols_lock);
+ return;
+ }
+
LINVRNT(osc_lock_invariant(olck));
LINVRNT(!osc_lock_has_pages(olck));
static int osc_lock_lockless_enqueue(const struct lu_env *env,
const struct cl_lock_slice *slice,
- struct cl_io *_, __u32 enqflags)
+ struct cl_io *unused, __u32 enqflags)
{
LBUG();
return 0;
int osc_lock_init(const struct lu_env *env,
struct cl_object *obj, struct cl_lock *lock,
- const struct cl_io *_)
+ const struct cl_io *unused)
{
struct osc_lock *clk;
int result;