X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosc%2Fosc_cache.c;h=e6b111bab6e59178f9bc645536f4ac0b2645b715;hb=2b23ad0d183141dc25377f2d37de6e6e36ba1169;hp=1b35afd9f0a97250cbd9756faaa475d870b783fc;hpb=07ee41661fec73addd3c73bea3d516a99d92d5c8;p=fs%2Flustre-release.git diff --git a/lustre/osc/osc_cache.c b/lustre/osc/osc_cache.c index 1b35afd..e6b111b 100644 --- a/lustre/osc/osc_cache.c +++ b/lustre/osc/osc_cache.c @@ -241,7 +241,7 @@ static int osc_extent_sanity_check0(struct osc_extent *ext, page_count = 0; cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) { - pgoff_t index = oap2cl_page(oap)->cp_index; + pgoff_t index = osc_index(oap2osc(oap)); ++page_count; if (index > ext->oe_end || index < ext->oe_start) GOTO(out, rc = 110); @@ -811,6 +811,8 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext, ext->oe_rc = rc ?: ext->oe_nr_pages; EASSERT(ergo(rc == 0, ext->oe_state == OES_RPC), ext); + + osc_lru_add_batch(cli, &ext->oe_pages); cfs_list_for_each_entry_safe(oap, tmp, &ext->oe_pages, oap_pending_item) { cfs_list_del_init(&oap->oap_rpc_item); @@ -832,9 +834,9 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext, /* For short writes we shouldn't count parts of pages that * span a whole chunk on the OST side, or our accounting goes * wrong. Should match the code in filter_grant_check. */ - int offset = oap->oap_page_off & ~CFS_PAGE_MASK; - int count = oap->oap_count + (offset & (blocksize - 1)); - int end = (offset + oap->oap_count) & (blocksize - 1); + int offset = last_off & ~CFS_PAGE_MASK; + int count = last_count + (offset & (blocksize - 1)); + int end = (offset + last_count) & (blocksize - 1); if (end) count += blocksize - end; @@ -945,18 +947,18 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index, /* discard all pages with index greater then trunc_index */ cfs_list_for_each_entry_safe(oap, tmp, &ext->oe_pages, oap_pending_item) { - struct cl_page *sub = oap2cl_page(oap); - struct cl_page *page = cl_page_top(sub); + pgoff_t index = osc_index(oap2osc(oap)); + struct cl_page *page = oap2cl_page(oap); LASSERT(cfs_list_empty(&oap->oap_rpc_item)); /* only discard the pages with their index greater than * trunc_index, and ... */ - if (sub->cp_index < trunc_index || - (sub->cp_index == trunc_index && partial)) { + if (index < trunc_index || + (index == trunc_index && partial)) { /* accounting how many pages remaining in the chunk * so that we can calculate grants correctly. */ - if (sub->cp_index >> ppc_bits == trunc_chunk) + if (index >> ppc_bits == trunc_chunk) ++pages_in_chunk; continue; } @@ -967,7 +969,6 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index, lu_ref_add(&page->cp_reference, "truncate", current); if (cl_page_own(env, io, page) == 0) { - cl_page_unmap(env, io, page); cl_page_discard(env, io, page); cl_page_disown(env, io, page); } else { @@ -1207,7 +1208,7 @@ static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap, int cmd) { struct osc_page *opg = oap2osc_page(oap); - struct cl_page *page = cl_page_top(oap2cl_page(oap)); + struct cl_page *page = oap2cl_page(oap); int result; LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */ @@ -1223,7 +1224,7 @@ static int osc_refresh_count(const struct lu_env *env, struct osc_async_page *oap, int cmd) { struct osc_page *opg = oap2osc_page(oap); - struct cl_page *page = oap2cl_page(oap); + pgoff_t index = osc_index(oap2osc(oap)); struct cl_object *obj; struct cl_attr *attr = &osc_env_info(env)->oti_attr; @@ -1241,10 +1242,10 @@ static int osc_refresh_count(const struct lu_env *env, if (result < 0) return result; kms = attr->cat_kms; - if (cl_offset(obj, page->cp_index) >= kms) + if (cl_offset(obj, index) >= kms) /* catch race with truncate */ return 0; - else if (cl_offset(obj, page->cp_index + 1) > kms) + else if (cl_offset(obj, index + 1) > kms) /* catch sub-page write at end of file */ return kms % PAGE_CACHE_SIZE; else @@ -1255,7 +1256,7 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, int cmd, int rc) { struct osc_page *opg = oap2osc_page(oap); - struct cl_page *page = cl_page_top(oap2cl_page(oap)); + struct cl_page *page = oap2cl_page(oap); struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj); enum cl_req_type crt; int srvlock; @@ -2195,8 +2196,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli, cl_object_get(obj); client_obd_list_unlock(&cli->cl_loi_list_lock); - lu_object_ref_add_at(&obj->co_lu, &link, "check", - current); + lu_object_ref_add_at(&obj->co_lu, &link, "check", current); /* attempt some read/write balancing by alternating between * reads and writes in an object. The makes_rpc checks here @@ -2237,8 +2237,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli, osc_object_unlock(osc); osc_list_maint(cli, osc); - lu_object_ref_del_at(&obj->co_lu, &link, "check", - current); + lu_object_ref_del_at(&obj->co_lu, &link, "check", current); cl_object_put(env, obj); client_obd_list_lock(&cli->cl_loi_list_lock); @@ -2367,6 +2366,9 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, RETURN(rc); } + if (osc_over_unstable_soft_limit(cli)) + brw_flags |= OBD_BRW_SOFT_SYNC; + oap->oap_cmd = cmd; oap->oap_page_off = ops->ops_from; oap->oap_count = ops->ops_to - ops->ops_from; @@ -2378,7 +2380,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, OSC_IO_DEBUG(osc, "oap %p page %p added for cmd %d\n", oap, oap->oap_page, oap->oap_cmd & OBD_BRW_RWMASK); - index = oap2cl_page(oap)->cp_index; + index = osc_index(oap2osc(oap)); /* Add this page into extent by the following steps: * 1. if there exists an active extent for this IO, mostly this page @@ -2488,20 +2490,20 @@ int osc_teardown_async_page(const struct lu_env *env, LASSERT(oap->oap_magic == OAP_MAGIC); CDEBUG(D_INFO, "teardown oap %p page %p at index %lu.\n", - oap, ops, oap2cl_page(oap)->cp_index); + oap, ops, osc_index(oap2osc(oap))); osc_object_lock(obj); if (!cfs_list_empty(&oap->oap_rpc_item)) { CDEBUG(D_CACHE, "oap %p is not in cache.\n", oap); rc = -EBUSY; } else if (!cfs_list_empty(&oap->oap_pending_item)) { - ext = osc_extent_lookup(obj, oap2cl_page(oap)->cp_index); + ext = osc_extent_lookup(obj, osc_index(oap2osc(oap))); /* only truncated pages are allowed to be taken out. * See osc_extent_truncate() and osc_cache_truncate_start() * for details. */ if (ext != NULL && ext->oe_state != OES_TRUNC) { OSC_EXTENT_DUMP(D_ERROR, ext, "trunc at %lu.\n", - oap2cl_page(oap)->cp_index); + osc_index(oap2osc(oap))); rc = -EBUSY; } } @@ -2524,7 +2526,7 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io, struct osc_extent *ext = NULL; struct osc_object *obj = cl2osc(ops->ops_cl.cpl_obj); struct cl_page *cp = ops->ops_cl.cpl_page; - pgoff_t index = cp->cp_index; + pgoff_t index = osc_index(ops); struct osc_async_page *oap = &ops->ops_oap; bool unplug = false; int rc = 0; @@ -2540,8 +2542,7 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io, switch (ext->oe_state) { case OES_RPC: case OES_LOCK_DONE: - CL_PAGE_DEBUG(D_ERROR, env, cl_page_top(cp), - "flush an in-rpc page?\n"); + CL_PAGE_DEBUG(D_ERROR, env, cp, "flush an in-rpc page?\n"); LASSERT(0); break; case OES_LOCKING: @@ -2553,12 +2554,18 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io, * really sending the RPC. */ case OES_TRUNC: /* race with truncate, page will be redirtied */ + case OES_ACTIVE: + /* The extent is active so we need to abort and let the caller + * re-dirty the page. If we continued on here, and we were the + * one making the extent active, we could deadlock waiting for + * the page writeback to clear but it won't because the extent + * is active and won't be written out. */ GOTO(out, rc = -EAGAIN); default: break; } - rc = cl_page_prep(env, io, cl_page_top(cp), CRT_WRITE); + rc = cl_page_prep(env, io, cp, CRT_WRITE); if (rc) GOTO(out, rc); @@ -2603,7 +2610,7 @@ int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops) struct osc_extent *ext; struct osc_extent *found = NULL; cfs_list_t *plist; - pgoff_t index = oap2cl_page(oap)->cp_index; + pgoff_t index = osc_index(ops); int rc = -EBUSY; int cmd; ENTRY; @@ -2666,11 +2673,11 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj, ENTRY; cfs_list_for_each_entry(oap, list, oap_pending_item) { - struct cl_page *cp = oap2cl_page(oap); - if (cp->cp_index > end) - end = cp->cp_index; - if (cp->cp_index < start) - start = cp->cp_index; + pgoff_t index = osc_index(oap2osc(oap)); + if (index > end) + end = index; + if (index < start) + start = index; ++page_count; mppr <<= (page_count > mppr); } @@ -3037,4 +3044,206 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj, RETURN(result); } +/** + * Returns a list of pages by a given [start, end] of \a obj. + * + * \param resched If not NULL, then we give up before hogging CPU for too + * long and set *resched = 1, in that case caller should implement a retry + * logic. + * + * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely + * crucial in the face of [offset, EOF] locks. + * + * Return at least one page in @queue unless there is no covered page. + */ +int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io, + struct osc_object *osc, pgoff_t start, pgoff_t end, + osc_page_gang_cbt cb, void *cbdata) +{ + struct osc_page *ops; + void **pvec; + pgoff_t idx; + unsigned int nr; + unsigned int i; + unsigned int j; + int res = CLP_GANG_OKAY; + bool tree_lock = true; + ENTRY; + + idx = start; + pvec = osc_env_info(env)->oti_pvec; + spin_lock(&osc->oo_tree_lock); + while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec, + idx, OTI_PVEC_SIZE)) > 0) { + struct cl_page *page; + bool end_of_region = false; + + for (i = 0, j = 0; i < nr; ++i) { + ops = pvec[i]; + pvec[i] = NULL; + + idx = osc_index(ops); + if (idx > end) { + end_of_region = true; + break; + } + + page = ops->ops_cl.cpl_page; + LASSERT(page->cp_type == CPT_CACHEABLE); + if (page->cp_state == CPS_FREEING) + continue; + + cl_page_get(page); + lu_ref_add_atomic(&page->cp_reference, + "gang_lookup", current); + pvec[j++] = ops; + } + ++idx; + + /* + * Here a delicate locking dance is performed. Current thread + * holds a reference to a page, but has to own it before it + * can be placed into queue. Owning implies waiting, so + * radix-tree lock is to be released. After a wait one has to + * check that pages weren't truncated (cl_page_own() returns + * error in the latter case). + */ + spin_unlock(&osc->oo_tree_lock); + tree_lock = false; + + for (i = 0; i < j; ++i) { + ops = pvec[i]; + if (res == CLP_GANG_OKAY) + res = (*cb)(env, io, ops, cbdata); + + page = ops->ops_cl.cpl_page; + lu_ref_del(&page->cp_reference, "gang_lookup", current); + cl_page_put(env, page); + } + if (nr < OTI_PVEC_SIZE || end_of_region) + break; + + if (res == CLP_GANG_OKAY && need_resched()) + res = CLP_GANG_RESCHED; + if (res != CLP_GANG_OKAY) + break; + + spin_lock(&osc->oo_tree_lock); + tree_lock = true; + } + if (tree_lock) + spin_unlock(&osc->oo_tree_lock); + RETURN(res); +} + +/** + * Check if page @page is covered by an extra lock or discard it. + */ +static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io, + struct osc_page *ops, void *cbdata) +{ + struct osc_thread_info *info = osc_env_info(env); + struct cl_lock *lock = cbdata; + pgoff_t index; + + index = osc_index(ops); + if (index >= info->oti_fn_index) { + struct cl_lock *tmp; + struct cl_page *page = ops->ops_cl.cpl_page; + + /* refresh non-overlapped index */ + tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, index, + lock, 1, 0); + if (tmp != NULL) { + /* Cache the first-non-overlapped index so as to skip + * all pages within [index, oti_fn_index). This + * is safe because if tmp lock is canceled, it will + * discard these pages. */ + info->oti_fn_index = tmp->cll_descr.cld_end + 1; + if (tmp->cll_descr.cld_end == CL_PAGE_EOF) + info->oti_fn_index = CL_PAGE_EOF; + cl_lock_put(env, tmp); + } else if (cl_page_own(env, io, page) == 0) { + /* discard the page */ + cl_page_discard(env, io, page); + cl_page_disown(env, io, page); + } else { + LASSERT(page->cp_state == CPS_FREEING); + } + } + + info->oti_next_index = index + 1; + return CLP_GANG_OKAY; +} + +static int discard_cb(const struct lu_env *env, struct cl_io *io, + struct osc_page *ops, void *cbdata) +{ + struct osc_thread_info *info = osc_env_info(env); + struct cl_lock *lock = cbdata; + struct cl_page *page = ops->ops_cl.cpl_page; + + LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE); + + /* page is top page. */ + info->oti_next_index = osc_index(ops) + 1; + if (cl_page_own(env, io, page) == 0) { + KLASSERT(ergo(page->cp_type == CPT_CACHEABLE, + !PageDirty(cl_page_vmpage(page)))); + + /* discard the page */ + cl_page_discard(env, io, page); + cl_page_disown(env, io, page); + } else { + LASSERT(page->cp_state == CPS_FREEING); + } + + return CLP_GANG_OKAY; +} + +/** + * Discard pages protected by the given lock. This function traverses radix + * tree to find all covering pages and discard them. If a page is being covered + * by other locks, it should remain in cache. + * + * If error happens on any step, the process continues anyway (the reasoning + * behind this being that lock cancellation cannot be delayed indefinitely). + */ +int osc_lock_discard_pages(const struct lu_env *env, struct osc_lock *ols) +{ + struct osc_thread_info *info = osc_env_info(env); + struct cl_io *io = &info->oti_io; + struct cl_object *osc = ols->ols_cl.cls_obj; + struct cl_lock *lock = ols->ols_cl.cls_lock; + struct cl_lock_descr *descr = &lock->cll_descr; + osc_page_gang_cbt cb; + int res; + int result; + + ENTRY; + + io->ci_obj = cl_object_top(osc); + io->ci_ignore_layout = 1; + result = cl_io_init(env, io, CIT_MISC, io->ci_obj); + if (result != 0) + GOTO(out, result); + + cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb; + info->oti_fn_index = info->oti_next_index = descr->cld_start; + do { + res = osc_page_gang_lookup(env, io, cl2osc(osc), + info->oti_next_index, descr->cld_end, + cb, (void *)lock); + if (info->oti_next_index > descr->cld_end) + break; + + if (res == CLP_GANG_RESCHED) + cond_resched(); + } while (res != CLP_GANG_OKAY); +out: + cl_io_fini(env, io); + RETURN(result); +} + + /** @} osc */