X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosc%2Fosc_cache.c;h=0d61628d8047824bf07019b973eaaf255227250b;hb=fd2b619ca59739b1978fb3f09690526afa9d00dc;hp=061fdea3ee2cd65aa26653a63fa75ff640774528;hpb=5bc62396670e499af519149739d6ede4e7bfbe68;p=fs%2Flustre-release.git diff --git a/lustre/osc/osc_cache.c b/lustre/osc/osc_cache.c index 061fdea..0d61628 100644 --- a/lustre/osc/osc_cache.c +++ b/lustre/osc/osc_cache.c @@ -27,7 +27,7 @@ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011 Whamcloud, Inc. + * Copyright (c) 2012, 2013, Intel Corporation. * */ /* @@ -799,10 +799,11 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext, struct client_obd *cli = osc_cli(ext->oe_obj); struct osc_async_page *oap; struct osc_async_page *tmp; - struct osc_async_page *last = NULL; int nr_pages = ext->oe_nr_pages; int lost_grant = 0; int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096; + __u64 last_off = 0; + int last_count = -1; ENTRY; OSC_EXTENT_DUMP(D_CACHE, ext, "extent finished.\n"); @@ -813,8 +814,10 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext, oap_pending_item) { cfs_list_del_init(&oap->oap_rpc_item); cfs_list_del_init(&oap->oap_pending_item); - if (last == NULL || last->oap_obj_off < oap->oap_obj_off) - last = oap; + if (last_off <= oap->oap_obj_off) { + last_off = oap->oap_obj_off; + last_count = oap->oap_count; + } --ext->oe_nr_pages; osc_ap_completion(env, cli, oap, sent, rc); @@ -824,7 +827,7 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext, if (!sent) { lost_grant = ext->oe_grants; } else if (blocksize < CFS_PAGE_SIZE && - last->oap_count != CFS_PAGE_SIZE) { + last_count != CFS_PAGE_SIZE) { /* For short writes we shouldn't count parts of pages that * span a whole chunk on the OST side, or our accounting goes * wrong. Should match the code in filter_grant_check. */ @@ -1054,9 +1057,9 @@ static int osc_extent_make_ready(const struct lu_env *env, rc = osc_make_ready(env, oap, OBD_BRW_WRITE); switch (rc) { case 0: - cfs_spin_lock(&oap->oap_lock); + spin_lock(&oap->oap_lock); oap->oap_async_flags |= ASYNC_READY; - cfs_spin_unlock(&oap->oap_lock); + spin_unlock(&oap->oap_lock); break; case -EALREADY: LASSERT((oap->oap_async_flags & ASYNC_READY) != 0); @@ -1270,18 +1273,16 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, /* Clear opg->ops_transfer_pinned before VM lock is released. */ opg->ops_transfer_pinned = 0; - cfs_spin_lock(&obj->oo_seatbelt); + spin_lock(&obj->oo_seatbelt); LASSERT(opg->ops_submitter != NULL); LASSERT(!cfs_list_empty(&opg->ops_inflight)); cfs_list_del_init(&opg->ops_inflight); opg->ops_submitter = NULL; - cfs_spin_unlock(&obj->oo_seatbelt); + spin_unlock(&obj->oo_seatbelt); opg->ops_submit_time = 0; srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK; - cl_page_completion(env, page, crt, rc); - /* statistic */ if (rc == 0 && srvlock) { struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev; @@ -1300,12 +1301,9 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, * reference counter protects page from concurrent reclaim. */ lu_ref_del(&page->cp_reference, "transfer", page); - /* - * As page->cp_obj is pinned by a reference from page->cp_req, it is - * safe to call cl_page_put() without risking object destruction in a - * non-blocking context. - */ - cl_page_put(env, page); + + cl_page_completion(env, page, crt, rc); + RETURN(0); } @@ -1390,8 +1388,6 @@ static void __osc_unreserve_grant(struct client_obd *cli, } else { cli->cl_avail_grant += unused; } - if (unused > 0) - osc_wake_cache_waiters(cli); } void osc_unreserve_grant(struct client_obd *cli, @@ -1399,6 +1395,8 @@ void osc_unreserve_grant(struct client_obd *cli, { client_obd_list_lock(&cli->cl_loi_list_lock); __osc_unreserve_grant(cli, reserved, unused); + if (unused > 0) + osc_wake_cache_waiters(cli); client_obd_list_unlock(&cli->cl_loi_list_lock); } @@ -1437,12 +1435,15 @@ static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages, cli->cl_avail_grant, cli->cl_dirty); } -/* The companion to osc_enter_cache(), called when @oap is no longer part of - * the dirty accounting. Writeback completes or truncate happens before - * writing starts. Must be called with the loi lock held. */ +/** + * The companion to osc_enter_cache(), called when @oap is no longer part of + * the dirty accounting due to error. + */ static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap) { + client_obd_list_lock(&cli->cl_loi_list_lock); osc_release_write_grant(cli, &oap->oap_brw_page); + client_obd_list_unlock(&cli->cl_loi_list_lock); } /** @@ -1477,8 +1478,22 @@ static int osc_enter_cache_try(struct client_obd *cli, return rc; } -/* Caller must hold loi_list_lock - we drop/regain it if we need to wait for - * grant or cache space. */ +static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw) +{ + int rc; + client_obd_list_lock(&cli->cl_loi_list_lock); + rc = cfs_list_empty(&ocw->ocw_entry); + client_obd_list_unlock(&cli->cl_loi_list_lock); + return rc; +} + +/** + * The main entry to reserve dirty page accounting. Usually the grant reserved + * in this function will be freed in bulk in osc_free_grant() unless it fails + * to add osc cache, in that case, it will be freed in osc_exit_cache(). + * + * The process will be put into sleep if it's already run out of grant. + */ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, struct osc_async_page *oap, int bytes) { @@ -1518,29 +1533,30 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, ocw.ocw_rc = 0; client_obd_list_unlock(&cli->cl_loi_list_lock); - osc_io_unplug(env, cli, osc, PDL_POLICY_ROUND); + osc_io_unplug_async(env, cli, NULL); CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n", cli->cl_import->imp_obd->obd_name, &ocw, oap); - rc = l_wait_event(ocw.ocw_waitq, - cfs_list_empty(&ocw.ocw_entry), &lwi); + rc = l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi); client_obd_list_lock(&cli->cl_loi_list_lock); - cfs_list_del_init(&ocw.ocw_entry); - if (rc < 0) - break; + /* l_wait_event is interrupted by signal */ + if (rc < 0) { + cfs_list_del_init(&ocw.ocw_entry); + GOTO(out, rc); + } + + LASSERT(cfs_list_empty(&ocw.ocw_entry)); rc = ocw.ocw_rc; + if (rc != -EDQUOT) - break; - if (osc_enter_cache_try(cli, oap, bytes, 0)) { - rc = 0; - break; - } + GOTO(out, rc); + if (osc_enter_cache_try(cli, oap, bytes, 0)) + GOTO(out, rc = 0); } EXIT; - out: client_obd_list_unlock(&cli->cl_loi_list_lock); OSC_DUMP_GRANT(cli, "returned %d.\n", rc); @@ -1555,31 +1571,25 @@ void osc_wake_cache_waiters(struct client_obd *cli) ENTRY; cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { - /* if we can't dirty more, we must wait until some is written */ + ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry); + cfs_list_del_init(&ocw->ocw_entry); + + ocw->ocw_rc = -EDQUOT; + /* we can't dirty more */ if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) || (cfs_atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) { CDEBUG(D_CACHE, "no dirty room: dirty: %ld " "osc max %ld, sys max %d\n", cli->cl_dirty, cli->cl_dirty_max, obd_max_dirty_pages); - return; + goto wakeup; } - /* if still dirty cache but no grant wait for pending RPCs that - * may yet return us some grant before doing sync writes */ - if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) { - CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n", - cli->cl_w_in_flight); - return; - } - - ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry); - cfs_list_del_init(&ocw->ocw_entry); - ocw->ocw_rc = 0; if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0)) ocw->ocw_rc = -EDQUOT; +wakeup: CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n", ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc); @@ -1753,9 +1763,9 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli, } /* As the transfer for this page is being done, clear the flags */ - cfs_spin_lock(&oap->oap_lock); + spin_lock(&oap->oap_lock); oap->oap_async_flags = 0; - cfs_spin_unlock(&oap->oap_lock); + spin_unlock(&oap->oap_lock); oap->oap_interrupted = 0; if (oap->oap_cmd & OBD_BRW_WRITE && xid > 0) { @@ -2122,7 +2132,11 @@ static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli, has_rpcs = __osc_list_maint(cli, osc); if (has_rpcs) { if (!async) { + /* disable osc_lru_shrink() temporarily to avoid + * potential stack overrun problem. LU-2859 */ + cfs_atomic_inc(&cli->cl_lru_shrinkers); osc_check_rpcs(env, cli, pol); + cfs_atomic_dec(&cli->cl_lru_shrinkers); } else { CDEBUG(D_CACHE, "Queue writeback work for client %p.\n", cli); @@ -2171,7 +2185,7 @@ int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops, CFS_INIT_LIST_HEAD(&oap->oap_pending_item); CFS_INIT_LIST_HEAD(&oap->oap_rpc_item); - cfs_spin_lock_init(&oap->oap_lock); + spin_lock_init(&oap->oap_lock); CDEBUG(D_INFO, "oap %p page %p obj off "LPU64"\n", oap, page, oap->oap_obj_off); RETURN(0); @@ -2389,7 +2403,7 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io, struct cl_page *cp = ops->ops_cl.cpl_page; pgoff_t index = cp->cp_index; struct osc_async_page *oap = &ops->ops_oap; - int unplug = 0; + bool unplug = false; int rc = 0; ENTRY; @@ -2425,19 +2439,20 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io, if (rc) GOTO(out, rc); - cfs_spin_lock(&oap->oap_lock); + spin_lock(&oap->oap_lock); oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT; - cfs_spin_unlock(&oap->oap_lock); + spin_unlock(&oap->oap_lock); if (cfs_memory_pressure_get()) ext->oe_memalloc = 1; ext->oe_urgent = 1; - if (ext->oe_state == OES_CACHE && cfs_list_empty(&ext->oe_link)) { + if (ext->oe_state == OES_CACHE) { OSC_EXTENT_DUMP(D_CACHE, ext, "flush page %p make it urgent.\n", oap); - cfs_list_add_tail(&ext->oe_link, &obj->oo_urgent_exts); - unplug = 1; + if (cfs_list_empty(&ext->oe_link)) + cfs_list_add_tail(&ext->oe_link, &obj->oo_urgent_exts); + unplug = true; } rc = 0; EXIT; @@ -2568,7 +2583,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj, } osc_object_unlock(obj); - osc_io_unplug(env, cli, obj, PDL_POLICY_ROUND); + osc_io_unplug_async(env, cli, obj); RETURN(0); } @@ -2613,6 +2628,8 @@ again: break; } + OSC_EXTENT_DUMP(D_CACHE, ext, "try to trunc:"LPU64".\n", size); + osc_extent_get(ext); if (ext->oe_state == OES_ACTIVE) { /* though we grab inode mutex for write path, but we @@ -2677,13 +2694,17 @@ again: osc_extent_put(env, ext); } if (waiting != NULL) { - if (result == 0) - result = osc_extent_wait(env, waiting, OES_INV); + int rc; + + /* ignore the result of osc_extent_wait the write initiator + * should take care of it. */ + rc = osc_extent_wait(env, waiting, OES_INV); + if (rc < 0) + OSC_EXTENT_DUMP(D_CACHE, ext, "wait error: %d.\n", rc); osc_extent_put(env, waiting); waiting = NULL; - if (result == 0) - goto again; + goto again; } RETURN(result); } @@ -2698,6 +2719,8 @@ void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio, oio->oi_trunc = NULL; if (ext != NULL) { + bool unplug = false; + EASSERT(ext->oe_nr_pages > 0, ext); EASSERT(ext->oe_state == OES_TRUNC, ext); EASSERT(!ext->oe_urgent, ext); @@ -2708,12 +2731,14 @@ void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio, if (ext->oe_fsync_wait && !ext->oe_urgent) { ext->oe_urgent = 1; cfs_list_move_tail(&ext->oe_link, &obj->oo_urgent_exts); + unplug = true; } osc_update_pending(obj, OBD_BRW_WRITE, ext->oe_nr_pages); osc_object_unlock(obj); osc_extent_put(env, ext); - osc_list_maint(osc_cli(obj), obj); + if (unplug) + osc_io_unplug_async(env, osc_cli(obj), obj); } } @@ -2722,9 +2747,9 @@ void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio, * The caller must have called osc_cache_writeback_range() to issue IO * otherwise it will take a long time for this function to finish. * - * Caller must hold inode_mutex and i_alloc_sem, or cancel exclusive - * dlm lock so that nobody else can dirty this range of file while we're - * waiting for extents to be written. + * Caller must hold inode_mutex , or cancel exclusive dlm lock so that + * nobody else can dirty this range of file while we're waiting for + * extents to be written. */ int osc_cache_wait_range(const struct lu_env *env, struct osc_object *obj, pgoff_t start, pgoff_t end) @@ -2787,7 +2812,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj, { struct osc_extent *ext; CFS_LIST_HEAD(discard_list); - int unplug = 0; + bool unplug = false; int result = 0; ENTRY; @@ -2815,10 +2840,9 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj, ext->oe_urgent = 1; list = &obj->oo_urgent_exts; } - if (list != NULL) { + if (list != NULL) cfs_list_move_tail(&ext->oe_link, list); - unplug = 1; - } + unplug = true; } else { /* the only discarder is lock cancelling, so * [start, end] must contain this extent */