#include "ptlrpc_internal.h"
-#define PPOOL_MIN_CHUNK_BITS 16 /* 2^16 bytes = 64KiB */
-#define PPOOL_MAX_CHUNK_BITS PTLRPC_MAX_BRW_BITS
-#define POOLS_COUNT (PPOOL_MAX_CHUNK_BITS - PPOOL_MIN_CHUNK_BITS + 1)
-#define PPOOL_ORDER_TO_INDEX(bits) ((bits) - PPOOL_MIN_CHUNK_BITS + 1)
-#define POOL_BITS(pool) ((pool) + PPOOL_MIN_CHUNK_BITS - 1)
-#define ELEMENT_SIZE(pool) (1 << (PPOOL_MIN_CHUNK_BITS + (pool) - 1))
+/* we have a pool for every power of 2 number of pages <= MAX_BRW_BITS.
+ * most pools will be unused, but that's OK - unused pools are very cheap
+ */
+#define POOLS_COUNT (PTLRPC_MAX_BRW_BITS + 1)
#define PAGES_TO_MiB(pages) ((pages) >> (20 - PAGE_SHIFT))
#define MiB_TO_PAGES(mb) ((mb) << (20 - PAGE_SHIFT))
/* deprecated - see pool_max_memory_mb below */
unsigned int ppp_waitqlen; /* wait queue length */
unsigned long ppp_pages_short; /* # of pages wanted of in-q users */
unsigned int ppp_growing:1; /* during adding pages */
+ unsigned int ppp_order; /* page pool order and index in pools
+ * array (element size is 2^order pages),
+ */
/*
* indicating how idle the pools are, from 0 to MAX_IDLE_IDX
struct mutex add_pages_mutex;
} **page_pools;
+static int element_size(struct ptlrpc_page_pool *pool)
+{
+ return 1 << pool->ppp_order;
+}
+
/*
* Keep old name (encrypt_page_pool vs page_pool) for compatibility with user
* tools pulling stats
*/
int encrypt_page_pools_seq_show(struct seq_file *m, void *v)
{
- struct ptlrpc_page_pool *pool = page_pools[PAGES_POOL];
+ struct ptlrpc_page_pool *pool = page_pools[0];
spin_lock(&pool->ppp_lock);
seq_printf(m, "physical pages: %lu\n"
*/
int page_pools_seq_show(struct seq_file *m, void *v)
{
- int pool_index;
+ int pool_order;
struct ptlrpc_page_pool *pool;
seq_printf(m, "physical_pages: %lu\n"
"pools:\n",
cfs_totalram_pages(), PAGES_PER_POOL);
- for (pool_index = 0; pool_index < POOLS_COUNT; pool_index++) {
- pool = page_pools[pool_index];
+ for (pool_order = 0; pool_order < POOLS_COUNT; pool_order++) {
+ pool = page_pools[pool_order];
if (!pool->ppp_st_access)
continue;
spin_lock(&pool->ppp_lock);
- seq_printf(m, " pool_%luk:\n"
+ seq_printf(m, " pool_%dk:\n"
" max_pages: %lu\n"
" max_pools: %u\n"
" total_pages: %lu\n"
" max_waitqueue_depth: %u\n"
" max_wait_time_ms: %lld\n"
" out_of_mem: %lu\n",
- (pool_index ? ELEMENT_SIZE(pool_index - 10) :
- PAGE_SIZE >> 10),
+ /* convert from bytes to KiB */
+ element_size(pool) >> 10,
pool->ppp_max_pages,
pool->ppp_max_pools,
pool->ppp_total_pages,
return 0;
}
-static void pool_release_free_pages(long npages, unsigned int pool_idx)
+static void pool_release_free_pages(long npages, struct ptlrpc_page_pool *pool)
{
int p_idx, g_idx;
int p_idx_max1, p_idx_max2;
- struct ptlrpc_page_pool *pool = page_pools[pool_idx];
LASSERT(npages > 0);
LASSERT(npages <= pool->ppp_free_pages);
LASSERT(pool->ppp_pools[p_idx]);
LASSERT(pool->ppp_pools[p_idx][g_idx] != NULL);
- if (pool_idx == PAGES_POOL)
+ if (pool->ppp_order == 0)
__free_page(pool->ppp_pools[p_idx][g_idx]);
else
OBD_FREE_LARGE(pool->ppp_pools[p_idx][g_idx],
- ELEMENT_SIZE(pool_idx));
+ element_size(pool));
pool->ppp_pools[p_idx][g_idx] = NULL;
if (++g_idx == PAGES_PER_POOL) {
}
}
-#define SEEKS_TO_INDEX(s) (((s)->seeks >> 8) & 0xff)
-#define INDEX_TO_SEEKS(i) (DEFAULT_SEEKS | (i << 8))
+#define SEEKS_TO_ORDER(s) (((s)->seeks >> 8) & 0xff)
+#define ORDER_TO_SEEKS(i) (DEFAULT_SEEKS | (i << 8))
/*
* we try to keep at least PTLRPC_MAX_BRW_PAGES pages in the pool.
*/
static unsigned long pool_shrink_count(struct shrinker *s,
struct shrink_control *sc)
{
- unsigned int pool_index = SEEKS_TO_INDEX(s);
- struct ptlrpc_page_pool *pool = page_pools[pool_index];
+ unsigned int pool_order = SEEKS_TO_ORDER(s);
+ struct ptlrpc_page_pool *pool = page_pools[pool_order];
/*
* if no pool access for a long time, we consider it's fully
* idle. A little race here is fine.
struct shrink_control *sc)
{
/* Get pool number passed as part of pool_shrinker_seeks value */
- unsigned int pool_index = SEEKS_TO_INDEX(s);
- struct ptlrpc_page_pool *pool = page_pools[pool_index];
+ unsigned int pool_order = SEEKS_TO_ORDER(s);
+ struct ptlrpc_page_pool *pool = page_pools[pool_order];
spin_lock(&pool->ppp_lock);
if (pool->ppp_free_pages <= PTLRPC_MAX_BRW_PAGES)
sc->nr_to_scan = min_t(unsigned long, sc->nr_to_scan,
pool->ppp_free_pages - PTLRPC_MAX_BRW_PAGES);
if (sc->nr_to_scan > 0) {
- pool_release_free_pages(sc->nr_to_scan, pool_index);
+ pool_release_free_pages(sc->nr_to_scan, pool);
CDEBUG(D_SEC, "released %ld pages, %ld left\n",
(long)sc->nr_to_scan, pool->ppp_free_pages);
/*
* return how many pages cleaned up.
*/
-static unsigned long pool_cleanup(void ***pools, int npools, int pool_idx)
+static unsigned long pool_cleanup(void ***pools, int npools,
+ struct ptlrpc_page_pool *pool)
{
unsigned long cleaned = 0;
int i, j;
if (pools[i]) {
for (j = 0; j < PAGES_PER_POOL; j++) {
if (pools[i][j]) {
- if (pool_idx == PAGES_POOL) {
+ if (pool->ppp_order == 0) {
__free_page(pools[i][j]);
} else {
OBD_FREE_LARGE(pools[i][j],
- ELEMENT_SIZE(pool_idx));
+ element_size(pool));
}
cleaned++;
}
* the simplest way to avoid complexity. It's not frequently called.
*/
static void pool_insert(void ***pools, int npools, int npages,
- unsigned int pool_idx)
+ struct ptlrpc_page_pool *page_pool)
{
int freeslot;
int op_idx, np_idx, og_idx, ng_idx;
int cur_npools, end_npools;
- struct ptlrpc_page_pool *page_pool = page_pools[pool_idx];
LASSERT(npages > 0);
LASSERT(page_pool->ppp_total_pages+npages <= page_pool->ppp_max_pages);
}
#define POOL_INIT_SIZE (PTLRPC_MAX_BRW_SIZE / 4)
-static int pool_add_pages(int npages, int pool_index)
+static int pool_add_pages(int npages, struct ptlrpc_page_pool *page_pool)
{
void ***pools;
int npools, alloced = 0;
int i, j, rc = -ENOMEM;
- struct ptlrpc_page_pool *page_pool = page_pools[pool_index];
+ unsigned int pool_order = page_pool->ppp_order;
- if (pool_index == PAGES_POOL) {
- if (npages < POOL_INIT_SIZE >> PAGE_SHIFT)
- npages = POOL_INIT_SIZE >> PAGE_SHIFT;
- } else {
- if (npages < POOL_INIT_SIZE / ELEMENT_SIZE(pool_index))
- npages = POOL_INIT_SIZE / ELEMENT_SIZE(pool_index);
- }
+ if (npages < POOL_INIT_SIZE / element_size(page_pool))
+ npages = POOL_INIT_SIZE / element_size(page_pool);
mutex_lock(&page_pool->add_pages_mutex);
goto out_pools;
for (j = 0; j < PAGES_PER_POOL && alloced < npages; j++) {
- if (pool_index == PAGES_POOL)
+ if (pool_order == 0)
pools[i][j] = alloc_page(GFP_NOFS |
__GFP_HIGHMEM);
else {
OBD_ALLOC_LARGE(pools[i][j],
- ELEMENT_SIZE(pool_index));
+ element_size(page_pool));
}
if (pools[i][j] == NULL)
goto out_pools;
}
LASSERT(alloced == npages);
- pool_insert(pools, npools, npages, pool_index);
+ pool_insert(pools, npools, npages, page_pool);
CDEBUG(D_SEC, "added %d pages into pools\n", npages);
OBD_FREE_PTR_ARRAY(pools, npools);
rc = 0;
out_pools:
if (rc) {
- pool_cleanup(pools, npools, pool_index);
+ pool_cleanup(pools, npools, page_pool);
}
out:
if (rc) {
return rc;
}
-static inline void pool_wakeup(unsigned int pool)
+static inline void pool_wakeup(struct ptlrpc_page_pool *pool)
{
- assert_spin_locked(&page_pools[pool]->ppp_lock);
+ assert_spin_locked(&pool->ppp_lock);
/* waitqueue_active */
- if (unlikely(waitqueue_active(&page_pools[pool]->ppp_waitq)))
- wake_up_all(&page_pools[pool]->ppp_waitq);
+ if (unlikely(waitqueue_active(&pool->ppp_waitq)))
+ wake_up_all(&pool->ppp_waitq);
}
-static int pool_should_grow(int needed, unsigned int pool_index)
+static int pool_should_grow(int needed, struct ptlrpc_page_pool *pool)
{
- struct ptlrpc_page_pool *pool = page_pools[pool_index];
-
/*
* don't grow if someone else is growing the pools right now,
* or the pools has reached its full capacity
}
/*
- * Export the number of free pages in the pool
+ * Export the number of free pages in the pool of 'order'
*/
-int sptlrpc_pool_get_free_pages(unsigned int pool)
+int sptlrpc_pool_get_free_pages(unsigned int order)
{
- return page_pools[pool]->ppp_free_pages;
+ return page_pools[order]->ppp_free_pages;
}
EXPORT_SYMBOL(sptlrpc_pool_get_free_pages);
/*
* Let outside world know if pool full capacity is reached
*/
-int __pool_is_at_full_capacity(unsigned int pool)
-{
- return (page_pools[pool]->ppp_total_pages ==
- page_pools[pool]->ppp_max_pages);
-}
-
-/*
- * Let outside world know if pool full capacity is reached
- */
-int pool_is_at_full_capacity(void)
+int pool_is_at_full_capacity(int order)
{
- return __pool_is_at_full_capacity(PAGES_POOL);
+ return (page_pools[order]->ppp_total_pages ==
+ page_pools[order]->ppp_max_pages);
}
EXPORT_SYMBOL(pool_is_at_full_capacity);
return (void **)array;
}
+static bool __grow_pool_try(int needed, struct ptlrpc_page_pool *pool);
+
/*
* we allocate the requested pages atomically.
*/
static inline int __sptlrpc_pool_get_pages(void *array, unsigned int count,
- unsigned int pool_idx,
+ unsigned int order,
void **(*page_from)(void *, int))
{
- struct ptlrpc_page_pool *page_pool = page_pools[pool_idx];
+ struct ptlrpc_page_pool *page_pool = page_pools[order];
wait_queue_entry_t waitlink;
unsigned long this_idle = -1;
u64 tick_ns = 0;
page_pool->ppp_st_missings++;
page_pool->ppp_pages_short += count;
- if (pool_should_grow(count, pool_idx)) {
- page_pool->ppp_growing = 1;
-
- spin_unlock(&page_pool->ppp_lock);
- CDEBUG(D_SEC, "ppp_pages_short: %lu\n",
- page_pool->ppp_pages_short);
- pool_add_pages(8, pool_idx);
- spin_lock(&page_pool->ppp_lock);
-
- page_pool->ppp_growing = 0;
-
- pool_wakeup(pool_idx);
- } else {
+ /* if we aren't able to add pages, check if someone else is
+ * growing the pool and sleep if so, otherwise we return
+ * ENOMEM because we can't sleep here waiting for other ops to
+ * complete (main user is ptlrpcd, which must not sleep waiting
+ * for other ops... technically sleeping for pool growth is
+ * also questionable but it's very unlikely in practice to get
+ * stuck from this)
+ *
+ * if ENOMEM is returned here, the RPC will go back in the queue
+ */
+ if (!__grow_pool_try(count, page_pool)) {
if (page_pool->ppp_growing) {
if (++page_pool->ppp_waitqlen >
page_pool->ppp_st_max_wqlen)
int rc;
LASSERT(desc->bd_iov_count > 0);
- LASSERT(desc->bd_iov_count <= page_pools[PAGES_POOL]->ppp_max_pages);
+ LASSERT(desc->bd_iov_count <= page_pools[0]->ppp_max_pages);
/* resent bulk, enc iov might have been allocated previously */
if (desc->bd_enc_vec != NULL)
if (desc->bd_enc_vec == NULL)
return -ENOMEM;
- rc = __sptlrpc_pool_get_pages((void *)desc, desc->bd_iov_count,
- PAGES_POOL, page_from_bulkdesc);
+ rc = __sptlrpc_pool_get_pages((void *)desc, desc->bd_iov_count, 0,
+ page_from_bulkdesc);
if (rc) {
OBD_FREE_LARGE(desc->bd_enc_vec,
desc->bd_iov_count *
int sptlrpc_pool_get_pages_array(struct page **pa, unsigned int count)
{
- return __sptlrpc_pool_get_pages((void *)pa, count, PAGES_POOL,
- page_from_pagearray);
+ return __sptlrpc_pool_get_pages((void *)pa, count, 0,
+ page_from_pagearray);
}
EXPORT_SYMBOL(sptlrpc_pool_get_pages_array);
int sptlrpc_pool_get_pages(void **pages, unsigned int order)
{
- return __sptlrpc_pool_get_pages((void *)pages, 1,
- PPOOL_ORDER_TO_INDEX(order),
- page_from_bufarray);
+ return __sptlrpc_pool_get_pages((void *)pages, 1, order,
+ page_from_bufarray);
}
EXPORT_SYMBOL(sptlrpc_pool_get_pages);
static int __sptlrpc_pool_put_pages(void *array, unsigned int count,
- unsigned int pool_idx,
- void **(*page_from)(void *, int))
+ unsigned int order,
+ void **(*page_from)(void *, int))
{
+ struct ptlrpc_page_pool *page_pool;
int p_idx, g_idx;
int i, rc = 0;
- struct ptlrpc_page_pool *page_pool;
- LASSERTF(pool_idx < POOLS_COUNT, "count %u, pool %u\n",
- count, pool_idx);
+ LASSERTF(order < POOLS_COUNT, "count %u, pool %u\n",
+ count, order);
if (!array) {
CERROR("Faled to put %u pages, from pool %u\n",
- count, pool_idx);
+ count, order);
return -EINVAL;
}
- page_pool = page_pools[pool_idx];
- LASSERTF(page_pool != NULL, "count %u, pool %u\n", count, pool_idx);
+ page_pool = page_pools[order];
+ LASSERTF(page_pool != NULL, "count %u, pool %u\n", count, order);
spin_lock(&page_pool->ppp_lock);
}
page_pool->ppp_free_pages += count;
- pool_wakeup(pool_idx);
+ pool_wakeup(page_pool);
out_unlock:
spin_unlock(&page_pool->ppp_lock);
if (desc->bd_enc_vec == NULL)
return;
- rc = __sptlrpc_pool_put_pages((void *)desc, desc->bd_iov_count,
- PAGES_POOL, page_from_bulkdesc);
+ rc = __sptlrpc_pool_put_pages((void *)desc, desc->bd_iov_count, 0,
+ page_from_bulkdesc);
if (rc)
CDEBUG(D_SEC, "error putting pages in pool: %d\n", rc);
{
int rc;
- rc = __sptlrpc_pool_put_pages((void *)pa, count, PAGES_POOL,
- page_from_pagearray);
+ rc = __sptlrpc_pool_put_pages((void *)pa, count, 0,
+ page_from_pagearray);
if (rc)
CDEBUG(D_SEC, "error putting pages in pool: %d\n", rc);
{
int rc;
- rc = __sptlrpc_pool_put_pages(buf, 1,
- PPOOL_ORDER_TO_INDEX(order),
- page_from_bufarray);
+ rc = __sptlrpc_pool_put_pages(buf, 1, order, page_from_bufarray);
if (rc)
CDEBUG(D_SEC, "error putting pages in pool: %d\n", rc);
}
EXPORT_SYMBOL(sptlrpc_pool_put_pages);
-
-/*
- * we don't do much stuff for add_user/del_user anymore, except adding some
- * initial pages in add_user() if current pools are empty, rest would be
- * handled by the pools's self-adaption.
- */
-int sptlrpc_pool_add_user(void)
+/* called with pool->ppp_lock held */
+static bool __grow_pool_try(int needed, struct ptlrpc_page_pool *pool)
{
- struct ptlrpc_page_pool *pool = page_pools[PAGES_POOL];
+ bool pool_grown = false;
+
+ assert_spin_locked(&pool->ppp_lock);
+
+ if (pool_should_grow(needed, pool)) {
+ unsigned int to_add;
+ int rc;
- spin_lock(&pool->ppp_lock);
- /* ask for 1 page - so if the pool is empty, it will grow
- * (this might also grow an in-use pool if it's full, which is fine)
- */
- if (pool_should_grow(1, PAGES_POOL)) {
pool->ppp_growing = 1;
+ /* the pool of single pages is grown a large amount on
+ * first use
+ */
+ if (pool->ppp_order == 0 &&
+ pool->ppp_total_pages == 0)
+ to_add = PTLRPC_MAX_BRW_PAGES * 2;
+ else /* otherwise, we add requested or at least 8 items */
+ to_add = max(needed, 8);
spin_unlock(&pool->ppp_lock);
- pool_add_pages(PTLRPC_MAX_BRW_PAGES * 2, PAGES_POOL);
+ CDEBUG(D_SEC,
+ "pool %d is %lu elements (size %d bytes), growing by %d items\n",
+ pool->ppp_order, pool->ppp_pages_short,
+ element_size(pool), to_add);
+ /* we can't hold a spinlock over page allocation */
+ rc = pool_add_pages(to_add, pool);
+ if (rc == 0)
+ pool_grown = true;
spin_lock(&pool->ppp_lock);
pool->ppp_growing = 0;
- pool_wakeup(PAGES_POOL);
+ pool_wakeup(pool);
}
+
+ return pool_grown;
+}
+
+static bool grow_pool_try(int needed, struct ptlrpc_page_pool *pool)
+{
+ bool rc;
+
+ spin_lock(&pool->ppp_lock);
+ rc = __grow_pool_try(needed, pool);
spin_unlock(&pool->ppp_lock);
- return 0;
+
+ return rc;
+}
+
+/*
+ * we don't do much stuff for add_user/del_user anymore, except adding some
+ * initial pages in add_user() if current pools are empty, rest would be
+ * handled by the pools's self-adaption.
+ */
+void sptlrpc_pool_add_user(void)
+{
+ struct ptlrpc_page_pool *pool = page_pools[0];
+
+ /* since this is startup, no one is waiting for these pages, so we
+ * don't worry about sucess or failure here
+ */
+ grow_pool_try(1, pool);
}
EXPORT_SYMBOL(sptlrpc_pool_add_user);
sizeof(*pool->ppp_pools));
}
-static inline void pool_free(unsigned int pool_index)
+static inline void pool_free(struct ptlrpc_page_pool *pool)
{
- struct ptlrpc_page_pool *pool = page_pools[pool_index];
-
LASSERT(pool->ppp_max_pools);
LASSERT(pool->ppp_pools);
int sptlrpc_pool_init(void)
{
- int pool_index = 0, to_revert;
- int rc = 0;
struct ptlrpc_page_pool *pool;
int pool_max_pages = cfs_totalram_pages() / POOLS_COUNT;
+ int pool_order = 0;
+ int to_revert;
+ int rc = 0;
ENTRY;
OBD_ALLOC(page_pools, POOLS_COUNT * sizeof(*page_pools));
if (page_pools == NULL)
RETURN(-ENOMEM);
- for (pool_index = 0; pool_index < POOLS_COUNT; pool_index++) {
- OBD_ALLOC(page_pools[pool_index], sizeof(**page_pools));
- if (page_pools[pool_index] == NULL)
+ for (pool_order = 0; pool_order < POOLS_COUNT; pool_order++) {
+ OBD_ALLOC(page_pools[pool_order], sizeof(**page_pools));
+ if (page_pools[pool_order] == NULL)
GOTO(fail, rc = -ENOMEM);
- pool = page_pools[pool_index];
+ pool = page_pools[pool_order];
pool->ppp_max_pages = pool_max_pages;
pool->ppp_max_pools =
pool->ppp_st_max_wait = ktime_set(0, 0);
pool_alloc(pool);
- CDEBUG(D_SEC, "Allocated pool %i\n", pool_index);
+ pool->ppp_order = pool_order;
+ CDEBUG(D_SEC, "Allocated pool %i\n", pool_order);
if (pool->ppp_pools == NULL)
GOTO(fail, rc = -ENOMEM);
/* Pass pool number as part of pool_shrinker_seeks value */
#else
pool->ppp_shops.shrink = pool_shrink;
#endif
- pool->ppp_shops.seeks = INDEX_TO_SEEKS(pool_index);
+ pool->ppp_shops.seeks = ORDER_TO_SEEKS(pool_order);
pool->pool_shrinker = ll_shrinker_create(&pool->ppp_shops, 0,
"sptlrpc_pool");
RETURN(0);
fail:
- to_revert = pool_index;
- for (pool_index = 0; pool_index <= to_revert; pool_index++) {
- pool = page_pools[pool_index];
+ to_revert = pool_order;
+ for (pool_order = 0; pool_order <= to_revert; pool_order++) {
+ pool = page_pools[pool_order];
if (pool) {
if (pool->ppp_pools)
- pool_free(pool_index);
+ pool_free(pool);
OBD_FREE(pool, sizeof(**page_pools));
}
}
void sptlrpc_pool_fini(void)
{
unsigned long cleaned, npools;
- int pool_index;
+ int pool_order;
struct ptlrpc_page_pool *pool;
- for (pool_index = 0; pool_index < POOLS_COUNT; pool_index++) {
- pool = page_pools[pool_index];
+ for (pool_order = 0; pool_order < POOLS_COUNT; pool_order++) {
+ pool = page_pools[pool_order];
shrinker_free(pool->pool_shrinker);
LASSERT(pool->ppp_pools);
LASSERT(pool->ppp_total_pages == pool->ppp_free_pages);
npools = npages_to_npools(pool->ppp_total_pages);
- cleaned = pool_cleanup(pool->ppp_pools, npools, pool_index);
+ cleaned = pool_cleanup(pool->ppp_pools, npools, pool);
LASSERT(cleaned == pool->ppp_total_pages);
- pool_free(pool_index);
+ pool_free(pool);
if (pool->ppp_st_access > 0) {
CDEBUG(D_SEC,