Whamcloud - gitweb
LU-16724 ptlrc: ptlrpc: extend sec bulk functionality 35/52335/9
authorArtem Blagodarenko <ablagodarenko@ddn.com>
Wed, 11 Oct 2023 21:20:40 +0000 (17:20 -0400)
committerOleg Drokin <green@whamcloud.com>
Fri, 23 Feb 2024 07:11:29 +0000 (07:11 +0000)
Features such as client-side-data-compression and unaligned
direct I/O need page/buffer pools for good performance.

This patch extends sec bulk functionality to allocate different
size buffers. Memory shrinking and other usefull features
should still work as expected.

Signed-off-by: Artem Blagodarenko <ablagodarenko@ddn.com>
Change-Id: I929b4dfdcb0e8197f3804629b000af0d4bd6f2a0
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/52335
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
lustre/include/lustre_sec.h
lustre/ptlrpc/client.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/sec_bulk.c
lustre/ptlrpc/sec_lproc.c
lustre/tests/sanity.sh

index 91f7e4f..77f970e 100644 (file)
@@ -1190,12 +1190,15 @@ int sptlrpc_cli_install_rvs_ctx(struct obd_import *imp,
                                struct ptlrpc_cli_ctx *ctx);
 
 /* bulk security api */
+#define PAGES_POOL 0
 int sptlrpc_enc_pool_add_user(void);
-int  sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc);
+int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc);
 int sptlrpc_enc_pool_get_pages_array(struct page **pa, unsigned int count);
+int sptlrpc_enc_pool_get_buf(void **buf, unsigned int size_bits);
 void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc);
 void sptlrpc_enc_pool_put_pages_array(struct page **pa, unsigned int count);
-int get_free_pages_in_pool(void);
+void sptlrpc_enc_pool_put_buf(void *buf, unsigned int size_bits);
+int sptlrpc_enc_pool_get_free_pages(unsigned int pool);
 int pool_is_at_full_capacity(void);
 
 int sptlrpc_cli_wrap_bulk(struct ptlrpc_request *req,
index 96fde33..a157a50 100644 (file)
@@ -1670,7 +1670,8 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
 
        /* do not try to go further if there is not enough memory in enc_pool */
        if (req->rq_sent && req->rq_bulk)
-               if (req->rq_bulk->bd_iov_count > get_free_pages_in_pool() &&
+               if (req->rq_bulk->bd_iov_count >
+                   sptlrpc_enc_pool_get_free_pages(PAGES_POOL) &&
                    pool_is_at_full_capacity())
                        RETURN(-ENOMEM);
 
index 4b1e15f..ff10538 100644 (file)
@@ -293,7 +293,8 @@ void sptlrpc_plain_fini(void);
 /* sec_bulk.c */
 int  sptlrpc_enc_pool_init(void);
 void sptlrpc_enc_pool_fini(void);
-int sptlrpc_proc_enc_pool_seq_show(struct seq_file *m, void *v);
+int encrypt_page_pools_seq_show(struct seq_file *m, void *v);
+int page_pools_seq_show(struct seq_file *m, void *v);
 
 /* sec_lproc.c */
 int  sptlrpc_lproc_init(void);
index cc4c06d..bbf7374 100644 (file)
 
 #include "ptlrpc_internal.h"
 
-static int mult = 20 - PAGE_SHIFT;
+#define PPOOL_MIN_CHUNK_BITS 16 /* 2^16 bytes = 64KiB */
+#define PPOOL_MAX_CHUNK_BITS PTLRPC_MAX_BRW_BITS
+#define POOLS_COUNT (PPOOL_MAX_CHUNK_BITS - PPOOL_MIN_CHUNK_BITS + 1)
+#define PPOOL_SIZE_TO_INDEX(bits) ((bits) - PPOOL_MIN_CHUNK_BITS + 1)
+#define POOL_BITS(pool) ((pool) + PPOOL_MIN_CHUNK_BITS - 1)
+#define ELEMENT_SIZE(pool) (1 << (PPOOL_MIN_CHUNK_BITS + (pool) - 1))
+#define mult (20 - PAGE_SHIFT)
 static int enc_pool_max_memory_mb;
 module_param(enc_pool_max_memory_mb, int, 0644);
 MODULE_PARM_DESC(enc_pool_max_memory_mb,
@@ -110,87 +116,157 @@ static struct ptlrpc_enc_page_pool {
        /*
         * pointers to pools, may be vmalloc'd
         */
-       struct page ***epp_pools;
-} page_pools;
+       void ***epp_pools;
+       /*
+        * memory shrinker
+        */
+       struct shrinker pool_shrinker;
+       struct mutex add_pages_mutex;
+} **page_pools;
 
 /*
- * /proc/fs/lustre/sptlrpc/encrypt_page_pools
+ * /sys/kernel/debug/lustre/sptlrpc/encrypt_page_pools
  */
-int sptlrpc_proc_enc_pool_seq_show(struct seq_file *m, void *v)
+int encrypt_page_pools_seq_show(struct seq_file *m, void *v)
 {
-       spin_lock(&page_pools.epp_lock);
-
+       spin_lock(&page_pools[PAGES_POOL]->epp_lock);
        seq_printf(m, "physical pages:          %lu\n"
-                  "pages per pool:          %lu\n"
-                  "max pages:               %lu\n"
-                  "max pools:               %u\n"
-                  "total pages:             %lu\n"
-                  "total free:              %lu\n"
-                  "idle index:              %lu/100\n"
-                  "last shrink:             %llds\n"
-                  "last access:             %llds\n"
-                  "max pages reached:       %lu\n"
-                  "grows:                   %u\n"
-                  "grows failure:           %u\n"
-                  "shrinks:                 %u\n"
-                  "cache access:            %lu\n"
-                  "cache missing:           %lu\n"
-                  "low free mark:           %lu\n"
-                  "max waitqueue depth:     %u\n"
-                  "max wait time ms:        %lld\n"
-                  "out of mem:              %lu\n",
-                  cfs_totalram_pages(), PAGES_PER_POOL,
-                  page_pools.epp_max_pages,
-                  page_pools.epp_max_pools,
-                  page_pools.epp_total_pages,
-                  page_pools.epp_free_pages,
-                  page_pools.epp_idle_idx,
-                  ktime_get_seconds() - page_pools.epp_last_shrink,
-                  ktime_get_seconds() - page_pools.epp_last_access,
-                  page_pools.epp_st_max_pages,
-                  page_pools.epp_st_grows,
-                  page_pools.epp_st_grow_fails,
-                  page_pools.epp_st_shrinks,
-                  page_pools.epp_st_access,
-                  page_pools.epp_st_missings,
-                  page_pools.epp_st_lowfree,
-                  page_pools.epp_st_max_wqlen,
-                  ktime_to_ms(page_pools.epp_st_max_wait),
-                  page_pools.epp_st_outofmem);
-
-       spin_unlock(&page_pools.epp_lock);
+               "pages per pool:          %lu\n"
+               "max pages:               %lu\n"
+               "max pools:               %u\n"
+               "total pages:             %lu\n"
+               "total free:              %lu\n"
+               "idle index:              %lu/100\n"
+               "last shrink:             %llds\n"
+               "last access:             %llds\n"
+               "max pages reached:       %lu\n"
+               "grows:                   %u\n"
+               "grows failure:           %u\n"
+               "shrinks:                 %u\n"
+               "cache access:            %lu\n"
+               "cache missing:           %lu\n"
+               "low free mark:           %lu\n"
+               "max waitqueue depth:     %u\n"
+               "max wait time ms:        %lld\n"
+               "out of mem:              %lu\n",
+               cfs_totalram_pages(), PAGES_PER_POOL,
+               page_pools[PAGES_POOL]->epp_max_pages,
+               page_pools[PAGES_POOL]->epp_max_pools,
+               page_pools[PAGES_POOL]->epp_total_pages,
+               page_pools[PAGES_POOL]->epp_free_pages,
+               page_pools[PAGES_POOL]->epp_idle_idx,
+               ktime_get_seconds() - page_pools[PAGES_POOL]->epp_last_shrink,
+               ktime_get_seconds() - page_pools[PAGES_POOL]->epp_last_access,
+               page_pools[PAGES_POOL]->epp_st_max_pages,
+               page_pools[PAGES_POOL]->epp_st_grows,
+               page_pools[PAGES_POOL]->epp_st_grow_fails,
+               page_pools[PAGES_POOL]->epp_st_shrinks,
+               page_pools[PAGES_POOL]->epp_st_access,
+               page_pools[PAGES_POOL]->epp_st_missings,
+               page_pools[PAGES_POOL]->epp_st_lowfree,
+               page_pools[PAGES_POOL]->epp_st_max_wqlen,
+               ktime_to_ms(page_pools[PAGES_POOL]->epp_st_max_wait),
+               page_pools[PAGES_POOL]->epp_st_outofmem);
+       spin_unlock(&page_pools[PAGES_POOL]->epp_lock);
+
+       return 0;
+}
+
+/*
+ * /sys/kernel/debug/lustre/sptlrpc/page_pools
+ */
+int page_pools_seq_show(struct seq_file *m, void *v)
+{
+       int pool_index;
+       struct ptlrpc_enc_page_pool *pool;
+
+       seq_printf(m, "physical_pages: %lu\n"
+                     "pages per pool: %lu\n\n"
+                     "pools:\n",
+                     cfs_totalram_pages(), PAGES_PER_POOL);
+
+       for (pool_index = 0; pool_index < POOLS_COUNT; pool_index++) {
+               pool = page_pools[pool_index];
+               if (!pool->epp_st_access)
+                       continue;
+               spin_lock(&pool->epp_lock);
+               seq_printf(m, "  pool_%luk:\n"
+                          "    max_pages: %lu\n"
+                          "    max_pools: %u\n"
+                          "    total_pages: %lu\n"
+                          "    total_free: %lu\n"
+                          "    idle_index: %lu/100\n"
+                          "    last_shrink: %llds\n"
+                          "    last_access: %llds\n"
+                          "    max_pages_reached: %lu\n"
+                          "    grows: %u\n"
+                          "    grows_failure: %u\n"
+                          "    shrinks: %u\n"
+                          "    cache_access: %lu\n"
+                          "    cache_missing: %lu\n"
+                          "    low_free_mark: %lu\n"
+                          "    max_waitqueue_depth: %u\n"
+                          "    max_wait_time_ms: %lld\n"
+                          "    out_of_mem: %lu\n",
+                          (pool_index ? ELEMENT_SIZE(pool_index - 10) :
+                          PAGE_SIZE >> 10),
+                          pool->epp_max_pages,
+                          pool->epp_max_pools,
+                          pool->epp_total_pages,
+                          pool->epp_free_pages,
+                          pool->epp_idle_idx,
+                          ktime_get_seconds() - pool->epp_last_shrink,
+                          ktime_get_seconds() - pool->epp_last_access,
+                          pool->epp_st_max_pages,
+                          pool->epp_st_grows,
+                          pool->epp_st_grow_fails,
+                          pool->epp_st_shrinks,
+                          pool->epp_st_access,
+                          pool->epp_st_missings,
+                          pool->epp_st_lowfree,
+                          pool->epp_st_max_wqlen,
+                          ktime_to_ms(pool->epp_st_max_wait),
+                          pool->epp_st_outofmem);
+
+               spin_unlock(&pool->epp_lock);
+       }
        return 0;
 }
 
-static void enc_pools_release_free_pages(long npages)
+static void enc_pools_release_free_pages(long npages, unsigned int pool_idx)
 {
        int p_idx, g_idx;
        int p_idx_max1, p_idx_max2;
+       struct ptlrpc_enc_page_pool *pool = page_pools[pool_idx];
 
        LASSERT(npages > 0);
-       LASSERT(npages <= page_pools.epp_free_pages);
-       LASSERT(page_pools.epp_free_pages <= page_pools.epp_total_pages);
+       LASSERT(npages <= pool->epp_free_pages);
+       LASSERT(pool->epp_free_pages <= pool->epp_total_pages);
 
        /* max pool index before the release */
-       p_idx_max2 = (page_pools.epp_total_pages - 1) / PAGES_PER_POOL;
+       p_idx_max2 = (pool->epp_total_pages - 1) / PAGES_PER_POOL;
 
-       page_pools.epp_free_pages -= npages;
-       page_pools.epp_total_pages -= npages;
+       pool->epp_free_pages -= npages;
+       pool->epp_total_pages -= npages;
 
        /* max pool index after the release */
-       p_idx_max1 = page_pools.epp_total_pages == 0 ? -1 :
-               ((page_pools.epp_total_pages - 1) / PAGES_PER_POOL);
+       p_idx_max1 = pool->epp_total_pages == 0 ? -1 :
+               ((pool->epp_total_pages - 1) / PAGES_PER_POOL);
 
-       p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
-       g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
-       LASSERT(page_pools.epp_pools[p_idx]);
+       p_idx = pool->epp_free_pages / PAGES_PER_POOL;
+       g_idx = pool->epp_free_pages % PAGES_PER_POOL;
+       LASSERT(pool->epp_pools[p_idx]);
 
        while (npages--) {
-               LASSERT(page_pools.epp_pools[p_idx]);
-               LASSERT(page_pools.epp_pools[p_idx][g_idx] != NULL);
+               LASSERT(pool->epp_pools[p_idx]);
+               LASSERT(pool->epp_pools[p_idx][g_idx] != NULL);
 
-               __free_page(page_pools.epp_pools[p_idx][g_idx]);
-               page_pools.epp_pools[p_idx][g_idx] = NULL;
+               if (pool_idx == 0)
+                       __free_page(pool->epp_pools[p_idx][g_idx]);
+               else
+                       OBD_FREE_LARGE(pool->epp_pools[p_idx][g_idx],
+                                      ELEMENT_SIZE(pool_idx));
+               pool->epp_pools[p_idx][g_idx] = NULL;
 
                if (++g_idx == PAGES_PER_POOL) {
                        p_idx++;
@@ -200,34 +276,39 @@ static void enc_pools_release_free_pages(long npages)
 
        /* free unused pools */
        while (p_idx_max1 < p_idx_max2) {
-               LASSERT(page_pools.epp_pools[p_idx_max2]);
-               OBD_FREE(page_pools.epp_pools[p_idx_max2], PAGE_SIZE);
-               page_pools.epp_pools[p_idx_max2] = NULL;
+               LASSERT(pool->epp_pools[p_idx_max2]);
+               OBD_FREE(pool->epp_pools[p_idx_max2], PAGE_SIZE);
+               pool->epp_pools[p_idx_max2] = NULL;
                p_idx_max2--;
        }
 }
 
+#define SEEKS_TO_INDEX(s) (((s)->seeks >> 8) & 0xff)
+#define INDEX_TO_SEEKS(i) (DEFAULT_SEEKS | (i << 8))
 /*
  * we try to keep at least PTLRPC_MAX_BRW_PAGES pages in the pool.
  */
 static unsigned long enc_pools_shrink_count(struct shrinker *s,
                                            struct shrink_control *sc)
 {
+       unsigned int pool_index = SEEKS_TO_INDEX(s);
+       struct ptlrpc_enc_page_pool *pool = page_pools[pool_index];
        /*
-        * if no pool access for a long time, we consider it's fully idle.
-        * a little race here is fine.
+        * if no pool access for a long time, we consider it's fully
+        * idle. A little race here is fine.
         */
-       if (unlikely(ktime_get_seconds() - page_pools.epp_last_access >
+       if (unlikely(ktime_get_seconds() - pool->epp_last_access >
                     CACHE_QUIESCENT_PERIOD)) {
-               spin_lock(&page_pools.epp_lock);
-               page_pools.epp_idle_idx = IDLE_IDX_MAX;
-               spin_unlock(&page_pools.epp_lock);
+               spin_lock(&pool->epp_lock);
+               pool->epp_idle_idx = IDLE_IDX_MAX;
+               spin_unlock(&pool->epp_lock);
        }
 
-       LASSERT(page_pools.epp_idle_idx <= IDLE_IDX_MAX);
-       return (page_pools.epp_free_pages <= PTLRPC_MAX_BRW_PAGES) ? 0 :
-               (page_pools.epp_free_pages - PTLRPC_MAX_BRW_PAGES) *
-               (IDLE_IDX_MAX - page_pools.epp_idle_idx) / IDLE_IDX_MAX;
+       LASSERT(pool->epp_idle_idx <= IDLE_IDX_MAX);
+
+       return (pool->epp_free_pages <= PTLRPC_MAX_BRW_PAGES) ? 0 :
+               (pool->epp_free_pages - PTLRPC_MAX_BRW_PAGES) *
+               (IDLE_IDX_MAX - pool->epp_idle_idx) / IDLE_IDX_MAX;
 }
 
 /*
@@ -236,44 +317,43 @@ static unsigned long enc_pools_shrink_count(struct shrinker *s,
 static unsigned long enc_pools_shrink_scan(struct shrinker *s,
                                           struct shrink_control *sc)
 {
-       spin_lock(&page_pools.epp_lock);
-       if (page_pools.epp_free_pages <= PTLRPC_MAX_BRW_PAGES)
+       /* Get pool number passed as part of pools_shrinker_seeks value */
+       unsigned int pool_index = SEEKS_TO_INDEX(s);
+       struct ptlrpc_enc_page_pool *pool = page_pools[pool_index];
+
+       spin_lock(&pool->epp_lock);
+       if (pool->epp_free_pages <= PTLRPC_MAX_BRW_PAGES)
                sc->nr_to_scan = 0;
        else
                sc->nr_to_scan = min_t(unsigned long, sc->nr_to_scan,
-                             page_pools.epp_free_pages - PTLRPC_MAX_BRW_PAGES);
+                             pool->epp_free_pages - PTLRPC_MAX_BRW_PAGES);
        if (sc->nr_to_scan > 0) {
-               enc_pools_release_free_pages(sc->nr_to_scan);
+               enc_pools_release_free_pages(sc->nr_to_scan, pool_index);
                CDEBUG(D_SEC, "released %ld pages, %ld left\n",
-                      (long)sc->nr_to_scan, page_pools.epp_free_pages);
+                      (long)sc->nr_to_scan, pool->epp_free_pages);
 
-               page_pools.epp_st_shrinks++;
-               page_pools.epp_last_shrink = ktime_get_seconds();
+               pool->epp_st_shrinks++;
+               pool->epp_last_shrink = ktime_get_seconds();
        }
-       spin_unlock(&page_pools.epp_lock);
+       spin_unlock(&pool->epp_lock);
 
        /*
         * if no pool access for a long time, we consider it's fully idle.
         * a little race here is fine.
         */
-       if (unlikely(ktime_get_seconds() - page_pools.epp_last_access >
+       if (unlikely(ktime_get_seconds() - pool->epp_last_access >
                     CACHE_QUIESCENT_PERIOD)) {
-               spin_lock(&page_pools.epp_lock);
-               page_pools.epp_idle_idx = IDLE_IDX_MAX;
-               spin_unlock(&page_pools.epp_lock);
+               spin_lock(&pool->epp_lock);
+               pool->epp_idle_idx = IDLE_IDX_MAX;
+               spin_unlock(&pool->epp_lock);
        }
 
-       LASSERT(page_pools.epp_idle_idx <= IDLE_IDX_MAX);
+       LASSERT(pool->epp_idle_idx <= IDLE_IDX_MAX);
+
        return sc->nr_to_scan;
 }
 
-#ifdef HAVE_SHRINKER_COUNT
-static struct shrinker pools_shrinker = {
-       .count_objects  = enc_pools_shrink_count,
-       .scan_objects   = enc_pools_shrink_scan,
-       .seeks          = DEFAULT_SEEKS,
-};
-#else
+#ifndef HAVE_SHRINKER_COUNT
 /*
  * could be called frequently for query (@nr_to_scan == 0).
  * we try to keep at least PTLRPC_MAX_BRW_PAGES pages in the pool.
@@ -285,11 +365,6 @@ static int enc_pools_shrink(struct shrinker *shrinker,
 
        return enc_pools_shrink_count(shrinker, sc);
 }
-
-static struct shrinker pools_shrinker = {
-       .shrink  = enc_pools_shrink,
-       .seeks   = DEFAULT_SEEKS,
-};
 #endif /* HAVE_SHRINKER_COUNT */
 
 static inline
@@ -301,7 +376,7 @@ int npages_to_npools(unsigned long npages)
 /*
  * return how many pages cleaned up.
  */
-static unsigned long enc_pools_cleanup(struct page ***pools, int npools)
+static unsigned long enc_pools_cleanup(void ***pools, int npools, int pool_idx)
 {
        unsigned long cleaned = 0;
        int i, j;
@@ -310,7 +385,12 @@ static unsigned long enc_pools_cleanup(struct page ***pools, int npools)
                if (pools[i]) {
                        for (j = 0; j < PAGES_PER_POOL; j++) {
                                if (pools[i][j]) {
-                                       __free_page(pools[i][j]);
+                                       if (pool_idx == 0) {
+                                               __free_page(pools[i][j]);
+                                       } else {
+                                               OBD_FREE_LARGE(pools[i][j],
+                                                       ELEMENT_SIZE(pool_idx));
+                                       }
                                        cleaned++;
                                }
                        }
@@ -329,18 +409,20 @@ static unsigned long enc_pools_cleanup(struct page ***pools, int npools)
  * we have options to avoid most memory copy with some tricks. but we choose
  * the simplest way to avoid complexity. It's not frequently called.
  */
-static void enc_pools_insert(struct page ***pools, int npools, int npages)
+static void enc_pools_insert(void ***pools, int npools, int npages,
+                            unsigned int pool_idx)
 {
        int freeslot;
        int op_idx, np_idx, og_idx, ng_idx;
        int cur_npools, end_npools;
+       struct ptlrpc_enc_page_pool *page_pool = page_pools[pool_idx];
 
        LASSERT(npages > 0);
-       LASSERT(page_pools.epp_total_pages+npages <= page_pools.epp_max_pages);
+       LASSERT(page_pool->epp_total_pages+npages <= page_pool->epp_max_pages);
        LASSERT(npages_to_npools(npages) == npools);
-       LASSERT(page_pools.epp_growing);
+       LASSERT(page_pool->epp_growing);
 
-       spin_lock(&page_pools.epp_lock);
+       spin_lock(&page_pool->epp_lock);
 
        /*
         * (1) fill all the free slots of current pools.
@@ -349,21 +431,21 @@ static void enc_pools_insert(struct page ***pools, int npools, int npages)
         * free slots are those left by rent pages, and the extra ones with
         * index >= total_pages, locate at the tail of last pool.
         */
-       freeslot = page_pools.epp_total_pages % PAGES_PER_POOL;
+       freeslot = page_pool->epp_total_pages % PAGES_PER_POOL;
        if (freeslot != 0)
                freeslot = PAGES_PER_POOL - freeslot;
-       freeslot += page_pools.epp_total_pages - page_pools.epp_free_pages;
+       freeslot += page_pool->epp_total_pages - page_pool->epp_free_pages;
 
-       op_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
-       og_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
+       op_idx = page_pool->epp_free_pages / PAGES_PER_POOL;
+       og_idx = page_pool->epp_free_pages % PAGES_PER_POOL;
        np_idx = npools - 1;
        ng_idx = (npages - 1) % PAGES_PER_POOL;
 
        while (freeslot) {
-               LASSERT(page_pools.epp_pools[op_idx][og_idx] == NULL);
+               LASSERT(page_pool->epp_pools[op_idx][og_idx] == NULL);
                LASSERT(pools[np_idx][ng_idx] != NULL);
 
-               page_pools.epp_pools[op_idx][og_idx] = pools[np_idx][ng_idx];
+               page_pool->epp_pools[op_idx][og_idx] = pools[np_idx][ng_idx];
                pools[np_idx][ng_idx] = NULL;
 
                freeslot--;
@@ -383,52 +465,69 @@ static void enc_pools_insert(struct page ***pools, int npools, int npages)
        /*
         * (2) add pools if needed.
         */
-       cur_npools = (page_pools.epp_total_pages + PAGES_PER_POOL - 1) /
+       cur_npools = (page_pool->epp_total_pages + PAGES_PER_POOL - 1) /
                      PAGES_PER_POOL;
-       end_npools = (page_pools.epp_total_pages + npages +
+       end_npools = (page_pool->epp_total_pages + npages +
                      PAGES_PER_POOL - 1) / PAGES_PER_POOL;
-       LASSERT(end_npools <= page_pools.epp_max_pools);
+       LASSERT(end_npools <= page_pool->epp_max_pools);
 
        np_idx = 0;
        while (cur_npools < end_npools) {
-               LASSERT(page_pools.epp_pools[cur_npools] == NULL);
+               LASSERT(page_pool->epp_pools[cur_npools] == NULL);
                LASSERT(np_idx < npools);
                LASSERT(pools[np_idx] != NULL);
 
-               page_pools.epp_pools[cur_npools++] = pools[np_idx];
+               page_pool->epp_pools[cur_npools++] = pools[np_idx];
                pools[np_idx++] = NULL;
        }
 
-       page_pools.epp_total_pages += npages;
-       page_pools.epp_free_pages += npages;
-       page_pools.epp_st_lowfree = page_pools.epp_free_pages;
+       /*
+        * (3) free useless source pools
+        */
+       while (np_idx < npools) {
+               LASSERT(pools[np_idx] != NULL);
+               CDEBUG(D_SEC, "Free useless pool buffer: %i, %p\n", np_idx,
+                      pools[np_idx]);
+               OBD_FREE(pools[np_idx], PAGE_SIZE);
+               pools[np_idx++] = NULL;
+       }
 
-       if (page_pools.epp_total_pages > page_pools.epp_st_max_pages)
-               page_pools.epp_st_max_pages = page_pools.epp_total_pages;
+       page_pool->epp_total_pages += npages;
+       page_pool->epp_free_pages += npages;
+       page_pool->epp_st_lowfree = page_pool->epp_free_pages;
+
+       if (page_pool->epp_total_pages > page_pool->epp_st_max_pages)
+               page_pool->epp_st_max_pages = page_pool->epp_total_pages;
 
        CDEBUG(D_SEC, "add %d pages to total %lu\n", npages,
-              page_pools.epp_total_pages);
+              page_pool->epp_total_pages);
 
-       spin_unlock(&page_pools.epp_lock);
+       spin_unlock(&page_pool->epp_lock);
 }
 
-static int enc_pools_add_pages(int npages)
+#define POOL_INIT_SIZE (PTLRPC_MAX_BRW_SIZE / 4)
+static int enc_pools_add_pages(int npages, int pool_index)
 {
-       static DEFINE_MUTEX(add_pages_mutex);
-       struct page ***pools;
+       void ***pools;
        int npools, alloced = 0;
        int i, j, rc = -ENOMEM;
+       struct ptlrpc_enc_page_pool *page_pool = page_pools[pool_index];
 
-       if (npages < PTLRPC_MAX_BRW_PAGES)
-               npages = PTLRPC_MAX_BRW_PAGES;
+       if (pool_index == 0) {
+               if (npages < POOL_INIT_SIZE >> PAGE_SHIFT)
+                       npages = POOL_INIT_SIZE >> PAGE_SHIFT;
+       } else {
+               if (npages < POOL_INIT_SIZE / ELEMENT_SIZE(pool_index))
+                       npages = POOL_INIT_SIZE / ELEMENT_SIZE(pool_index);
+       }
 
-       mutex_lock(&add_pages_mutex);
+       mutex_lock(&page_pool->add_pages_mutex);
 
-       if (npages + page_pools.epp_total_pages > page_pools.epp_max_pages)
-               npages = page_pools.epp_max_pages - page_pools.epp_total_pages;
+       if (npages + page_pool->epp_total_pages > page_pool->epp_max_pages)
+               npages = page_pool->epp_max_pages - page_pool->epp_total_pages;
        LASSERT(npages > 0);
 
-       page_pools.epp_st_grows++;
+       page_pool->epp_st_grows++;
 
        npools = npages_to_npools(npages);
        OBD_ALLOC_PTR_ARRAY(pools, npools);
@@ -441,8 +540,13 @@ static int enc_pools_add_pages(int npages)
                        goto out_pools;
 
                for (j = 0; j < PAGES_PER_POOL && alloced < npages; j++) {
-                       pools[i][j] = alloc_page(GFP_NOFS |
-                                                __GFP_HIGHMEM);
+                       if (pool_index == 0)
+                               pools[i][j] = alloc_page(GFP_NOFS |
+                                       __GFP_HIGHMEM);
+                       else {
+                               OBD_ALLOC_LARGE(pools[i][j],
+                                       ELEMENT_SIZE(pool_index));
+                       }
                        if (pools[i][j] == NULL)
                                goto out_pools;
 
@@ -451,46 +555,49 @@ static int enc_pools_add_pages(int npages)
        }
        LASSERT(alloced == npages);
 
-       enc_pools_insert(pools, npools, npages);
+       enc_pools_insert(pools, npools, npages, pool_index);
        CDEBUG(D_SEC, "added %d pages into pools\n", npages);
+       OBD_FREE_PTR_ARRAY(pools, npools);
        rc = 0;
 
 out_pools:
-       enc_pools_cleanup(pools, npools);
-       OBD_FREE_PTR_ARRAY(pools, npools);
+       if (rc) {
+               enc_pools_cleanup(pools, npools, pool_index);
+       }
 out:
        if (rc) {
-               page_pools.epp_st_grow_fails++;
+               page_pool->epp_st_grow_fails++;
                CERROR("Failed to allocate %d enc pages\n", npages);
        }
 
-       mutex_unlock(&add_pages_mutex);
+       mutex_unlock(&page_pool->add_pages_mutex);
        return rc;
 }
 
-static inline void enc_pools_wakeup(void)
+static inline void enc_pools_wakeup(unsigned int pool)
 {
-       assert_spin_locked(&page_pools.epp_lock);
+       assert_spin_locked(&page_pools[pool]->epp_lock);
 
        /* waitqueue_active */
-       if (unlikely(waitqueue_active(&page_pools.epp_waitq)))
-               wake_up(&page_pools.epp_waitq);
+       if (unlikely(waitqueue_active(&page_pools[pool]->epp_waitq)))
+               wake_up_all(&page_pools[pool]->epp_waitq);
 }
 
-static int enc_pools_should_grow(int page_needed, time64_t now)
+static int enc_pools_should_grow(int page_needed, time64_t now,
+                                unsigned int pool_index)
 {
        /*
         * don't grow if someone else is growing the pools right now,
         * or the pools has reached its full capacity
         */
-       if (page_pools.epp_growing ||
-           page_pools.epp_total_pages == page_pools.epp_max_pages)
+       if (page_pools[pool_index]->epp_growing ||
+           page_pools[pool_index]->epp_total_pages ==
+           page_pools[pool_index]->epp_max_pages)
                return 0;
 
        /* if total pages is not enough, we need to grow */
-       if (page_pools.epp_total_pages < page_needed)
+       if (page_pools[pool_index]->epp_total_pages < page_needed)
                return 1;
-
        /*
         * we wanted to return 0 here if there was a shrink just
         * happened a moment ago, but this may cause deadlock if both
@@ -509,41 +616,57 @@ static int enc_pools_should_grow(int page_needed, time64_t now)
 /*
  * Export the number of free pages in the pool
  */
-int get_free_pages_in_pool(void)
+int sptlrpc_enc_pool_get_free_pages(unsigned int pool)
+{
+       return page_pools[pool]->epp_free_pages;
+}
+EXPORT_SYMBOL(sptlrpc_enc_pool_get_free_pages);
+
+/*
+ * Let outside world know if enc_pool full capacity is reached
+ */
+int __pool_is_at_full_capacity(unsigned int pool)
 {
-       return page_pools.epp_free_pages;
+       return (page_pools[pool]->epp_total_pages ==
+               page_pools[pool]->epp_max_pages);
 }
-EXPORT_SYMBOL(get_free_pages_in_pool);
 
 /*
  * Let outside world know if enc_pool full capacity is reached
  */
 int pool_is_at_full_capacity(void)
 {
-       return (page_pools.epp_total_pages == page_pools.epp_max_pages);
+       return __pool_is_at_full_capacity(PAGES_POOL);
 }
 EXPORT_SYMBOL(pool_is_at_full_capacity);
 
-static inline struct page **page_from_bulkdesc(void *array, int index)
+static inline void **page_from_bulkdesc(void *array, int index)
 {
        struct ptlrpc_bulk_desc *desc = (struct ptlrpc_bulk_desc *)array;
 
-       return &desc->bd_enc_vec[index].bv_page;
+       return (void **)&desc->bd_enc_vec[index].bv_page;
 }
 
-static inline struct page **page_from_pagearray(void *array, int index)
+static inline void **page_from_pagearray(void *array, int index)
 {
        struct page **pa = (struct page **)array;
 
-       return &pa[index];
+       return (void **)&pa[index];
+}
+
+static inline void **page_from_bufarray(void *array, int index)
+{
+       return (void **)array;
 }
 
 /*
  * we allocate the requested pages atomically.
  */
 static inline int __sptlrpc_enc_pool_get_pages(void *array, unsigned int count,
-                                       struct page **(*page_from)(void *, int))
+                                       unsigned int pool,
+                                       void **(*page_from)(void *, int))
 {
+       struct ptlrpc_enc_page_pool *page_pool = page_pools[pool];
        wait_queue_entry_t waitlink;
        unsigned long this_idle = -1;
        u64 tick_ns = 0;
@@ -551,50 +674,54 @@ static inline int __sptlrpc_enc_pool_get_pages(void *array, unsigned int count,
        int p_idx, g_idx;
        int i, rc = 0;
 
-       if (!array || count <= 0 || count > page_pools.epp_max_pages)
+       if (pool)
+               count = 1;
+
+       if (!array || count <= 0 || count > page_pool->epp_max_pages)
                return -EINVAL;
 
-       spin_lock(&page_pools.epp_lock);
+       spin_lock(&page_pool->epp_lock);
 
-       page_pools.epp_st_access++;
+       page_pool->epp_st_access++;
 again:
-       if (unlikely(page_pools.epp_free_pages < count)) {
+       if (unlikely(page_pool->epp_free_pages < count)) {
                if (tick_ns == 0)
                        tick_ns = ktime_get_ns();
 
                now = ktime_get_real_seconds();
 
-               page_pools.epp_st_missings++;
-               page_pools.epp_pages_short += count;
+               page_pool->epp_st_missings++;
+               page_pool->epp_pages_short += count;
 
-               if (enc_pools_should_grow(count, now)) {
-                       page_pools.epp_growing = 1;
+               if (enc_pools_should_grow(count, now, pool)) {
+                       page_pool->epp_growing = 1;
 
-                       spin_unlock(&page_pools.epp_lock);
-                       enc_pools_add_pages(page_pools.epp_pages_short / 2);
-                       spin_lock(&page_pools.epp_lock);
+                       spin_unlock(&page_pool->epp_lock);
+                       CDEBUG(D_SEC, "epp_pages_short: %lu\n", page_pool->epp_pages_short);
+                       enc_pools_add_pages(8, pool);
+                       spin_lock(&page_pool->epp_lock);
 
-                       page_pools.epp_growing = 0;
+                       page_pool->epp_growing = 0;
 
-                       enc_pools_wakeup();
+                       enc_pools_wakeup(pool);
                } else {
-                       if (page_pools.epp_growing) {
-                               if (++page_pools.epp_waitqlen >
-                                   page_pools.epp_st_max_wqlen)
-                                       page_pools.epp_st_max_wqlen =
-                                               page_pools.epp_waitqlen;
+                       if (page_pool->epp_growing) {
+                               if (++page_pool->epp_waitqlen >
+                                   page_pool->epp_st_max_wqlen)
+                                       page_pool->epp_st_max_wqlen =
+                                               page_pool->epp_waitqlen;
 
                                set_current_state(TASK_UNINTERRUPTIBLE);
                                init_wait(&waitlink);
-                               add_wait_queue(&page_pools.epp_waitq,
+                               add_wait_queue(&page_pool->epp_waitq,
                                               &waitlink);
 
-                               spin_unlock(&page_pools.epp_lock);
+                               spin_unlock(&page_pool->epp_lock);
                                schedule();
-                               remove_wait_queue(&page_pools.epp_waitq,
+                               remove_wait_queue(&page_pool->epp_waitq,
                                                  &waitlink);
-                               spin_lock(&page_pools.epp_lock);
-                               page_pools.epp_waitqlen--;
+                               spin_lock(&page_pool->epp_lock);
+                               page_pool->epp_waitqlen--;
                        } else {
                                /*
                                 * ptlrpcd thread should not sleep in that case,
@@ -602,14 +729,14 @@ again:
                                 * Instead, return -ENOMEM so that upper layers
                                 * will put request back in queue.
                                 */
-                               page_pools.epp_st_outofmem++;
+                               page_pool->epp_st_outofmem++;
                                GOTO(out_unlock, rc = -ENOMEM);
                        }
                }
 
-               if (page_pools.epp_pages_short < count)
+               if (page_pool->epp_pages_short < count)
                        GOTO(out_unlock, rc = -EPROTO);
-               page_pools.epp_pages_short -= count;
+               page_pool->epp_pages_short -= count;
 
                this_idle = 0;
                goto again;
@@ -619,23 +746,23 @@ again:
        if (unlikely(tick_ns)) {
                ktime_t tick = ktime_sub_ns(ktime_get(), tick_ns);
 
-               if (ktime_after(tick, page_pools.epp_st_max_wait))
-                       page_pools.epp_st_max_wait = tick;
+               if (ktime_after(tick, page_pool->epp_st_max_wait))
+                       page_pool->epp_st_max_wait = tick;
        }
 
        /* proceed with rest of allocation */
-       page_pools.epp_free_pages -= count;
+       page_pool->epp_free_pages -= count;
 
-       p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
-       g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
+       p_idx = page_pool->epp_free_pages / PAGES_PER_POOL;
+       g_idx = page_pool->epp_free_pages % PAGES_PER_POOL;
 
        for (i = 0; i < count; i++) {
-               struct page **pagep = page_from(array, i);
+               void **pagep = page_from(array, i);
 
-               if (page_pools.epp_pools[p_idx][g_idx] == NULL)
+               if (page_pool->epp_pools[p_idx][g_idx] == NULL)
                        GOTO(out_unlock, rc = -EPROTO);
-               *pagep = page_pools.epp_pools[p_idx][g_idx];
-               page_pools.epp_pools[p_idx][g_idx] = NULL;
+               *pagep = page_pool->epp_pools[p_idx][g_idx];
+               page_pool->epp_pools[p_idx][g_idx] = NULL;
 
                if (++g_idx == PAGES_PER_POOL) {
                        p_idx++;
@@ -643,24 +770,25 @@ again:
                }
        }
 
-       if (page_pools.epp_free_pages < page_pools.epp_st_lowfree)
-               page_pools.epp_st_lowfree = page_pools.epp_free_pages;
+       if (page_pool->epp_free_pages < page_pool->epp_st_lowfree)
+               page_pool->epp_st_lowfree =
+                       page_pool->epp_free_pages;
 
        /*
         * new idle index = (old * weight + new) / (weight + 1)
         */
        if (this_idle == -1) {
-               this_idle = page_pools.epp_free_pages * IDLE_IDX_MAX /
-                       page_pools.epp_total_pages;
+               this_idle = page_pool->epp_free_pages * IDLE_IDX_MAX /
+                       page_pool->epp_total_pages;
        }
-       page_pools.epp_idle_idx = (page_pools.epp_idle_idx * IDLE_IDX_WEIGHT +
-                                  this_idle) /
-               (IDLE_IDX_WEIGHT + 1);
+       page_pool->epp_idle_idx = (page_pool->epp_idle_idx *
+                       IDLE_IDX_WEIGHT + this_idle) /
+                       (IDLE_IDX_WEIGHT + 1);
 
-       page_pools.epp_last_access = ktime_get_seconds();
+       page_pool->epp_last_access = ktime_get_seconds();
 
 out_unlock:
-       spin_unlock(&page_pools.epp_lock);
+       spin_unlock(&page_pool->epp_lock);
        return rc;
 }
 
@@ -669,7 +797,7 @@ int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc)
        int rc;
 
        LASSERT(desc->bd_iov_count > 0);
-       LASSERT(desc->bd_iov_count <= page_pools.epp_max_pages);
+       LASSERT(desc->bd_iov_count <= page_pools[PAGES_POOL]->epp_max_pages);
 
        /* resent bulk, enc iov might have been allocated previously */
        if (desc->bd_enc_vec != NULL)
@@ -681,7 +809,7 @@ int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc)
                return -ENOMEM;
 
        rc = __sptlrpc_enc_pool_get_pages((void *)desc, desc->bd_iov_count,
-                                         page_from_bulkdesc);
+                                         PAGES_POOL, page_from_bulkdesc);
        if (rc) {
                OBD_FREE_LARGE(desc->bd_enc_vec,
                               desc->bd_iov_count *
@@ -694,49 +822,65 @@ EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages);
 
 int sptlrpc_enc_pool_get_pages_array(struct page **pa, unsigned int count)
 {
-       return __sptlrpc_enc_pool_get_pages((void *)pa, count,
+       return __sptlrpc_enc_pool_get_pages((void *)pa, count, PAGES_POOL,
                                            page_from_pagearray);
 }
 EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages_array);
 
+int sptlrpc_enc_pool_get_buf(void **buf, unsigned int size_bits)
+{
+       return __sptlrpc_enc_pool_get_pages((void *)buf, 0,
+                                           PPOOL_SIZE_TO_INDEX(size_bits),
+                                           page_from_bufarray);
+}
+EXPORT_SYMBOL(sptlrpc_enc_pool_get_buf);
+
 static int __sptlrpc_enc_pool_put_pages(void *array, unsigned int count,
-                                       struct page **(*page_from)(void *, int))
+                                       unsigned int pool,
+                                       void **(*page_from)(void *, int))
 {
        int p_idx, g_idx;
        int i, rc = 0;
+       struct ptlrpc_enc_page_pool *page_pool;
 
-       if (!array || count <= 0)
+       LASSERTF(pool < POOLS_COUNT, "count %u, pool %u\n", count, pool);
+       if (!array || pool >= POOLS_COUNT) {
+               CERROR("Faled to put %u pages, from pull %u\n", count, pool);
                return -EINVAL;
+       }
 
-       spin_lock(&page_pools.epp_lock);
+       page_pool = page_pools[pool];
+       LASSERTF(page_pool != NULL, "count %u, pool %u\n", count, pool);
 
-       p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
-       g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
+       spin_lock(&page_pool->epp_lock);
 
-       if (page_pools.epp_free_pages + count > page_pools.epp_total_pages)
+       p_idx = page_pool->epp_free_pages / PAGES_PER_POOL;
+       g_idx = page_pool->epp_free_pages % PAGES_PER_POOL;
+
+       if (page_pool->epp_free_pages + count > page_pool->epp_total_pages)
                GOTO(out_unlock, rc = -EPROTO);
-       if (!page_pools.epp_pools[p_idx])
+       if (!page_pool->epp_pools[p_idx])
                GOTO(out_unlock, rc = -EPROTO);
 
        for (i = 0; i < count; i++) {
-               struct page **pagep = page_from(array, i);
+               void **pagep = page_from(array, i);
 
                if (!*pagep ||
-                   page_pools.epp_pools[p_idx][g_idx] != NULL)
+                   page_pool->epp_pools[p_idx][g_idx] != NULL)
                        GOTO(out_unlock, rc = -EPROTO);
 
-               page_pools.epp_pools[p_idx][g_idx] = *pagep;
+               page_pool->epp_pools[p_idx][g_idx] = *pagep;
                if (++g_idx == PAGES_PER_POOL) {
                        p_idx++;
                        g_idx = 0;
                }
        }
 
-       page_pools.epp_free_pages += count;
-       enc_pools_wakeup();
+       page_pool->epp_free_pages += count;
+       enc_pools_wakeup(pool);
 
 out_unlock:
-       spin_unlock(&page_pools.epp_lock);
+       spin_unlock(&page_pool->epp_lock);
        return rc;
 }
 
@@ -748,7 +892,7 @@ void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
                return;
 
        rc = __sptlrpc_enc_pool_put_pages((void *)desc, desc->bd_iov_count,
-                                         page_from_bulkdesc);
+                                         PAGES_POOL, page_from_bulkdesc);
        if (rc)
                CDEBUG(D_SEC, "error putting pages in enc pool: %d\n", rc);
 
@@ -761,7 +905,7 @@ void sptlrpc_enc_pool_put_pages_array(struct page **pa, unsigned int count)
 {
        int rc;
 
-       rc = __sptlrpc_enc_pool_put_pages((void *)pa, count,
+       rc = __sptlrpc_enc_pool_put_pages((void *)pa, count, PAGES_POOL,
                                          page_from_pagearray);
 
        if (rc)
@@ -769,6 +913,19 @@ void sptlrpc_enc_pool_put_pages_array(struct page **pa, unsigned int count)
 }
 EXPORT_SYMBOL(sptlrpc_enc_pool_put_pages_array);
 
+void sptlrpc_enc_pool_put_buf(void *buf, unsigned int size_bits)
+{
+       int rc;
+
+       rc = __sptlrpc_enc_pool_put_pages(buf, 1,
+                                         PPOOL_SIZE_TO_INDEX(size_bits),
+                                         page_from_bufarray);
+       if (rc)
+               CDEBUG(D_SEC, "error putting pages in enc pool: %d\n", rc);
+}
+EXPORT_SYMBOL(sptlrpc_enc_pool_put_buf);
+
+
 /*
  * we don't do much stuff for add_user/del_user anymore, except adding some
  * initial pages in add_user() if current pools are empty, rest would be
@@ -778,118 +935,153 @@ int sptlrpc_enc_pool_add_user(void)
 {
        int need_grow = 0;
 
-       spin_lock(&page_pools.epp_lock);
-       if (page_pools.epp_growing == 0 && page_pools.epp_total_pages == 0) {
-               page_pools.epp_growing = 1;
+       spin_lock(&page_pools[PAGES_POOL]->epp_lock);
+       if (page_pools[PAGES_POOL]->epp_growing == 0 &&
+               page_pools[PAGES_POOL]->epp_total_pages == 0) {
+               page_pools[PAGES_POOL]->epp_growing = 1;
                need_grow = 1;
        }
-       spin_unlock(&page_pools.epp_lock);
+       spin_unlock(&page_pools[PAGES_POOL]->epp_lock);
+
 
        if (need_grow) {
                enc_pools_add_pages(PTLRPC_MAX_BRW_PAGES +
-                                   PTLRPC_MAX_BRW_PAGES);
+                                   PTLRPC_MAX_BRW_PAGES, 0);
 
-               spin_lock(&page_pools.epp_lock);
-               page_pools.epp_growing = 0;
-               enc_pools_wakeup();
-               spin_unlock(&page_pools.epp_lock);
+               spin_lock(&page_pools[PAGES_POOL]->epp_lock);
+               page_pools[PAGES_POOL]->epp_growing = 0;
+               enc_pools_wakeup(PAGES_POOL);
+               spin_unlock(&page_pools[PAGES_POOL]->epp_lock);
        }
        return 0;
 }
 EXPORT_SYMBOL(sptlrpc_enc_pool_add_user);
 
-static inline void enc_pools_alloc(void)
+static inline void enc_pools_alloc(struct ptlrpc_enc_page_pool *pool)
 {
-       LASSERT(page_pools.epp_max_pools);
-       OBD_ALLOC_LARGE(page_pools.epp_pools,
-                       page_pools.epp_max_pools *
-                       sizeof(*page_pools.epp_pools));
+       LASSERT(pool->epp_max_pools);
+       OBD_ALLOC_LARGE(pool->epp_pools,
+                       pool->epp_max_pools *
+                       sizeof(*pool->epp_pools));
 }
 
-static inline void enc_pools_free(void)
+static inline void enc_pools_free(unsigned int i)
 {
-       LASSERT(page_pools.epp_max_pools);
-       LASSERT(page_pools.epp_pools);
+       LASSERT(page_pools[i]->epp_max_pools);
+       LASSERT(page_pools[i]->epp_pools);
 
-       OBD_FREE_LARGE(page_pools.epp_pools,
-                      page_pools.epp_max_pools *
-                      sizeof(*page_pools.epp_pools));
+       OBD_FREE_LARGE(page_pools[i]->epp_pools,
+                      page_pools[i]->epp_max_pools *
+                      sizeof(*page_pools[i]->epp_pools));
 }
 
 int sptlrpc_enc_pool_init(void)
 {
-       int rc;
-
-       page_pools.epp_max_pages = cfs_totalram_pages() / 8;
-       if (enc_pool_max_memory_mb > 0 &&
-           enc_pool_max_memory_mb <= (cfs_totalram_pages() >> mult))
-               page_pools.epp_max_pages = enc_pool_max_memory_mb << mult;
-
-       page_pools.epp_max_pools = npages_to_npools(page_pools.epp_max_pages);
-
-       init_waitqueue_head(&page_pools.epp_waitq);
-       page_pools.epp_waitqlen = 0;
-       page_pools.epp_pages_short = 0;
-
-       page_pools.epp_growing = 0;
-
-       page_pools.epp_idle_idx = 0;
-       page_pools.epp_last_shrink = ktime_get_seconds();
-       page_pools.epp_last_access = ktime_get_seconds();
-
-       spin_lock_init(&page_pools.epp_lock);
-       page_pools.epp_total_pages = 0;
-       page_pools.epp_free_pages = 0;
+       int pool_index = 0, to_revert;
+       int rc = 0;
+       struct ptlrpc_enc_page_pool *pool;
+
+       ENTRY;
+       OBD_ALLOC(page_pools, POOLS_COUNT * sizeof(*page_pools));
+       if (page_pools == NULL)
+               RETURN(-ENOMEM);
+       for (pool_index = 0; pool_index < POOLS_COUNT; pool_index++) {
+               OBD_ALLOC(page_pools[pool_index], sizeof(**page_pools));
+               if (page_pools[pool_index] == NULL)
+                       GOTO(fail, rc = -ENOMEM);
+
+               pool = page_pools[pool_index];
+               pool->epp_max_pages =
+                       cfs_totalram_pages() / POOLS_COUNT;
+               if (enc_pool_max_memory_mb > 0 &&
+                   enc_pool_max_memory_mb <= (cfs_totalram_pages() >> mult))
+                       pool->epp_max_pages =
+                               enc_pool_max_memory_mb << mult;
+
+               pool->epp_max_pools =
+                       npages_to_npools(pool->epp_max_pages);
+
+               init_waitqueue_head(&pool->epp_waitq);
+               pool->epp_last_shrink = ktime_get_seconds();
+               pool->epp_last_access = ktime_get_seconds();
+
+               spin_lock_init(&pool->epp_lock);
+               pool->epp_st_max_wait = ktime_set(0, 0);
+
+               enc_pools_alloc(pool);
+               CDEBUG(D_SEC, "Allocated pool %i\n", pool_index);
+               if (pool->epp_pools == NULL)
+                       GOTO(fail, rc = -ENOMEM);
+               /* Pass pool number as part of pools_shrinker_seeks value */
+#ifdef HAVE_SHRINKER_COUNT
+               pool->pool_shrinker.count_objects = enc_pools_shrink_count;
+               pool->pool_shrinker.scan_objects = enc_pools_shrink_scan;
+#else
+               pool->pool_shrinker.shrink = enc_pools_shrink;
+#endif
+               pool->pool_shrinker.seeks = INDEX_TO_SEEKS(pool_index);
 
-       page_pools.epp_st_max_pages = 0;
-       page_pools.epp_st_grows = 0;
-       page_pools.epp_st_grow_fails = 0;
-       page_pools.epp_st_shrinks = 0;
-       page_pools.epp_st_access = 0;
-       page_pools.epp_st_missings = 0;
-       page_pools.epp_st_lowfree = 0;
-       page_pools.epp_st_max_wqlen = 0;
-       page_pools.epp_st_max_wait = ktime_set(0, 0);
-       page_pools.epp_st_outofmem = 0;
+               rc = register_shrinker(&pool->pool_shrinker);
+               if (rc)
+                       GOTO(fail, rc);
 
-       enc_pools_alloc();
-       if (page_pools.epp_pools == NULL)
-               return -ENOMEM;
+               mutex_init(&pool->add_pages_mutex);
+       }
 
-       rc = register_shrinker(&pools_shrinker);
-       if (rc)
-               enc_pools_free();
+       RETURN(0);
+fail:
+       to_revert = pool_index;
+       for (pool_index = 0; pool_index <= to_revert; pool_index++) {
+               pool = page_pools[pool_index];
+               if (pool) {
+                       if (pool->epp_pools) 
+                               enc_pools_free(pool_index);
+                       OBD_FREE(pool, sizeof(**page_pools));
+               }
+       }
+       OBD_FREE(page_pools, POOLS_COUNT * sizeof(*page_pools));
 
-       return rc;
+       RETURN(rc);
 }
 
 void sptlrpc_enc_pool_fini(void)
 {
        unsigned long cleaned, npools;
+       int pool_index;
+       struct ptlrpc_enc_page_pool *pool;
+
+       for (pool_index = 0; pool_index < POOLS_COUNT; pool_index++) {
+               pool = page_pools[pool_index];
+               unregister_shrinker(&pool->pool_shrinker);
+               LASSERT(pool->epp_pools);
+               LASSERT(pool->epp_total_pages == pool->epp_free_pages);
+
+               npools = npages_to_npools(pool->epp_total_pages);
+               cleaned = enc_pools_cleanup(pool->epp_pools,
+                                           npools, pool_index);
+               LASSERT(cleaned == pool->epp_total_pages);
+
+               enc_pools_free(pool_index);
+
+               if (pool->epp_st_access > 0) {
+                       CDEBUG(D_SEC,
+                              "max pages %lu, grows %u, grow fails %u, shrinks %u, access %lu, missing %lu, max qlen %u, max wait ms %lld, out of mem %lu\n",
+                              pool->epp_st_max_pages,
+                              pool->epp_st_grows,
+                              pool->epp_st_grow_fails,
+                              pool->epp_st_shrinks,
+                              pool->epp_st_access,
+                              pool->epp_st_missings,
+                              pool->epp_st_max_wqlen,
+                              ktime_to_ms(pool->epp_st_max_wait),
+                              pool->epp_st_outofmem);
+               }
 
-       LASSERT(page_pools.epp_pools);
-       LASSERT(page_pools.epp_total_pages == page_pools.epp_free_pages);
-
-       unregister_shrinker(&pools_shrinker);
-
-       npools = npages_to_npools(page_pools.epp_total_pages);
-       cleaned = enc_pools_cleanup(page_pools.epp_pools, npools);
-       LASSERT(cleaned == page_pools.epp_total_pages);
-
-       enc_pools_free();
-
-       if (page_pools.epp_st_access > 0) {
-               CDEBUG(D_SEC,
-                      "max pages %lu, grows %u, grow fails %u, shrinks %u, access %lu, missing %lu, max qlen %u, max wait ms %lld, out of mem %lu\n",
-                      page_pools.epp_st_max_pages, page_pools.epp_st_grows,
-                      page_pools.epp_st_grow_fails,
-                      page_pools.epp_st_shrinks, page_pools.epp_st_access,
-                      page_pools.epp_st_missings, page_pools.epp_st_max_wqlen,
-                      ktime_to_ms(page_pools.epp_st_max_wait),
-                      page_pools.epp_st_outofmem);
+               OBD_FREE(pool, sizeof(**page_pools));
        }
-}
 
+       OBD_FREE(page_pools, POOLS_COUNT * sizeof(*page_pools));
+}
 
 static int cfs_hash_alg_id[] = {
        [BULK_HASH_ALG_NULL]    = CFS_HASH_ALG_NULL,
index 597036a..8e8e875 100644 (file)
@@ -390,11 +390,15 @@ int sptlrpc_lprocfs_cliobd_attach(struct obd_device *obd)
 }
 EXPORT_SYMBOL(sptlrpc_lprocfs_cliobd_attach);
 
-LDEBUGFS_SEQ_FOPS_RO(sptlrpc_proc_enc_pool);
+LDEBUGFS_SEQ_FOPS_RO(encrypt_page_pools);
+LDEBUGFS_SEQ_FOPS_RO(page_pools);
 
 static struct ldebugfs_vars sptlrpc_lprocfs_vars[] = {
        { .name =       "encrypt_page_pools",
-         .fops =       &sptlrpc_proc_enc_pool_fops     },
+         .fops =       &encrypt_page_pools_fops        },
+       { .name =       "page_pools",
+         .fops =       &page_pools_fops        },
+
        { NULL }
 };
 
index 0dffc84..193c11c 100755 (executable)
@@ -30429,6 +30429,14 @@ test_442() {
 }
 run_test 442 "truncate vs read/write should not panic"
 
+test_460d() {
+       verify_yaml_available || skip_env "YAML verification not installed"
+       $LCTL get_param -n sptlrpc.page_pools
+       $LCTL get_param -n sptlrpc.page_pools | verify_yaml ||
+               error "The output of encrypt_page_pools is not an valid YAML"
+}
+run_test 460d "Check encrypt pools output"
+
 prep_801() {
        [[ $MDS1_VERSION -lt $(version_code 2.9.55) ]] ||
        [[ $OST1_VERSION -lt $(version_code 2.9.55) ]] &&