X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_cache.c;h=658e8f419fbc56f352e77b833510eebca2ac74ce;hp=364a71f378ee592b20bcc9f4481eea0bb32304fd;hb=a4fbe7341b;hpb=0f48cd0b9856fe1ea920b8abab3579ded0b9511e diff --git a/lustre/osc/osc_cache.c b/lustre/osc/osc_cache.c index 364a71f..658e8f4 100644 --- a/lustre/osc/osc_cache.c +++ b/lustre/osc/osc_cache.c @@ -28,7 +28,6 @@ */ /* * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. * * osc cache management. * @@ -97,7 +96,7 @@ static inline char *ext_flags(struct osc_extent *ext, char *flags) #define EXTSTR "[%lu -> %lu/%lu]" #define EXTPARA(ext) (ext)->oe_start, (ext)->oe_end, (ext)->oe_max_end -static const char *oes_strings[] = { +static const char *const oes_strings[] = { "inv", "active", "cache", "locking", "lockdone", "rpc", "trunc", NULL }; #define OSC_EXTENT_DUMP_WITH_LOC(file, func, line, mask, extent, fmt, ...) do {\ @@ -531,7 +530,8 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur, if (victim == NULL) return -EINVAL; - if (victim->oe_state != OES_CACHE || victim->oe_fsync_wait) + if (victim->oe_state != OES_INV && + (victim->oe_state != OES_CACHE || victim->oe_fsync_wait)) return -EBUSY; if (cur->oe_max_end != victim->oe_max_end) @@ -583,11 +583,10 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur, /** * Drop user count of osc_extent, and unplug IO asynchronously. */ -int osc_extent_release(const struct lu_env *env, struct osc_extent *ext) +void osc_extent_release(const struct lu_env *env, struct osc_extent *ext) { struct osc_object *obj = ext->oe_obj; struct client_obd *cli = osc_cli(obj); - int rc = 0; ENTRY; LASSERT(atomic_read(&ext->oe_users) > 0); @@ -634,7 +633,8 @@ int osc_extent_release(const struct lu_env *env, struct osc_extent *ext) osc_io_unplug_async(env, cli, obj); } osc_extent_put(env, ext); - RETURN(rc); + + RETURN_EXIT; } /** @@ -768,7 +768,6 @@ restart: if (osc_extent_merge(env, ext, cur) == 0) { LASSERT(*grants >= chunksize); *grants -= chunksize; - found = osc_extent_hold(ext); /* * Try to merge with the next one too because we @@ -778,6 +777,7 @@ restart: /* we can save extent tax from next extent */ *grants += cli->cl_grant_extent_tax; + found = osc_extent_hold(ext); break; } } @@ -1110,7 +1110,8 @@ static int osc_extent_make_ready(const struct lu_env *env, * the size of file. */ if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) { int last_oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE); - LASSERT(last_oap_count > 0); + LASSERTF(last_oap_count > 0, + "last_oap_count %d\n", last_oap_count); LASSERT(last->oap_page_off + last_oap_count <= PAGE_SIZE); last->oap_count = last_oap_count; spin_lock(&last->oap_lock); @@ -1369,7 +1370,6 @@ static void osc_consume_write_grant(struct client_obd *cli, pga->flag |= OBD_BRW_FROM_GRANT; CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n", PAGE_SIZE, pga, pga->pg); - osc_update_next_shrink(cli); } /* the companion to osc_consume_write_grant, called when a brw has completed. @@ -2237,10 +2237,11 @@ int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli, EXPORT_SYMBOL(osc_io_unplug0); int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops, - struct page *page, loff_t offset) + struct cl_page *page, loff_t offset) { struct obd_export *exp = osc_export(osc); struct osc_async_page *oap = &ops->ops_oap; + struct page *vmpage = page->cp_vmpage; ENTRY; if (!page) @@ -2250,16 +2251,24 @@ int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops, oap->oap_cli = &exp->exp_obd->u.cli; oap->oap_obj = osc; - oap->oap_page = page; + oap->oap_page = vmpage; oap->oap_obj_off = offset; LASSERT(!(offset & ~PAGE_MASK)); + /* Count of transient (direct i/o) pages is always stable by the time + * they're submitted. Setting this here lets us avoid calling + * cl_page_clip later to set this. + */ + if (page->cp_type == CPT_TRANSIENT) + oap->oap_async_flags |= ASYNC_COUNT_STABLE|ASYNC_URGENT| + ASYNC_READY; + INIT_LIST_HEAD(&oap->oap_pending_item); INIT_LIST_HEAD(&oap->oap_rpc_item); spin_lock_init(&oap->oap_lock); - CDEBUG(D_INFO, "oap %p page %p obj off %llu\n", - oap, page, oap->oap_obj_off); + CDEBUG(D_INFO, "oap %p vmpage %p obj off %llu\n", + oap, vmpage, oap->oap_obj_off); RETURN(0); } EXPORT_SYMBOL(osc_prep_async_page); @@ -2300,7 +2309,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, } /* check if the file's owner/group is over quota */ - if (!(cmd & OBD_BRW_NOQUOTA)) { + if (!io->ci_noquota) { struct cl_object *obj; struct cl_attr *attr; unsigned int qid[LL_MAXQUOTAS]; @@ -2559,7 +2568,7 @@ out: return rc; } -int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io, +int osc_queue_sync_pages(const struct lu_env *env, struct cl_io *io, struct osc_object *obj, struct list_head *list, int brw_flags) { @@ -2619,6 +2628,7 @@ int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io, grants += (1 << cli->cl_chunkbits) * ((page_count + ppc - 1) / ppc); + CDEBUG(D_CACHE, "requesting %d bytes grant\n", grants); spin_lock(&cli->cl_loi_list_lock); if (osc_reserve_grant(cli, grants) == 0) { list_for_each_entry(oap, list, oap_pending_item) { @@ -2628,6 +2638,15 @@ int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io, } osc_unreserve_grant_nolock(cli, grants, 0); ext->oe_grants = grants; + } else { + /* We cannot report ENOSPC correctly if we do parallel + * DIO (async RPC submission), so turn off parallel dio + * if there is not sufficient grant available. This + * makes individual RPCs synchronous. + */ + io->ci_parallel_dio = false; + CDEBUG(D_CACHE, + "not enough grant available, switching to sync for this i/o\n"); } spin_unlock(&cli->cl_loi_list_lock); } @@ -3081,11 +3100,10 @@ bool osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io, spin_unlock(&osc->oo_tree_lock); tree_lock = false; + res = (*cb)(env, io, pvec, j, cbdata); + for (i = 0; i < j; ++i) { ops = pvec[i]; - if (res) - res = (*cb)(env, io, ops, cbdata); - page = ops->ops_cl.cpl_page; lu_ref_del(&page->cp_reference, "gang_lookup", current); cl_pagevec_put(env, page, pagevec); @@ -3097,6 +3115,15 @@ bool osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io, if (!res) break; + + if (io->ci_type == CIT_MISC && + io->u.ci_misc.lm_next_rpc_time && + ktime_get_seconds() > io->u.ci_misc.lm_next_rpc_time) { + osc_send_empty_rpc(osc, idx << PAGE_SHIFT); + io->u.ci_misc.lm_next_rpc_time = ktime_get_seconds() + + 5 * obd_timeout / 16; + } + if (need_resched()) cond_resched(); @@ -3113,85 +3140,101 @@ EXPORT_SYMBOL(osc_page_gang_lookup); * Check if page @page is covered by an extra lock or discard it. */ static bool check_and_discard_cb(const struct lu_env *env, struct cl_io *io, - struct osc_page *ops, void *cbdata) + void **pvec, int count, void *cbdata) { struct osc_thread_info *info = osc_env_info(env); struct osc_object *osc = cbdata; - struct cl_page *page = ops->ops_cl.cpl_page; - pgoff_t index; - bool discard = false; - - index = osc_index(ops); - - /* negative lock caching */ - if (index < info->oti_ng_index) { - discard = true; - } else if (index >= info->oti_fn_index) { - struct ldlm_lock *tmp; - /* refresh non-overlapped index */ - tmp = osc_dlmlock_at_pgoff(env, osc, index, - OSC_DAP_FL_TEST_LOCK | - OSC_DAP_FL_AST | OSC_DAP_FL_RIGHT); - if (tmp != NULL) { - __u64 end = tmp->l_policy_data.l_extent.end; - __u64 start = tmp->l_policy_data.l_extent.start; - - /* no lock covering this page */ - if (index < cl_index(osc2cl(osc), start)) { - /* no lock at @index, first lock at @start */ - info->oti_ng_index = cl_index(osc2cl(osc), - start); - discard = true; + int i; + + for (i = 0; i < count; i++) { + struct osc_page *ops = pvec[i]; + struct cl_page *page = ops->ops_cl.cpl_page; + pgoff_t index = osc_index(ops); + bool discard = false; + + /* negative lock caching */ + if (index < info->oti_ng_index) { + discard = true; + } else if (index >= info->oti_fn_index) { + struct ldlm_lock *tmp; + /* refresh non-overlapped index */ + tmp = osc_dlmlock_at_pgoff(env, osc, index, + OSC_DAP_FL_TEST_LOCK | + OSC_DAP_FL_AST | + OSC_DAP_FL_RIGHT); + if (tmp != NULL) { + __u64 end = + tmp->l_policy_data.l_extent.end; + __u64 start = + tmp->l_policy_data.l_extent.start; + + /* no lock covering this page */ + if (index < cl_index(osc2cl(osc), start)) { + /* no lock at @index, + * first lock at @start + */ + info->oti_ng_index = + cl_index(osc2cl(osc), start); + discard = true; + } else { + /* Cache the first-non-overlapped + * index so as to skip all pages + * within [index, oti_fn_index). + * This is safe because if tmp lock + * is canceled, it will discard these + * pages. + */ + info->oti_fn_index = + cl_index(osc2cl(osc), end + 1); + if (end == OBD_OBJECT_EOF) + info->oti_fn_index = + CL_PAGE_EOF; + } + LDLM_LOCK_PUT(tmp); } else { - /* Cache the first-non-overlapped index so as to - * skip all pages within [index, oti_fn_index). - * This is safe because if tmp lock is canceled, - * it will discard these pages. - */ - info->oti_fn_index = cl_index(osc2cl(osc), - end + 1); - if (end == OBD_OBJECT_EOF) - info->oti_fn_index = CL_PAGE_EOF; + info->oti_ng_index = CL_PAGE_EOF; + discard = true; } - LDLM_LOCK_PUT(tmp); - } else { - info->oti_ng_index = CL_PAGE_EOF; - discard = true; } - } - if (discard) { - if (cl_page_own(env, io, page) == 0) { - cl_page_discard(env, io, page); - cl_page_disown(env, io, page); - } else { - LASSERT(page->cp_state == CPS_FREEING); + if (discard) { + if (cl_page_own(env, io, page) == 0) { + cl_page_discard(env, io, page); + cl_page_disown(env, io, page); + } else { + LASSERT(page->cp_state == CPS_FREEING); + } } - } - info->oti_next_index = index + 1; + info->oti_next_index = index + 1; + } return true; } bool osc_discard_cb(const struct lu_env *env, struct cl_io *io, - struct osc_page *ops, void *cbdata) + void **pvec, int count, void *cbdata) { struct osc_thread_info *info = osc_env_info(env); - struct cl_page *page = ops->ops_cl.cpl_page; - - /* page is top page. */ - info->oti_next_index = osc_index(ops) + 1; - if (cl_page_own(env, io, page) == 0) { - if (!ergo(page->cp_type == CPT_CACHEABLE, - !PageDirty(cl_page_vmpage(page)))) - CL_PAGE_DEBUG(D_ERROR, env, page, - "discard dirty page?\n"); - - /* discard the page */ - cl_page_discard(env, io, page); - cl_page_disown(env, io, page); - } else { - LASSERT(page->cp_state == CPS_FREEING); + int i; + + for (i = 0; i < count; i++) { + struct osc_page *ops = pvec[i]; + struct cl_page *page = ops->ops_cl.cpl_page; + + /* page is top page. */ + info->oti_next_index = osc_index(ops) + 1; + if (cl_page_own(env, io, page) == 0) { + if (!ergo(page->cp_type == CPT_CACHEABLE, + !PageDirty(cl_page_vmpage(page)))) + CL_PAGE_DEBUG(D_ERROR, env, page, + "discard dirty page?\n"); + + /* discard the page */ + cl_page_discard(env, io, page); + cl_page_disown(env, io, page); + } else { + LASSERT(page->cp_state == CPS_FREEING); + } } return true; @@ -3218,6 +3261,8 @@ int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc, io->ci_obj = cl_object_top(osc2cl(osc)); io->ci_ignore_layout = 1; + io->u.ci_misc.lm_next_rpc_time = ktime_get_seconds() + + 5 * obd_timeout / 16; result = cl_io_init(env, io, CIT_MISC, io->ci_obj); if (result != 0) GOTO(out, result);