X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_cache.c;h=27a034efd154bbf0bc963a514d0b41f41b5174bf;hp=52870d5bfec560415f55a56a64a073316e1c6d3f;hb=13834f5aeef42d3c358574ac59475c0758dce300;hpb=a2cbc7d543a54f33d4e84bcd0465b7029aa1c82d diff --git a/lustre/osc/osc_cache.c b/lustre/osc/osc_cache.c index 52870d5..27a034e 100644 --- a/lustre/osc/osc_cache.c +++ b/lustre/osc/osc_cache.c @@ -27,7 +27,7 @@ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2014, Intel Corporation. + * Copyright (c) 2012, 2015, Intel Corporation. * */ /* @@ -58,13 +58,16 @@ static int osc_refresh_count(const struct lu_env *env, static int osc_io_unplug_async(const struct lu_env *env, struct client_obd *cli, struct osc_object *osc); static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages, - unsigned int lost_grant); + unsigned int lost_grant, unsigned int dirty_grant); static void osc_extent_tree_dump0(int level, struct osc_object *obj, const char *func, int line); #define osc_extent_tree_dump(lvl, obj) \ osc_extent_tree_dump0(lvl, obj, __func__, __LINE__) +static void osc_unreserve_grant(struct client_obd *cli, unsigned int reserved, + unsigned int unused); + /** \addtogroup osc * @{ */ @@ -127,9 +130,9 @@ static const char *oes_strings[] = { /* ----- part 4 ----- */ \ ## __VA_ARGS__); \ if (lvl == D_ERROR && __ext->oe_dlmlock != NULL) \ - LDLM_ERROR(__ext->oe_dlmlock, "extent: %p\n", __ext); \ + LDLM_ERROR(__ext->oe_dlmlock, "extent: %p", __ext); \ else \ - LDLM_DEBUG(__ext->oe_dlmlock, "extent: %p\n", __ext); \ + LDLM_DEBUG(__ext->oe_dlmlock, "extent: %p", __ext); \ } while (0) #undef EASSERTF @@ -226,7 +229,7 @@ static int osc_extent_sanity_check0(struct osc_extent *ext, if (ext->oe_sync && ext->oe_grants > 0) GOTO(out, rc = 90); - if (ext->oe_dlmlock != NULL) { + if (ext->oe_dlmlock != NULL && !ldlm_is_failed(ext->oe_dlmlock)) { struct ldlm_extent *extent; extent = &ext->oe_dlmlock->l_policy_data.l_extent; @@ -319,12 +322,13 @@ static struct osc_extent *osc_extent_alloc(struct osc_object *obj) { struct osc_extent *ext; - OBD_SLAB_ALLOC_PTR_GFP(ext, osc_extent_kmem, GFP_IOFS); + OBD_SLAB_ALLOC_PTR_GFP(ext, osc_extent_kmem, GFP_NOFS); if (ext == NULL) return NULL; RB_CLEAR_NODE(&ext->oe_node); ext->oe_obj = obj; + cl_object_get(osc2cl(obj)); atomic_set(&ext->oe_refc, 1); atomic_set(&ext->oe_users, 0); INIT_LIST_HEAD(&ext->oe_link); @@ -363,6 +367,7 @@ static void osc_extent_put(const struct lu_env *env, struct osc_extent *ext) LDLM_LOCK_PUT(ext->oe_dlmlock); ext->oe_dlmlock = NULL; } + cl_object_put(env, osc2cl(ext->oe_obj)); osc_extent_free(ext); } } @@ -495,15 +500,16 @@ static void osc_extent_remove(struct osc_extent *ext) /** * This function is used to merge extents to get better performance. It checks - * if @cur and @victim are contiguous at chunk level. + * if @cur and @victim are contiguous at block level. */ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur, struct osc_extent *victim) { - struct osc_object *obj = cur->oe_obj; - pgoff_t chunk_start; - pgoff_t chunk_end; - int ppc_bits; + struct osc_object *obj = cur->oe_obj; + struct client_obd *cli = osc_cli(obj); + pgoff_t chunk_start; + pgoff_t chunk_end; + int ppc_bits; LASSERT(cur->oe_state == OES_CACHE); LASSERT(osc_object_is_locked(obj)); @@ -524,11 +530,18 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur, chunk_end + 1 != victim->oe_start >> ppc_bits) return -ERANGE; + /* overall extent size should not exceed the max supported limit + * reported by the server */ + if (cur->oe_end - cur->oe_start + 1 + + victim->oe_end - victim->oe_start + 1 > cli->cl_max_extent_pages) + return -ERANGE; + OSC_EXTENT_DUMP(D_CACHE, victim, "will be merged by %p.\n", cur); cur->oe_start = min(cur->oe_start, victim->oe_start); cur->oe_end = max(cur->oe_end, victim->oe_end); - cur->oe_grants += victim->oe_grants; + /* per-extent tax should be accounted only once for the whole extent */ + cur->oe_grants += victim->oe_grants - cli->cl_grant_extent_tax; cur->oe_nr_pages += victim->oe_nr_pages; /* only the following bits are needed to merge */ cur->oe_urgent |= victim->oe_urgent; @@ -551,6 +564,7 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur, int osc_extent_release(const struct lu_env *env, struct osc_extent *ext) { struct osc_object *obj = ext->oe_obj; + struct client_obd *cli = osc_cli(obj); int rc = 0; ENTRY; @@ -567,13 +581,19 @@ int osc_extent_release(const struct lu_env *env, struct osc_extent *ext) osc_extent_state_set(ext, OES_TRUNC); ext->oe_trunc_pending = 0; } else { + int grant = 0; + osc_extent_state_set(ext, OES_CACHE); osc_update_pending(obj, OBD_BRW_WRITE, ext->oe_nr_pages); /* try to merge the previous and next extent. */ - osc_extent_merge(env, ext, prev_extent(ext)); - osc_extent_merge(env, ext, next_extent(ext)); + if (osc_extent_merge(env, ext, prev_extent(ext)) == 0) + grant += cli->cl_grant_extent_tax; + if (osc_extent_merge(env, ext, next_extent(ext)) == 0) + grant += cli->cl_grant_extent_tax; + if (grant > 0) + osc_unreserve_grant(cli, 0, grant); if (ext->oe_urgent) list_move_tail(&ext->oe_link, @@ -581,7 +601,7 @@ int osc_extent_release(const struct lu_env *env, struct osc_extent *ext) } osc_object_unlock(obj); - osc_io_unplug_async(env, osc_cli(obj), obj); + osc_io_unplug_async(env, cli, obj); } osc_extent_put(env, ext); RETURN(rc); @@ -627,15 +647,20 @@ static struct osc_extent *osc_extent_find(const struct lu_env *env, descr = &olck->ols_cl.cls_lock->cll_descr; LASSERT(descr->cld_mode >= CLM_WRITE); - LASSERT(cli->cl_chunkbits >= PAGE_CACHE_SHIFT); - ppc_bits = cli->cl_chunkbits - PAGE_CACHE_SHIFT; + LASSERTF(cli->cl_chunkbits >= PAGE_SHIFT, + "chunkbits: %u\n", cli->cl_chunkbits); + ppc_bits = cli->cl_chunkbits - PAGE_SHIFT; chunk_mask = ~((1 << ppc_bits) - 1); chunksize = 1 << cli->cl_chunkbits; chunk = index >> ppc_bits; - /* align end to rpc edge, rpc size may not be a power 2 integer. */ + /* align end to RPC edge. */ max_pages = cli->cl_max_pages_per_rpc; - LASSERT((max_pages & ~chunk_mask) == 0); + if ((max_pages & ~chunk_mask) != 0) { + CERROR("max_pages: %#x chunkbits: %u chunk_mask: %#lx\n", + max_pages, cli->cl_chunkbits, chunk_mask); + RETURN(ERR_PTR(-EINVAL)); + } max_end = index - (index % max_pages) + max_pages - 1; max_end = min_t(pgoff_t, max_end, descr->cld_end); @@ -656,8 +681,8 @@ static struct osc_extent *osc_extent_find(const struct lu_env *env, } /* grants has been allocated by caller */ - LASSERTF(*grants >= chunksize + cli->cl_extent_tax, - "%u/%u/%u.\n", *grants, chunksize, cli->cl_extent_tax); + LASSERTF(*grants >= chunksize + cli->cl_grant_extent_tax, + "%u/%u/%u.\n", *grants, chunksize, cli->cl_grant_extent_tax); LASSERTF((max_end - cur->oe_start) < max_pages, EXTSTR"\n", EXTPARA(cur)); @@ -730,6 +755,13 @@ restart: continue; } + /* check whether maximum extent size will be hit */ + if ((ext_chk_end - ext_chk_start + 1 + 1) << ppc_bits > + cli->cl_max_extent_pages) { + ext = next_extent(ext); + continue; + } + /* it's required that an extent must be contiguous at chunk * level so that we know the whole extent is covered by grant * (the pages in the extent are NOT required to be contiguous). @@ -757,7 +789,7 @@ restart: * in a gap */ if (osc_extent_merge(env, ext, next_extent(ext)) == 0) /* we can save extent tax from next extent */ - *grants += cli->cl_extent_tax; + *grants += cli->cl_grant_extent_tax; found = osc_extent_hold(ext); } @@ -778,7 +810,7 @@ restart: } else if (conflict == NULL) { /* create a new extent */ EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur); - cur->oe_grants = chunksize + cli->cl_extent_tax; + cur->oe_grants = chunksize + cli->cl_grant_extent_tax; *grants -= cur->oe_grants; LASSERT(*grants >= 0); @@ -823,7 +855,7 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext, int nr_pages = ext->oe_nr_pages; int lost_grant = 0; int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096; - __u64 last_off = 0; + loff_t last_off = 0; int last_count = -1; ENTRY; @@ -854,7 +886,7 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext, /* For short writes we shouldn't count parts of pages that * span a whole chunk on the OST side, or our accounting goes * wrong. Should match the code in filter_grant_check. */ - int offset = last_off & ~CFS_PAGE_MASK; + int offset = last_off & ~PAGE_MASK; int count = last_count + (offset & (blocksize - 1)); int end = (offset + last_count) & (blocksize - 1); if (end) @@ -863,7 +895,7 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext, lost_grant = PAGE_CACHE_SIZE - count; } if (ext->oe_grants > 0) - osc_free_grant(cli, nr_pages, lost_grant); + osc_free_grant(cli, nr_pages, lost_grant, ext->oe_grants); osc_extent_remove(ext); /* put the refcount for RPC */ @@ -916,7 +948,7 @@ static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext, if (rc == -ETIMEDOUT) { OSC_EXTENT_DUMP(D_ERROR, ext, "%s: wait ext to %u timedout, recovery in progress?\n", - osc_export(obj)->exp_obd->obd_name, state); + cli_name(osc_cli(obj)), state); lwi = LWI_INTR(NULL, NULL); rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state), @@ -934,7 +966,6 @@ static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext, static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index, bool partial) { - struct cl_env_nest nest; struct lu_env *env; struct cl_io *io; struct osc_object *obj = ext->oe_obj; @@ -948,6 +979,7 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index, int grants = 0; int nr_pages = 0; int rc = 0; + __u16 refcheck; ENTRY; LASSERT(sanity_check(ext) == 0); @@ -957,14 +989,15 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index, /* Request new lu_env. * We can't use that env from osc_cache_truncate_start() because * it's from lov_io_sub and not fully initialized. */ - env = cl_env_nested_get(&nest); + env = cl_env_get(&refcheck); io = &osc_env_info(env)->oti_io; io->ci_obj = cl_object_top(osc2cl(obj)); + io->ci_ignore_layout = 1; rc = cl_io_init(env, io, CIT_MISC, io->ci_obj); if (rc < 0) GOTO(out, rc); - /* discard all pages with index greater then trunc_index */ + /* discard all pages with index greater than trunc_index */ list_for_each_entry_safe(oap, tmp, &ext->oe_pages, oap_pending_item) { pgoff_t index = osc_index(oap2osc(oap)); @@ -1038,11 +1071,11 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index, osc_object_unlock(obj); if (grants > 0 || nr_pages > 0) - osc_free_grant(cli, nr_pages, grants); + osc_free_grant(cli, nr_pages, grants, grants); out: cl_io_fini(env, io); - cl_env_nested_put(&nest, env); + cl_env_put(env, &refcheck); RETURN(rc); } @@ -1156,9 +1189,14 @@ static int osc_extent_expand(struct osc_extent *ext, pgoff_t index, GOTO(out, rc = 0); LASSERT(end_chunk + 1 == chunk); + /* try to expand this extent to cover @index */ end_index = min(ext->oe_max_end, ((chunk + 1) << ppc_bits) - 1); + /* don't go over the maximum extent size reported by server */ + if (end_index - ext->oe_start + 1 > cli->cl_max_extent_pages) + GOTO(out, rc = -ERANGE); + next = next_extent(ext); if (next != NULL && next->oe_start <= end_index) /* complex mode - overlapped with the next extent, @@ -1279,7 +1317,6 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, { struct osc_page *opg = oap2osc_page(oap); struct cl_page *page = oap2cl_page(oap); - struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj); enum cl_req_type crt; int srvlock; @@ -1292,25 +1329,10 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, "cp_state:%u, cmd:%d\n", page->cp_state, cmd); LASSERT(opg->ops_transfer_pinned); - /* - * page->cp_req can be NULL if io submission failed before - * cl_req was allocated. - */ - if (page->cp_req != NULL) - cl_req_page_done(env, page); - LASSERT(page->cp_req == NULL); - crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE; /* Clear opg->ops_transfer_pinned before VM lock is released. */ opg->ops_transfer_pinned = 0; - spin_lock(&obj->oo_seatbelt); - LASSERT(opg->ops_submitter != NULL); - LASSERT(!list_empty(&opg->ops_inflight)); - list_del_init(&opg->ops_inflight); - opg->ops_submitter = NULL; - spin_unlock(&obj->oo_seatbelt); - opg->ops_submit_time = 0; srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK; @@ -1334,6 +1356,7 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, lu_ref_del(&page->cp_reference, "transfer", page); cl_page_completion(env, page, crt, rc); + cl_page_put(env, page); RETURN(0); } @@ -1341,12 +1364,14 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, #define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do { \ struct client_obd *__tmp = (cli); \ CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %ld/%lu " \ - "dropped: %ld avail: %ld, reserved: %ld, flight: %d }" \ - "lru {in list: %ld, left: %ld, waiters: %d }"fmt"\n", \ - __tmp->cl_import->imp_obd->obd_name, \ + "dropped: %ld avail: %ld, dirty_grant: %ld, " \ + "reserved: %ld, flight: %d } lru {in list: %ld, " \ + "left: %ld, waiters: %d }" fmt "\n", \ + cli_name(__tmp), \ __tmp->cl_dirty_pages, __tmp->cl_dirty_max_pages, \ atomic_long_read(&obd_dirty_pages), obd_max_dirty_pages, \ __tmp->cl_lost_grant, __tmp->cl_avail_grant, \ + __tmp->cl_dirty_grant, \ __tmp->cl_reserved_grant, __tmp->cl_w_in_flight, \ atomic_long_read(&__tmp->cl_lru_in_list), \ atomic_long_read(&__tmp->cl_lru_busy), \ @@ -1420,8 +1445,10 @@ static void __osc_unreserve_grant(struct client_obd *cli, if (unused > reserved) { cli->cl_avail_grant += reserved; cli->cl_lost_grant += unused - reserved; + cli->cl_dirty_grant -= unused - reserved; } else { cli->cl_avail_grant += unused; + cli->cl_dirty_grant += reserved - unused; } } @@ -1449,14 +1476,17 @@ static void osc_unreserve_grant(struct client_obd *cli, * See filter_grant_check() for details. */ static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages, - unsigned int lost_grant) + unsigned int lost_grant, unsigned int dirty_grant) { - unsigned long grant = (1 << cli->cl_chunkbits) + cli->cl_extent_tax; + unsigned long grant; + + grant = (1 << cli->cl_chunkbits) + cli->cl_grant_extent_tax; spin_lock(&cli->cl_loi_list_lock); atomic_long_sub(nr_pages, &obd_dirty_pages); cli->cl_dirty_pages -= nr_pages; cli->cl_lost_grant += lost_grant; + cli->cl_dirty_grant -= dirty_grant; if (cli->cl_avail_grant < grant && cli->cl_lost_grant >= grant) { /* borrow some grant from truncate to avoid the case that * truncate uses up all avail grant */ @@ -1465,9 +1495,10 @@ static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages, } osc_wake_cache_waiters(cli); spin_unlock(&cli->cl_loi_list_lock); - CDEBUG(D_CACHE, "lost %u grant: %lu avail: %lu dirty: %lu\n", + CDEBUG(D_CACHE, "lost %u grant: %lu avail: %lu dirty: %lu/%lu\n", lost_grant, cli->cl_lost_grant, - cli->cl_avail_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT); + cli->cl_avail_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT, + cli->cl_dirty_grant); } /** @@ -1491,7 +1522,7 @@ static int osc_enter_cache_try(struct client_obd *cli, { int rc; - OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes); + OSC_DUMP_GRANT(D_CACHE, cli, "need:%d\n", bytes); rc = osc_reserve_grant(cli, bytes); if (rc < 0) @@ -1532,15 +1563,17 @@ static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw) static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, struct osc_async_page *oap, int bytes) { - struct osc_object *osc = oap->oap_obj; - struct lov_oinfo *loi = osc->oo_oinfo; - struct osc_cache_waiter ocw; - struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL, - LWI_ON_SIGNAL_NOOP, NULL); - int rc = -EDQUOT; + struct osc_object *osc = oap->oap_obj; + struct lov_oinfo *loi = osc->oo_oinfo; + struct osc_cache_waiter ocw; + struct l_wait_info lwi; + int rc = -EDQUOT; ENTRY; - OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes); + lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(AT_OFF ? obd_timeout : at_max), + NULL, LWI_ON_SIGNAL_NOOP, NULL); + + OSC_DUMP_GRANT(D_CACHE, cli, "need:%d\n", bytes); spin_lock(&cli->cl_loi_list_lock); @@ -1548,12 +1581,16 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, * of queued writes and create a discontiguous rpc stream */ if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) || cli->cl_dirty_max_pages == 0 || - cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) + cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) { + OSC_DUMP_GRANT(D_CACHE, cli, "forced sync i/o\n"); GOTO(out, rc = -EDQUOT); + } /* Hopefully normal case - cache space and write credits available */ - if (osc_enter_cache_try(cli, oap, bytes, 0)) + if (osc_enter_cache_try(cli, oap, bytes, 0)) { + OSC_DUMP_GRANT(D_CACHE, cli, "granted from cache\n"); GOTO(out, rc = 0); + } /* We can get here for two reasons: too many dirty pages in cache, or * run out of grants. In both cases we should write dirty pages out. @@ -1572,48 +1609,57 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, osc_io_unplug_async(env, cli, NULL); CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n", - cli->cl_import->imp_obd->obd_name, &ocw, oap); + cli_name(cli), &ocw, oap); rc = l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi); spin_lock(&cli->cl_loi_list_lock); - /* l_wait_event is interrupted by signal, or timed out */ if (rc < 0) { - switch (rc) { - case -ETIMEDOUT: - OSC_DUMP_GRANT(D_ERROR, cli, - "try to reserve %d.\n", bytes); - osc_extent_tree_dump(D_ERROR, osc); - rc = -EDQUOT; - break; - case -EINTR: - /* Ensures restartability - LU-3581 */ - rc = -ERESTARTSYS; - break; - default: - CDEBUG(D_CACHE, "%s: event for cache space @" - " %p never arrived due to %d\n", - cli->cl_import->imp_obd->obd_name, - &ocw, rc); - break; - } + /* l_wait_event is interrupted by signal or timed out */ list_del_init(&ocw.ocw_entry); - GOTO(out, rc); + break; } - LASSERT(list_empty(&ocw.ocw_entry)); rc = ocw.ocw_rc; if (rc != -EDQUOT) - GOTO(out, rc); - if (osc_enter_cache_try(cli, oap, bytes, 0)) - GOTO(out, rc = 0); + break; + if (osc_enter_cache_try(cli, oap, bytes, 0)) { + rc = 0; + break; + } + } + + switch (rc) { + case 0: + OSC_DUMP_GRANT(D_CACHE, cli, "finally got grant space\n"); + break; + case -ETIMEDOUT: + OSC_DUMP_GRANT(D_CACHE, cli, + "timeout, fall back to sync i/o\n"); + osc_extent_tree_dump(D_CACHE, osc); + /* fall back to synchronous I/O */ + rc = -EDQUOT; + break; + case -EINTR: + /* Ensures restartability - LU-3581 */ + OSC_DUMP_GRANT(D_CACHE, cli, "interrupted\n"); + rc = -ERESTARTSYS; + break; + case -EDQUOT: + OSC_DUMP_GRANT(D_CACHE, cli, + "no grant space, fall back to sync i/o\n"); + break; + default: + CDEBUG(D_CACHE, "%s: event for cache space @ %p never arrived " + "due to %d, fall back to sync i/o\n", + cli_name(cli), &ocw, rc); + break; } EXIT; out: spin_unlock(&cli->cl_loi_list_lock); - OSC_DUMP_GRANT(D_CACHE, cli, "returned %d.\n", rc); RETURN(rc); } @@ -1640,10 +1686,8 @@ void osc_wake_cache_waiters(struct client_obd *cli) goto wakeup; } - ocw->ocw_rc = 0; - if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0)) - ocw->ocw_rc = -EDQUOT; - + if (osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0)) + ocw->ocw_rc = 0; wakeup: CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n", ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc); @@ -1880,7 +1924,8 @@ static int try_to_add_extent_for_io(struct client_obd *cli, } if (tmp->oe_srvlock != ext->oe_srvlock || - !tmp->oe_grants != !ext->oe_grants) + !tmp->oe_grants != !ext->oe_grants || + tmp->oe_no_merge || ext->oe_no_merge) RETURN(0); /* remove break for strict check */ @@ -1971,7 +2016,7 @@ static unsigned int get_write_extents(struct osc_object *obj, static int osc_send_write_rpc(const struct lu_env *env, struct client_obd *cli, - struct osc_object *osc, pdl_policy_t pol) + struct osc_object *osc) __must_hold(osc) { struct list_head rpclist = LIST_HEAD_INIT(rpclist); @@ -2025,7 +2070,7 @@ __must_hold(osc) if (!list_empty(&rpclist)) { LASSERT(page_count > 0); - rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_WRITE, pol); + rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_WRITE); LASSERT(list_empty(&rpclist)); } @@ -2045,7 +2090,7 @@ __must_hold(osc) */ static int osc_send_read_rpc(const struct lu_env *env, struct client_obd *cli, - struct osc_object *osc, pdl_policy_t pol) + struct osc_object *osc) __must_hold(osc) { struct osc_extent *ext; @@ -2074,7 +2119,7 @@ __must_hold(osc) osc_object_unlock(osc); LASSERT(page_count > 0); - rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_READ, pol); + rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_READ); LASSERT(list_empty(&rpclist)); osc_object_lock(osc); @@ -2124,8 +2169,7 @@ static struct osc_object *osc_next_obj(struct client_obd *cli) } /* called with the loi list lock held */ -static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli, - pdl_policy_t pol) +static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) __must_hold(&cli->cl_loi_list_lock) { struct osc_object *osc; @@ -2155,7 +2199,7 @@ __must_hold(&cli->cl_loi_list_lock) * do io on writes while there are cache waiters */ osc_object_lock(osc); if (osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) { - rc = osc_send_write_rpc(env, cli, osc, pol); + rc = osc_send_write_rpc(env, cli, osc); if (rc < 0) { CERROR("Write request failed with %d\n", rc); @@ -2179,7 +2223,7 @@ __must_hold(&cli->cl_loi_list_lock) } } if (osc_makes_rpc(cli, osc, OBD_BRW_READ)) { - rc = osc_send_read_rpc(env, cli, osc, pol); + rc = osc_send_read_rpc(env, cli, osc); if (rc < 0) CERROR("Read request failed with %d\n", rc); } @@ -2194,7 +2238,7 @@ __must_hold(&cli->cl_loi_list_lock) } static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli, - struct osc_object *osc, pdl_policy_t pol, int async) + struct osc_object *osc, int async) { int rc = 0; @@ -2202,13 +2246,9 @@ static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli, return 0; if (!async) { - /* disable osc_lru_shrink() temporarily to avoid - * potential stack overrun problem. LU-2859 */ - atomic_inc(&cli->cl_lru_shrinkers); spin_lock(&cli->cl_loi_list_lock); - osc_check_rpcs(env, cli, pol); + osc_check_rpcs(env, cli); spin_unlock(&cli->cl_loi_list_lock); - atomic_dec(&cli->cl_lru_shrinkers); } else { CDEBUG(D_CACHE, "Queue writeback work for client %p.\n", cli); LASSERT(cli->cl_writeback_work != NULL); @@ -2220,14 +2260,13 @@ static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli, static int osc_io_unplug_async(const struct lu_env *env, struct client_obd *cli, struct osc_object *osc) { - /* XXX: policy is no use actually. */ - return osc_io_unplug0(env, cli, osc, PDL_POLICY_ROUND, 1); + return osc_io_unplug0(env, cli, osc, 1); } void osc_io_unplug(const struct lu_env *env, struct client_obd *cli, - struct osc_object *osc, pdl_policy_t pol) + struct osc_object *osc) { - (void)osc_io_unplug0(env, cli, osc, pol, 0); + (void)osc_io_unplug0(env, cli, osc, 0); } int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops, @@ -2246,9 +2285,9 @@ int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops, oap->oap_page = page; oap->oap_obj_off = offset; - LASSERT(!(offset & ~CFS_PAGE_MASK)); + LASSERT(!(offset & ~PAGE_MASK)); - if (!client_is_remote(exp) && cfs_capable(CFS_CAP_SYS_RESOURCE)) + if (cfs_capable(CFS_CAP_SYS_RESOURCE)) oap->oap_brw_flags = OBD_BRW_NOQUOTA; INIT_LIST_HEAD(&oap->oap_pending_item); @@ -2271,7 +2310,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, pgoff_t index; unsigned int tmp; unsigned int grants = 0; - int brw_flags = OBD_BRW_ASYNC; + u32 brw_flags = OBD_BRW_ASYNC; int cmd = OBD_BRW_WRITE; int need_release = 0; int rc = 0; @@ -2289,8 +2328,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, /* Set the OBD_BRW_SRVLOCK before the page is queued. */ brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0; - if (!client_is_remote(osc_export(osc)) && - cfs_capable(CFS_CAP_SYS_RESOURCE)) { + if (cfs_capable(CFS_CAP_SYS_RESOURCE)) { brw_flags |= OBD_BRW_NOQUOTA; cmd |= OBD_BRW_NOQUOTA; } @@ -2299,7 +2337,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, if (!(cmd & OBD_BRW_NOQUOTA)) { struct cl_object *obj; struct cl_attr *attr; - unsigned int qid[MAXQUOTAS]; + unsigned int qid[LL_MAXQUOTAS]; obj = cl_object_top(&osc->oo_cl); attr = &osc_env_info(env)->oti_attr; @@ -2339,7 +2377,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, if (ext != NULL && ext->oe_start <= index && ext->oe_max_end >= index) { /* one chunk plus extent overhead must be enough to write this * page */ - grants = (1 << cli->cl_chunkbits) + cli->cl_extent_tax; + grants = (1 << cli->cl_chunkbits) + cli->cl_grant_extent_tax; if (ext->oe_end >= index) grants = 0; @@ -2376,7 +2414,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, } if (ext == NULL) { - tmp = (1 << cli->cl_chunkbits) + cli->cl_extent_tax; + tmp = (1 << cli->cl_chunkbits) + cli->cl_grant_extent_tax; /* try to find new extent to cover this page */ LASSERT(oio->oi_active == NULL); @@ -2430,7 +2468,6 @@ int osc_teardown_async_page(const struct lu_env *env, struct osc_object *obj, struct osc_page *ops) { struct osc_async_page *oap = &ops->ops_oap; - struct osc_extent *ext = NULL; int rc = 0; ENTRY; @@ -2439,12 +2476,15 @@ int osc_teardown_async_page(const struct lu_env *env, CDEBUG(D_INFO, "teardown oap %p page %p at index %lu.\n", oap, ops, osc_index(oap2osc(oap))); - osc_object_lock(obj); if (!list_empty(&oap->oap_rpc_item)) { CDEBUG(D_CACHE, "oap %p is not in cache.\n", oap); rc = -EBUSY; } else if (!list_empty(&oap->oap_pending_item)) { + struct osc_extent *ext = NULL; + + osc_object_lock(obj); ext = osc_extent_lookup(obj, osc_index(oap2osc(oap))); + osc_object_unlock(obj); /* only truncated pages are allowed to be taken out. * See osc_extent_truncate() and osc_cache_truncate_start() * for details. */ @@ -2453,10 +2493,9 @@ int osc_teardown_async_page(const struct lu_env *env, osc_index(oap2osc(oap))); rc = -EBUSY; } + if (ext != NULL) + osc_extent_put(env, ext); } - osc_object_unlock(obj); - if (ext != NULL) - osc_extent_put(env, ext); RETURN(rc); } @@ -2615,23 +2654,31 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj, struct osc_async_page *oap; int page_count = 0; int mppr = cli->cl_max_pages_per_rpc; + bool can_merge = true; pgoff_t start = CL_PAGE_EOF; pgoff_t end = 0; ENTRY; list_for_each_entry(oap, list, oap_pending_item) { - pgoff_t index = osc_index(oap2osc(oap)); + struct osc_page *opg = oap2osc_page(oap); + pgoff_t index = osc_index(opg); + if (index > end) end = index; if (index < start) start = index; ++page_count; mppr <<= (page_count > mppr); + + if (unlikely(opg->ops_from > 0 || opg->ops_to < PAGE_SIZE)) + can_merge = false; } ext = osc_extent_alloc(obj); if (ext == NULL) { - list_for_each_entry(oap, list, oap_pending_item) { + struct osc_async_page *tmp; + + list_for_each_entry_safe(oap, tmp, list, oap_pending_item) { list_del_init(&oap->oap_pending_item); osc_ap_completion(env, cli, oap, 0, -ENOMEM); } @@ -2640,6 +2687,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj, ext->oe_rw = !!(cmd & OBD_BRW_READ); ext->oe_sync = 1; + ext->oe_no_merge = !can_merge; ext->oe_urgent = 1; ext->oe_start = start; ext->oe_end = ext->oe_max_end = end; @@ -2668,8 +2716,8 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj, /** * Called by osc_io_setattr_start() to freeze and destroy covering extents. */ -int osc_cache_truncate_start(const struct lu_env *env, struct osc_io *oio, - struct osc_object *obj, __u64 size) +int osc_cache_truncate_start(const struct lu_env *env, struct osc_object *obj, + __u64 size, struct osc_extent **extp) { struct client_obd *cli = osc_cli(obj); struct osc_extent *ext; @@ -2699,7 +2747,6 @@ again: * a page already having been flushed by write_page(). * We have to wait for this extent because we can't * truncate that page. */ - LASSERT(!ext->oe_hp); OSC_EXTENT_DUMP(D_CACHE, ext, "waiting for busy extent\n"); waiting = osc_extent_get(ext); @@ -2763,9 +2810,11 @@ again: /* we need to hold this extent in OES_TRUNC state so * that no writeback will happen. This is to avoid - * BUG 17397. */ - LASSERT(oio->oi_trunc == NULL); - oio->oi_trunc = osc_extent_get(ext); + * BUG 17397. + * Only partial truncate can reach here, if @size is + * not zero, the caller should provide a valid @extp. */ + LASSERT(*extp == NULL); + *extp = osc_extent_get(ext); OSC_EXTENT_DUMP(D_CACHE, ext, "trunc at "LPU64"\n", size); } @@ -2790,13 +2839,10 @@ again: /** * Called after osc_io_setattr_end to add oio->oi_trunc back to cache. */ -void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio, - struct osc_object *obj) +void osc_cache_truncate_end(const struct lu_env *env, struct osc_extent *ext) { - struct osc_extent *ext = oio->oi_trunc; - - oio->oi_trunc = NULL; if (ext != NULL) { + struct osc_object *obj = ext->oe_obj; bool unplug = false; EASSERT(ext->oe_nr_pages > 0, ext); @@ -2979,7 +3025,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj, } if (unplug) - osc_io_unplug(env, osc_cli(obj), obj, PDL_POLICY_ROUND); + osc_io_unplug(env, osc_cli(obj), obj); if (hp || discard) { int rc; @@ -3100,7 +3146,8 @@ static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io, struct cl_page *page = ops->ops_cl.cpl_page; /* refresh non-overlapped index */ - tmp = osc_dlmlock_at_pgoff(env, osc, index, 0, 0); + tmp = osc_dlmlock_at_pgoff(env, osc, index, + OSC_DAP_FL_TEST_LOCK); if (tmp != NULL) { __u64 end = tmp->l_policy_data.l_extent.end; /* Cache the first-non-overlapped index so as to skip @@ -3133,8 +3180,10 @@ static int discard_cb(const struct lu_env *env, struct cl_io *io, /* page is top page. */ info->oti_next_index = osc_index(ops) + 1; if (cl_page_own(env, io, page) == 0) { - KLASSERT(ergo(page->cp_type == CPT_CACHEABLE, - !PageDirty(cl_page_vmpage(page)))); + if (!ergo(page->cp_type == CPT_CACHEABLE, + !PageDirty(cl_page_vmpage(page)))) + CL_PAGE_DEBUG(D_ERROR, env, page, + "discard dirty page?\n"); /* discard the page */ cl_page_discard(env, io, page);