#include "lov_cl_internal.h"
-/** \addtogroup lov lov @{ */
+/** \addtogroup lov
+ * @{
+ */
static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
struct cl_lock *parent);
subenv->lse_io = io;
subenv->lse_sub = NULL;
} else {
- LASSERT(io != NULL);
sub = lov_sub_get(env, lio, lls->sub_stripe);
if (!IS_ERR(sub)) {
subenv->lse_env = sub->sub_env;
LASSERT(idx < lck->lls_nr);
ENTRY;
- OBD_SLAB_ALLOC_PTR(link, lov_lock_link_kmem);
+ OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, CFS_ALLOC_IO);
if (link != NULL) {
struct lov_sublock_env *subenv;
struct lov_lock_sub *lls;
}
static int lov_sublock_lock(const struct lu_env *env,
+ struct lov_lock *lck,
struct lov_lock_sub *lls,
struct cl_lock_closure *closure,
struct lov_sublock_env **lsep)
{
- struct cl_lock *child;
- int result = 0;
+ struct lovsub_lock *sublock;
+ struct cl_lock *child;
+ int result = 0;
ENTRY;
LASSERT(list_empty(&closure->clc_list));
- child = lls->sub_lock->lss_cl.cls_lock;
+ sublock = lls->sub_lock;
+ child = sublock->lss_cl.cls_lock;
result = cl_lock_closure_build(env, child, closure);
if (result == 0) {
struct cl_lock *parent = closure->clc_origin;
LASSERT(cl_lock_is_mutexed(child));
- lls->sub_lock->lss_active = parent;
+ sublock->lss_active = parent;
- if (lsep) {
+ if (unlikely(child->cll_state == CLS_FREEING)) {
+ struct lov_lock_link *link;
+ /*
+ * we could race with lock deletion which temporarily
+ * put the lock in freeing state, bug 19080.
+ */
+ LASSERT(!(lls->sub_flags & LSF_HELD));
+
+ link = lov_lock_link_find(env, lck, sublock);
+ LASSERT(link != NULL);
+ lov_lock_unlink(env, link, sublock);
+ lov_sublock_unlock(env, sublock, closure, NULL);
+ result = CLO_REPEAT;
+ } else if (lsep) {
struct lov_sublock_env *subenv;
subenv = lov_sublock_env_get(env, parent, lls);
if (IS_ERR(subenv)) {
- lov_sublock_unlock(env, lls->sub_lock,
+ lov_sublock_unlock(env, sublock,
closure, NULL);
result = PTR_ERR(subenv);
} else {
{
int result = 0;
int i;
- int j;
int nr;
- int stripe;
- int start_stripe;
obd_off start;
obd_off end;
obd_off file_start;
file_start = cl_offset(lov2cl(loo), parent->cll_descr.cld_start);
file_end = cl_offset(lov2cl(loo), parent->cll_descr.cld_end + 1) - 1;
- start_stripe = lov_stripe_number(r0->lo_lsm, file_start);
for (i = 0, nr = 0; i < r0->lo_nr; i++) {
/*
* XXX for wide striping smarter algorithm is desirable,
* breaking out of the loop, early.
*/
- stripe = (start_stripe + i) % r0->lo_nr;
- if (lov_stripe_intersects(r0->lo_lsm, stripe,
+ if (lov_stripe_intersects(r0->lo_lsm, i,
file_start, file_end, &start, &end))
nr++;
}
* create sub-locks. At this moment, no other thread can access
* top-lock.
*/
- for (j = 0, nr = 0; j < i; ++j) {
- stripe = (start_stripe + j) % r0->lo_nr;
- if (lov_stripe_intersects(r0->lo_lsm, stripe,
+ for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
+ if (lov_stripe_intersects(r0->lo_lsm, i,
file_start, file_end, &start, &end)) {
struct cl_lock_descr *descr;
descr = &lck->lls_sub[nr].sub_descr;
LASSERT(descr->cld_obj == NULL);
- descr->cld_obj = lovsub2cl(r0->lo_sub[stripe]);
+ descr->cld_obj = lovsub2cl(r0->lo_sub[i]);
descr->cld_start = cl_index(descr->cld_obj, start);
descr->cld_end = cl_index(descr->cld_obj, end);
descr->cld_mode = parent->cll_descr.cld_mode;
+ descr->cld_gid = parent->cll_descr.cld_gid;
+ /* XXX has no effect */
lck->lls_sub[nr].sub_got = *descr;
- lck->lls_sub[nr].sub_stripe = stripe;
+ lck->lls_sub[nr].sub_stripe = i;
nr++;
}
}
lov_sublock_adopt(env, lck, sublock, i, link);
cl_lock_mutex_put(env, parent);
} else {
+ OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
cl_lock_mutex_put(env, parent);
cl_lock_unhold(env, sublock,
"lov-parent", parent);
* while sub-lock is being paged out.
*/
dying = (sublock->cll_descr.cld_mode == CLM_PHANTOM ||
+ sublock->cll_descr.cld_mode == CLM_GROUP ||
(sublock->cll_flags & (CLF_CANCELPEND|CLF_DOOMED))) &&
sublock->cll_holds == 1;
if (dying)
lck->lls_sub[idx].sub_lock == NULL)
lov_sublock_adopt(env, lck, sublock, idx, link);
else {
+ OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
/* other thread allocated sub-lock, or enqueue is no
* longer going on */
cl_lock_mutex_put(env, parent);
break;
}
sublock = sub->lss_cl.cls_lock;
- rc = lov_sublock_lock(env, lls, closure, &subenv);
+ rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
if (rc == 0) {
lov_sublock_hold(env, lck, i);
rc = lov_lock_enqueue_one(subenv->lse_env, lck, sublock,
lov_sublock_unlock(env, sub, closure, subenv);
}
result = lov_subresult(result, rc);
- if (result < 0)
+ if (result != 0)
break;
}
cl_lock_closure_fini(closure);
continue;
sublock = sub->lss_cl.cls_lock;
- rc = lov_sublock_lock(env, lls, closure, &subenv);
+ rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
if (rc == 0) {
if (lck->lls_sub[i].sub_flags & LSF_HELD) {
LASSERT(sublock->cll_state == CLS_HELD);
sub = lls->sub_lock;
LASSERT(sub != NULL);
sublock = sub->lss_cl.cls_lock;
- rc = lov_sublock_lock(env, lls, closure, &subenv);
+ rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
if (rc == 0) {
LASSERT(sublock->cll_state >= CLS_ENQUEUED);
if (sublock->cll_state < CLS_HELD)
lov_sublock_unlock(env, sub, closure, subenv);
}
result = lov_subresult(result, rc);
- if (result < 0)
+ if (result != 0)
break;
}
cl_lock_closure_fini(closure);
sub = lls->sub_lock;
LASSERT(sub != NULL);
sublock = sub->lss_cl.cls_lock;
- rc = lov_sublock_lock(env, lls, closure, &subenv);
+ rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
if (rc == 0) {
LASSERT(sublock->cll_state != CLS_FREEING);
lov_sublock_hold(env, lck, i);
lov_sublock_unlock(env, sub, closure, subenv);
}
result = lov_subresult(result, rc);
- if (result < 0)
+ if (result != 0)
break;
}
cl_lock_closure_fini(closure);
subd->cld_obj = NULL; /* don't need sub object at all */
subd->cld_mode = descr->cld_mode;
+ subd->cld_gid = descr->cld_gid;
result = lov_stripe_intersects(lsm, stripe, start, end,
&sub_start, &sub_end);
LASSERT(result);
ENTRY;
- if (lov->lls_nr == 1) {
+ if (need->cld_mode == CLM_GROUP)
+ /*
+ * always allow to match group lock.
+ */
+ result = cl_lock_ext_match(&lov->lls_orig, need);
+ else if (lov->lls_nr == 1) {
struct cl_lock_descr *got = &lov->lls_sub[0].sub_got;
result = lov_lock_stripe_is_matching(env,
cl2lov(slice->cls_obj),
continue;
sublock = lsl->lss_cl.cls_lock;
- rc = lov_sublock_lock(env, lls, closure, NULL);
+ rc = lov_sublock_lock(env, lck, lls, closure, NULL);
if (rc == 0) {
if (lck->lls_sub[i].sub_flags & LSF_HELD)
lov_sublock_release(env, lck, i, 1, 0);
int result;
ENTRY;
- OBD_SLAB_ALLOC_PTR(lck, lov_lock_kmem);
+ OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, CFS_ALLOC_IO);
if (lck != NULL) {
cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops);
result = lov_lock_sub_init(env, lck, io);
struct cl_lock_closure *closure;
closure = &lov_env_info(env)->lti_closure;
- LINVRNT(list_empty(&closure->clc_list));
+ LASSERT(list_empty(&closure->clc_list));
cl_lock_closure_init(env, closure, parent, 1);
return closure;
}