From: Jinshan Xiong Date: Thu, 15 May 2014 16:47:11 +0000 (-0700) Subject: LU-4841 osc: revise unstable pages accounting X-Git-Tag: 2.5.60~2 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=48eceeec98da3db2c93d980bb5f7e1c1f7c333cc LU-4841 osc: revise unstable pages accounting A few changes are made in this patch for unstable pages tracking: 1. Remove kernel NFS unstable pages tracking because it killed performance 2. Track unstable pages as part of LRU cache. Otherwise Lustre can use much more memory than max_cached_mb 3. Remove obd_unstable_pages tracking to avoid using global atomic counter 4. Make unstable pages track optional. Tracking unstable pages is turned off by default, and can be controled by llite.*.unstable_stats. Signed-off-by: Jinshan Xiong Change-Id: I3bfae9376aac2e2c38539577d9db29a375bc95af Reviewed-on: http://review.whamcloud.com/10003 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Lai Siyao Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index 5ad0067..a037f3d 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -1058,22 +1058,32 @@ do { \ } \ } while (0) -static inline int __page_in_use(const struct cl_page *page, int refc) +static inline struct page *cl_page_vmpage(const struct cl_page *page) { - if (page->cp_type == CPT_CACHEABLE) - ++refc; - LASSERT(atomic_read(&page->cp_ref) > 0); - return (atomic_read(&page->cp_ref) > refc); + LASSERT(page->cp_vmpage != NULL); + return page->cp_vmpage; } -#define cl_page_in_use(pg) __page_in_use(pg, 1) -#define cl_page_in_use_noref(pg) __page_in_use(pg, 0) -static inline struct page *cl_page_vmpage(struct cl_page *page) +/** + * Check if a cl_page is in use. + * + * Client cache holds a refcount, this refcount will be dropped when + * the page is taken out of cache, see vvp_page_delete(). + */ +static inline bool __page_in_use(const struct cl_page *page, int refc) { - LASSERT(page->cp_vmpage != NULL); - return page->cp_vmpage; + return (atomic_read(&page->cp_ref) > refc + 1); } +/** + * Caller itself holds a refcount of cl_page. + */ +#define cl_page_in_use(pg) __page_in_use(pg, 1) +/** + * Caller doesn't hold a refcount. + */ +#define cl_page_in_use_noref(pg) __page_in_use(pg, 0) + /** @} cl_page */ /** \addtogroup cl_lock cl_lock diff --git a/lustre/include/lclient.h b/lustre/include/lclient.h index cf852f5..3016c08 100644 --- a/lustre/include/lclient.h +++ b/lustre/include/lclient.h @@ -457,14 +457,43 @@ void ccc_inode_lsm_put(struct inode *inode, struct lov_stripe_md *lsm); * layer for recovery purposes. */ struct cl_client_cache { - atomic_t ccc_users; /* # of users (OSCs) */ - cfs_list_t ccc_lru; /* LRU of cached clean pages */ - spinlock_t ccc_lru_lock; /* lock for list */ - atomic_t ccc_lru_left; /* # of LRU entries available */ - unsigned long ccc_lru_max; /* Max # of LRU entries */ - unsigned int ccc_lru_shrinkers; /* # of threads shrinking */ - atomic_t ccc_unstable_nr; /* # of pages pinned */ - wait_queue_head_t ccc_unstable_waitq; /* Signaled on BRW commit */ + /** + * # of users (OSCs) + */ + atomic_t ccc_users; + /** + * # of LRU entries available + */ + atomic_t ccc_lru_left; + /** + * List of entities(OSCs) for this LRU cache + */ + cfs_list_t ccc_lru; + /** + * Max # of LRU entries + */ + unsigned long ccc_lru_max; + /** + * Lock to protect ccc_lru list + */ + spinlock_t ccc_lru_lock; + /** + * # of threads are doing shrinking + */ + unsigned int ccc_lru_shrinkers; + /** + * Set if unstable check is enabled + */ + unsigned int ccc_unstable_check:1; + /** + * Waitq for awaiting unstable pages to reach zero. + * Used at umounting time and signaled on BRW commit + */ + wait_queue_head_t ccc_unstable_waitq; + /** + * # of unstable pages for this mount point + */ + atomic_t ccc_unstable_nr; }; enum { diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index bb156f2..e99c1f5 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -73,7 +73,6 @@ extern unsigned int at_history; extern int at_early_margin; extern int at_extra; extern unsigned int obd_max_dirty_pages; -extern atomic_t obd_unstable_pages; extern atomic_t obd_dirty_pages; extern atomic_t obd_dirty_transit_pages; extern unsigned int obd_alloc_fail_rate; diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index c5b6210..1c59285 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -97,6 +97,8 @@ static struct ll_sb_info *ll_init_sbi(void) spin_lock_init(&sbi->ll_cache.ccc_lru_lock); INIT_LIST_HEAD(&sbi->ll_cache.ccc_lru); + /* turn unstable check off by default as it impacts performance */ + sbi->ll_cache.ccc_unstable_check = 0; atomic_set(&sbi->ll_cache.ccc_unstable_nr, 0); init_waitqueue_head(&sbi->ll_cache.ccc_unstable_waitq); diff --git a/lustre/llite/lproc_llite.c b/lustre/llite/lproc_llite.c index 8595822..152e4a7 100644 --- a/lustre/llite/lproc_llite.c +++ b/lustre/llite/lproc_llite.c @@ -826,10 +826,44 @@ static int ll_unstable_stats_seq_show(struct seq_file *m, void *v) pages = atomic_read(&cache->ccc_unstable_nr); mb = (pages * PAGE_CACHE_SIZE) >> 20; - return seq_printf(m, "unstable_pages: %8d\n" - "unstable_mb: %8d\n", pages, mb); + return seq_printf(m, "unstable_check: %8d\n" + "unstable_pages: %8d\n" + "unstable_mb: %8d\n", + cache->ccc_unstable_check, pages, mb); } -LPROC_SEQ_FOPS_RO(ll_unstable_stats); + +static ssize_t ll_unstable_stats_seq_write(struct file *file, + const char __user *buffer, + size_t count, loff_t *unused) +{ + struct seq_file *seq = file->private_data; + struct ll_sb_info *sbi = ll_s2sbi((struct super_block *)seq->private); + char kernbuf[128]; + int val, rc; + + if (count == 0) + return 0; + if (count < 0 || count >= sizeof(kernbuf)) + return -EINVAL; + + if (copy_from_user(kernbuf, buffer, count)) + return -EFAULT; + kernbuf[count] = 0; + + buffer += lprocfs_find_named_value(kernbuf, "unstable_check:", &count) - + kernbuf; + rc = lprocfs_write_helper(buffer, count, &val); + if (rc < 0) + return rc; + + /* borrow lru lock to set the value */ + spin_lock(&sbi->ll_cache.ccc_lru_lock); + sbi->ll_cache.ccc_unstable_check = !!val; + spin_unlock(&sbi->ll_cache.ccc_lru_lock); + + return count; +} +LPROC_SEQ_FOPS(ll_unstable_stats); static int ll_root_squash_seq_show(struct seq_file *m, void *v) { diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c index f08fe35..ed02f4a 100644 --- a/lustre/obdclass/class_obd.c +++ b/lustre/obdclass/class_obd.c @@ -85,8 +85,6 @@ unsigned int obd_dump_on_eviction; EXPORT_SYMBOL(obd_dump_on_eviction); unsigned int obd_max_dirty_pages = 256; EXPORT_SYMBOL(obd_max_dirty_pages); -atomic_t obd_unstable_pages; -EXPORT_SYMBOL(obd_unstable_pages); atomic_t obd_dirty_pages; EXPORT_SYMBOL(obd_dirty_pages); unsigned int obd_timeout = OBD_TIMEOUT_DEFAULT; /* seconds */ diff --git a/lustre/osc/osc_cache.c b/lustre/osc/osc_cache.c index 5b94f37..545e0a3 100644 --- a/lustre/osc/osc_cache.c +++ b/lustre/osc/osc_cache.c @@ -1319,13 +1319,11 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, #define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do { \ struct client_obd *__tmp = (cli); \ CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %d/%d " \ - "unstable_pages: %d/%d dropped: %ld avail: %ld, " \ - "reserved: %ld, flight: %d } lru {in list: %d, " \ - "left: %d, waiters: %d }" fmt, \ + "dropped: %ld avail: %ld, reserved: %ld, flight: %d }" \ + "lru {in list: %d, left: %d, waiters: %d }" fmt, \ __tmp->cl_import->imp_obd->obd_name, \ __tmp->cl_dirty, __tmp->cl_dirty_max, \ atomic_read(&obd_dirty_pages), obd_max_dirty_pages, \ - atomic_read(&obd_unstable_pages), obd_max_dirty_pages, \ __tmp->cl_lost_grant, __tmp->cl_avail_grant, \ __tmp->cl_reserved_grant, __tmp->cl_w_in_flight, \ atomic_read(&__tmp->cl_lru_in_list), \ @@ -1478,8 +1476,7 @@ static int osc_enter_cache_try(struct client_obd *cli, return 0; if (cli->cl_dirty + PAGE_CACHE_SIZE <= cli->cl_dirty_max && - atomic_read(&obd_unstable_pages) + 1 + - atomic_read(&obd_dirty_pages) <= obd_max_dirty_pages) { + 1 + atomic_read(&obd_dirty_pages) <= obd_max_dirty_pages) { osc_consume_write_grant(cli, &oap->oap_brw_page); if (transient) { cli->cl_dirty_transit += PAGE_CACHE_SIZE; @@ -1612,8 +1609,7 @@ void osc_wake_cache_waiters(struct client_obd *cli) ocw->ocw_rc = -EDQUOT; /* we can't dirty more */ if ((cli->cl_dirty + PAGE_CACHE_SIZE > cli->cl_dirty_max) || - (atomic_read(&obd_unstable_pages) + 1 + - atomic_read(&obd_dirty_pages) > obd_max_dirty_pages)) { + (1 + atomic_read(&obd_dirty_pages) > obd_max_dirty_pages)) { CDEBUG(D_CACHE, "no dirty room: dirty: %ld " "osc max %ld, sys max %d\n", cli->cl_dirty, cli->cl_dirty_max, obd_max_dirty_pages); @@ -1782,80 +1778,6 @@ static void osc_process_ar(struct osc_async_rc *ar, __u64 xid, ar->ar_force_sync = 0; } -/* Performs "unstable" page accounting. This function balances the - * increment operations performed in osc_inc_unstable_pages. It is - * registered as the RPC request callback, and is executed when the - * bulk RPC is committed on the server. Thus at this point, the pages - * involved in the bulk transfer are no longer considered unstable. */ -void osc_dec_unstable_pages(struct ptlrpc_request *req) -{ - struct ptlrpc_bulk_desc *desc = req->rq_bulk; - struct client_obd *cli = &req->rq_import->imp_obd->u.cli; - int page_count = desc->bd_iov_count; - int i; - - /* No unstable page tracking */ - if (cli->cl_cache == NULL) - return; - - LASSERT(page_count >= 0); - - for (i = 0; i < page_count; i++) - dec_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS); - - atomic_sub(page_count, &cli->cl_cache->ccc_unstable_nr); - LASSERT(atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0); - - atomic_sub(page_count, &cli->cl_unstable_count); - LASSERT(atomic_read(&cli->cl_unstable_count) >= 0); - - atomic_sub(page_count, &obd_unstable_pages); - LASSERT(atomic_read(&obd_unstable_pages) >= 0); - - wake_up_all(&cli->cl_cache->ccc_unstable_waitq); -} - -/* "unstable" page accounting. See: osc_dec_unstable_pages. */ -void osc_inc_unstable_pages(struct ptlrpc_request *req) -{ - struct ptlrpc_bulk_desc *desc = req->rq_bulk; - struct client_obd *cli = &req->rq_import->imp_obd->u.cli; - int page_count = desc->bd_iov_count; - int i; - - /* No unstable page tracking */ - if (cli->cl_cache == NULL) - return; - - LASSERT(page_count >= 0); - - for (i = 0; i < page_count; i++) - inc_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS); - - LASSERT(atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0); - atomic_add(page_count, &cli->cl_cache->ccc_unstable_nr); - - LASSERT(atomic_read(&cli->cl_unstable_count) >= 0); - atomic_add(page_count, &cli->cl_unstable_count); - - LASSERT(atomic_read(&obd_unstable_pages) >= 0); - atomic_add(page_count, &obd_unstable_pages); - - /* If the request has already been committed (i.e. brw_commit - * called via rq_commit_cb), we need to undo the unstable page - * increments we just performed because rq_commit_cb wont be - * called again. */ - spin_lock(&req->rq_lock); - if (unlikely(req->rq_committed)) { - spin_unlock(&req->rq_lock); - - osc_dec_unstable_pages(req); - } else { - req->rq_unstable = 1; - spin_unlock(&req->rq_lock); - } -} - /* this must be called holding the loi list lock to give coverage to exit_cache, * async_flag maintenance, and oap_request */ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli, @@ -1867,9 +1789,6 @@ static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli, ENTRY; if (oap->oap_request != NULL) { - if (rc == 0) - osc_inc_unstable_pages(oap->oap_request); - xid = ptlrpc_req_xid(oap->oap_request); ptlrpc_req_finished(oap->oap_request); oap->oap_request = NULL; @@ -2357,9 +2276,6 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, RETURN(rc); } - if (osc_over_unstable_soft_limit(cli)) - brw_flags |= OBD_BRW_SOFT_SYNC; - oap->oap_cmd = cmd; oap->oap_page_off = ops->ops_from; oap->oap_count = ops->ops_to - ops->ops_from; diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h index 483ac5f..347371f 100644 --- a/lustre/osc/osc_internal.h +++ b/lustre/osc/osc_internal.h @@ -209,5 +209,5 @@ int osc_quota_poll_check(struct obd_export *exp, struct if_quotacheck *qchk); void osc_inc_unstable_pages(struct ptlrpc_request *req); void osc_dec_unstable_pages(struct ptlrpc_request *req); -int osc_over_unstable_soft_limit(struct client_obd *cli); +bool osc_over_unstable_soft_limit(struct client_obd *cli); #endif /* OSC_INTERNAL_H */ diff --git a/lustre/osc/osc_page.c b/lustre/osc/osc_page.c index eb054a9..8a77710 100644 --- a/lustre/osc/osc_page.c +++ b/lustre/osc/osc_page.c @@ -457,30 +457,6 @@ int osc_page_init(const struct lu_env *env, struct cl_object *obj, return result; } -int osc_over_unstable_soft_limit(struct client_obd *cli) -{ - long obd_upages, obd_dpages, osc_upages; - - /* Can't check cli->cl_unstable_count, therefore, no soft limit */ - if (cli == NULL) - return 0; - - obd_upages = atomic_read(&obd_unstable_pages); - obd_dpages = atomic_read(&obd_dirty_pages); - - osc_upages = atomic_read(&cli->cl_unstable_count); - - /* obd_max_dirty_pages is the max number of (dirty + unstable) - * pages allowed at any given time. To simulate an unstable page - * only limit, we subtract the current number of dirty pages - * from this max. This difference is roughly the amount of pages - * currently available for unstable pages. Thus, the soft limit - * is half of that difference. Check osc_upages to ensure we don't - * set SOFT_SYNC for OSCs without any outstanding unstable pages. */ - return osc_upages != 0 && - obd_upages >= (obd_max_dirty_pages - obd_dpages) / 2; -} - /** * Helper function called by osc_io_submit() for every page in an immediate * transfer (i.e., transferred synchronously). @@ -504,9 +480,6 @@ void osc_page_submit(const struct lu_env *env, struct osc_page *opg, oap->oap_count = opg->ops_to - opg->ops_from; oap->oap_brw_flags = OBD_BRW_SYNC | brw_flags; - if (osc_over_unstable_soft_limit(oap->oap_cli)) - oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC; - if (!client_is_remote(osc_export(obj)) && cfs_capable(CFS_CAP_SYS_RESOURCE)) { oap->oap_brw_flags |= OBD_BRW_NOQUOTA; @@ -671,6 +644,28 @@ static void discard_pagevec(const struct lu_env *env, struct cl_io *io, } /** + * Check if a cl_page can be released, i.e, it's not being used. + * + * If unstable account is turned on, bulk transfer may hold one refcount + * for recovery so we need to check vmpage refcount as well; otherwise, + * even we can destroy cl_page but the corresponding vmpage can't be reused. + */ +static inline bool lru_page_busy(struct client_obd *cli, struct cl_page *page) +{ + if (cl_page_in_use_noref(page)) + return true; + + if (cli->cl_cache->ccc_unstable_check) { + struct page *vmpage = cl_page_vmpage(page); + + /* vmpage have two known users: cl_page and VM page cache */ + if (page_count(vmpage) - page_mapcount(vmpage) > 2) + return true; + } + return false; +} + +/** * Drop @target of pages from LRU at most. */ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli, @@ -717,7 +712,7 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli, opg = list_entry(cli->cl_lru_list.next, struct osc_page, ops_lru); page = opg->ops_cl.cpl_page; - if (cl_page_in_use_noref(page)) { + if (lru_page_busy(cli, page)) { list_move_tail(&opg->ops_lru, &cli->cl_lru_list); continue; } @@ -753,7 +748,7 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli, } if (cl_page_own_try(env, io, page) == 0) { - if (!cl_page_in_use_noref(page)) { + if (!lru_page_busy(cli, page)) { /* remove it from lru list earlier to avoid * lock contention */ __osc_lru_del(cli, opg); @@ -873,6 +868,13 @@ out: return rc; } +/** + * osc_lru_reserve() is called to reserve an LRU slot for a cl_page. + * + * Usually the LRU slots are reserved in osc_io_iter_rw_init(). + * Only in the case that the LRU slots are in extreme shortage, it should + * have reserved enough slots for an IO. + */ static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj, struct osc_page *opg) { @@ -918,4 +920,146 @@ out: RETURN(rc); } +/** + * Atomic operations are expensive. We accumulate the accounting for the + * same page zone to get better performance. + * In practice this can work pretty good because the pages in the same RPC + * are likely from the same page zone. + */ +static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc, + int factor) +{ + obd_count page_count = desc->bd_iov_count; + void *zone = NULL; + int count = 0; + int i; + + for (i = 0; i < page_count; i++) { + void *pz = page_zone(desc->bd_iov[i].kiov_page); + + if (likely(pz == zone)) { + ++count; + continue; + } + + if (count > 0) { + mod_zone_page_state(zone, NR_UNSTABLE_NFS, + factor * count); + count = 0; + } + zone = pz; + ++count; + } + if (count > 0) + mod_zone_page_state(zone, NR_UNSTABLE_NFS, factor * count); +} + +static inline void add_unstable_page_accounting(struct ptlrpc_bulk_desc *desc) +{ + unstable_page_accounting(desc, 1); +} + +static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc) +{ + unstable_page_accounting(desc, -1); +} + +/** + * Performs "unstable" page accounting. This function balances the + * increment operations performed in osc_inc_unstable_pages. It is + * registered as the RPC request callback, and is executed when the + * bulk RPC is committed on the server. Thus at this point, the pages + * involved in the bulk transfer are no longer considered unstable. + * + * If this function is called, the request should have been committed + * or req:rq_unstable must have been set; it implies that the unstable + * statistic have been added. + */ +void osc_dec_unstable_pages(struct ptlrpc_request *req) +{ + struct ptlrpc_bulk_desc *desc = req->rq_bulk; + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + obd_count page_count = desc->bd_iov_count; + int unstable_count; + + LASSERT(page_count >= 0); + dec_unstable_page_accounting(desc); + + unstable_count = atomic_sub_return(page_count, &cli->cl_unstable_count); + LASSERT(unstable_count >= 0); + + unstable_count = atomic_sub_return(page_count, + &cli->cl_cache->ccc_unstable_nr); + LASSERT(unstable_count >= 0); + if (unstable_count == 0) + wake_up_all(&cli->cl_cache->ccc_unstable_waitq); + + if (osc_cache_too_much(cli)) + (void)ptlrpcd_queue_work(cli->cl_lru_work); +} + +/** + * "unstable" page accounting. See: osc_dec_unstable_pages. + */ +void osc_inc_unstable_pages(struct ptlrpc_request *req) +{ + struct ptlrpc_bulk_desc *desc = req->rq_bulk; + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + obd_count page_count = desc->bd_iov_count; + + /* No unstable page tracking */ + if (cli->cl_cache == NULL || !cli->cl_cache->ccc_unstable_check) + return; + + add_unstable_page_accounting(desc); + atomic_add(page_count, &cli->cl_unstable_count); + atomic_add(page_count, &cli->cl_cache->ccc_unstable_nr); + + /* If the request has already been committed (i.e. brw_commit + * called via rq_commit_cb), we need to undo the unstable page + * increments we just performed because rq_commit_cb wont be + * called again. */ + spin_lock(&req->rq_lock); + if (unlikely(req->rq_committed)) { + spin_unlock(&req->rq_lock); + + osc_dec_unstable_pages(req); + } else { + req->rq_unstable = 1; + spin_unlock(&req->rq_lock); + } +} + +/** + * Check if it piggybacks SOFT_SYNC flag to OST from this OSC. + * This function will be called by every BRW RPC so it's critical + * to make this function fast. + */ +bool osc_over_unstable_soft_limit(struct client_obd *cli) +{ + long unstable_nr, osc_unstable_count; + + /* Can't check cli->cl_unstable_count, therefore, no soft limit */ + if (cli->cl_cache == NULL || !cli->cl_cache->ccc_unstable_check) + return false; + + osc_unstable_count = atomic_read(&cli->cl_unstable_count); + unstable_nr = atomic_read(&cli->cl_cache->ccc_unstable_nr); + + CDEBUG(D_CACHE, + "%s: cli: %p unstable pages: %lu, osc unstable pages: %lu\n", + cli->cl_import->imp_obd->obd_name, cli, + unstable_nr, osc_unstable_count); + + /* If the LRU slots are in shortage - 25% remaining AND this OSC + * has one full RPC window of unstable pages, it's a good chance + * to piggyback a SOFT_SYNC flag. + * Please notice that the OST won't take immediate response for the + * SOFT_SYNC request so active OSCs will have more chance to carry + * the flag, this is reasonable. */ + return unstable_nr > cli->cl_cache->ccc_lru_max >> 2 && + osc_unstable_count > cli->cl_max_pages_per_rpc * + cli->cl_max_rpcs_in_flight; +} + /** @} osc */ diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index b93af0a..10f9328 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -818,16 +818,14 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, CERROR("dirty %lu - %lu > dirty_max %lu\n", cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max); oa->o_undirty = 0; - } else if (unlikely(atomic_read(&obd_unstable_pages) + - atomic_read(&obd_dirty_pages) - + } else if (unlikely(atomic_read(&obd_dirty_pages) - atomic_read(&obd_dirty_transit_pages) > (long)(obd_max_dirty_pages + 1))) { /* The atomic_read() allowing the atomic_inc() are * not covered by a lock thus they may safely race and trip * this CERROR() unless we add in a small fudge factor (+1). */ - CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n", + CERROR("%s: dirty %d - %d > system dirty_max %d\n", cli->cl_import->imp_obd->obd_name, - atomic_read(&obd_unstable_pages), atomic_read(&obd_dirty_pages), atomic_read(&obd_dirty_transit_pages), obd_max_dirty_pages); @@ -1834,6 +1832,9 @@ static int brw_interpret(const struct lu_env *env, } OBDO_FREE(aa->aa_oa); + if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0) + osc_inc_unstable_pages(req); + list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) { list_del_init(&ext->oe_link); osc_extent_finish(env, ext, 1, rc); @@ -1903,6 +1904,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, int mpflag = 0; int mem_tight = 0; int page_count = 0; + bool soft_sync = false; int i; int rc; struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); @@ -1930,6 +1932,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, } } + soft_sync = osc_over_unstable_soft_limit(cli); if (mem_tight) mpflag = cfs_memory_pressure_get_and_set(); @@ -1956,6 +1959,8 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, } if (mem_tight) oap->oap_brw_flags |= OBD_BRW_MEMALLOC; + if (soft_sync) + oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC; pga[i] = &oap->oap_brw_page; pga[i]->off = oap->oap_obj_off + oap->oap_page_off; CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",