X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_lock.c;h=62da056e2876e9b6f2e403d42f4761670ee58c76;hp=b2042a47d92e17a4ac093e4d96ffea8f3e0991ba;hb=72617588ac8cb2e3e5a7b8e5ebc201cab524d938;hpb=b44b1ff8c7fc50d5b0438926c89c69eb85817168 diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index b2042a4..62da056 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -106,7 +106,7 @@ static int osc_lock_invariant(struct osc_lock *ols) if (! ergo(ols->ols_state == OLS_GRANTED, olock != NULL && - olock->l_req_mode == olock->l_granted_mode && + ldlm_is_granted(olock) && ols->ols_hold)) return 0; return 1; @@ -144,20 +144,17 @@ static void osc_lock_build_policy(const struct lu_env *env, * with the DLM lock reply from the server. Copy of osc_update_enqueue() * logic. * - * This can be optimized to not update attributes when lock is a result of a - * local match. - * * Called under lock and resource spin-locks. */ -static void osc_lock_lvb_update(const struct lu_env *env, - struct osc_object *osc, - struct ldlm_lock *dlmlock, - struct ost_lvb *lvb) +void osc_lock_lvb_update(const struct lu_env *env, + struct osc_object *osc, + struct ldlm_lock *dlmlock, + struct ost_lvb *lvb) { - struct cl_object *obj = osc2cl(osc); - struct lov_oinfo *oinfo = osc->oo_oinfo; - struct cl_attr *attr = &osc_env_info(env)->oti_attr; - unsigned valid; + struct cl_object *obj = osc2cl(osc); + struct lov_oinfo *oinfo = osc->oo_oinfo; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + unsigned valid, setkms = 0; ENTRY; @@ -182,19 +179,23 @@ static void osc_lock_lvb_update(const struct lu_env *env, if (size > dlmlock->l_policy_data.l_extent.end) size = dlmlock->l_policy_data.l_extent.end + 1; if (size >= oinfo->loi_kms) { - LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu" - ", kms=%llu", lvb->lvb_size, size); valid |= CAT_KMS; attr->cat_kms = size; - } else { - LDLM_DEBUG(dlmlock, "lock acquired, setting rss=" - "%llu; leaving kms=%llu, end=%llu", - lvb->lvb_size, oinfo->loi_kms, - dlmlock->l_policy_data.l_extent.end); + setkms = 1; } ldlm_lock_allow_match_locked(dlmlock); } + /* The size should not be less than the kms */ + if (attr->cat_size < oinfo->loi_kms) + attr->cat_size = oinfo->loi_kms; + + LDLM_DEBUG(dlmlock, "acquired size %llu, setting rss=%llu;%s " + "kms=%llu, end=%llu", lvb->lvb_size, attr->cat_size, + setkms ? "" : " leaving", + setkms ? attr->cat_kms : oinfo->loi_kms, + dlmlock ? dlmlock->l_policy_data.l_extent.end : -1ull); + cl_object_attr_update(env, obj, attr, valid); cl_object_attr_unlock(obj); @@ -202,8 +203,9 @@ static void osc_lock_lvb_update(const struct lu_env *env, } static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, - struct lustre_handle *lockh, bool lvb_update) + struct lustre_handle *lockh) { + struct osc_object *osc = cl2osc(oscl->ols_cl.cls_obj); struct ldlm_lock *dlmlock; dlmlock = ldlm_handle2lock_long(lockh, 0); @@ -212,7 +214,7 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, /* lock reference taken by ldlm_handle2lock_long() is * owned by osc_lock and released in osc_lock_detach() */ - lu_ref_add(&dlmlock->l_reference, "osc_lock", oscl); + lu_ref_add_atomic(&dlmlock->l_reference, "osc_lock", oscl); oscl->ols_has_ref = 1; LASSERT(oscl->ols_dlmlock == NULL); @@ -230,7 +232,7 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, /* Lock must have been granted. */ lock_res_and_lock(dlmlock); - if (dlmlock->l_granted_mode == dlmlock->l_req_mode) { + if (ldlm_is_granted(dlmlock)) { struct ldlm_extent *ext = &dlmlock->l_policy_data.l_extent; struct cl_lock_descr *descr = &oscl->ols_cl.cls_lock->cll_descr; @@ -242,10 +244,11 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, descr->cld_gid = ext->gid; /* no lvb update for matched lock */ - if (lvb_update) { + if (!ldlm_is_lvb_cached(dlmlock)) { LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY); - osc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj), - dlmlock, NULL); + LASSERT(osc == dlmlock->l_ast_data); + osc_lock_lvb_update(env, osc, dlmlock, NULL); + ldlm_set_lvb_cached(dlmlock); } LINVRNT(osc_lock_invariant(oscl)); } @@ -285,7 +288,7 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh, } if (rc == 0) - osc_lock_granted(env, oscl, lockh, errcode == ELDLM_OK); + osc_lock_granted(env, oscl, lockh); /* Error handling, some errors are tolerable. */ if (oscl->ols_locklessable && rc == -EUSERS) { @@ -339,9 +342,10 @@ static int osc_lock_upcall_speculative(void *cookie, LASSERT(dlmlock != NULL); lock_res_and_lock(dlmlock); - LASSERT(dlmlock->l_granted_mode == dlmlock->l_req_mode); + LASSERT(ldlm_is_granted(dlmlock)); - /* there is no osc_lock associated with speculative locks */ + /* there is no osc_lock associated with speculative locks + * thus no need to set LDLM_FL_LVB_CACHED */ osc_lock_lvb_update(env, osc, dlmlock, NULL); unlock_res_and_lock(dlmlock); @@ -377,7 +381,12 @@ static int osc_lock_flush(struct osc_object *obj, pgoff_t start, pgoff_t end, rc = 0; } - rc2 = osc_lock_discard_pages(env, obj, start, end, discard); + /* + * Do not try to match other locks with CLM_WRITE since we already + * know there're none + */ + rc2 = osc_lock_discard_pages(env, obj, start, end, + mode == CLM_WRITE || discard); if (rc == 0 && rc2 < 0) rc = rc2; @@ -402,7 +411,7 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env, LASSERT(flag == LDLM_CB_CANCELING); lock_res_and_lock(dlmlock); - if (dlmlock->l_granted_mode != dlmlock->l_req_mode) { + if (!ldlm_is_granted(dlmlock)) { dlmlock->l_ast_data = NULL; unlock_res_and_lock(dlmlock); RETURN(0); @@ -414,13 +423,13 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env, if (dlmlock->l_ast_data != NULL) { obj = osc2cl(dlmlock->l_ast_data); - dlmlock->l_ast_data = NULL; - cl_object_get(obj); } unlock_res_and_lock(dlmlock); + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_CANCEL, 5); + /* if l_ast_data is NULL, the dlmlock was enqueued by AGL or * the object has been destroyed. */ if (obj != NULL) { @@ -436,6 +445,9 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env, /* losing a lock, update kms */ lock_res_and_lock(dlmlock); + /* clearing l_ast_data after flushing data, + * to let glimpse ast find the lock and the object */ + dlmlock->l_ast_data = NULL; cl_object_attr_lock(obj); /* Must get the value under the lock to avoid race. */ old_kms = cl2osc(obj)->oo_oinfo->loi_kms; @@ -549,6 +561,10 @@ int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) struct ost_lvb *lvb; struct req_capsule *cap; struct cl_object *obj = NULL; + struct ldlm_resource *res = dlmlock->l_resource; + struct ldlm_match_data matchdata = { 0 }; + union ldlm_policy_data policy; + enum ldlm_mode mode = LCK_PW | LCK_GROUP | LCK_PR; int result; __u16 refcheck; @@ -560,13 +576,39 @@ int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) if (IS_ERR(env)) GOTO(out, result = PTR_ERR(env)); + policy.l_extent.start = 0; + policy.l_extent.end = LUSTRE_EOF; - lock_res_and_lock(dlmlock); - if (dlmlock->l_ast_data != NULL) { - obj = osc2cl(dlmlock->l_ast_data); - cl_object_get(obj); + matchdata.lmd_mode = &mode; + matchdata.lmd_policy = &policy; + matchdata.lmd_flags = LDLM_FL_TEST_LOCK | LDLM_FL_CBPENDING; + matchdata.lmd_match = LDLM_MATCH_UNREF | LDLM_MATCH_AST_ANY; + + LDLM_LOCK_GET(dlmlock); + + /* If any dlmlock has l_ast_data set, we must find it or we risk + * missing a size update done under a different lock. + */ + while (dlmlock) { + lock_res_and_lock(dlmlock); + if (dlmlock->l_ast_data) { + obj = osc2cl(dlmlock->l_ast_data); + cl_object_get(obj); + } + unlock_res_and_lock(dlmlock); + LDLM_LOCK_RELEASE(dlmlock); + + dlmlock = NULL; + + if (obj == NULL && res->lr_type == LDLM_EXTENT) { + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_SIZE_DATA)) + break; + + lock_res(res); + dlmlock = search_itree(res, &matchdata); + unlock_res(res); + } } - unlock_res_and_lock(dlmlock); if (obj != NULL) { /* Do not grab the mutex of cl_lock for glimpse. @@ -606,17 +648,17 @@ out: } EXPORT_SYMBOL(osc_ldlm_glimpse_ast); -static int weigh_cb(const struct lu_env *env, struct cl_io *io, - struct osc_page *ops, void *cbdata) +static bool weigh_cb(const struct lu_env *env, struct cl_io *io, + struct osc_page *ops, void *cbdata) { struct cl_page *page = ops->ops_cl.cpl_page; if (cl_page_is_vmlocked(env, page) || PageDirty(page->cp_vmpage) || PageWriteback(page->cp_vmpage)) - return CLP_GANG_ABORT; + return false; *(pgoff_t *)cbdata = osc_index(ops) + 1; - return CLP_GANG_OKAY; + return true; } static unsigned long osc_lock_weight(const struct lu_env *env, @@ -634,21 +676,17 @@ static unsigned long osc_lock_weight(const struct lu_env *env, io->ci_ignore_layout = 1; result = cl_io_init(env, io, CIT_MISC, io->ci_obj); if (result != 0) - RETURN(result); + RETURN(1); page_index = cl_index(obj, start); - do { - result = osc_page_gang_lookup(env, io, oscobj, - page_index, cl_index(obj, end), - weigh_cb, (void *)&page_index); - if (result == CLP_GANG_ABORT) - break; - if (result == CLP_GANG_RESCHED) - cond_resched(); - } while (result != CLP_GANG_OKAY); + + if (!osc_page_gang_lookup(env, io, oscobj, + page_index, cl_index(obj, end), + weigh_cb, (void *)&page_index)) + result = 1; cl_io_fini(env, io); - return result == CLP_GANG_ABORT ? 1 : 0; + return result; } /** @@ -679,7 +717,8 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock) RETURN(1); LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT || - ldlm_has_dom(dlmlock)); + dlmlock->l_resource->lr_type == LDLM_IBITS); + lock_res_and_lock(dlmlock); obj = dlmlock->l_ast_data; if (obj) @@ -687,13 +726,14 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock) unlock_res_and_lock(dlmlock); if (obj == NULL) - GOTO(out, weight = 1); + GOTO(out, weight = 0); spin_lock(&obj->oo_ol_spin); list_for_each_entry(oscl, &obj->oo_ol_list, ols_nextlock_oscobj) { - if (oscl->ols_dlmlock != NULL && oscl->ols_dlmlock != dlmlock) - continue; - found = true; + if (oscl->ols_dlmlock == dlmlock) { + found = true; + break; + } } spin_unlock(&obj->oo_ol_spin); if (found) { @@ -703,12 +743,17 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock) GOTO(out, weight = 1); } - if (ldlm_has_dom(dlmlock)) - weight = osc_lock_weight(env, obj, 0, OBD_OBJECT_EOF); - else + if (dlmlock->l_resource->lr_type == LDLM_EXTENT) weight = osc_lock_weight(env, obj, dlmlock->l_policy_data.l_extent.start, dlmlock->l_policy_data.l_extent.end); + else if (ldlm_has_dom(dlmlock)) + weight = osc_lock_weight(env, obj, 0, OBD_OBJECT_EOF); + /* The DOM bit can be cancelled at any time; in that case, we know + * there are no pages, so just return weight of 0 + */ + else + weight = 0; EXIT; @@ -866,7 +911,7 @@ restart: continue; /* wait for conflicting lock to be canceled */ - cl_sync_io_init(waiter, 1, cl_sync_io_end); + cl_sync_io_init(waiter, 1); oscl->ols_owner = waiter; spin_lock(&tmp_oscl->ols_lock); @@ -931,10 +976,10 @@ static int osc_lock_enqueue(const struct lu_env *env, RETURN(0); if ((oscl->ols_flags & LDLM_FL_NO_EXPANSION) && - !(exp_connect_lockahead_old(exp) || exp_connect_lockahead(exp))) { + !exp_connect_lockahead(exp)) { result = -EOPNOTSUPP; - CERROR("%s: server does not support lockahead/locknoexpand:" - "rc = %d\n", exp->exp_obd->obd_name, result); + CERROR("%s: server does not support lockahead/locknoexpand: rc = %d\n", + exp->exp_obd->obd_name, result); RETURN(result); } @@ -992,7 +1037,6 @@ enqueue_base: } result = osc_enqueue_base(exp, resname, &oscl->ols_flags, policy, &oscl->ols_lvb, - osc->oo_oinfo->loi_kms_valid, upcall, cookie, &oscl->ols_einfo, PTLRPCD_SET, async, oscl->ols_speculative); @@ -1142,9 +1186,9 @@ void osc_lock_set_writer(const struct lu_env *env, const struct cl_io *io, return; if (likely(io->ci_type == CIT_WRITE)) { - io_start = cl_index(obj, io->u.ci_rw.rw_range.cir_pos); - io_end = cl_index(obj, io->u.ci_rw.rw_range.cir_pos + - io->u.ci_rw.rw_range.cir_count - 1); + io_start = cl_index(obj, io->u.ci_rw.crw_pos); + io_end = cl_index(obj, io->u.ci_rw.crw_pos + + io->u.ci_rw.crw_count - 1); } else { LASSERT(cl_io_is_mkwrite(io)); io_start = io_end = io->u.ci_fault.ft_index; @@ -1188,6 +1232,8 @@ int osc_lock_init(const struct lu_env *env, oscl->ols_flags = osc_enq2ldlm_flags(enqflags); oscl->ols_speculative = !!(enqflags & CEF_SPECULATIVE); + if (lock->cll_descr.cld_mode == CLM_GROUP) + oscl->ols_flags |= LDLM_FL_ATOMIC_CB; if (oscl->ols_flags & LDLM_FL_HAS_INTENT) { oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED; @@ -1230,6 +1276,7 @@ struct ldlm_lock *osc_obj_dlmlock_at_pgoff(const struct lu_env *env, struct ldlm_lock *lock = NULL; enum ldlm_mode mode; __u64 flags; + enum ldlm_match_flags match_flags = 0; ENTRY; @@ -1240,14 +1287,24 @@ struct ldlm_lock *osc_obj_dlmlock_at_pgoff(const struct lu_env *env, flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; if (dap_flags & OSC_DAP_FL_TEST_LOCK) flags |= LDLM_FL_TEST_LOCK; + + if (dap_flags & OSC_DAP_FL_AST) + match_flags |= LDLM_MATCH_AST; + + if (dap_flags & OSC_DAP_FL_CANCELING) + match_flags |= LDLM_MATCH_UNREF; + + if (dap_flags & OSC_DAP_FL_RIGHT) + match_flags |= LDLM_MATCH_RIGHT; + /* * It is fine to match any group lock since there could be only one * with a uniq gid and it conflicts with all other lock modes too */ again: - mode = osc_match_base(osc_export(obj), resname, LDLM_EXTENT, policy, - LCK_PR | LCK_PW | LCK_GROUP, &flags, obj, &lockh, - dap_flags & OSC_DAP_FL_CANCELING); + mode = osc_match_base(env, osc_export(obj), resname, LDLM_EXTENT, + policy, LCK_PR | LCK_PW | LCK_GROUP, &flags, + obj, &lockh, match_flags); if (mode != 0) { lock = ldlm_handle2lock(&lockh); /* RACE: the lock is cancelled so let's try again */