From 302c5bfebe61e988dbd27063becc4ef90befc6df Mon Sep 17 00:00:00 2001 From: Emoly Liu Date: Fri, 13 Feb 2015 10:15:48 +0800 Subject: [PATCH] LU-6173 llite: allocate and free client cache asynchronously Since the inflight request holds import refcount as well as export, sometimes obd_disconnect() in client_common_put_super() can't put the last refcount of OSC import (e.g. due to network disconnection), this will cause cl_cache being accessed after free. To fix this issue, ccc_users is used as cl_cache refcount, and lov/llite/osc all hold one cl_cache refcount respectively, to avoid the race that a new OST is being added into the system when the client is mounted. The following cl_cache functions are added: - cl_cache_init(): allocate and initialize cl_cache - cl_cache_incref(): increase cl_cache refcount - cl_cache_decref(): decrease cl_cache refcount and free the cache if refcount=0. Also, the fix of LU-2543 is not needed anymore, so reverted. Signed-off-by: Emoly Liu Change-Id: I22ff10b683b683d49d603e5dc2de3397746a79bb Reviewed-on: http://review.whamcloud.com/13746 Reviewed-by: Niu Yawei Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/include/cl_object.h | 9 +++++++- lustre/include/obd.h | 2 +- lustre/llite/llite_internal.h | 2 +- lustre/llite/llite_lib.c | 34 +++++++++++++----------------- lustre/llite/lproc_llite.c | 12 +++++------ lustre/lov/lov_obd.c | 7 +++++++ lustre/obdclass/cl_page.c | 49 +++++++++++++++++++++++++++++++++++++++++++ lustre/osc/osc_page.c | 5 +++-- lustre/osc/osc_request.c | 4 ++-- 9 files changed, 91 insertions(+), 33 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index a629a9b..09828e1 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -2327,7 +2327,8 @@ void cl_lock_descr_print(const struct lu_env *env, void *cookie, */ struct cl_client_cache { /** - * # of users (OSCs) + * # of client cache refcount + * # of users (OSCs) + 2 (held by llite and lov) */ atomic_t ccc_users; /** @@ -2364,6 +2365,12 @@ struct cl_client_cache { */ wait_queue_head_t ccc_unstable_waitq; }; +/** + * cl_cache functions + */ +struct cl_client_cache *cl_cache_init(unsigned long lru_page_max); +void cl_cache_incref(struct cl_client_cache *cache); +void cl_cache_decref(struct cl_client_cache *cache); /** @} cl_page */ diff --git a/lustre/include/obd.h b/lustre/include/obd.h index de554b8..15c64f4 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -428,7 +428,7 @@ struct lov_obd { enum lustre_sec_part lov_sp_me; /* Cached LRU and unstable data from upper layer */ - void *lov_cache; + struct cl_client_cache *lov_cache; struct rw_semaphore lov_notify_lock; }; diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 327cb36..551dc3e 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -522,7 +522,7 @@ struct ll_sb_info { * LRU list of clean pages. An "unstable" page is defined as * any page which is sent to a server as part of a bulk request, * but is uncommitted to stable storage. */ - struct cl_client_cache ll_cache; + struct cl_client_cache *ll_cache; struct lprocfs_stats *ll_ra_stats; diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index d5d96e5..a5dde6d 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -91,16 +91,11 @@ static struct ll_sb_info *ll_init_sbi(void) lru_page_max = pages / 2; /* initialize ll_cache data */ - atomic_set(&sbi->ll_cache.ccc_users, 0); - sbi->ll_cache.ccc_lru_max = lru_page_max; - atomic_long_set(&sbi->ll_cache.ccc_lru_left, lru_page_max); - spin_lock_init(&sbi->ll_cache.ccc_lru_lock); - INIT_LIST_HEAD(&sbi->ll_cache.ccc_lru); - - /* turn unstable check off by default as it impacts performance */ - sbi->ll_cache.ccc_unstable_check = 0; - atomic_long_set(&sbi->ll_cache.ccc_unstable_nr, 0); - init_waitqueue_head(&sbi->ll_cache.ccc_unstable_waitq); + sbi->ll_cache = cl_cache_init(lru_page_max); + if (sbi->ll_cache == NULL) { + OBD_FREE(sbi, sizeof(*sbi)); + RETURN(NULL); + } sbi->ll_ra_info.ra_max_pages_per_file = min(pages / 32, SBI_DEFAULT_READAHEAD_MAX); @@ -162,6 +157,10 @@ static void ll_free_sbi(struct super_block *sb) spin_unlock(&ll_sb_lock); if (!list_empty(&sbi->ll_squash.rsi_nosquash_nids)) cfs_free_nidlist(&sbi->ll_squash.rsi_nosquash_nids); + if (sbi->ll_cache != NULL) { + cl_cache_decref(sbi->ll_cache); + sbi->ll_cache = NULL; + } OBD_FREE(sbi, sizeof(*sbi)); } EXIT; @@ -577,8 +576,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, cl_sb_init(sb); err = obd_set_info_async(NULL, sbi->ll_dt_exp, sizeof(KEY_CACHE_SET), - KEY_CACHE_SET, sizeof(sbi->ll_cache), - &sbi->ll_cache, NULL); + KEY_CACHE_SET, sizeof(*sbi->ll_cache), + sbi->ll_cache, NULL); sb->s_root = d_make_root(root); if (sb->s_root == NULL) { @@ -624,8 +623,6 @@ out_lock_cn_cb: out_dt: obd_disconnect(sbi->ll_dt_exp); sbi->ll_dt_exp = NULL; - /* Make sure all OScs are gone, since cl_cache is accessing sbi. */ - obd_zombie_barrier(); out_md_fid: obd_fid_fini(sbi->ll_md_exp->exp_obd); out_md: @@ -795,9 +792,6 @@ static void client_common_put_super(struct super_block *sb) obd_fid_fini(sbi->ll_dt_exp->exp_obd); obd_disconnect(sbi->ll_dt_exp); sbi->ll_dt_exp = NULL; - /* wait till all OSCs are gone, since cl_cache is accessing sbi. - * see LU-2543. */ - obd_zombie_barrier(); lprocfs_unregister_mountpoint(sbi); @@ -1159,12 +1153,12 @@ void ll_put_super(struct super_block *sb) /* Wait for unstable pages to be committed to stable storage */ if (force == 0) { struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); - rc = l_wait_event(sbi->ll_cache.ccc_unstable_waitq, - atomic_long_read(&sbi->ll_cache.ccc_unstable_nr) == 0, + rc = l_wait_event(sbi->ll_cache->ccc_unstable_waitq, + atomic_long_read(&sbi->ll_cache->ccc_unstable_nr) == 0, &lwi); } - ccc_count = atomic_long_read(&sbi->ll_cache.ccc_unstable_nr); + ccc_count = atomic_long_read(&sbi->ll_cache->ccc_unstable_nr); if (force == 0 && rc != -EINTR) LASSERTF(ccc_count == 0, "count: %li\n", ccc_count); diff --git a/lustre/llite/lproc_llite.c b/lustre/llite/lproc_llite.c index e4241e4..6034164 100644 --- a/lustre/llite/lproc_llite.c +++ b/lustre/llite/lproc_llite.c @@ -396,7 +396,7 @@ static int ll_max_cached_mb_seq_show(struct seq_file *m, void *v) { struct super_block *sb = m->private; struct ll_sb_info *sbi = ll_s2sbi(sb); - struct cl_client_cache *cache = &sbi->ll_cache; + struct cl_client_cache *cache = sbi->ll_cache; int shift = 20 - PAGE_CACHE_SHIFT; long max_cached_mb; long unused_mb; @@ -423,7 +423,7 @@ ll_max_cached_mb_seq_write(struct file *file, const char __user *buffer, struct seq_file *m = file->private_data; struct super_block *sb = m->private; struct ll_sb_info *sbi = ll_s2sbi(sb); - struct cl_client_cache *cache = &sbi->ll_cache; + struct cl_client_cache *cache = sbi->ll_cache; struct lu_env *env; __u64 val; long diff = 0; @@ -907,7 +907,7 @@ static int ll_unstable_stats_seq_show(struct seq_file *m, void *v) { struct super_block *sb = m->private; struct ll_sb_info *sbi = ll_s2sbi(sb); - struct cl_client_cache *cache = &sbi->ll_cache; + struct cl_client_cache *cache = sbi->ll_cache; long pages; int mb; @@ -945,9 +945,9 @@ static ssize_t ll_unstable_stats_seq_write(struct file *file, return rc; /* borrow lru lock to set the value */ - spin_lock(&sbi->ll_cache.ccc_lru_lock); - sbi->ll_cache.ccc_unstable_check = !!val; - spin_unlock(&sbi->ll_cache.ccc_lru_lock); + spin_lock(&sbi->ll_cache->ccc_lru_lock); + sbi->ll_cache->ccc_unstable_check = !!val; + spin_unlock(&sbi->ll_cache->ccc_lru_lock); return count; } diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index ea39c92..0b62a00 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -951,6 +951,12 @@ static int lov_cleanup(struct obd_device *obd) lov->lov_tgt_size); lov->lov_tgt_size = 0; } + + if (lov->lov_cache != NULL) { + cl_cache_decref(lov->lov_cache); + lov->lov_cache = NULL; + } + RETURN(0); } @@ -1919,6 +1925,7 @@ static int lov_set_info_async(const struct lu_env *env, struct obd_export *exp, LASSERT(lov->lov_cache == NULL); lov->lov_cache = val; do_inactive = 1; + cl_cache_incref(lov->lov_cache); } for (i = 0; i < count; i++) { diff --git a/lustre/obdclass/cl_page.c b/lustre/obdclass/cl_page.c index b8639fd..b361c23 100644 --- a/lustre/obdclass/cl_page.c +++ b/lustre/obdclass/cl_page.c @@ -1152,3 +1152,52 @@ void cl_page_slice_add(struct cl_page *page, struct cl_page_slice *slice, EXIT; } EXPORT_SYMBOL(cl_page_slice_add); + +/** + * Allocate and initialize cl_cache, called by ll_init_sbi(). + */ +struct cl_client_cache *cl_cache_init(unsigned long lru_page_max) +{ + struct cl_client_cache *cache = NULL; + + ENTRY; + OBD_ALLOC(cache, sizeof(*cache)); + if (cache == NULL) + RETURN(NULL); + + /* Initialize cache data */ + atomic_set(&cache->ccc_users, 1); + cache->ccc_lru_max = lru_page_max; + atomic_long_set(&cache->ccc_lru_left, lru_page_max); + spin_lock_init(&cache->ccc_lru_lock); + INIT_LIST_HEAD(&cache->ccc_lru); + + /* turn unstable check off by default as it impacts performance */ + cache->ccc_unstable_check = 0; + atomic_long_set(&cache->ccc_unstable_nr, 0); + init_waitqueue_head(&cache->ccc_unstable_waitq); + + RETURN(cache); +} +EXPORT_SYMBOL(cl_cache_init); + +/** + * Increase cl_cache refcount + */ +void cl_cache_incref(struct cl_client_cache *cache) +{ + atomic_inc(&cache->ccc_users); +} +EXPORT_SYMBOL(cl_cache_incref); + +/** + * Decrease cl_cache refcount and free the cache if refcount=0. + * Since llite, lov and osc all hold cl_cache refcount, + * the free will not cause race. (LU-6173) + */ +void cl_cache_decref(struct cl_client_cache *cache) +{ + if (atomic_dec_and_test(&cache->ccc_users)) + OBD_FREE(cache, sizeof(*cache)); +} +EXPORT_SYMBOL(cl_cache_decref); diff --git a/lustre/osc/osc_page.c b/lustre/osc/osc_page.c index a1e90e2..9f9f2d0 100644 --- a/lustre/osc/osc_page.c +++ b/lustre/osc/osc_page.c @@ -521,7 +521,8 @@ static int osc_cache_too_much(struct client_obd *cli) long pages = atomic_long_read(&cli->cl_lru_in_list); unsigned long budget; - budget = cache->ccc_lru_max / atomic_read(&cache->ccc_users); + LASSERT(cache != NULL); + budget = cache->ccc_lru_max / (atomic_read(&cache->ccc_users) - 2); /* if it's going to run out LRU slots, we should free some, but not * too much to maintain faireness among OSCs. */ @@ -834,7 +835,7 @@ long osc_lru_reclaim(struct client_obd *cli) cache->ccc_lru_shrinkers++; list_move_tail(&cli->cl_lru_osc, &cache->ccc_lru); - max_scans = atomic_read(&cache->ccc_users); + max_scans = atomic_read(&cache->ccc_users) - 2; while (--max_scans > 0 && !list_empty(&cache->ccc_lru)) { cli = list_entry(cache->ccc_lru.next, struct client_obd, cl_lru_osc); diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index e60a1b0..7ed4225 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -2636,7 +2636,7 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, LASSERT(cli->cl_cache == NULL); /* only once */ cli->cl_cache = (struct cl_client_cache *)val; - atomic_inc(&cli->cl_cache->ccc_users); + cl_cache_incref(cli->cl_cache); cli->cl_lru_left = &cli->cl_cache->ccc_lru_left; /* add this osc into entity list */ @@ -3030,7 +3030,7 @@ int osc_cleanup(struct obd_device *obd) list_del_init(&cli->cl_lru_osc); spin_unlock(&cli->cl_cache->ccc_lru_lock); cli->cl_lru_left = NULL; - atomic_dec(&cli->cl_cache->ccc_users); + cl_cache_decref(cli->cl_cache); cli->cl_cache = NULL; } -- 1.8.3.1