X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_cache.c;h=891d5608733cfcf10dc153498b411971679c18ca;hp=2fd076bc0e8d10c673872551d9177f145a7941f5;hb=546993d587c5fc380e9745eae98f863e02e68575;hpb=ecb6712a19fa836ecdba41ccda80de0a10b1336a diff --git a/lustre/osc/osc_cache.c b/lustre/osc/osc_cache.c index 2fd076b..891d560 100644 --- a/lustre/osc/osc_cache.c +++ b/lustre/osc/osc_cache.c @@ -23,7 +23,7 @@ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2015, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. * */ /* @@ -37,7 +37,8 @@ #define DEBUG_SUBSYSTEM S_OSC -#include "osc_cl_internal.h" +#include + #include "osc_internal.h" static int extent_debug; /* set it to be true for more debug */ @@ -56,10 +57,10 @@ static int osc_io_unplug_async(const struct lu_env *env, static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages, unsigned int lost_grant, unsigned int dirty_grant); -static void osc_extent_tree_dump0(int level, struct osc_object *obj, +static void osc_extent_tree_dump0(int mask, struct osc_object *obj, const char *func, int line); -#define osc_extent_tree_dump(lvl, obj) \ - osc_extent_tree_dump0(lvl, obj, __func__, __LINE__) +#define osc_extent_tree_dump(mask, obj) \ + osc_extent_tree_dump0(mask, obj, __func__, __LINE__) static void osc_unreserve_grant(struct client_obd *cli, unsigned int reserved, unsigned int unused); @@ -103,18 +104,19 @@ static inline char list_empty_marker(struct list_head *list) static const char *oes_strings[] = { "inv", "active", "cache", "locking", "lockdone", "rpc", "trunc", NULL }; -#define OSC_EXTENT_DUMP(lvl, extent, fmt, ...) do { \ +#define OSC_EXTENT_DUMP_WITH_LOC(file, func, line, mask, extent, fmt, ...) do {\ + static struct cfs_debug_limit_state cdls; \ struct osc_extent *__ext = (extent); \ char __buf[16]; \ \ - CDEBUG(lvl, \ + __CDEBUG_WITH_LOC(file, func, line, mask, &cdls, \ "extent %p@{" EXTSTR ", " \ "[%d|%d|%c|%s|%s|%p], [%d|%d|%c|%c|%p|%u|%p]} " fmt, \ /* ----- extent part 0 ----- */ \ __ext, EXTPARA(__ext), \ /* ----- part 1 ----- */ \ - atomic_read(&__ext->oe_refc), \ - atomic_read(&__ext->oe_users), \ + atomic_read(&__ext->oe_refc), \ + atomic_read(&__ext->oe_users), \ list_empty_marker(&__ext->oe_link), \ oes_strings[__ext->oe_state], ext_flags(__ext, __buf), \ __ext->oe_obj, \ @@ -125,12 +127,16 @@ static const char *oes_strings[] = { __ext->oe_dlmlock, __ext->oe_mppr, __ext->oe_owner, \ /* ----- part 4 ----- */ \ ## __VA_ARGS__); \ - if (lvl == D_ERROR && __ext->oe_dlmlock != NULL) \ + if (mask == D_ERROR && __ext->oe_dlmlock != NULL) \ LDLM_ERROR(__ext->oe_dlmlock, "extent: %p", __ext); \ else \ LDLM_DEBUG(__ext->oe_dlmlock, "extent: %p", __ext); \ } while (0) +#define OSC_EXTENT_DUMP(mask, ext, fmt, ...) \ + OSC_EXTENT_DUMP_WITH_LOC(__FILE__, __func__, __LINE__, \ + mask, ext, fmt, ## __VA_ARGS__) + #undef EASSERTF #define EASSERTF(expr, ext, fmt, args...) do { \ if (!(expr)) { \ @@ -214,6 +220,7 @@ static int osc_extent_sanity_check0(struct osc_extent *ext, GOTO(out, rc = 60); if (ext->oe_fsync_wait && !ext->oe_urgent && !ext->oe_hp) GOTO(out, rc = 65); + /* fallthrough */ default: if (atomic_read(&ext->oe_users) > 0) GOTO(out, rc = 70); @@ -225,7 +232,9 @@ static int osc_extent_sanity_check0(struct osc_extent *ext, if (ext->oe_sync && ext->oe_grants > 0) GOTO(out, rc = 90); - if (ext->oe_dlmlock != NULL && !ldlm_is_failed(ext->oe_dlmlock)) { + if (ext->oe_dlmlock != NULL && + ext->oe_dlmlock->l_resource->lr_type == LDLM_EXTENT && + !ldlm_is_failed(ext->oe_dlmlock)) { struct ldlm_extent *extent; extent = &ext->oe_dlmlock->l_policy_data.l_extent; @@ -260,9 +269,9 @@ static int osc_extent_sanity_check0(struct osc_extent *ext, out: if (rc != 0) - OSC_EXTENT_DUMP(D_ERROR, ext, - "%s:%d sanity check %p failed with rc = %d\n", - func, line, ext, rc); + OSC_EXTENT_DUMP_WITH_LOC(__FILE__, func, line, D_ERROR, ext, + "sanity check %p failed: rc = %d\n", + ext, rc); return rc; } @@ -576,6 +585,7 @@ int osc_extent_release(const struct lu_env *env, struct osc_extent *ext) * osc_cache_truncate_start(). */ osc_extent_state_set(ext, OES_TRUNC); ext->oe_trunc_pending = 0; + osc_object_unlock(obj); } else { int grant = 0; @@ -588,8 +598,6 @@ int osc_extent_release(const struct lu_env *env, struct osc_extent *ext) grant += cli->cl_grant_extent_tax; if (osc_extent_merge(env, ext, next_extent(ext)) == 0) grant += cli->cl_grant_extent_tax; - if (grant > 0) - osc_unreserve_grant(cli, 0, grant); if (ext->oe_urgent) list_move_tail(&ext->oe_link, @@ -598,8 +606,10 @@ int osc_extent_release(const struct lu_env *env, struct osc_extent *ext) list_move_tail(&ext->oe_link, &obj->oo_full_exts); } + osc_object_unlock(obj); + if (grant > 0) + osc_unreserve_grant(cli, 0, grant); } - osc_object_unlock(obj); osc_io_unplug_async(env, cli, obj); } @@ -696,7 +706,7 @@ restart: pgoff_t ext_chk_end = ext->oe_end >> ppc_bits; LASSERT(sanity_check_nolock(ext) == 0); - if (chunk > ext_chk_end + 1) + if (chunk > ext_chk_end + 1 || chunk < ext_chk_start) break; /* if covering by different locks, no chance to match */ @@ -776,6 +786,7 @@ restart: /* pull ext's start back to cover cur */ ext->oe_start = cur->oe_start; ext->oe_grants += chunksize; + LASSERT(*grants >= chunksize); *grants -= chunksize; found = osc_extent_hold(ext); @@ -783,6 +794,7 @@ restart: /* rear merge */ ext->oe_end = cur->oe_end; ext->oe_grants += chunksize; + LASSERT(*grants >= chunksize); *grants -= chunksize; /* try to merge with the next one because we just fill @@ -811,8 +823,8 @@ restart: /* create a new extent */ EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur); cur->oe_grants = chunksize + cli->cl_grant_extent_tax; + LASSERT(*grants >= cur->oe_grants); *grants -= cur->oe_grants; - LASSERT(*grants >= 0); cur->oe_state = OES_CACHE; found = osc_extent_hold(cur); @@ -839,7 +851,6 @@ restart: out: osc_extent_put(env, cur); - LASSERT(*grants >= 0); return found; } @@ -950,9 +961,8 @@ static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext, "%s: wait ext to %u timedout, recovery in progress?\n", cli_name(osc_cli(obj)), state); - lwi = LWI_INTR(NULL, NULL); - rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state), - &lwi); + wait_event_idle(ext->oe_waitq, extent_wait_cb(ext, state)); + rc = 0; } if (rc == 0 && ext->oe_rc < 0) rc = ext->oe_rc; @@ -972,6 +982,7 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index, struct client_obd *cli = osc_cli(obj); struct osc_async_page *oap; struct osc_async_page *tmp; + struct pagevec *pvec; int pages_in_chunk = 0; int ppc_bits = cli->cl_chunkbits - PAGE_SHIFT; @@ -990,9 +1001,14 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index, * We can't use that env from osc_cache_truncate_start() because * it's from lov_io_sub and not fully initialized. */ env = cl_env_get(&refcheck); - io = &osc_env_info(env)->oti_io; + if (IS_ERR(env)) + RETURN(PTR_ERR(env)); + + io = osc_env_thread_io(env); io->ci_obj = cl_object_top(osc2cl(obj)); io->ci_ignore_layout = 1; + pvec = &osc_env_info(env)->oti_pagevec; + ll_pagevec_init(pvec, 0); rc = cl_io_init(env, io, CIT_MISC, io->ci_obj); if (rc < 0) GOTO(out, rc); @@ -1030,11 +1046,13 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index, } lu_ref_del(&page->cp_reference, "truncate", current); - cl_page_put(env, page); + cl_pagevec_put(env, page, pvec); --ext->oe_nr_pages; ++nr_pages; } + pagevec_release(pvec); + EASSERTF(ergo(ext->oe_start >= trunc_index + !!partial, ext->oe_nr_pages == 0), ext, "trunc_index %lu, partial %d\n", trunc_index, partial); @@ -1205,8 +1223,8 @@ static int osc_extent_expand(struct osc_extent *ext, pgoff_t index, ext->oe_end = end_index; ext->oe_grants += chunksize; + LASSERT(*grants >= chunksize); *grants -= chunksize; - LASSERT(*grants >= 0); EASSERTF(osc_extent_is_overlapped(obj, ext) == 0, ext, "overlapped after expanding for %lu.\n", index); EXIT; @@ -1216,31 +1234,34 @@ out: RETURN(rc); } -static void osc_extent_tree_dump0(int level, struct osc_object *obj, +static void osc_extent_tree_dump0(int mask, struct osc_object *obj, const char *func, int line) { struct osc_extent *ext; int cnt; - CDEBUG(level, "Dump object %p extents at %s:%d, mppr: %u.\n", + if (!cfs_cdebug_show(mask, DEBUG_SUBSYSTEM)) + return; + + CDEBUG(mask, "Dump object %p extents at %s:%d, mppr: %u.\n", obj, func, line, osc_cli(obj)->cl_max_pages_per_rpc); /* osc_object_lock(obj); */ cnt = 1; for (ext = first_extent(obj); ext != NULL; ext = next_extent(ext)) - OSC_EXTENT_DUMP(level, ext, "in tree %d.\n", cnt++); + OSC_EXTENT_DUMP(mask, ext, "in tree %d.\n", cnt++); cnt = 1; list_for_each_entry(ext, &obj->oo_hp_exts, oe_link) - OSC_EXTENT_DUMP(level, ext, "hp %d.\n", cnt++); + OSC_EXTENT_DUMP(mask, ext, "hp %d.\n", cnt++); cnt = 1; list_for_each_entry(ext, &obj->oo_urgent_exts, oe_link) - OSC_EXTENT_DUMP(level, ext, "urgent %d.\n", cnt++); + OSC_EXTENT_DUMP(mask, ext, "urgent %d.\n", cnt++); cnt = 1; list_for_each_entry(ext, &obj->oo_reading_exts, oe_link) - OSC_EXTENT_DUMP(level, ext, "reading %d.\n", cnt++); + OSC_EXTENT_DUMP(mask, ext, "reading %d.\n", cnt++); /* osc_object_unlock(obj); */ } @@ -1276,7 +1297,7 @@ static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap, ENTRY; result = cl_page_make_ready(env, page, CRT_WRITE); if (result == 0) - opg->ops_submit_time = cfs_time_current(); + opg->ops_submit_time = ktime_get(); RETURN(result); } @@ -1287,7 +1308,6 @@ static int osc_refresh_count(const struct lu_env *env, pgoff_t index = osc_index(oap2osc(oap)); struct cl_object *obj; struct cl_attr *attr = &osc_env_info(env)->oti_attr; - int result; loff_t kms; @@ -1333,7 +1353,7 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, /* Clear opg->ops_transfer_pinned before VM lock is released. */ opg->ops_transfer_pinned = 0; - opg->ops_submit_time = 0; + opg->ops_submit_time = ktime_set(0, 0); srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK; /* statistic */ @@ -1361,9 +1381,9 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, RETURN(0); } -#define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do { \ +#define OSC_DUMP_GRANT(mask, cli, fmt, args...) do { \ struct client_obd *__tmp = (cli); \ - CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %ld/%lu " \ + CDEBUG(mask, "%s: grant { dirty: %ld/%ld dirty_pages: %ld/%lu " \ "dropped: %ld avail: %ld, dirty_grant: %ld, " \ "reserved: %ld, flight: %d } lru {in list: %ld, " \ "left: %ld, waiters: %d }" fmt "\n", \ @@ -1384,7 +1404,6 @@ static void osc_consume_write_grant(struct client_obd *cli, { assert_spin_locked(&cli->cl_loi_list_lock); LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT)); - atomic_long_inc(&obd_dirty_pages); cli->cl_dirty_pages++; pga->flag |= OBD_BRW_FROM_GRANT; CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n", @@ -1452,13 +1471,20 @@ static void __osc_unreserve_grant(struct client_obd *cli, } } -static void osc_unreserve_grant(struct client_obd *cli, - unsigned int reserved, unsigned int unused) +static void osc_unreserve_grant_nolock(struct client_obd *cli, + unsigned int reserved, + unsigned int unused) { - spin_lock(&cli->cl_loi_list_lock); __osc_unreserve_grant(cli, reserved, unused); if (unused > 0) osc_wake_cache_waiters(cli); +} + +static void osc_unreserve_grant(struct client_obd *cli, + unsigned int reserved, unsigned int unused) +{ + spin_lock(&cli->cl_loi_list_lock); + osc_unreserve_grant_nolock(cli, reserved, unused); spin_unlock(&cli->cl_loi_list_lock); } @@ -1528,19 +1554,23 @@ static int osc_enter_cache_try(struct client_obd *cli, if (rc < 0) return 0; - if (cli->cl_dirty_pages < cli->cl_dirty_max_pages && - 1 + atomic_long_read(&obd_dirty_pages) <= obd_max_dirty_pages) { - osc_consume_write_grant(cli, &oap->oap_brw_page); - if (transient) { - cli->cl_dirty_transit++; - atomic_long_inc(&obd_dirty_transit_pages); - oap->oap_brw_flags |= OBD_BRW_NOCACHE; - } - rc = 1; - } else { - __osc_unreserve_grant(cli, bytes, bytes); - rc = 0; + if (cli->cl_dirty_pages < cli->cl_dirty_max_pages) { + if (atomic_long_add_return(1, &obd_dirty_pages) <= + obd_max_dirty_pages) { + osc_consume_write_grant(cli, &oap->oap_brw_page); + if (transient) { + cli->cl_dirty_transit++; + atomic_long_inc(&obd_dirty_transit_pages); + oap->oap_brw_flags |= OBD_BRW_NOCACHE; + } + rc = 1; + goto out; + } else + atomic_long_dec(&obd_dirty_pages); } + __osc_unreserve_grant(cli, bytes, bytes); + +out: return rc; } @@ -1587,7 +1617,8 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, } /* Hopefully normal case - cache space and write credits available */ - if (osc_enter_cache_try(cli, oap, bytes, 0)) { + if (list_empty(&cli->cl_cache_waiters) && + osc_enter_cache_try(cli, oap, bytes, 0)) { OSC_DUMP_GRANT(D_CACHE, cli, "granted from cache\n"); GOTO(out, rc = 0); } @@ -1672,31 +1703,26 @@ void osc_wake_cache_waiters(struct client_obd *cli) ENTRY; list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { ocw = list_entry(l, struct osc_cache_waiter, ocw_entry); - list_del_init(&ocw->ocw_entry); ocw->ocw_rc = -EDQUOT; - /* we can't dirty more */ - if ((cli->cl_dirty_pages >= cli->cl_dirty_max_pages) || - (1 + atomic_long_read(&obd_dirty_pages) > - obd_max_dirty_pages)) { - CDEBUG(D_CACHE, "no dirty room: dirty: %ld " - "osc max %ld, sys max %ld\n", - cli->cl_dirty_pages, cli->cl_dirty_max_pages, - obd_max_dirty_pages); - goto wakeup; - } if (osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0)) ocw->ocw_rc = 0; -wakeup: - CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n", - ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc); - wake_up(&ocw->ocw_waitq); + if (ocw->ocw_rc == 0 || + !(cli->cl_dirty_pages > 0 || cli->cl_w_in_flight > 0)) { + list_del_init(&ocw->ocw_entry); + CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant " + "%ld, %d\n", ocw, ocw->ocw_oap, + cli->cl_avail_grant, ocw->ocw_rc); + + wake_up(&ocw->ocw_waitq); + } } EXIT; } +EXPORT_SYMBOL(osc_wake_cache_waiters); static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc) { @@ -1960,6 +1986,7 @@ static int try_to_add_extent_for_io(struct client_obd *cli, if (tmp->oe_srvlock != ext->oe_srvlock || !tmp->oe_grants != !ext->oe_grants || + tmp->oe_ndelay != ext->oe_ndelay || tmp->oe_no_merge || ext->oe_no_merge) RETURN(0); @@ -1975,36 +2002,6 @@ static int try_to_add_extent_for_io(struct client_obd *cli, RETURN(1); } -static inline unsigned osc_max_write_chunks(const struct client_obd *cli) -{ - /* - * LU-8135: - * - * The maximum size of a single transaction is about 64MB in ZFS. - * #define DMU_MAX_ACCESS (64 * 1024 * 1024) - * - * Since ZFS is a copy-on-write file system, a single dirty page in - * a chunk will result in the rewrite of the whole chunk, therefore - * an RPC shouldn't be allowed to contain too many chunks otherwise - * it will make transaction size much bigger than 64MB, especially - * with big block size for ZFS. - * - * This piece of code is to make sure that OSC won't send write RPCs - * with too many chunks. The maximum chunk size that an RPC can cover - * is set to PTLRPC_MAX_BRW_SIZE, which is defined to 16MB. Ideally - * OST should tell the client what the biggest transaction size is, - * but it's good enough for now. - * - * This limitation doesn't apply to ldiskfs, which allows as many - * chunks in one RPC as we want. However, it won't have any benefits - * to have too many discontiguous pages in one RPC. - * - * An osc_extent won't cover over a RPC size, so the chunks in an - * osc_extent won't bigger than PTLRPC_MAX_BRW_SIZE >> chunkbits. - */ - return PTLRPC_MAX_BRW_SIZE >> cli->cl_chunkbits; -} - /** * In order to prevent multiple ptlrpcd from breaking contiguous extents, * get_write_extent() takes all appropriate extents in atomic. @@ -2088,7 +2085,7 @@ osc_send_write_rpc(const struct lu_env *env, struct client_obd *cli, struct osc_object *osc) __must_hold(osc) { - struct list_head rpclist = LIST_HEAD_INIT(rpclist); + LIST_HEAD(rpclist); struct osc_extent *ext; struct osc_extent *tmp; struct osc_extent *first = NULL; @@ -2164,7 +2161,7 @@ __must_hold(osc) { struct osc_extent *ext; struct osc_extent *next; - struct list_head rpclist = LIST_HEAD_INIT(rpclist); + LIST_HEAD(rpclist); struct extent_rpc_data data = { .erd_rpc_list = &rpclist, .erd_page_count = 0, @@ -2308,8 +2305,8 @@ __must_hold(&cli->cl_loi_list_lock) } } -static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli, - struct osc_object *osc, int async) +int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli, + struct osc_object *osc, int async) { int rc = 0; @@ -2327,18 +2324,7 @@ static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli, } return rc; } - -static int osc_io_unplug_async(const struct lu_env *env, - struct client_obd *cli, struct osc_object *osc) -{ - return osc_io_unplug0(env, cli, osc, 1); -} - -void osc_io_unplug(const struct lu_env *env, struct client_obd *cli, - struct osc_object *osc) -{ - (void)osc_io_unplug0(env, cli, osc, 0); -} +EXPORT_SYMBOL(osc_io_unplug0); int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops, struct page *page, loff_t offset) @@ -2358,9 +2344,6 @@ int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops, oap->oap_obj_off = offset; LASSERT(!(offset & ~PAGE_MASK)); - if (cfs_capable(CFS_CAP_SYS_RESOURCE)) - oap->oap_brw_flags = OBD_BRW_NOQUOTA; - INIT_LIST_HEAD(&oap->oap_pending_item); INIT_LIST_HEAD(&oap->oap_rpc_item); @@ -2369,15 +2352,17 @@ int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops, oap, page, oap->oap_obj_off); RETURN(0); } +EXPORT_SYMBOL(osc_prep_async_page); int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, - struct osc_page *ops) + struct osc_page *ops, cl_commit_cbt cb) { struct osc_io *oio = osc_env_io(env); struct osc_extent *ext = NULL; struct osc_async_page *oap = &ops->ops_oap; struct client_obd *cli = oap->oap_cli; struct osc_object *osc = oap->oap_obj; + struct pagevec *pvec = &osc_env_info(env)->oti_pagevec; pgoff_t index; unsigned int tmp; unsigned int grants = 0; @@ -2399,7 +2384,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, /* Set the OBD_BRW_SRVLOCK before the page is queued. */ brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0; - if (cfs_capable(CFS_CAP_SYS_RESOURCE)) { + if (oio->oi_cap_sys_resource) { brw_flags |= OBD_BRW_NOQUOTA; cmd |= OBD_BRW_NOQUOTA; } @@ -2419,6 +2404,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, qid[USRQUOTA] = attr->cat_uid; qid[GRPQUOTA] = attr->cat_gid; + qid[PRJQUOTA] = attr->cat_projid; if (rc == 0 && osc_quota_chkdq(cli, qid) == NO_QUOTA) rc = -EDQUOT; if (rc) @@ -2455,7 +2441,6 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, /* it doesn't need any grant to dirty this page */ spin_lock(&cli->cl_loi_list_lock); rc = osc_enter_cache_try(cli, oap, grants, 0); - spin_unlock(&cli->cl_loi_list_lock); if (rc == 0) { /* try failed */ grants = 0; need_release = 1; @@ -2469,10 +2454,11 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, } else { OSC_EXTENT_DUMP(D_CACHE, ext, "expanded for %lu.\n", index); - osc_unreserve_grant(cli, grants, tmp); + osc_unreserve_grant_nolock(cli, grants, tmp); grants = 0; } } + spin_unlock(&cli->cl_loi_list_lock); rc = 0; } else if (ext != NULL) { /* index is located outside of active extent */ @@ -2495,7 +2481,14 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, rc = 0; if (grants == 0) { - /* we haven't allocated grant for this page. */ + /* We haven't allocated grant for this page, and we + * must not hold a page lock while we do enter_cache, + * so we must mark dirty & unlock any pages in the + * write commit pagevec. */ + if (pagevec_count(pvec)) { + cb(env, io, pvec); + pagevec_reinit(pvec); + } rc = osc_enter_cache(env, cli, oap, tmp); if (rc == 0) grants = tmp; @@ -2531,7 +2524,11 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, ++ext->oe_nr_pages; list_add_tail(&oap->oap_pending_item, &ext->oe_pages); osc_object_unlock(osc); + + if (!ext->oe_layout_version) + ext->oe_layout_version = io->ci_layout_version; } + RETURN(rc); } @@ -2717,8 +2714,9 @@ int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops) RETURN(rc); } -int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj, - struct list_head *list, int cmd, int brw_flags) +int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io, + struct osc_object *obj, struct list_head *list, + int brw_flags) { struct client_obd *cli = osc_cli(obj); struct osc_extent *ext; @@ -2756,7 +2754,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj, RETURN(-ENOMEM); } - ext->oe_rw = !!(cmd & OBD_BRW_READ); + ext->oe_rw = !!(brw_flags & OBD_BRW_READ); ext->oe_sync = 1; ext->oe_no_merge = !can_merge; ext->oe_urgent = 1; @@ -2764,14 +2762,16 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj, ext->oe_end = ext->oe_max_end = end; ext->oe_obj = obj; ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK); + ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY); ext->oe_nr_pages = page_count; ext->oe_mppr = mppr; list_splice_init(list, &ext->oe_pages); + ext->oe_layout_version = io->ci_layout_version; osc_object_lock(obj); /* Reuse the initial refcount for RPC, don't drop it */ osc_extent_state_set(ext, OES_LOCK_DONE); - if (cmd & OBD_BRW_WRITE) { + if (!ext->oe_rw) { /* write */ list_add_tail(&ext->oe_link, &obj->oo_urgent_exts); osc_update_pending(obj, OBD_BRW_WRITE, page_count); } else { @@ -2794,7 +2794,7 @@ int osc_cache_truncate_start(const struct lu_env *env, struct osc_object *obj, struct osc_extent *ext; struct osc_extent *waiting = NULL; pgoff_t index; - struct list_head list = LIST_HEAD_INIT(list); + LIST_HEAD(list); int result = 0; bool partial; ENTRY; @@ -2910,6 +2910,7 @@ again: } RETURN(result); } +EXPORT_SYMBOL(osc_cache_truncate_start); /** * Called after osc_io_setattr_end to add oio->oi_trunc back to cache. @@ -2996,6 +2997,7 @@ again: OSC_IO_DEBUG(obj, "sync file range.\n"); RETURN(result); } +EXPORT_SYMBOL(osc_cache_wait_range); /** * Called to write out a range of osc object. @@ -3010,7 +3012,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj, pgoff_t start, pgoff_t end, int hp, int discard) { struct osc_extent *ext; - struct list_head discard_list = LIST_HEAD_INIT(discard_list); + LIST_HEAD(discard_list); bool unplug = false; int result = 0; ENTRY; @@ -3043,10 +3045,25 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj, list_move_tail(&ext->oe_link, list); unplug = true; } else { + struct client_obd *cli = osc_cli(obj); + int pcc_bits = cli->cl_chunkbits - PAGE_SHIFT; + pgoff_t align_by = (1 << pcc_bits); + pgoff_t a_start = round_down(start, align_by); + pgoff_t a_end = round_up(end, align_by); + + /* overflow case */ + if (end && !a_end) + a_end = CL_PAGE_EOF; /* the only discarder is lock cancelling, so - * [start, end] must contain this extent */ - EASSERT(ext->oe_start >= start && - ext->oe_max_end <= end, ext); + * [start, end], aligned by chunk size, must + * contain this extent */ + LASSERTF(ext->oe_start >= a_start && + ext->oe_end <= a_end, + "ext [%lu, %lu] reg [%lu, %lu] " + "orig [%lu %lu] align %lu bits " + "%d\n", ext->oe_start, ext->oe_end, + a_start, a_end, start, end, + align_by, pcc_bits); osc_extent_state_set(ext, OES_LOCKING); ext->oe_owner = current; list_move_tail(&ext->oe_link, @@ -3112,6 +3129,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj, OSC_IO_DEBUG(obj, "pageout [%lu, %lu], %d.\n", start, end, result); RETURN(result); } +EXPORT_SYMBOL(osc_cache_writeback_range); /** * Returns a list of pages by a given [start, end] of \a obj. @@ -3130,6 +3148,7 @@ int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io, osc_page_gang_cbt cb, void *cbdata) { struct osc_page *ops; + struct pagevec *pagevec; void **pvec; pgoff_t idx; unsigned int nr; @@ -3141,6 +3160,8 @@ int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io, idx = start; pvec = osc_env_info(env)->oti_pvec; + pagevec = &osc_env_info(env)->oti_pagevec; + ll_pagevec_init(pagevec, 0); spin_lock(&osc->oo_tree_lock); while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec, idx, OTI_PVEC_SIZE)) > 0) { @@ -3187,8 +3208,10 @@ int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io, page = ops->ops_cl.cpl_page; lu_ref_del(&page->cp_reference, "gang_lookup", current); - cl_page_put(env, page); + cl_pagevec_put(env, page, pagevec); } + pagevec_release(pagevec); + if (nr < OTI_PVEC_SIZE || end_of_region) break; @@ -3204,6 +3227,7 @@ int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io, spin_unlock(&osc->oo_tree_lock); RETURN(res); } +EXPORT_SYMBOL(osc_page_gang_lookup); /** * Check if page @page is covered by an extra lock or discard it. @@ -3246,8 +3270,8 @@ static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io, return CLP_GANG_OKAY; } -static int discard_cb(const struct lu_env *env, struct cl_io *io, - struct osc_page *ops, void *cbdata) +int osc_discard_cb(const struct lu_env *env, struct cl_io *io, + struct osc_page *ops, void *cbdata) { struct osc_thread_info *info = osc_env_info(env); struct cl_page *page = ops->ops_cl.cpl_page; @@ -3269,6 +3293,7 @@ static int discard_cb(const struct lu_env *env, struct cl_io *io, return CLP_GANG_OKAY; } +EXPORT_SYMBOL(osc_discard_cb); /** * Discard pages protected by the given lock. This function traverses radix @@ -3279,10 +3304,10 @@ static int discard_cb(const struct lu_env *env, struct cl_io *io, * behind this being that lock cancellation cannot be delayed indefinitely). */ int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc, - pgoff_t start, pgoff_t end, enum cl_lock_mode mode) + pgoff_t start, pgoff_t end, bool discard) { struct osc_thread_info *info = osc_env_info(env); - struct cl_io *io = &info->oti_io; + struct cl_io *io = osc_env_thread_io(env); osc_page_gang_cbt cb; int res; int result; @@ -3295,7 +3320,7 @@ int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc, if (result != 0) GOTO(out, result); - cb = mode == CLM_READ ? check_and_discard_cb : discard_cb; + cb = discard ? osc_discard_cb : check_and_discard_cb; info->oti_fn_index = info->oti_next_index = start; do { res = osc_page_gang_lookup(env, io, osc,