X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flov%2Flov_lock.c;h=79a69420c0794d16cb2142a94ac7ad11cb5094b4;hp=14ecd6850290ccf59e608bf74176cf1437f096fb;hb=15385c3b934b511a1452327c701fbb6adad71416;hpb=fbf5870b9848929d352460f1f005b79c0b5ccc5a diff --git a/lustre/lov/lov_lock.c b/lustre/lov/lov_lock.c index 14ecd68..79a6942 100644 --- a/lustre/lov/lov_lock.c +++ b/lustre/lov/lov_lock.c @@ -53,6 +53,50 @@ static struct cl_lock_closure *lov_closure_get(const struct lu_env *env, * */ +static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env, + struct cl_lock *parent, + struct lov_lock_sub *lls) +{ + struct lov_sublock_env *subenv; + struct lov_io *lio = lov_env_io(env); + struct cl_io *io = lio->lis_cl.cis_io; + struct lov_io_sub *sub; + + subenv = &lov_env_session(env)->ls_subenv; + + /* + * FIXME: We tend to use the subio's env & io to call the sublock + * lock operations because osc lock sometimes stores some control + * variables in thread's IO infomation(Now only lockless information). + * However, if the lock's host(object) is different from the object + * for current IO, we have no way to get the subenv and subio because + * they are not initialized at all. As a temp fix, in this case, + * we still borrow the parent's env to call sublock operations. + */ + if (!cl_object_same(io->ci_obj, parent->cll_descr.cld_obj)) { + subenv->lse_env = env; + subenv->lse_io = io; + subenv->lse_sub = NULL; + } else { + LASSERT(io != NULL); + sub = lov_sub_get(env, lio, lls->sub_stripe); + if (!IS_ERR(sub)) { + subenv->lse_env = sub->sub_env; + subenv->lse_io = sub->sub_io; + subenv->lse_sub = sub; + } else { + subenv = (void*)sub; + } + } + return subenv; +} + +static void lov_sublock_env_put(struct lov_sublock_env *subenv) +{ + if (subenv && subenv->lse_sub) + lov_sub_put(subenv->lse_sub); +} + static void lov_sublock_adopt(const struct lu_env *env, struct lov_lock *lck, struct cl_lock *sublock, int idx, struct lov_lock_link *link) @@ -100,17 +144,32 @@ static struct cl_lock *lov_sublock_alloc(const struct lu_env *env, LASSERT(idx < lck->lls_nr); ENTRY; - OBD_SLAB_ALLOC_PTR(link, lov_lock_link_kmem); + OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, CFS_ALLOC_IO); if (link != NULL) { - struct lov_lock_sub *sub; + struct lov_sublock_env *subenv; + struct lov_lock_sub *lls; struct cl_lock_descr *descr; parent = lck->lls_cl.cls_lock; - sub = &lck->lls_sub[idx]; - descr = &sub->sub_descr; + lls = &lck->lls_sub[idx]; + descr = &lls->sub_descr; + + subenv = lov_sublock_env_get(env, parent, lls); + if (!IS_ERR(subenv)) { + /* CAVEAT: Don't try to add a field in lov_lock_sub + * to remember the subio. This is because lock is able + * to be cached, but this is not true for IO. This + * further means a sublock might be referenced in + * different io context. -jay */ + + sublock = cl_lock_hold(subenv->lse_env, subenv->lse_io, + descr, "lov-parent", parent); + lov_sublock_env_put(subenv); + } else { + /* error occurs. */ + sublock = (void*)subenv; + } - /* XXX maybe sub-io? */ - sublock = cl_lock_hold(env, io, descr, "lov-parent", parent); if (!IS_ERR(sublock)) *out = link; else @@ -122,28 +181,46 @@ static struct cl_lock *lov_sublock_alloc(const struct lu_env *env, static void lov_sublock_unlock(const struct lu_env *env, struct lovsub_lock *lsl, - struct cl_lock_closure *closure) + struct cl_lock_closure *closure, + struct lov_sublock_env *subenv) { ENTRY; + lov_sublock_env_put(subenv); lsl->lss_active = NULL; cl_lock_disclosure(env, closure); EXIT; } -static int lov_sublock_lock(const struct lu_env *env, struct lovsub_lock *lsl, - struct cl_lock_closure *closure) +static int lov_sublock_lock(const struct lu_env *env, + struct lov_lock_sub *lls, + struct cl_lock_closure *closure, + struct lov_sublock_env **lsep) { struct cl_lock *child; - int result; + int result = 0; + ENTRY; LASSERT(list_empty(&closure->clc_list)); - ENTRY; - child = lsl->lss_cl.cls_lock; + child = lls->sub_lock->lss_cl.cls_lock; result = cl_lock_closure_build(env, child, closure); if (result == 0) { + struct cl_lock *parent = closure->clc_origin; + LASSERT(cl_lock_is_mutexed(child)); - lsl->lss_active = closure->clc_origin; + lls->sub_lock->lss_active = parent; + + if (lsep) { + struct lov_sublock_env *subenv; + subenv = lov_sublock_env_get(env, parent, lls); + if (IS_ERR(subenv)) { + lov_sublock_unlock(env, lls->sub_lock, + closure, NULL); + result = PTR_ERR(subenv); + } else { + *lsep = subenv; + } + } } RETURN(result); } @@ -253,6 +330,8 @@ static int lov_lock_sub_init(const struct lu_env *env, descr->cld_start = cl_index(descr->cld_obj, start); descr->cld_end = cl_index(descr->cld_obj, end); descr->cld_mode = parent->cll_descr.cld_mode; + descr->cld_gid = parent->cll_descr.cld_gid; + /* XXX has no effect */ lck->lls_sub[nr].sub_got = *descr; lck->lls_sub[nr].sub_stripe = stripe; nr++; @@ -308,7 +387,7 @@ static int lov_sublock_release(const struct lu_env *env, struct lov_lock *lck, ENTRY; if (lck->lls_sub[i].sub_flags & LSF_HELD) { - struct cl_lock *sublock; + struct cl_lock *sublock; int dying; LASSERT(lck->lls_sub[i].sub_lock != NULL); @@ -324,6 +403,7 @@ static int lov_sublock_release(const struct lu_env *env, struct lov_lock *lck, * while sub-lock is being paged out. */ dying = (sublock->cll_descr.cld_mode == CLM_PHANTOM || + sublock->cll_descr.cld_mode == CLM_GROUP || (sublock->cll_flags & (CLF_CANCELPEND|CLF_DOOMED))) && sublock->cll_holds == 1; if (dying) @@ -404,8 +484,8 @@ static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck, struct cl_io *io, __u32 enqflags, int last) { int result; - ENTRY; + /* first, try to enqueue a sub-lock ... */ result = cl_enqueue_try(env, sublock, io, enqflags); if (sublock->cll_state == CLS_ENQUEUED) @@ -480,8 +560,10 @@ static int lov_lock_enqueue(const struct lu_env *env, for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) { int rc; - struct lovsub_lock *sub; - struct cl_lock *sublock; + struct lovsub_lock *sub; + struct lov_lock_sub *lls; + struct cl_lock *sublock; + struct lov_sublock_env *subenv; if (lock->cll_state != CLS_QUEUING) { /* @@ -493,7 +575,8 @@ static int lov_lock_enqueue(const struct lu_env *env, break; } - sub = lck->lls_sub[i].sub_lock; + lls = &lck->lls_sub[i]; + sub = lls->sub_lock; /* * Sub-lock might have been canceled, while top-lock was * cached. @@ -505,11 +588,11 @@ static int lov_lock_enqueue(const struct lu_env *env, break; } sublock = sub->lss_cl.cls_lock; - rc = lov_sublock_lock(env, sub, closure); + rc = lov_sublock_lock(env, lls, closure, &subenv); if (rc == 0) { lov_sublock_hold(env, lck, i); - rc = lov_lock_enqueue_one(env, lck, sublock, io, - enqflags, + rc = lov_lock_enqueue_one(subenv->lse_env, lck, sublock, + subenv->lse_io, enqflags, i == lck->lls_nr - 1); minstate = min(minstate, sublock->cll_state); /* @@ -518,7 +601,7 @@ static int lov_lock_enqueue(const struct lu_env *env, */ if (sublock->cll_state > CLS_HELD) rc = lov_sublock_release(env, lck, i, 1, rc); - lov_sublock_unlock(env, sub, closure); + lov_sublock_unlock(env, sub, closure, subenv); } result = lov_subresult(result, rc); if (result < 0) @@ -540,28 +623,31 @@ static int lov_lock_unuse(const struct lu_env *env, for (result = 0, i = 0; i < lck->lls_nr; ++i) { int rc; - struct lovsub_lock *sub; - struct cl_lock *sublock; + struct lovsub_lock *sub; + struct cl_lock *sublock; + struct lov_lock_sub *lls; + struct lov_sublock_env *subenv; /* top-lock state cannot change concurrently, because single * thread (one that released the last hold) carries unlocking * to the completion. */ LASSERT(slice->cls_lock->cll_state == CLS_UNLOCKING); - sub = lck->lls_sub[i].sub_lock; + lls = &lck->lls_sub[i]; + sub = lls->sub_lock; if (sub == NULL) continue; sublock = sub->lss_cl.cls_lock; - rc = lov_sublock_lock(env, sub, closure); + rc = lov_sublock_lock(env, lls, closure, &subenv); if (rc == 0) { if (lck->lls_sub[i].sub_flags & LSF_HELD) { LASSERT(sublock->cll_state == CLS_HELD); - rc = cl_unuse_try(env, sublock); + rc = cl_unuse_try(subenv->lse_env, sublock); if (rc != CLO_WAIT) rc = lov_sublock_release(env, lck, i, 0, rc); } - lov_sublock_unlock(env, sub, closure); + lov_sublock_unlock(env, sub, closure, subenv); } result = lov_subresult(result, rc); if (result < 0) @@ -588,19 +674,23 @@ static int lov_lock_wait(const struct lu_env *env, for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) { int rc; - struct lovsub_lock *sub; - struct cl_lock *sublock; + struct lovsub_lock *sub; + struct cl_lock *sublock; + struct lov_lock_sub *lls; + struct lov_sublock_env *subenv; - sub = lck->lls_sub[i].sub_lock; + lls = &lck->lls_sub[i]; + sub = lls->sub_lock; LASSERT(sub != NULL); sublock = sub->lss_cl.cls_lock; - rc = lov_sublock_lock(env, sub, closure); + rc = lov_sublock_lock(env, lls, closure, &subenv); if (rc == 0) { LASSERT(sublock->cll_state >= CLS_ENQUEUED); if (sublock->cll_state < CLS_HELD) rc = cl_wait_try(env, sublock); + minstate = min(minstate, sublock->cll_state); - lov_sublock_unlock(env, sub, closure); + lov_sublock_unlock(env, sub, closure, subenv); } result = lov_subresult(result, rc); if (result < 0) @@ -623,8 +713,10 @@ static int lov_lock_use(const struct lu_env *env, for (result = 0, i = 0; i < lck->lls_nr; ++i) { int rc; - struct lovsub_lock *sub; - struct cl_lock *sublock; + struct lovsub_lock *sub; + struct cl_lock *sublock; + struct lov_lock_sub *lls; + struct lov_sublock_env *subenv; if (slice->cls_lock->cll_state != CLS_CACHED) { /* see comment in lov_lock_enqueue(). */ @@ -636,21 +728,22 @@ static int lov_lock_use(const struct lu_env *env, * CLS_CACHED state, top-lock would have been moved into * CLS_NEW state, so all sub-locks have to be in place. */ - sub = lck->lls_sub[i].sub_lock; + lls = &lck->lls_sub[i]; + sub = lls->sub_lock; LASSERT(sub != NULL); sublock = sub->lss_cl.cls_lock; - rc = lov_sublock_lock(env, sub, closure); + rc = lov_sublock_lock(env, lls, closure, &subenv); if (rc == 0) { LASSERT(sublock->cll_state != CLS_FREEING); lov_sublock_hold(env, lck, i); if (sublock->cll_state == CLS_CACHED) { - rc = cl_use_try(env, sublock); + rc = cl_use_try(subenv->lse_env, sublock); if (rc != 0) rc = lov_sublock_release(env, lck, i, 1, rc); } else rc = 0; - lov_sublock_unlock(env, sub, closure); + lov_sublock_unlock(env, sub, closure, subenv); } result = lov_subresult(result, rc); if (result < 0) @@ -699,19 +792,49 @@ static int lock_lock_multi_match() } #endif -static int lov_is_same_stripe(struct lov_object *lov, int stripe, - const struct cl_lock_descr *descr) +/** + * Check if the extent region \a descr is covered by \a child against the + * specific \a stripe. + */ +static int lov_lock_stripe_is_matching(const struct lu_env *env, + struct lov_object *lov, int stripe, + const struct cl_lock_descr *child, + const struct cl_lock_descr *descr) { struct lov_stripe_md *lsm = lov_r0(lov)->lo_lsm; obd_off start; obd_off end; + int result; + if (lov_r0(lov)->lo_nr == 1) + return cl_lock_ext_match(child, descr); + + /* + * For a multi-stripes object: + * - make sure the descr only covers child's stripe, and + * - check if extent is matching. + */ start = cl_offset(&lov->lo_cl, descr->cld_start); end = cl_offset(&lov->lo_cl, descr->cld_end + 1) - 1; - return - end - start <= lsm->lsm_stripe_size && - stripe == lov_stripe_number(lsm, start) && - stripe == lov_stripe_number(lsm, end); + result = end - start <= lsm->lsm_stripe_size && + stripe == lov_stripe_number(lsm, start) && + stripe == lov_stripe_number(lsm, end); + if (result) { + struct cl_lock_descr *subd = &lov_env_info(env)->lti_ldescr; + obd_off sub_start; + obd_off sub_end; + + subd->cld_obj = NULL; /* don't need sub object at all */ + subd->cld_mode = descr->cld_mode; + subd->cld_gid = descr->cld_gid; + result = lov_stripe_intersects(lsm, stripe, start, end, + &sub_start, &sub_end); + LASSERT(result); + subd->cld_start = cl_index(child->cld_obj, sub_start); + subd->cld_end = cl_index(child->cld_obj, sub_end); + result = cl_lock_ext_match(child, subd); + } + return result; } /** @@ -737,20 +860,17 @@ static int lov_lock_fits_into(const struct lu_env *env, ENTRY; - if (lov->lls_nr == 1) { + if (need->cld_mode == CLM_GROUP) /* - * If a lock is on a single stripe, it's enough to check that - * @need lock matches actually granted stripe lock, and... + * always allow to match group lock. */ - result = cl_lock_ext_match(&lov->lls_sub[0].sub_got, need); - if (result && lov_r0(obj)->lo_nr > 1) - /* - * ... @need is on the same stripe, if multiple - * stripes are possible at all for this object. - */ - result = lov_is_same_stripe(cl2lov(slice->cls_obj), - lov->lls_sub[0].sub_stripe, - need); + result = cl_lock_ext_match(&lov->lls_orig, need); + else if (lov->lls_nr == 1) { + struct cl_lock_descr *got = &lov->lls_sub[0].sub_got; + result = lov_lock_stripe_is_matching(env, + cl2lov(slice->cls_obj), + lov->lls_sub[0].sub_stripe, + got, need); } else if (io->ci_type != CIT_TRUNC && io->ci_type != CIT_MISC && !cl_io_is_append(io) && need->cld_mode != CLM_PHANTOM) /* @@ -838,16 +958,18 @@ static void lov_lock_delete(const struct lu_env *env, ENTRY; for (i = 0; i < lck->lls_nr; ++i) { - struct lovsub_lock *lsl; - struct cl_lock *sublock; + struct lov_lock_sub *lls; + struct lovsub_lock *lsl; + struct cl_lock *sublock; int rc; - lsl = lck->lls_sub[i].sub_lock; + lls = &lck->lls_sub[i]; + lsl = lls->sub_lock; if (lsl == NULL) continue; sublock = lsl->lss_cl.cls_lock; - rc = lov_sublock_lock(env, lsl, closure); + rc = lov_sublock_lock(env, lls, closure, NULL); if (rc == 0) { if (lck->lls_sub[i].sub_flags & LSF_HELD) lov_sublock_release(env, lck, i, 1, 0); @@ -859,7 +981,7 @@ static void lov_lock_delete(const struct lu_env *env, lov_lock_unlink(env, link, lsl); LASSERT(lck->lls_sub[i].sub_lock == NULL); } - lov_sublock_unlock(env, lsl, closure); + lov_sublock_unlock(env, lsl, closure, NULL); } else if (rc == CLO_REPEAT) { --i; /* repeat with this lock */ } else { @@ -911,7 +1033,7 @@ int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj, int result; ENTRY; - OBD_SLAB_ALLOC_PTR(lck, lov_lock_kmem); + OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, CFS_ALLOC_IO); if (lck != NULL) { cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops); result = lov_lock_sub_init(env, lck, io); @@ -926,7 +1048,7 @@ static struct cl_lock_closure *lov_closure_get(const struct lu_env *env, struct cl_lock_closure *closure; closure = &lov_env_info(env)->lti_closure; - LINVRNT(list_empty(&closure->clc_list)); + LASSERT(list_empty(&closure->clc_list)); cl_lock_closure_init(env, closure, parent, 1); return closure; }