*
*/
+static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env,
+ struct cl_lock *parent,
+ struct lov_lock_sub *lls)
+{
+ struct lov_sublock_env *subenv;
+ struct lov_io *lio = lov_env_io(env);
+ struct cl_io *io = lio->lis_cl.cis_io;
+ struct lov_io_sub *sub;
+
+ subenv = &lov_env_session(env)->ls_subenv;
+
+ /*
+ * FIXME: We tend to use the subio's env & io to call the sublock
+ * lock operations because osc lock sometimes stores some control
+ * variables in thread's IO infomation(Now only lockless information).
+ * However, if the lock's host(object) is different from the object
+ * for current IO, we have no way to get the subenv and subio because
+ * they are not initialized at all. As a temp fix, in this case,
+ * we still borrow the parent's env to call sublock operations.
+ */
+ if (!cl_object_same(io->ci_obj, parent->cll_descr.cld_obj)) {
+ subenv->lse_env = env;
+ subenv->lse_io = io;
+ subenv->lse_sub = NULL;
+ } else {
+ LASSERT(io != NULL);
+ sub = lov_sub_get(env, lio, lls->sub_stripe);
+ if (!IS_ERR(sub)) {
+ subenv->lse_env = sub->sub_env;
+ subenv->lse_io = sub->sub_io;
+ subenv->lse_sub = sub;
+ } else {
+ subenv = (void*)sub;
+ }
+ }
+ return subenv;
+}
+
+static void lov_sublock_env_put(struct lov_sublock_env *subenv)
+{
+ if (subenv && subenv->lse_sub)
+ lov_sub_put(subenv->lse_sub);
+}
+
static void lov_sublock_adopt(const struct lu_env *env, struct lov_lock *lck,
struct cl_lock *sublock, int idx,
struct lov_lock_link *link)
OBD_SLAB_ALLOC_PTR(link, lov_lock_link_kmem);
if (link != NULL) {
- struct lov_lock_sub *sub;
+ struct lov_sublock_env *subenv;
+ struct lov_lock_sub *lls;
struct cl_lock_descr *descr;
parent = lck->lls_cl.cls_lock;
- sub = &lck->lls_sub[idx];
- descr = &sub->sub_descr;
+ lls = &lck->lls_sub[idx];
+ descr = &lls->sub_descr;
+
+ subenv = lov_sublock_env_get(env, parent, lls);
+ if (!IS_ERR(subenv)) {
+ /* CAVEAT: Don't try to add a field in lov_lock_sub
+ * to remember the subio. This is because lock is able
+ * to be cached, but this is not true for IO. This
+ * further means a sublock might be referenced in
+ * different io context. -jay */
+
+ sublock = cl_lock_hold(subenv->lse_env, subenv->lse_io,
+ descr, "lov-parent", parent);
+ lov_sublock_env_put(subenv);
+ } else {
+ /* error occurs. */
+ sublock = (void*)subenv;
+ }
- /* XXX maybe sub-io? */
- sublock = cl_lock_hold(env, io, descr, "lov-parent", parent);
if (!IS_ERR(sublock))
*out = link;
else
static void lov_sublock_unlock(const struct lu_env *env,
struct lovsub_lock *lsl,
- struct cl_lock_closure *closure)
+ struct cl_lock_closure *closure,
+ struct lov_sublock_env *subenv)
{
ENTRY;
+ lov_sublock_env_put(subenv);
lsl->lss_active = NULL;
cl_lock_disclosure(env, closure);
EXIT;
}
-static int lov_sublock_lock(const struct lu_env *env, struct lovsub_lock *lsl,
- struct cl_lock_closure *closure)
+static int lov_sublock_lock(const struct lu_env *env,
+ struct lov_lock_sub *lls,
+ struct cl_lock_closure *closure,
+ struct lov_sublock_env **lsep)
{
struct cl_lock *child;
- int result;
+ int result = 0;
+ ENTRY;
LASSERT(list_empty(&closure->clc_list));
- ENTRY;
- child = lsl->lss_cl.cls_lock;
+ child = lls->sub_lock->lss_cl.cls_lock;
result = cl_lock_closure_build(env, child, closure);
if (result == 0) {
+ struct cl_lock *parent = closure->clc_origin;
+
LASSERT(cl_lock_is_mutexed(child));
- lsl->lss_active = closure->clc_origin;
+ lls->sub_lock->lss_active = parent;
+
+ if (lsep) {
+ struct lov_sublock_env *subenv;
+ subenv = lov_sublock_env_get(env, parent, lls);
+ if (IS_ERR(subenv)) {
+ lov_sublock_unlock(env, lls->sub_lock,
+ closure, NULL);
+ result = PTR_ERR(subenv);
+ } else {
+ *lsep = subenv;
+ }
+ }
}
RETURN(result);
}
ENTRY;
if (lck->lls_sub[i].sub_flags & LSF_HELD) {
- struct cl_lock *sublock;
+ struct cl_lock *sublock;
int dying;
LASSERT(lck->lls_sub[i].sub_lock != NULL);
struct cl_io *io, __u32 enqflags, int last)
{
int result;
-
ENTRY;
+
/* first, try to enqueue a sub-lock ... */
result = cl_enqueue_try(env, sublock, io, enqflags);
if (sublock->cll_state == CLS_ENQUEUED)
for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
int rc;
- struct lovsub_lock *sub;
- struct cl_lock *sublock;
+ struct lovsub_lock *sub;
+ struct lov_lock_sub *lls;
+ struct cl_lock *sublock;
+ struct lov_sublock_env *subenv;
if (lock->cll_state != CLS_QUEUING) {
/*
break;
}
- sub = lck->lls_sub[i].sub_lock;
+ lls = &lck->lls_sub[i];
+ sub = lls->sub_lock;
/*
* Sub-lock might have been canceled, while top-lock was
* cached.
break;
}
sublock = sub->lss_cl.cls_lock;
- rc = lov_sublock_lock(env, sub, closure);
+ rc = lov_sublock_lock(env, lls, closure, &subenv);
if (rc == 0) {
lov_sublock_hold(env, lck, i);
- rc = lov_lock_enqueue_one(env, lck, sublock, io,
- enqflags,
+ rc = lov_lock_enqueue_one(subenv->lse_env, lck, sublock,
+ subenv->lse_io, enqflags,
i == lck->lls_nr - 1);
minstate = min(minstate, sublock->cll_state);
/*
*/
if (sublock->cll_state > CLS_HELD)
rc = lov_sublock_release(env, lck, i, 1, rc);
- lov_sublock_unlock(env, sub, closure);
+ lov_sublock_unlock(env, sub, closure, subenv);
}
result = lov_subresult(result, rc);
if (result < 0)
for (result = 0, i = 0; i < lck->lls_nr; ++i) {
int rc;
- struct lovsub_lock *sub;
- struct cl_lock *sublock;
+ struct lovsub_lock *sub;
+ struct cl_lock *sublock;
+ struct lov_lock_sub *lls;
+ struct lov_sublock_env *subenv;
/* top-lock state cannot change concurrently, because single
* thread (one that released the last hold) carries unlocking
* to the completion. */
LASSERT(slice->cls_lock->cll_state == CLS_UNLOCKING);
- sub = lck->lls_sub[i].sub_lock;
+ lls = &lck->lls_sub[i];
+ sub = lls->sub_lock;
if (sub == NULL)
continue;
sublock = sub->lss_cl.cls_lock;
- rc = lov_sublock_lock(env, sub, closure);
+ rc = lov_sublock_lock(env, lls, closure, &subenv);
if (rc == 0) {
if (lck->lls_sub[i].sub_flags & LSF_HELD) {
LASSERT(sublock->cll_state == CLS_HELD);
- rc = cl_unuse_try(env, sublock);
+ rc = cl_unuse_try(subenv->lse_env, sublock);
if (rc != CLO_WAIT)
rc = lov_sublock_release(env, lck,
i, 0, rc);
}
- lov_sublock_unlock(env, sub, closure);
+ lov_sublock_unlock(env, sub, closure, subenv);
}
result = lov_subresult(result, rc);
if (result < 0)
for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
int rc;
- struct lovsub_lock *sub;
- struct cl_lock *sublock;
+ struct lovsub_lock *sub;
+ struct cl_lock *sublock;
+ struct lov_lock_sub *lls;
+ struct lov_sublock_env *subenv;
- sub = lck->lls_sub[i].sub_lock;
+ lls = &lck->lls_sub[i];
+ sub = lls->sub_lock;
LASSERT(sub != NULL);
sublock = sub->lss_cl.cls_lock;
- rc = lov_sublock_lock(env, sub, closure);
+ rc = lov_sublock_lock(env, lls, closure, &subenv);
if (rc == 0) {
LASSERT(sublock->cll_state >= CLS_ENQUEUED);
if (sublock->cll_state < CLS_HELD)
rc = cl_wait_try(env, sublock);
+
minstate = min(minstate, sublock->cll_state);
- lov_sublock_unlock(env, sub, closure);
+ lov_sublock_unlock(env, sub, closure, subenv);
}
result = lov_subresult(result, rc);
if (result < 0)
for (result = 0, i = 0; i < lck->lls_nr; ++i) {
int rc;
- struct lovsub_lock *sub;
- struct cl_lock *sublock;
+ struct lovsub_lock *sub;
+ struct cl_lock *sublock;
+ struct lov_lock_sub *lls;
+ struct lov_sublock_env *subenv;
if (slice->cls_lock->cll_state != CLS_CACHED) {
/* see comment in lov_lock_enqueue(). */
* CLS_CACHED state, top-lock would have been moved into
* CLS_NEW state, so all sub-locks have to be in place.
*/
- sub = lck->lls_sub[i].sub_lock;
+ lls = &lck->lls_sub[i];
+ sub = lls->sub_lock;
LASSERT(sub != NULL);
sublock = sub->lss_cl.cls_lock;
- rc = lov_sublock_lock(env, sub, closure);
+ rc = lov_sublock_lock(env, lls, closure, &subenv);
if (rc == 0) {
LASSERT(sublock->cll_state != CLS_FREEING);
lov_sublock_hold(env, lck, i);
if (sublock->cll_state == CLS_CACHED) {
- rc = cl_use_try(env, sublock);
+ rc = cl_use_try(subenv->lse_env, sublock);
if (rc != 0)
rc = lov_sublock_release(env, lck,
i, 1, rc);
} else
rc = 0;
- lov_sublock_unlock(env, sub, closure);
+ lov_sublock_unlock(env, sub, closure, subenv);
}
result = lov_subresult(result, rc);
if (result < 0)
ENTRY;
for (i = 0; i < lck->lls_nr; ++i) {
- struct lovsub_lock *lsl;
- struct cl_lock *sublock;
+ struct lov_lock_sub *lls;
+ struct lovsub_lock *lsl;
+ struct cl_lock *sublock;
int rc;
- lsl = lck->lls_sub[i].sub_lock;
+ lls = &lck->lls_sub[i];
+ lsl = lls->sub_lock;
if (lsl == NULL)
continue;
sublock = lsl->lss_cl.cls_lock;
- rc = lov_sublock_lock(env, lsl, closure);
+ rc = lov_sublock_lock(env, lls, closure, NULL);
if (rc == 0) {
if (lck->lls_sub[i].sub_flags & LSF_HELD)
lov_sublock_release(env, lck, i, 1, 0);
lov_lock_unlink(env, link, lsl);
LASSERT(lck->lls_sub[i].sub_lock == NULL);
}
- lov_sublock_unlock(env, lsl, closure);
+ lov_sublock_unlock(env, lsl, closure, NULL);
} else if (rc == CLO_REPEAT) {
--i; /* repeat with this lock */
} else {