*/
struct cl_client_cache {
/**
- * # of users (OSCs)
+ * # of client cache refcount
+ * # of users (OSCs) + 2 (held by llite and lov)
*/
atomic_t ccc_users;
/**
*/
wait_queue_head_t ccc_unstable_waitq;
};
+/**
+ * cl_cache functions
+ */
+struct cl_client_cache *cl_cache_init(unsigned long lru_page_max);
+void cl_cache_incref(struct cl_client_cache *cache);
+void cl_cache_decref(struct cl_client_cache *cache);
/** @} cl_page */
enum lustre_sec_part lov_sp_me;
/* Cached LRU and unstable data from upper layer */
- void *lov_cache;
+ struct cl_client_cache *lov_cache;
struct rw_semaphore lov_notify_lock;
};
* LRU list of clean pages. An "unstable" page is defined as
* any page which is sent to a server as part of a bulk request,
* but is uncommitted to stable storage. */
- struct cl_client_cache ll_cache;
+ struct cl_client_cache *ll_cache;
struct lprocfs_stats *ll_ra_stats;
lru_page_max = pages / 2;
/* initialize ll_cache data */
- atomic_set(&sbi->ll_cache.ccc_users, 0);
- sbi->ll_cache.ccc_lru_max = lru_page_max;
- atomic_long_set(&sbi->ll_cache.ccc_lru_left, lru_page_max);
- spin_lock_init(&sbi->ll_cache.ccc_lru_lock);
- INIT_LIST_HEAD(&sbi->ll_cache.ccc_lru);
-
- /* turn unstable check off by default as it impacts performance */
- sbi->ll_cache.ccc_unstable_check = 0;
- atomic_long_set(&sbi->ll_cache.ccc_unstable_nr, 0);
- init_waitqueue_head(&sbi->ll_cache.ccc_unstable_waitq);
+ sbi->ll_cache = cl_cache_init(lru_page_max);
+ if (sbi->ll_cache == NULL) {
+ OBD_FREE(sbi, sizeof(*sbi));
+ RETURN(NULL);
+ }
sbi->ll_ra_info.ra_max_pages_per_file = min(pages / 32,
SBI_DEFAULT_READAHEAD_MAX);
spin_unlock(&ll_sb_lock);
if (!list_empty(&sbi->ll_squash.rsi_nosquash_nids))
cfs_free_nidlist(&sbi->ll_squash.rsi_nosquash_nids);
+ if (sbi->ll_cache != NULL) {
+ cl_cache_decref(sbi->ll_cache);
+ sbi->ll_cache = NULL;
+ }
OBD_FREE(sbi, sizeof(*sbi));
}
EXIT;
cl_sb_init(sb);
err = obd_set_info_async(NULL, sbi->ll_dt_exp, sizeof(KEY_CACHE_SET),
- KEY_CACHE_SET, sizeof(sbi->ll_cache),
- &sbi->ll_cache, NULL);
+ KEY_CACHE_SET, sizeof(*sbi->ll_cache),
+ sbi->ll_cache, NULL);
sb->s_root = d_make_root(root);
if (sb->s_root == NULL) {
out_dt:
obd_disconnect(sbi->ll_dt_exp);
sbi->ll_dt_exp = NULL;
- /* Make sure all OScs are gone, since cl_cache is accessing sbi. */
- obd_zombie_barrier();
out_md_fid:
obd_fid_fini(sbi->ll_md_exp->exp_obd);
out_md:
obd_fid_fini(sbi->ll_dt_exp->exp_obd);
obd_disconnect(sbi->ll_dt_exp);
sbi->ll_dt_exp = NULL;
- /* wait till all OSCs are gone, since cl_cache is accessing sbi.
- * see LU-2543. */
- obd_zombie_barrier();
lprocfs_unregister_mountpoint(sbi);
/* Wait for unstable pages to be committed to stable storage */
if (force == 0) {
struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
- rc = l_wait_event(sbi->ll_cache.ccc_unstable_waitq,
- atomic_long_read(&sbi->ll_cache.ccc_unstable_nr) == 0,
+ rc = l_wait_event(sbi->ll_cache->ccc_unstable_waitq,
+ atomic_long_read(&sbi->ll_cache->ccc_unstable_nr) == 0,
&lwi);
}
- ccc_count = atomic_long_read(&sbi->ll_cache.ccc_unstable_nr);
+ ccc_count = atomic_long_read(&sbi->ll_cache->ccc_unstable_nr);
if (force == 0 && rc != -EINTR)
LASSERTF(ccc_count == 0, "count: %li\n", ccc_count);
{
struct super_block *sb = m->private;
struct ll_sb_info *sbi = ll_s2sbi(sb);
- struct cl_client_cache *cache = &sbi->ll_cache;
+ struct cl_client_cache *cache = sbi->ll_cache;
int shift = 20 - PAGE_CACHE_SHIFT;
long max_cached_mb;
long unused_mb;
struct seq_file *m = file->private_data;
struct super_block *sb = m->private;
struct ll_sb_info *sbi = ll_s2sbi(sb);
- struct cl_client_cache *cache = &sbi->ll_cache;
+ struct cl_client_cache *cache = sbi->ll_cache;
struct lu_env *env;
__u64 val;
long diff = 0;
{
struct super_block *sb = m->private;
struct ll_sb_info *sbi = ll_s2sbi(sb);
- struct cl_client_cache *cache = &sbi->ll_cache;
+ struct cl_client_cache *cache = sbi->ll_cache;
long pages;
int mb;
return rc;
/* borrow lru lock to set the value */
- spin_lock(&sbi->ll_cache.ccc_lru_lock);
- sbi->ll_cache.ccc_unstable_check = !!val;
- spin_unlock(&sbi->ll_cache.ccc_lru_lock);
+ spin_lock(&sbi->ll_cache->ccc_lru_lock);
+ sbi->ll_cache->ccc_unstable_check = !!val;
+ spin_unlock(&sbi->ll_cache->ccc_lru_lock);
return count;
}
lov->lov_tgt_size);
lov->lov_tgt_size = 0;
}
+
+ if (lov->lov_cache != NULL) {
+ cl_cache_decref(lov->lov_cache);
+ lov->lov_cache = NULL;
+ }
+
RETURN(0);
}
LASSERT(lov->lov_cache == NULL);
lov->lov_cache = val;
do_inactive = 1;
+ cl_cache_incref(lov->lov_cache);
}
for (i = 0; i < count; i++) {
EXIT;
}
EXPORT_SYMBOL(cl_page_slice_add);
+
+/**
+ * Allocate and initialize cl_cache, called by ll_init_sbi().
+ */
+struct cl_client_cache *cl_cache_init(unsigned long lru_page_max)
+{
+ struct cl_client_cache *cache = NULL;
+
+ ENTRY;
+ OBD_ALLOC(cache, sizeof(*cache));
+ if (cache == NULL)
+ RETURN(NULL);
+
+ /* Initialize cache data */
+ atomic_set(&cache->ccc_users, 1);
+ cache->ccc_lru_max = lru_page_max;
+ atomic_long_set(&cache->ccc_lru_left, lru_page_max);
+ spin_lock_init(&cache->ccc_lru_lock);
+ INIT_LIST_HEAD(&cache->ccc_lru);
+
+ /* turn unstable check off by default as it impacts performance */
+ cache->ccc_unstable_check = 0;
+ atomic_long_set(&cache->ccc_unstable_nr, 0);
+ init_waitqueue_head(&cache->ccc_unstable_waitq);
+
+ RETURN(cache);
+}
+EXPORT_SYMBOL(cl_cache_init);
+
+/**
+ * Increase cl_cache refcount
+ */
+void cl_cache_incref(struct cl_client_cache *cache)
+{
+ atomic_inc(&cache->ccc_users);
+}
+EXPORT_SYMBOL(cl_cache_incref);
+
+/**
+ * Decrease cl_cache refcount and free the cache if refcount=0.
+ * Since llite, lov and osc all hold cl_cache refcount,
+ * the free will not cause race. (LU-6173)
+ */
+void cl_cache_decref(struct cl_client_cache *cache)
+{
+ if (atomic_dec_and_test(&cache->ccc_users))
+ OBD_FREE(cache, sizeof(*cache));
+}
+EXPORT_SYMBOL(cl_cache_decref);
long pages = atomic_long_read(&cli->cl_lru_in_list);
unsigned long budget;
- budget = cache->ccc_lru_max / atomic_read(&cache->ccc_users);
+ LASSERT(cache != NULL);
+ budget = cache->ccc_lru_max / (atomic_read(&cache->ccc_users) - 2);
/* if it's going to run out LRU slots, we should free some, but not
* too much to maintain faireness among OSCs. */
cache->ccc_lru_shrinkers++;
list_move_tail(&cli->cl_lru_osc, &cache->ccc_lru);
- max_scans = atomic_read(&cache->ccc_users);
+ max_scans = atomic_read(&cache->ccc_users) - 2;
while (--max_scans > 0 && !list_empty(&cache->ccc_lru)) {
cli = list_entry(cache->ccc_lru.next, struct client_obd,
cl_lru_osc);
LASSERT(cli->cl_cache == NULL); /* only once */
cli->cl_cache = (struct cl_client_cache *)val;
- atomic_inc(&cli->cl_cache->ccc_users);
+ cl_cache_incref(cli->cl_cache);
cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
/* add this osc into entity list */
list_del_init(&cli->cl_lru_osc);
spin_unlock(&cli->cl_cache->ccc_lru_lock);
cli->cl_lru_left = NULL;
- atomic_dec(&cli->cl_cache->ccc_users);
+ cl_cache_decref(cli->cl_cache);
cli->cl_cache = NULL;
}