X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosc%2Fosc_lock.c;h=dbf8cde90317f019b3a3b84cb0dfafc8f1d78404;hb=HEAD;hp=10849d0c68248918c81172711b00bfb35ac68d10;hpb=92c4ad14d4b1e77e1566c8481195bb623472f149;p=fs%2Flustre-release.git diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index 10849d0..3502f70 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -27,7 +27,6 @@ */ /* * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. * * Implementation of cl_lock for OSC layer. * @@ -65,7 +64,7 @@ static struct ldlm_lock *osc_handle_ptr(struct lustre_handle *handle) /** * Invariant that has to be true all of the time. */ -static int osc_lock_invariant(struct osc_lock *ols) +static inline int osc_lock_invariant(struct osc_lock *ols) { struct ldlm_lock *lock = osc_handle_ptr(&ols->ols_handle); struct ldlm_lock *olock = ols->ols_dlmlock; @@ -144,20 +143,17 @@ static void osc_lock_build_policy(const struct lu_env *env, * with the DLM lock reply from the server. Copy of osc_update_enqueue() * logic. * - * This can be optimized to not update attributes when lock is a result of a - * local match. - * * Called under lock and resource spin-locks. */ -static void osc_lock_lvb_update(const struct lu_env *env, - struct osc_object *osc, - struct ldlm_lock *dlmlock, - struct ost_lvb *lvb) +void osc_lock_lvb_update(const struct lu_env *env, + struct osc_object *osc, + struct ldlm_lock *dlmlock, + struct ost_lvb *lvb) { - struct cl_object *obj = osc2cl(osc); - struct lov_oinfo *oinfo = osc->oo_oinfo; - struct cl_attr *attr = &osc_env_info(env)->oti_attr; - unsigned valid; + struct cl_object *obj = osc2cl(osc); + struct lov_oinfo *oinfo = osc->oo_oinfo; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + unsigned valid, setkms = 0; ENTRY; @@ -182,19 +178,23 @@ static void osc_lock_lvb_update(const struct lu_env *env, if (size > dlmlock->l_policy_data.l_extent.end) size = dlmlock->l_policy_data.l_extent.end + 1; if (size >= oinfo->loi_kms) { - LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu" - ", kms=%llu", lvb->lvb_size, size); valid |= CAT_KMS; attr->cat_kms = size; - } else { - LDLM_DEBUG(dlmlock, "lock acquired, setting rss=" - "%llu; leaving kms=%llu, end=%llu", - lvb->lvb_size, oinfo->loi_kms, - dlmlock->l_policy_data.l_extent.end); + setkms = 1; } ldlm_lock_allow_match_locked(dlmlock); } + /* The size should not be less than the kms */ + if (attr->cat_size < oinfo->loi_kms) + attr->cat_size = oinfo->loi_kms; + + LDLM_DEBUG(dlmlock, "acquired size %llu, setting rss=%llu;%s " + "kms=%llu, end=%llu", lvb->lvb_size, attr->cat_size, + setkms ? "" : " leaving", + setkms ? attr->cat_kms : oinfo->loi_kms, + dlmlock ? dlmlock->l_policy_data.l_extent.end : -1ull); + cl_object_attr_update(env, obj, attr, valid); cl_object_attr_unlock(obj); @@ -202,8 +202,9 @@ static void osc_lock_lvb_update(const struct lu_env *env, } static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, - struct lustre_handle *lockh, bool lvb_update) + struct lustre_handle *lockh) { + struct osc_object *osc = cl2osc(oscl->ols_cl.cls_obj); struct ldlm_lock *dlmlock; dlmlock = ldlm_handle2lock_long(lockh, 0); @@ -212,7 +213,7 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, /* lock reference taken by ldlm_handle2lock_long() is * owned by osc_lock and released in osc_lock_detach() */ - lu_ref_add(&dlmlock->l_reference, "osc_lock", oscl); + lu_ref_add_atomic(&dlmlock->l_reference, "osc_lock", oscl); oscl->ols_has_ref = 1; LASSERT(oscl->ols_dlmlock == NULL); @@ -237,15 +238,16 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, /* extend the lock extent, otherwise it will have problem when * we decide whether to grant a lockless lock. */ descr->cld_mode = osc_ldlm2cl_lock(dlmlock->l_granted_mode); - descr->cld_start = cl_index(descr->cld_obj, ext->start); - descr->cld_end = cl_index(descr->cld_obj, ext->end); + descr->cld_start = ext->start >> PAGE_SHIFT; + descr->cld_end = ext->end >> PAGE_SHIFT; descr->cld_gid = ext->gid; /* no lvb update for matched lock */ - if (lvb_update) { + if (!ldlm_is_lvb_cached(dlmlock)) { LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY); - osc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj), - dlmlock, NULL); + LASSERT(osc == dlmlock->l_ast_data); + osc_lock_lvb_update(env, osc, dlmlock, NULL); + ldlm_set_lvb_cached(dlmlock); } LINVRNT(osc_lock_invariant(oscl)); } @@ -285,28 +287,17 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh, } if (rc == 0) - osc_lock_granted(env, oscl, lockh, errcode == ELDLM_OK); + osc_lock_granted(env, oscl, lockh); /* Error handling, some errors are tolerable. */ - if (oscl->ols_locklessable && rc == -EUSERS) { - /* This is a tolerable error, turn this lock into - * lockless lock. - */ - osc_object_set_contended(cl2osc(slice->cls_obj)); - LASSERT(slice->cls_ops != oscl->ols_lockless_ops); - - /* Change this lock to ldlmlock-less lock. */ - osc_lock_to_lockless(env, oscl, 1); - oscl->ols_state = OLS_GRANTED; - rc = 0; - } else if (oscl->ols_glimpse && rc == -ENAVAIL) { + if (oscl->ols_glimpse && rc == -ENAVAIL) { LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY); osc_lock_lvb_update(env, cl2osc(slice->cls_obj), NULL, &oscl->ols_lvb); /* Hide the error. */ rc = 0; } else if (rc < 0 && oscl->ols_flags & LDLM_FL_NDELAY) { - rc = -EWOULDBLOCK; + rc = -EAGAIN; } if (oscl->ols_owner != NULL) @@ -341,7 +332,8 @@ static int osc_lock_upcall_speculative(void *cookie, lock_res_and_lock(dlmlock); LASSERT(ldlm_is_granted(dlmlock)); - /* there is no osc_lock associated with speculative locks */ + /* there is no osc_lock associated with speculative locks + * thus no need to set LDLM_FL_LVB_CACHED */ osc_lock_lvb_update(env, osc, dlmlock, NULL); unlock_res_and_lock(dlmlock); @@ -419,13 +411,13 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env, if (dlmlock->l_ast_data != NULL) { obj = osc2cl(dlmlock->l_ast_data); - dlmlock->l_ast_data = NULL; - cl_object_get(obj); } unlock_res_and_lock(dlmlock); + CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_CANCEL, 5); + /* if l_ast_data is NULL, the dlmlock was enqueued by AGL or * the object has been destroyed. */ if (obj != NULL) { @@ -435,12 +427,15 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env, /* Destroy pages covered by the extent of the DLM lock */ result = osc_lock_flush(cl2osc(obj), - cl_index(obj, extent->start), - cl_index(obj, extent->end), + extent->start >> PAGE_SHIFT, + extent->end >> PAGE_SHIFT, mode, discard); /* losing a lock, update kms */ lock_res_and_lock(dlmlock); + /* clearing l_ast_data after flushing data, + * to let glimpse ast find the lock and the object */ + dlmlock->l_ast_data = NULL; cl_object_attr_lock(obj); /* Must get the value under the lock to avoid race. */ old_kms = cl2osc(obj)->oo_oinfo->loi_kms; @@ -554,6 +549,10 @@ int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) struct ost_lvb *lvb; struct req_capsule *cap; struct cl_object *obj = NULL; + struct ldlm_resource *res = dlmlock->l_resource; + struct ldlm_match_data matchdata = { 0 }; + union ldlm_policy_data policy; + enum ldlm_mode mode = LCK_PW | LCK_GROUP | LCK_PR; int result; __u16 refcheck; @@ -565,13 +564,39 @@ int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) if (IS_ERR(env)) GOTO(out, result = PTR_ERR(env)); + policy.l_extent.start = 0; + policy.l_extent.end = LUSTRE_EOF; - lock_res_and_lock(dlmlock); - if (dlmlock->l_ast_data != NULL) { - obj = osc2cl(dlmlock->l_ast_data); - cl_object_get(obj); + matchdata.lmd_mode = &mode; + matchdata.lmd_policy = &policy; + matchdata.lmd_flags = LDLM_FL_TEST_LOCK | LDLM_FL_CBPENDING; + matchdata.lmd_match = LDLM_MATCH_UNREF | LDLM_MATCH_AST_ANY; + + LDLM_LOCK_GET(dlmlock); + + /* If any dlmlock has l_ast_data set, we must find it or we risk + * missing a size update done under a different lock. + */ + while (dlmlock) { + lock_res_and_lock(dlmlock); + if (dlmlock->l_ast_data) { + obj = osc2cl(dlmlock->l_ast_data); + cl_object_get(obj); + } + unlock_res_and_lock(dlmlock); + LDLM_LOCK_RELEASE(dlmlock); + + dlmlock = NULL; + + if (obj == NULL && res->lr_type == LDLM_EXTENT) { + if (CFS_FAIL_CHECK(OBD_FAIL_OSC_NO_SIZE_DATA)) + break; + + lock_res(res); + dlmlock = search_itree(res, &matchdata); + unlock_res(res); + } } - unlock_res_and_lock(dlmlock); if (obj != NULL) { /* Do not grab the mutex of cl_lock for glimpse. @@ -611,17 +636,23 @@ out: } EXPORT_SYMBOL(osc_ldlm_glimpse_ast); -static int weigh_cb(const struct lu_env *env, struct cl_io *io, - struct osc_page *ops, void *cbdata) +static bool weigh_cb(const struct lu_env *env, struct cl_io *io, + void **pvec, int count, void *cbdata) { - struct cl_page *page = ops->ops_cl.cpl_page; + int i; - if (cl_page_is_vmlocked(env, page) || PageDirty(page->cp_vmpage) || - PageWriteback(page->cp_vmpage)) - return CLP_GANG_ABORT; + for (i = 0; i < count; i++) { + struct osc_page *ops = pvec[i]; + struct cl_page *page = ops->ops_cl.cpl_page; - *(pgoff_t *)cbdata = osc_index(ops) + 1; - return CLP_GANG_OKAY; + if (PageLocked(page->cp_vmpage) || + PageDirty(page->cp_vmpage) || + PageWriteback(page->cp_vmpage)) + return false; + + *(pgoff_t *)cbdata = osc_index(ops) + 1; + } + return true; } static unsigned long osc_lock_weight(const struct lu_env *env, @@ -639,21 +670,17 @@ static unsigned long osc_lock_weight(const struct lu_env *env, io->ci_ignore_layout = 1; result = cl_io_init(env, io, CIT_MISC, io->ci_obj); if (result != 0) - RETURN(result); + RETURN(1); - page_index = cl_index(obj, start); - do { - result = osc_page_gang_lookup(env, io, oscobj, - page_index, cl_index(obj, end), - weigh_cb, (void *)&page_index); - if (result == CLP_GANG_ABORT) - break; - if (result == CLP_GANG_RESCHED) - cond_resched(); - } while (result != CLP_GANG_OKAY); + page_index = start >> PAGE_SHIFT; + + if (!osc_page_gang_lookup(env, io, oscobj, + page_index, end >> PAGE_SHIFT, + weigh_cb, (void *)&page_index)) + result = 1; cl_io_fini(env, io); - return result == CLP_GANG_ABORT ? 1 : 0; + return result; } /** @@ -693,7 +720,7 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock) unlock_res_and_lock(dlmlock); if (obj == NULL) - GOTO(out, weight = 1); + GOTO(out, weight = 0); spin_lock(&obj->oo_ol_spin); list_for_each_entry(oscl, &obj->oo_ol_list, ols_nextlock_oscobj) { @@ -766,7 +793,6 @@ void osc_lock_to_lockless(const struct lu_env *env, struct cl_io *io = oio->oi_cl.cis_io; struct cl_object *obj = slice->cls_obj; struct osc_object *oob = cl2osc(obj); - const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); struct obd_connect_data *ocd; LASSERT(ols->ols_state == OLS_NEW || @@ -785,12 +811,7 @@ void osc_lock_to_lockless(const struct lu_env *env, (io->ci_lockreq == CILR_MAYBE) && (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK); - if (io->ci_lockreq == CILR_NEVER || - /* lockless IO */ - (ols->ols_locklessable && osc_object_is_contended(oob)) || - /* lockless truncate */ - (cl_io_is_trunc(io) && osd->od_lockless_truncate && - (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK))) { + if (io->ci_lockreq == CILR_NEVER) { ols->ols_locklessable = 1; slice->cls_ops = ols->ols_lockless_ops; } @@ -825,16 +846,16 @@ static bool osc_lock_compatible(const struct osc_lock *qing, void osc_lock_wake_waiters(const struct lu_env *env, struct osc_object *osc, struct osc_lock *oscl) { + struct osc_lock *scan; + spin_lock(&osc->oo_ol_spin); list_del_init(&oscl->ols_nextlock_oscobj); spin_unlock(&osc->oo_ol_spin); spin_lock(&oscl->ols_lock); - while (!list_empty(&oscl->ols_waiting_list)) { - struct osc_lock *scan; - - scan = list_entry(oscl->ols_waiting_list.next, struct osc_lock, - ols_wait_entry); + while ((scan = list_first_entry_or_null(&oscl->ols_waiting_list, + struct osc_lock, + ols_wait_entry)) != NULL) { list_del_init(&scan->ols_wait_entry); cl_sync_io_note(env, scan->ols_owner, 0); @@ -937,16 +958,16 @@ static int osc_lock_enqueue(const struct lu_env *env, ENTRY; LASSERTF(ergo(oscl->ols_glimpse, lock->cll_descr.cld_mode <= CLM_READ), - "lock = %p, ols = %p\n", lock, oscl); + "lock = %px, ols = %px\n", lock, oscl); if (oscl->ols_state == OLS_GRANTED) RETURN(0); if ((oscl->ols_flags & LDLM_FL_NO_EXPANSION) && - !(exp_connect_lockahead_old(exp) || exp_connect_lockahead(exp))) { + !exp_connect_lockahead(exp)) { result = -EOPNOTSUPP; - CERROR("%s: server does not support lockahead/locknoexpand:" - "rc = %d\n", exp->exp_obd->obd_name, result); + CERROR("%s: server does not support lockahead/locknoexpand: rc = %d\n", + exp->exp_obd->obd_name, result); RETURN(result); } @@ -1004,7 +1025,6 @@ enqueue_base: } result = osc_enqueue_base(exp, resname, &oscl->ols_flags, policy, &oscl->ols_lvb, - osc->oo_oinfo->loi_kms_valid, upcall, cookie, &oscl->ols_einfo, PTLRPCD_SET, async, oscl->ols_speculative); @@ -1012,6 +1032,11 @@ enqueue_base: if (osc_lock_is_lockless(oscl)) { oio->oi_lockless = 1; } else if (!async) { + if (CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_IDLE_RACE)) { + CFS_RACE(OBD_FAIL_PTLRPC_IDLE_RACE); + set_current_state(TASK_UNINTERRUPTIBLE); + schedule_timeout(cfs_time_seconds(1) / 2); + } LASSERT(oscl->ols_state == OLS_GRANTED); LASSERT(oscl->ols_hold); LASSERT(oscl->ols_dlmlock != NULL); @@ -1123,16 +1148,8 @@ static void osc_lock_lockless_cancel(const struct lu_env *env, { struct osc_lock *ols = cl2osc_lock(slice); struct osc_object *osc = cl2osc(slice->cls_obj); - struct cl_lock_descr *descr = &slice->cls_lock->cll_descr; - int result; LASSERT(ols->ols_dlmlock == NULL); - result = osc_lock_flush(osc, descr->cld_start, descr->cld_end, - descr->cld_mode, false); - if (result) - CERROR("Pages for lockless lock %p were not purged(%d)\n", - ols, result); - osc_lock_wake_waiters(env, osc, ols); } @@ -1154,9 +1171,9 @@ void osc_lock_set_writer(const struct lu_env *env, const struct cl_io *io, return; if (likely(io->ci_type == CIT_WRITE)) { - io_start = cl_index(obj, io->u.ci_rw.crw_pos); - io_end = cl_index(obj, io->u.ci_rw.crw_pos + - io->u.ci_rw.crw_count - 1); + io_start = io->u.ci_rw.crw_pos >> PAGE_SHIFT; + io_end = (io->u.ci_rw.crw_pos + + io->u.ci_rw.crw_bytes - 1) >> PAGE_SHIFT; } else { LASSERT(cl_io_is_mkwrite(io)); io_start = io_end = io->u.ci_fault.ft_index; @@ -1174,6 +1191,22 @@ void osc_lock_set_writer(const struct lu_env *env, const struct cl_io *io, } EXPORT_SYMBOL(osc_lock_set_writer); +void osc_lock_set_reader(const struct lu_env *env, const struct cl_io *io, + struct cl_object *obj, struct osc_lock *oscl) +{ + struct osc_io *oio = osc_env_io(env); + + if (!cl_object_same(io->ci_obj, obj)) + return; + + if (oscl->ols_glimpse || osc_lock_is_lockless(oscl)) + return; + + if (oio->oi_read_osclock == NULL) + oio->oi_read_osclock = oscl; +} +EXPORT_SYMBOL(osc_lock_set_reader); + int osc_lock_init(const struct lu_env *env, struct cl_object *obj, struct cl_lock *lock, const struct cl_io *io) @@ -1214,11 +1247,12 @@ int osc_lock_init(const struct lu_env *env, if (!(enqflags & CEF_MUST)) /* try to convert this lock to a lockless lock */ osc_lock_to_lockless(env, oscl, (enqflags & CEF_NEVER)); - if (oscl->ols_locklessable && !(enqflags & CEF_DISCARD_DATA)) - oscl->ols_flags |= LDLM_FL_DENY_ON_CONTENTION; if (io->ci_type == CIT_WRITE || cl_io_is_mkwrite(io)) osc_lock_set_writer(env, io, obj, oscl); + else if (io->ci_type == CIT_READ || + (io->ci_type == CIT_FAULT && !io->u.ci_fault.ft_mkwrite)) + osc_lock_set_reader(env, io, obj, oscl); LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %#llx", lock, oscl, oscl->ols_flags); @@ -1242,6 +1276,7 @@ struct ldlm_lock *osc_obj_dlmlock_at_pgoff(const struct lu_env *env, struct ldlm_lock *lock = NULL; enum ldlm_mode mode; __u64 flags; + enum ldlm_match_flags match_flags = 0; ENTRY; @@ -1252,14 +1287,24 @@ struct ldlm_lock *osc_obj_dlmlock_at_pgoff(const struct lu_env *env, flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; if (dap_flags & OSC_DAP_FL_TEST_LOCK) flags |= LDLM_FL_TEST_LOCK; + + if (dap_flags & OSC_DAP_FL_AST) + match_flags |= LDLM_MATCH_AST; + + if (dap_flags & OSC_DAP_FL_CANCELING) + match_flags |= LDLM_MATCH_UNREF; + + if (dap_flags & OSC_DAP_FL_RIGHT) + match_flags |= LDLM_MATCH_RIGHT; + /* * It is fine to match any group lock since there could be only one * with a uniq gid and it conflicts with all other lock modes too */ again: - mode = osc_match_base(osc_export(obj), resname, LDLM_EXTENT, policy, - LCK_PR | LCK_PW | LCK_GROUP, &flags, obj, &lockh, - dap_flags & OSC_DAP_FL_CANCELING); + mode = osc_match_base(env, osc_export(obj), resname, LDLM_EXTENT, + policy, LCK_PR | LCK_PW | LCK_GROUP, &flags, + obj, &lockh, match_flags); if (mode != 0) { lock = ldlm_handle2lock(&lockh); /* RACE: the lock is cancelled so let's try again */