Whamcloud - gitweb
LU-16724 ptlrpc: refactor page pools patch 2 45/52645/10
authorPatrick Farrell <paf0187@gmail.com>
Wed, 27 Mar 2024 21:24:18 +0000 (17:24 -0400)
committerOleg Drokin <green@whamcloud.com>
Mon, 15 Apr 2024 16:51:46 +0000 (16:51 +0000)
This is a combined series that refactors the page pools
code to make it more readable.  It used to be many
separate patches but has been combined in to just three,
and this is the second.

LU-16724 ptlrpc: stop passing around pool_index

We pass pool_index around from function to function over
and over, but it's easier to just pass the pool around.

This does require the pool to know its own index, but
that seems better anyway.

LU-16724 ptlrpc: convert to void

Convert functions without meaningful return to void.

Change-Id-Was: I81f0baefd5b77b60ba699fa8749eaa83acadd8dd

LU-16724 ptlrpc: refactor pool growing code

This refactors the pool growing code, combining two
separate instances of it in to a single function.

Change-Id-Was: I175abc7e61d55563e989f87207a8c59da852f5f9

LU-16724 ptlrpc: replace ELEMENT_SIZE

The ELEMENT_SIZE macro is fine, but it takes a pool index
and doesn't handle the pool of order 0.  Change it to a
function.  (This is marginally less efficient in one spot,
since it replaces a shift with a divide, but it should be
just fine.)

Change-Id-Was: I322037e50bbdb8e0274b37f82618b6907b6d2906

LU-16724 ptlrpc: simplify pool arrays

Currently, we do a fancy trick where we have a pool of
order 0, then subsequent pools start at
PPOOL_MIN_CHUNK_BITS (which is actually the minimum
compresison size).

So pool index 1 isn't a pool of order 1 (2 pages), it's a
pool of order PPOOL_MIN_CHUNK_BITS.

All this saves us is the cost of the empty pools below
PPOOL_MIN_CHUNK_BITS, but it makes the code notably harder
to read.

With this change, the order of the pool and the pool index
are the same.  This simplification will be embraced more
in subsequent patches.

Change-Id-Was: I650e05d25727f10b0ca2d556cba17e9c4fccc309

LU-16724 ptlrpc: begin renaming pool_index to order

Replace local variables for pool_index with pool_order.

Other renames will be in a subsequent patch, to keep these
as simple as possible.

Change-Id-Was: If347ff39776f9a75c0f7d9d9981d01e19bc2cbc9

LU-16724 ptlrpc: rename ppp_index to ppp_order

Rename ppp_index to ppp_order.

Other renames will be in a subsequent patch, to keep these
as simple as possible.

Change-Id-Was: I96559e27a67b7cc4e56e06378e5686370438850c

LU-16724 ptlrpc: rename INDEX macros

Rename INDEX macros to ORDER.

Change-Id-Was: Ic1123d25bc855dc7671c9cb587a0d6680662b729

LU-16724 ptlrpc: remove PAGES_POOL macro

PAGES_POOL is just the order 0 pool now, so remove the
special naming, and adjust a few associated functions.

Change-Id-Was: I09e1debeadecbce33c7be43a8859815084623358

Signed-off-by: Patrick Farrell <patrick.farrell@oracle.com>
Change-Id: I42dc8b8094212c69b7a29cc3766bd0a10860f7af
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/52645
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Artem Blagodarenko <ablagodarenko@ddn.com>
lustre/include/lustre_sec.h
lustre/ptlrpc/client.c
lustre/ptlrpc/sec_bulk.c

index bad288e..94ceabf 100644 (file)
@@ -1191,16 +1191,15 @@ int sptlrpc_cli_install_rvs_ctx(struct obd_import *imp,
                                struct ptlrpc_cli_ctx *ctx);
 
 /* bulk security api */
-#define PAGES_POOL 0
-int sptlrpc_pool_add_user(void);
+void sptlrpc_pool_add_user(void);
 int sptlrpc_pool_get_desc_pages(struct ptlrpc_bulk_desc *desc);
 int sptlrpc_pool_get_pages_array(struct page **pa, unsigned int count);
 int sptlrpc_pool_get_pages(void **buf, unsigned int order);
 void sptlrpc_pool_put_desc_pages(struct ptlrpc_bulk_desc *desc);
 void sptlrpc_pool_put_pages_array(struct page **pa, unsigned int count);
 void sptlrpc_pool_put_pages(void *buf, unsigned int order);
-int sptlrpc_pool_get_free_pages(unsigned int pool);
-int pool_is_at_full_capacity(void);
+int sptlrpc_pool_get_free_pages(unsigned int order);
+int pool_is_at_full_capacity(int order);
 
 int sptlrpc_cli_wrap_bulk(struct ptlrpc_request *req,
                          struct ptlrpc_bulk_desc *desc);
index 4efeb20..fcfd417 100644 (file)
@@ -1670,11 +1670,11 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
        ENTRY;
        LASSERT(req->rq_phase == RQ_PHASE_NEW);
 
-       /* do not try to go further if there is not enough memory in enc_pool */
+       /* do not try to go further if there is not enough memory in pool */
        if (req->rq_sent && req->rq_bulk)
                if (req->rq_bulk->bd_iov_count >
-                   sptlrpc_pool_get_free_pages(PAGES_POOL) &&
-                   pool_is_at_full_capacity())
+                   sptlrpc_pool_get_free_pages(0) &&
+                   pool_is_at_full_capacity(0))
                        RETURN(-ENOMEM);
 
        if (req->rq_sent && (req->rq_sent > ktime_get_real_seconds()) &&
index d8d8080..96590ca 100644 (file)
 
 #include "ptlrpc_internal.h"
 
-#define PPOOL_MIN_CHUNK_BITS 16 /* 2^16 bytes = 64KiB */
-#define PPOOL_MAX_CHUNK_BITS PTLRPC_MAX_BRW_BITS
-#define POOLS_COUNT (PPOOL_MAX_CHUNK_BITS - PPOOL_MIN_CHUNK_BITS + 1)
-#define PPOOL_ORDER_TO_INDEX(bits) ((bits) - PPOOL_MIN_CHUNK_BITS + 1)
-#define POOL_BITS(pool) ((pool) + PPOOL_MIN_CHUNK_BITS - 1)
-#define ELEMENT_SIZE(pool) (1 << (PPOOL_MIN_CHUNK_BITS + (pool) - 1))
+/* we have a pool for every power of 2 number of pages <= MAX_BRW_BITS.
+ * most pools will be unused, but that's OK - unused pools are very cheap
+ */
+#define POOLS_COUNT (PTLRPC_MAX_BRW_BITS + 1)
 #define PAGES_TO_MiB(pages)    ((pages) >> (20 - PAGE_SHIFT))
 #define MiB_TO_PAGES(mb)       ((mb) << (20 - PAGE_SHIFT))
 /* deprecated - see pool_max_memory_mb below */
@@ -89,6 +87,9 @@ static struct ptlrpc_page_pool {
        unsigned int ppp_waitqlen;    /* wait queue length */
        unsigned long ppp_pages_short; /* # of pages wanted of in-q users */
        unsigned int ppp_growing:1;   /* during adding pages */
+       unsigned int ppp_order;       /* page pool order and index in pools
+                                      * array (element size is 2^order pages),
+                                      */
 
        /*
         * indicating how idle the pools are, from 0 to MAX_IDLE_IDX
@@ -131,6 +132,11 @@ static struct ptlrpc_page_pool {
        struct mutex add_pages_mutex;
 } **page_pools;
 
+static int element_size(struct ptlrpc_page_pool *pool)
+{
+       return 1 << pool->ppp_order;
+}
+
 /*
  * Keep old name (encrypt_page_pool vs page_pool) for compatibility with user
  * tools pulling stats
@@ -139,7 +145,7 @@ static struct ptlrpc_page_pool {
  */
 int encrypt_page_pools_seq_show(struct seq_file *m, void *v)
 {
-       struct ptlrpc_page_pool *pool = page_pools[PAGES_POOL];
+       struct ptlrpc_page_pool *pool = page_pools[0];
 
        spin_lock(&pool->ppp_lock);
        seq_printf(m, "physical pages:          %lu\n"
@@ -189,7 +195,7 @@ int encrypt_page_pools_seq_show(struct seq_file *m, void *v)
  */
 int page_pools_seq_show(struct seq_file *m, void *v)
 {
-       int pool_index;
+       int pool_order;
        struct ptlrpc_page_pool *pool;
 
        seq_printf(m, "physical_pages: %lu\n"
@@ -197,12 +203,12 @@ int page_pools_seq_show(struct seq_file *m, void *v)
                      "pools:\n",
                      cfs_totalram_pages(), PAGES_PER_POOL);
 
-       for (pool_index = 0; pool_index < POOLS_COUNT; pool_index++) {
-               pool = page_pools[pool_index];
+       for (pool_order = 0; pool_order < POOLS_COUNT; pool_order++) {
+               pool = page_pools[pool_order];
                if (!pool->ppp_st_access)
                        continue;
                spin_lock(&pool->ppp_lock);
-               seq_printf(m, "  pool_%luk:\n"
+               seq_printf(m, "  pool_%dk:\n"
                           "    max_pages: %lu\n"
                           "    max_pools: %u\n"
                           "    total_pages: %lu\n"
@@ -220,8 +226,8 @@ int page_pools_seq_show(struct seq_file *m, void *v)
                           "    max_waitqueue_depth: %u\n"
                           "    max_wait_time_ms: %lld\n"
                           "    out_of_mem: %lu\n",
-                          (pool_index ? ELEMENT_SIZE(pool_index - 10) :
-                          PAGE_SIZE >> 10),
+                          /* convert from bytes to KiB */
+                          element_size(pool) >> 10,
                           pool->ppp_max_pages,
                           pool->ppp_max_pools,
                           pool->ppp_total_pages,
@@ -245,11 +251,10 @@ int page_pools_seq_show(struct seq_file *m, void *v)
        return 0;
 }
 
-static void pool_release_free_pages(long npages, unsigned int pool_idx)
+static void pool_release_free_pages(long npages, struct ptlrpc_page_pool *pool)
 {
        int p_idx, g_idx;
        int p_idx_max1, p_idx_max2;
-       struct ptlrpc_page_pool *pool = page_pools[pool_idx];
 
        LASSERT(npages > 0);
        LASSERT(npages <= pool->ppp_free_pages);
@@ -273,11 +278,11 @@ static void pool_release_free_pages(long npages, unsigned int pool_idx)
                LASSERT(pool->ppp_pools[p_idx]);
                LASSERT(pool->ppp_pools[p_idx][g_idx] != NULL);
 
-               if (pool_idx == PAGES_POOL)
+               if (pool->ppp_order == 0)
                        __free_page(pool->ppp_pools[p_idx][g_idx]);
                else
                        OBD_FREE_LARGE(pool->ppp_pools[p_idx][g_idx],
-                                      ELEMENT_SIZE(pool_idx));
+                                      element_size(pool));
                pool->ppp_pools[p_idx][g_idx] = NULL;
 
                if (++g_idx == PAGES_PER_POOL) {
@@ -295,16 +300,16 @@ static void pool_release_free_pages(long npages, unsigned int pool_idx)
        }
 }
 
-#define SEEKS_TO_INDEX(s) (((s)->seeks >> 8) & 0xff)
-#define INDEX_TO_SEEKS(i) (DEFAULT_SEEKS | (i << 8))
+#define SEEKS_TO_ORDER(s) (((s)->seeks >> 8) & 0xff)
+#define ORDER_TO_SEEKS(i) (DEFAULT_SEEKS | (i << 8))
 /*
  * we try to keep at least PTLRPC_MAX_BRW_PAGES pages in the pool.
  */
 static unsigned long pool_shrink_count(struct shrinker *s,
                                       struct shrink_control *sc)
 {
-       unsigned int pool_index = SEEKS_TO_INDEX(s);
-       struct ptlrpc_page_pool *pool = page_pools[pool_index];
+       unsigned int pool_order = SEEKS_TO_ORDER(s);
+       struct ptlrpc_page_pool *pool = page_pools[pool_order];
        /*
         * if no pool access for a long time, we consider it's fully
         * idle. A little race here is fine.
@@ -330,8 +335,8 @@ static unsigned long pool_shrink_scan(struct shrinker *s,
                                      struct shrink_control *sc)
 {
        /* Get pool number passed as part of pool_shrinker_seeks value */
-       unsigned int pool_index = SEEKS_TO_INDEX(s);
-       struct ptlrpc_page_pool *pool = page_pools[pool_index];
+       unsigned int pool_order = SEEKS_TO_ORDER(s);
+       struct ptlrpc_page_pool *pool = page_pools[pool_order];
 
        spin_lock(&pool->ppp_lock);
        if (pool->ppp_free_pages <= PTLRPC_MAX_BRW_PAGES)
@@ -340,7 +345,7 @@ static unsigned long pool_shrink_scan(struct shrinker *s,
                sc->nr_to_scan = min_t(unsigned long, sc->nr_to_scan,
                              pool->ppp_free_pages - PTLRPC_MAX_BRW_PAGES);
        if (sc->nr_to_scan > 0) {
-               pool_release_free_pages(sc->nr_to_scan, pool_index);
+               pool_release_free_pages(sc->nr_to_scan, pool);
                CDEBUG(D_SEC, "released %ld pages, %ld left\n",
                       (long)sc->nr_to_scan, pool->ppp_free_pages);
 
@@ -387,7 +392,8 @@ int npages_to_npools(unsigned long npages)
 /*
  * return how many pages cleaned up.
  */
-static unsigned long pool_cleanup(void ***pools, int npools, int pool_idx)
+static unsigned long pool_cleanup(void ***pools, int npools,
+                                 struct ptlrpc_page_pool *pool)
 {
        unsigned long cleaned = 0;
        int i, j;
@@ -396,11 +402,11 @@ static unsigned long pool_cleanup(void ***pools, int npools, int pool_idx)
                if (pools[i]) {
                        for (j = 0; j < PAGES_PER_POOL; j++) {
                                if (pools[i][j]) {
-                                       if (pool_idx == PAGES_POOL) {
+                                       if (pool->ppp_order == 0) {
                                                __free_page(pools[i][j]);
                                        } else {
                                                OBD_FREE_LARGE(pools[i][j],
-                                                       ELEMENT_SIZE(pool_idx));
+                                                       element_size(pool));
                                        }
                                        cleaned++;
                                }
@@ -421,12 +427,11 @@ static unsigned long pool_cleanup(void ***pools, int npools, int pool_idx)
  * the simplest way to avoid complexity. It's not frequently called.
  */
 static void pool_insert(void ***pools, int npools, int npages,
-                       unsigned int pool_idx)
+                       struct ptlrpc_page_pool *page_pool)
 {
        int freeslot;
        int op_idx, np_idx, og_idx, ng_idx;
        int cur_npools, end_npools;
-       struct ptlrpc_page_pool *page_pool = page_pools[pool_idx];
 
        LASSERT(npages > 0);
        LASSERT(page_pool->ppp_total_pages+npages <= page_pool->ppp_max_pages);
@@ -517,20 +522,15 @@ static void pool_insert(void ***pools, int npools, int npages,
 }
 
 #define POOL_INIT_SIZE (PTLRPC_MAX_BRW_SIZE / 4)
-static int pool_add_pages(int npages, int pool_index)
+static int pool_add_pages(int npages, struct ptlrpc_page_pool *page_pool)
 {
        void ***pools;
        int npools, alloced = 0;
        int i, j, rc = -ENOMEM;
-       struct ptlrpc_page_pool *page_pool = page_pools[pool_index];
+       unsigned int pool_order = page_pool->ppp_order;
 
-       if (pool_index == PAGES_POOL) {
-               if (npages < POOL_INIT_SIZE >> PAGE_SHIFT)
-                       npages = POOL_INIT_SIZE >> PAGE_SHIFT;
-       } else {
-               if (npages < POOL_INIT_SIZE / ELEMENT_SIZE(pool_index))
-                       npages = POOL_INIT_SIZE / ELEMENT_SIZE(pool_index);
-       }
+       if (npages < POOL_INIT_SIZE / element_size(page_pool))
+               npages = POOL_INIT_SIZE / element_size(page_pool);
 
        mutex_lock(&page_pool->add_pages_mutex);
 
@@ -551,12 +551,12 @@ static int pool_add_pages(int npages, int pool_index)
                        goto out_pools;
 
                for (j = 0; j < PAGES_PER_POOL && alloced < npages; j++) {
-                       if (pool_index == PAGES_POOL)
+                       if (pool_order == 0)
                                pools[i][j] = alloc_page(GFP_NOFS |
                                        __GFP_HIGHMEM);
                        else {
                                OBD_ALLOC_LARGE(pools[i][j],
-                                       ELEMENT_SIZE(pool_index));
+                                       element_size(page_pool));
                        }
                        if (pools[i][j] == NULL)
                                goto out_pools;
@@ -566,14 +566,14 @@ static int pool_add_pages(int npages, int pool_index)
        }
        LASSERT(alloced == npages);
 
-       pool_insert(pools, npools, npages, pool_index);
+       pool_insert(pools, npools, npages, page_pool);
        CDEBUG(D_SEC, "added %d pages into pools\n", npages);
        OBD_FREE_PTR_ARRAY(pools, npools);
        rc = 0;
 
 out_pools:
        if (rc) {
-               pool_cleanup(pools, npools, pool_index);
+               pool_cleanup(pools, npools, page_pool);
        }
 out:
        if (rc) {
@@ -585,19 +585,17 @@ out:
        return rc;
 }
 
-static inline void pool_wakeup(unsigned int pool)
+static inline void pool_wakeup(struct ptlrpc_page_pool *pool)
 {
-       assert_spin_locked(&page_pools[pool]->ppp_lock);
+       assert_spin_locked(&pool->ppp_lock);
 
        /* waitqueue_active */
-       if (unlikely(waitqueue_active(&page_pools[pool]->ppp_waitq)))
-               wake_up_all(&page_pools[pool]->ppp_waitq);
+       if (unlikely(waitqueue_active(&pool->ppp_waitq)))
+               wake_up_all(&pool->ppp_waitq);
 }
 
-static int pool_should_grow(int needed, unsigned int pool_index)
+static int pool_should_grow(int needed, struct ptlrpc_page_pool *pool)
 {
-       struct ptlrpc_page_pool *pool = page_pools[pool_index];
-
        /*
         * don't grow if someone else is growing the pools right now,
         * or the pools has reached its full capacity
@@ -624,29 +622,21 @@ static int pool_should_grow(int needed, unsigned int pool_index)
 }
 
 /*
- * Export the number of free pages in the pool
+ * Export the number of free pages in the pool of 'order'
  */
-int sptlrpc_pool_get_free_pages(unsigned int pool)
+int sptlrpc_pool_get_free_pages(unsigned int order)
 {
-       return page_pools[pool]->ppp_free_pages;
+       return page_pools[order]->ppp_free_pages;
 }
 EXPORT_SYMBOL(sptlrpc_pool_get_free_pages);
 
 /*
  * Let outside world know if pool full capacity is reached
  */
-int __pool_is_at_full_capacity(unsigned int pool)
-{
-       return (page_pools[pool]->ppp_total_pages ==
-               page_pools[pool]->ppp_max_pages);
-}
-
-/*
- * Let outside world know if pool full capacity is reached
- */
-int pool_is_at_full_capacity(void)
+int pool_is_at_full_capacity(int order)
 {
-       return __pool_is_at_full_capacity(PAGES_POOL);
+       return (page_pools[order]->ppp_total_pages ==
+               page_pools[order]->ppp_max_pages);
 }
 EXPORT_SYMBOL(pool_is_at_full_capacity);
 
@@ -669,14 +659,16 @@ static inline void **page_from_bufarray(void *array, int index)
        return (void **)array;
 }
 
+static bool __grow_pool_try(int needed, struct ptlrpc_page_pool *pool);
+
 /*
  * we allocate the requested pages atomically.
  */
 static inline int __sptlrpc_pool_get_pages(void *array, unsigned int count,
-                                          unsigned int pool_idx,
+                                          unsigned int order,
                                           void **(*page_from)(void *, int))
 {
-       struct ptlrpc_page_pool *page_pool = page_pools[pool_idx];
+       struct ptlrpc_page_pool *page_pool = page_pools[order];
        wait_queue_entry_t waitlink;
        unsigned long this_idle = -1;
        u64 tick_ns = 0;
@@ -697,19 +689,17 @@ again:
                page_pool->ppp_st_missings++;
                page_pool->ppp_pages_short += count;
 
-               if (pool_should_grow(count, pool_idx)) {
-                       page_pool->ppp_growing = 1;
-
-                       spin_unlock(&page_pool->ppp_lock);
-                       CDEBUG(D_SEC, "ppp_pages_short: %lu\n",
-                              page_pool->ppp_pages_short);
-                       pool_add_pages(8, pool_idx);
-                       spin_lock(&page_pool->ppp_lock);
-
-                       page_pool->ppp_growing = 0;
-
-                       pool_wakeup(pool_idx);
-               } else {
+               /* if we aren't able to add pages, check if someone else is
+                * growing the pool and sleep if so, otherwise we return
+                * ENOMEM because we can't sleep here waiting for other ops to
+                * complete (main user is ptlrpcd, which must not sleep waiting
+                * for other ops...  technically sleeping for pool growth is
+                * also questionable but it's very unlikely in practice to get
+                * stuck from this)
+                *
+                * if ENOMEM is returned here, the RPC will go back in the queue
+                */
+               if (!__grow_pool_try(count, page_pool)) {
                        if (page_pool->ppp_growing) {
                                if (++page_pool->ppp_waitqlen >
                                    page_pool->ppp_st_max_wqlen)
@@ -802,7 +792,7 @@ int sptlrpc_pool_get_desc_pages(struct ptlrpc_bulk_desc *desc)
        int rc;
 
        LASSERT(desc->bd_iov_count > 0);
-       LASSERT(desc->bd_iov_count <= page_pools[PAGES_POOL]->ppp_max_pages);
+       LASSERT(desc->bd_iov_count <= page_pools[0]->ppp_max_pages);
 
        /* resent bulk, enc iov might have been allocated previously */
        if (desc->bd_enc_vec != NULL)
@@ -813,8 +803,8 @@ int sptlrpc_pool_get_desc_pages(struct ptlrpc_bulk_desc *desc)
        if (desc->bd_enc_vec == NULL)
                return -ENOMEM;
 
-       rc = __sptlrpc_pool_get_pages((void *)desc, desc->bd_iov_count,
-                                     PAGES_POOL, page_from_bulkdesc);
+       rc = __sptlrpc_pool_get_pages((void *)desc, desc->bd_iov_count, 0,
+                                     page_from_bulkdesc);
        if (rc) {
                OBD_FREE_LARGE(desc->bd_enc_vec,
                               desc->bd_iov_count *
@@ -827,37 +817,36 @@ EXPORT_SYMBOL(sptlrpc_pool_get_desc_pages);
 
 int sptlrpc_pool_get_pages_array(struct page **pa, unsigned int count)
 {
-       return __sptlrpc_pool_get_pages((void *)pa, count, PAGES_POOL,
-                                           page_from_pagearray);
+       return __sptlrpc_pool_get_pages((void *)pa, count, 0,
+                                       page_from_pagearray);
 }
 EXPORT_SYMBOL(sptlrpc_pool_get_pages_array);
 
 int sptlrpc_pool_get_pages(void **pages, unsigned int order)
 {
-       return __sptlrpc_pool_get_pages((void *)pages, 1,
-                                           PPOOL_ORDER_TO_INDEX(order),
-                                           page_from_bufarray);
+       return __sptlrpc_pool_get_pages((void *)pages, 1, order,
+                                       page_from_bufarray);
 }
 EXPORT_SYMBOL(sptlrpc_pool_get_pages);
 
 static int __sptlrpc_pool_put_pages(void *array, unsigned int count,
-                                       unsigned int pool_idx,
-                                       void **(*page_from)(void *, int))
+                                   unsigned int order,
+                                   void **(*page_from)(void *, int))
 {
+       struct ptlrpc_page_pool *page_pool;
        int p_idx, g_idx;
        int i, rc = 0;
-       struct ptlrpc_page_pool *page_pool;
 
-       LASSERTF(pool_idx < POOLS_COUNT, "count %u, pool %u\n",
-                count, pool_idx);
+       LASSERTF(order < POOLS_COUNT, "count %u, pool %u\n",
+                count, order);
        if (!array) {
                CERROR("Faled to put %u pages, from pool %u\n",
-                      count, pool_idx);
+                      count, order);
                return -EINVAL;
        }
 
-       page_pool = page_pools[pool_idx];
-       LASSERTF(page_pool != NULL, "count %u, pool %u\n", count, pool_idx);
+       page_pool = page_pools[order];
+       LASSERTF(page_pool != NULL, "count %u, pool %u\n", count, order);
 
        spin_lock(&page_pool->ppp_lock);
 
@@ -884,7 +873,7 @@ static int __sptlrpc_pool_put_pages(void *array, unsigned int count,
        }
 
        page_pool->ppp_free_pages += count;
-       pool_wakeup(pool_idx);
+       pool_wakeup(page_pool);
 
 out_unlock:
        spin_unlock(&page_pool->ppp_lock);
@@ -898,8 +887,8 @@ void sptlrpc_pool_put_desc_pages(struct ptlrpc_bulk_desc *desc)
        if (desc->bd_enc_vec == NULL)
                return;
 
-       rc = __sptlrpc_pool_put_pages((void *)desc, desc->bd_iov_count,
-                                         PAGES_POOL, page_from_bulkdesc);
+       rc = __sptlrpc_pool_put_pages((void *)desc, desc->bd_iov_count, 0,
+                                     page_from_bulkdesc);
        if (rc)
                CDEBUG(D_SEC, "error putting pages in pool: %d\n", rc);
 
@@ -912,8 +901,8 @@ void sptlrpc_pool_put_pages_array(struct page **pa, unsigned int count)
 {
        int rc;
 
-       rc = __sptlrpc_pool_put_pages((void *)pa, count, PAGES_POOL,
-                                         page_from_pagearray);
+       rc = __sptlrpc_pool_put_pages((void *)pa, count, 0,
+                                     page_from_pagearray);
 
        if (rc)
                CDEBUG(D_SEC, "error putting pages in pool: %d\n", rc);
@@ -924,40 +913,75 @@ void sptlrpc_pool_put_pages(void *buf, unsigned int order)
 {
        int rc;
 
-       rc = __sptlrpc_pool_put_pages(buf, 1,
-                                         PPOOL_ORDER_TO_INDEX(order),
-                                         page_from_bufarray);
+       rc = __sptlrpc_pool_put_pages(buf, 1, order, page_from_bufarray);
        if (rc)
                CDEBUG(D_SEC, "error putting pages in pool: %d\n", rc);
 }
 EXPORT_SYMBOL(sptlrpc_pool_put_pages);
 
-
-/*
- * we don't do much stuff for add_user/del_user anymore, except adding some
- * initial pages in add_user() if current pools are empty, rest would be
- * handled by the pools's self-adaption.
- */
-int sptlrpc_pool_add_user(void)
+/* called with pool->ppp_lock held */
+static bool __grow_pool_try(int needed, struct ptlrpc_page_pool *pool)
 {
-       struct ptlrpc_page_pool *pool = page_pools[PAGES_POOL];
+       bool pool_grown = false;
+
+       assert_spin_locked(&pool->ppp_lock);
+
+       if (pool_should_grow(needed, pool)) {
+               unsigned int to_add;
+               int rc;
 
-       spin_lock(&pool->ppp_lock);
-       /* ask for 1 page - so if the pool is empty, it will grow
-        * (this might also grow an in-use pool if it's full, which is fine)
-        */
-       if (pool_should_grow(1, PAGES_POOL)) {
                pool->ppp_growing = 1;
+               /* the pool of single pages is grown a large amount on
+                * first use
+                */
+               if (pool->ppp_order == 0 &&
+                   pool->ppp_total_pages == 0)
+                       to_add = PTLRPC_MAX_BRW_PAGES * 2;
+               else /* otherwise, we add requested or at least 8 items */
+                       to_add = max(needed, 8);
                spin_unlock(&pool->ppp_lock);
 
-               pool_add_pages(PTLRPC_MAX_BRW_PAGES * 2, PAGES_POOL);
+               CDEBUG(D_SEC,
+                      "pool %d is %lu elements (size %d bytes), growing by %d items\n",
+                       pool->ppp_order, pool->ppp_pages_short,
+                       element_size(pool), to_add);
+               /* we can't hold a spinlock over page allocation */
+               rc = pool_add_pages(to_add, pool);
+               if (rc == 0)
+                       pool_grown = true;
 
                spin_lock(&pool->ppp_lock);
                pool->ppp_growing = 0;
-               pool_wakeup(PAGES_POOL);
+               pool_wakeup(pool);
        }
+
+       return pool_grown;
+}
+
+static bool grow_pool_try(int needed, struct ptlrpc_page_pool *pool)
+{
+       bool rc;
+
+       spin_lock(&pool->ppp_lock);
+       rc = __grow_pool_try(needed, pool);
        spin_unlock(&pool->ppp_lock);
-       return 0;
+
+       return rc;
+}
+
+/*
+ * we don't do much stuff for add_user/del_user anymore, except adding some
+ * initial pages in add_user() if current pools are empty, rest would be
+ * handled by the pools's self-adaption.
+ */
+void sptlrpc_pool_add_user(void)
+{
+       struct ptlrpc_page_pool *pool = page_pools[0];
+
+       /* since this is startup, no one is waiting for these pages, so we
+        * don't worry about sucess or failure here
+        */
+       grow_pool_try(1, pool);
 }
 EXPORT_SYMBOL(sptlrpc_pool_add_user);
 
@@ -969,10 +993,8 @@ static inline void pool_alloc(struct ptlrpc_page_pool *pool)
                        sizeof(*pool->ppp_pools));
 }
 
-static inline void pool_free(unsigned int pool_index)
+static inline void pool_free(struct ptlrpc_page_pool *pool)
 {
-       struct ptlrpc_page_pool *pool = page_pools[pool_index];
-
        LASSERT(pool->ppp_max_pools);
        LASSERT(pool->ppp_pools);
 
@@ -982,10 +1004,11 @@ static inline void pool_free(unsigned int pool_index)
 
 int sptlrpc_pool_init(void)
 {
-       int pool_index = 0, to_revert;
-       int rc = 0;
        struct ptlrpc_page_pool *pool;
        int pool_max_pages = cfs_totalram_pages() / POOLS_COUNT;
+       int pool_order = 0;
+       int to_revert;
+       int rc = 0;
 
        ENTRY;
 
@@ -998,12 +1021,12 @@ int sptlrpc_pool_init(void)
        OBD_ALLOC(page_pools, POOLS_COUNT * sizeof(*page_pools));
        if (page_pools == NULL)
                RETURN(-ENOMEM);
-       for (pool_index = 0; pool_index < POOLS_COUNT; pool_index++) {
-               OBD_ALLOC(page_pools[pool_index], sizeof(**page_pools));
-               if (page_pools[pool_index] == NULL)
+       for (pool_order = 0; pool_order < POOLS_COUNT; pool_order++) {
+               OBD_ALLOC(page_pools[pool_order], sizeof(**page_pools));
+               if (page_pools[pool_order] == NULL)
                        GOTO(fail, rc = -ENOMEM);
 
-               pool = page_pools[pool_index];
+               pool = page_pools[pool_order];
                pool->ppp_max_pages = pool_max_pages;
 
                pool->ppp_max_pools =
@@ -1017,7 +1040,8 @@ int sptlrpc_pool_init(void)
                pool->ppp_st_max_wait = ktime_set(0, 0);
 
                pool_alloc(pool);
-               CDEBUG(D_SEC, "Allocated pool %i\n", pool_index);
+               pool->ppp_order = pool_order;
+               CDEBUG(D_SEC, "Allocated pool %i\n", pool_order);
                if (pool->ppp_pools == NULL)
                        GOTO(fail, rc = -ENOMEM);
                /* Pass pool number as part of pool_shrinker_seeks value */
@@ -1027,7 +1051,7 @@ int sptlrpc_pool_init(void)
 #else
                pool->ppp_shops.shrink = pool_shrink;
 #endif
-               pool->ppp_shops.seeks = INDEX_TO_SEEKS(pool_index);
+               pool->ppp_shops.seeks = ORDER_TO_SEEKS(pool_order);
 
                pool->pool_shrinker = ll_shrinker_create(&pool->ppp_shops, 0,
                                                         "sptlrpc_pool");
@@ -1039,12 +1063,12 @@ int sptlrpc_pool_init(void)
 
        RETURN(0);
 fail:
-       to_revert = pool_index;
-       for (pool_index = 0; pool_index <= to_revert; pool_index++) {
-               pool = page_pools[pool_index];
+       to_revert = pool_order;
+       for (pool_order = 0; pool_order <= to_revert; pool_order++) {
+               pool = page_pools[pool_order];
                if (pool) {
                        if (pool->ppp_pools)
-                               pool_free(pool_index);
+                               pool_free(pool);
                        OBD_FREE(pool, sizeof(**page_pools));
                }
        }
@@ -1056,20 +1080,20 @@ fail:
 void sptlrpc_pool_fini(void)
 {
        unsigned long cleaned, npools;
-       int pool_index;
+       int pool_order;
        struct ptlrpc_page_pool *pool;
 
-       for (pool_index = 0; pool_index < POOLS_COUNT; pool_index++) {
-               pool = page_pools[pool_index];
+       for (pool_order = 0; pool_order < POOLS_COUNT; pool_order++) {
+               pool = page_pools[pool_order];
                shrinker_free(pool->pool_shrinker);
                LASSERT(pool->ppp_pools);
                LASSERT(pool->ppp_total_pages == pool->ppp_free_pages);
 
                npools = npages_to_npools(pool->ppp_total_pages);
-               cleaned = pool_cleanup(pool->ppp_pools, npools, pool_index);
+               cleaned = pool_cleanup(pool->ppp_pools, npools, pool);
                LASSERT(cleaned == pool->ppp_total_pages);
 
-               pool_free(pool_index);
+               pool_free(pool);
 
                if (pool->ppp_st_access > 0) {
                        CDEBUG(D_SEC,