Whamcloud - gitweb
LU-15003 sec: use enc pool for bounce pages 49/47149/11
authorSebastien Buisson <sbuisson@ddn.com>
Fri, 25 Mar 2022 08:24:32 +0000 (09:24 +0100)
committerOleg Drokin <green@whamcloud.com>
Thu, 1 Sep 2022 05:52:52 +0000 (05:52 +0000)
Take pages from the enc pool so that they can be used for
encryption, instead of letting llcrypt allocate a bounce page
for every call to the encryption primitives.
Pages are taken from the enc pool a whole array at a time.

This requires modifying the llcrypt API, so that new functions
llcrypt_encrypt_page() and llcrypt_decrypt_page() are exported.
These functions take a destination page parameter.
Until this change is pushed in upstream fscrypt, this performance
optimization is not available when Lustre is built and run against
the in-kernel fscrypt lib.

Using enc pool for bounce pages is a worthwhile performance win. Here
are performance penalties incurred by encryption, without this patch,
and with this patch:

                     ||=====================|=====================||
                     || Performance penalty | Performance penalty ||
                     ||    without patch    |     with patch      ||
||==========================================|=====================||
|| Bandwidth – write |        30%-35%       |   5%-10% large IOs  ||
||                   |                      |    15% small IOs    ||
||------------------------------------------|---------------------||
|| Bandwidth – read  |         20%          |    less than 10%    ||
||------------------------------------------|---------------------||
||      Metadata     |         N/A          |         5%          ||
|| creat,stat,remove |                      |                     ||
||==========================================|=====================||

Signed-off-by: Sebastien Buisson <sbuisson@ddn.com>
Signed-off-by: James Simmons <jsimmons@infradead.org>
Change-Id: I3078d0a3349b3d24acc5e61ab53ac434b5f9d0e3
Reviewed-on: https://review.whamcloud.com/47149
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
libcfs/include/libcfs/crypto/llcrypt.h
libcfs/libcfs/crypto/crypto.c
lustre/include/lustre_crypto.h
lustre/include/lustre_sec.h
lustre/llite/dir.c
lustre/osc/osc_request.c
lustre/ptlrpc/sec_bulk.c

index 5df68c4..893ac1e 100644 (file)
@@ -121,16 +121,26 @@ extern struct page *llcrypt_encrypt_pagecache_blocks(struct page *page,
                                                     unsigned int len,
                                                     unsigned int offs,
                                                     gfp_t gfp_flags);
-extern int llcrypt_encrypt_block_inplace(const struct inode *inode,
-                                        struct page *page, unsigned int len,
-                                        unsigned int offs, u64 lblk_num,
-                                        gfp_t gfp_flags);
+extern int llcrypt_encrypt_block(const struct inode *inode, struct page *src,
+                        struct page *dst, unsigned int len,
+                        unsigned int offs, u64 lblk_num, gfp_t gfp_flags);
 
 extern int llcrypt_decrypt_pagecache_blocks(struct page *page, unsigned int len,
                                            unsigned int offs);
-extern int llcrypt_decrypt_block_inplace(const struct inode *inode,
-                                        struct page *page, unsigned int len,
-                                        unsigned int offs, u64 lblk_num);
+
+extern int llcrypt_decrypt_block(const struct inode *inode, struct page *src,
+                        struct page *dst, unsigned int len,
+                        unsigned int offs, u64 lblk_num, gfp_t gfp_flags);
+
+static inline int llcrypt_decrypt_block_inplace(const struct inode *inode,
+                                               struct page *page,
+                                               unsigned int len,
+                                               unsigned int offs,
+                                               u64 lblk_num)
+{
+       return llcrypt_decrypt_block(inode, page, page, len, offs, lblk_num,
+                                    GFP_NOFS);
+}
 
 static inline bool llcrypt_is_bounce_page(struct page *page)
 {
@@ -336,11 +346,10 @@ static inline struct page *llcrypt_encrypt_pagecache_blocks(struct page *page,
        return ERR_PTR(-EOPNOTSUPP);
 }
 
-static inline int llcrypt_encrypt_block_inplace(const struct inode *inode,
-                                               struct page *page,
-                                               unsigned int len,
-                                               unsigned int offs, u64 lblk_num,
-                                               gfp_t gfp_flags)
+static inline int llcrypt_encrypt_block(const struct inode *inode,
+                                       struct page *src, struct page *dst,
+                                       unsigned int len, unsigned int offs,
+                                       u64 lblk_num, gfp_t gfp_flags)
 {
        return -EOPNOTSUPP;
 }
@@ -352,6 +361,14 @@ static inline int llcrypt_decrypt_pagecache_blocks(struct page *page,
        return -EOPNOTSUPP;
 }
 
+static inline int llcrypt_decrypt_block(const struct inode *inode,
+                                       struct page *src, struct page *dst,
+                                       unsigned int len, unsigned int offs,
+                                       u64 lblk_num, gfp_t gfp_flags)
+{
+       return -EOPNOTSUPP;
+}
+
 static inline int llcrypt_decrypt_block_inplace(const struct inode *inode,
                                                struct page *page,
                                                unsigned int len,
index d079b81..3d18715 100644 (file)
@@ -272,9 +272,10 @@ struct page *llcrypt_encrypt_pagecache_blocks(struct page *page,
 EXPORT_SYMBOL(llcrypt_encrypt_pagecache_blocks);
 
 /**
- * llcrypt_encrypt_block_inplace() - Encrypt a filesystem block in-place
+ * llcrypt_encrypt_block() - Encrypt a filesystem block in a page
  * @inode:     The inode to which this block belongs
- * @page:      The page containing the block to encrypt
+ * @src:       The page containing the block to encrypt
+ * @dst:       The page which will contain the encrypted data
  * @len:       Size of block to encrypt.  Doesn't need to be a multiple of the
  *             fs block size, but must be a multiple of LL_CRYPTO_BLOCK_SIZE.
  * @offs:      Byte offset within @page at which the block to encrypt begins
@@ -285,17 +286,18 @@ EXPORT_SYMBOL(llcrypt_encrypt_pagecache_blocks);
  * Encrypt a possibly-compressed filesystem block that is located in an
  * arbitrary page, not necessarily in the original pagecache page.  The @inode
  * and @lblk_num must be specified, as they can't be determined from @page.
+ * The decrypted data will be stored in @dst.
  *
  * Return: 0 on success; -errno on failure
  */
-int llcrypt_encrypt_block_inplace(const struct inode *inode, struct page *page,
-                                 unsigned int len, unsigned int offs,
-                                 u64 lblk_num, gfp_t gfp_flags)
+int llcrypt_encrypt_block(const struct inode *inode, struct page *src,
+                         struct page *dst, unsigned int len, unsigned int offs,
+                         u64 lblk_num, gfp_t gfp_flags)
 {
-       return llcrypt_crypt_block(inode, FS_ENCRYPT, lblk_num, page, page,
+       return llcrypt_crypt_block(inode, FS_ENCRYPT, lblk_num, src, dst,
                                   len, offs, gfp_flags);
 }
-EXPORT_SYMBOL(llcrypt_encrypt_block_inplace);
+EXPORT_SYMBOL(llcrypt_encrypt_block);
 
 /**
  * llcrypt_decrypt_pagecache_blocks() - Decrypt filesystem blocks in a pagecache page
@@ -341,9 +343,10 @@ int llcrypt_decrypt_pagecache_blocks(struct page *page, unsigned int len,
 EXPORT_SYMBOL(llcrypt_decrypt_pagecache_blocks);
 
 /**
- * llcrypt_decrypt_block_inplace() - Decrypt a filesystem block in-place
+ * llcrypt_decrypt_block() - Cache a decrypted filesystem block in a page
  * @inode:     The inode to which this block belongs
- * @page:      The page containing the block to decrypt
+ * @src:       The page containing the block to decrypt
+ * @dst:       The page which will contain the plain data
  * @len:       Size of block to decrypt.  Doesn't need to be a multiple of the
  *             fs block size, but must be a multiple of LL_CRYPTO_BLOCK_SIZE.
  * @offs:      Byte offset within @page at which the block to decrypt begins
@@ -353,17 +356,18 @@ EXPORT_SYMBOL(llcrypt_decrypt_pagecache_blocks);
  * Decrypt a possibly-compressed filesystem block that is located in an
  * arbitrary page, not necessarily in the original pagecache page.  The @inode
  * and @lblk_num must be specified, as they can't be determined from @page.
+ * The encrypted data will be stored in @dst.
  *
  * Return: 0 on success; -errno on failure
  */
-int llcrypt_decrypt_block_inplace(const struct inode *inode, struct page *page,
-                                 unsigned int len, unsigned int offs,
-                                 u64 lblk_num)
+int llcrypt_decrypt_block(const struct inode *inode, struct page *src,
+                         struct page *dst, unsigned int len, unsigned int offs,
+                         u64 lblk_num, gfp_t gfp_flags)
 {
-       return llcrypt_crypt_block(inode, FS_DECRYPT, lblk_num, page, page,
-                                  len, offs, GFP_NOFS);
+       return llcrypt_crypt_block(inode, FS_DECRYPT, lblk_num, src, dst,
+                                  len, offs, gfp_flags);
 }
-EXPORT_SYMBOL(llcrypt_decrypt_block_inplace);
+EXPORT_SYMBOL(llcrypt_decrypt_block);
 
 /*
  * Validate dentries in encrypted directories to make sure we aren't potentially
index 5d25d11..d048470 100644 (file)
@@ -50,8 +50,6 @@
 #define llcrypt_has_encryption_key(inode) fscrypt_has_encryption_key(inode)
 #define llcrypt_encrypt_pagecache_blocks(page, len, offs, gfp_flags)   \
        fscrypt_encrypt_pagecache_blocks(page, len, offs, gfp_flags)
-#define llcrypt_encrypt_block_inplace(inode, page, len, offs, lblk, gfp_flags) \
-       fscrypt_encrypt_block_inplace(inode, page, len, offs, lblk, gfp_flags)
 #define llcrypt_decrypt_pagecache_blocks(page, len, offs)      \
        fscrypt_decrypt_pagecache_blocks(page, len, offs)
 #define llcrypt_decrypt_block_inplace(inode, page, len, offs, lblk_num)        \
index b70563c..831d351 100644 (file)
@@ -1165,7 +1165,9 @@ int sptlrpc_cli_install_rvs_ctx(struct obd_import *imp,
 int sptlrpc_enc_pool_add_user(void);
 int sptlrpc_enc_pool_del_user(void);
 int  sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc);
+int sptlrpc_enc_pool_get_pages_array(struct page **pa, unsigned int count);
 void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc);
+void sptlrpc_enc_pool_put_pages_array(struct page **pa, unsigned int count);
 int get_free_pages_in_pool(void);
 int pool_is_at_full_capacity(void);
 
index de7c285..f430ebb 100644 (file)
@@ -2317,16 +2317,31 @@ out_detach:
        case LL_IOC_ADD_ENCRYPTION_KEY:
                if (!ll_sbi_has_encrypt(ll_i2sbi(inode)))
                        return -EOPNOTSUPP;
-               return llcrypt_ioctl_add_key(file, (void __user *)arg);
+               rc = llcrypt_ioctl_add_key(file, (void __user *)arg);
+#ifdef CONFIG_LL_ENCRYPTION
+               if (!rc)
+                       sptlrpc_enc_pool_add_user();
+#endif
+               return rc;
        case LL_IOC_REMOVE_ENCRYPTION_KEY:
                if (!ll_sbi_has_encrypt(ll_i2sbi(inode)))
                        return -EOPNOTSUPP;
-               return llcrypt_ioctl_remove_key(file, (void __user *)arg);
+               rc = llcrypt_ioctl_remove_key(file, (void __user *)arg);
+#ifdef CONFIG_LL_ENCRYPTION
+               if (!rc)
+                       sptlrpc_enc_pool_del_user();
+#endif
+               return rc;
        case LL_IOC_REMOVE_ENCRYPTION_KEY_ALL_USERS:
                if (!ll_sbi_has_encrypt(ll_i2sbi(inode)))
                        return -EOPNOTSUPP;
-               return llcrypt_ioctl_remove_key_all_users(file,
-                                                         (void __user *)arg);
+               rc = llcrypt_ioctl_remove_key_all_users(file,
+                                                       (void __user *)arg);
+#ifdef CONFIG_LL_ENCRYPTION
+               if (!rc)
+                       sptlrpc_enc_pool_del_user();
+#endif
+               return rc;
        case LL_IOC_GET_ENCRYPTION_KEY_STATUS:
                if (!ll_sbi_has_encrypt(ll_i2sbi(inode)))
                        return -EOPNOTSUPP;
index 5847bf2..fded520 100644 (file)
@@ -1369,22 +1369,124 @@ static int osc_checksum_bulk_rw(const char *obd_name,
        RETURN(rc);
 }
 
+#ifdef CONFIG_LL_ENCRYPTION
+/**
+ * osc_encrypt_pagecache_blocks() - overlay to llcrypt_encrypt_pagecache_blocks
+ * @srcpage:      The locked pagecache page containing the block(s) to encrypt
+ * @dstpage:      The page to put encryption result
+ * @len:       Total size of the block(s) to encrypt.  Must be a nonzero
+ *             multiple of the filesystem's block size.
+ * @offs:      Byte offset within @page of the first block to encrypt.  Must be
+ *             a multiple of the filesystem's block size.
+ * @gfp_flags: Memory allocation flags
+ *
+ * This overlay function is necessary to be able to provide our own bounce page.
+ */
+static struct page *osc_encrypt_pagecache_blocks(struct page *srcpage,
+                                                struct page *dstpage,
+                                                unsigned int len,
+                                                unsigned int offs,
+                                                gfp_t gfp_flags)
+
+{
+       const struct inode *inode = srcpage->mapping->host;
+       const unsigned int blockbits = inode->i_blkbits;
+       const unsigned int blocksize = 1 << blockbits;
+       u64 lblk_num = ((u64)srcpage->index << (PAGE_SHIFT - blockbits)) +
+               (offs >> blockbits);
+       unsigned int i;
+       int err;
+
+       if (unlikely(!dstpage))
+               return llcrypt_encrypt_pagecache_blocks(srcpage, len, offs,
+                                                       gfp_flags);
+
+       if (WARN_ON_ONCE(!PageLocked(srcpage)))
+               return ERR_PTR(-EINVAL);
+
+       if (WARN_ON_ONCE(len <= 0 || !IS_ALIGNED(len | offs, blocksize)))
+               return ERR_PTR(-EINVAL);
+
+       /* Set PagePrivate2 for disambiguation in
+        * osc_finalize_bounce_page().
+        * It means cipher page was not allocated by llcrypt.
+        */
+       SetPagePrivate2(dstpage);
+
+       for (i = offs; i < offs + len; i += blocksize, lblk_num++) {
+               err = llcrypt_encrypt_block(inode, srcpage, dstpage, blocksize,
+                                           i, lblk_num, gfp_flags);
+               if (err)
+                       return ERR_PTR(err);
+       }
+       SetPagePrivate(dstpage);
+       set_page_private(dstpage, (unsigned long)srcpage);
+       return dstpage;
+}
+
+/**
+ * osc_finalize_bounce_page() - overlay to llcrypt_finalize_bounce_page
+ *
+ * This overlay function is necessary to handle bounce pages
+ * allocated by ourselves.
+ */
+static inline void osc_finalize_bounce_page(struct page **pagep)
+{
+       struct page *page = *pagep;
+
+       /* PagePrivate2 was set in osc_encrypt_pagecache_blocks
+        * to indicate the cipher page was allocated by ourselves.
+        * So we must not free it via llcrypt.
+        */
+       if (unlikely(!page || !PagePrivate2(page)))
+               return llcrypt_finalize_bounce_page(pagep);
+
+       if (llcrypt_is_bounce_page(page)) {
+               *pagep = llcrypt_pagecache_page(page);
+               ClearPagePrivate2(page);
+               set_page_private(page, (unsigned long)NULL);
+               ClearPagePrivate(page);
+       }
+}
+#else /* !CONFIG_LL_ENCRYPTION */
+#define osc_encrypt_pagecache_blocks(srcpage, dstpage, len, offs, gfp_flags) \
+       llcrypt_encrypt_pagecache_blocks(srcpage, len, offs, gfp_flags)
+#define osc_finalize_bounce_page(page) llcrypt_finalize_bounce_page(page)
+#endif
+
 static inline void osc_release_bounce_pages(struct brw_page **pga,
                                            u32 page_count)
 {
 #ifdef HAVE_LUSTRE_CRYPTO
-       int i;
+       struct page **pa = NULL;
+       int i, j = 0;
+
+#ifdef CONFIG_LL_ENCRYPTION
+       if (PageChecked(pga[0]->pg)) {
+               OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count);
+               if (!pa)
+                       return;
+       }
+#endif
 
        for (i = 0; i < page_count; i++) {
-               /* Bounce pages allocated by a call to
-                * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
+               /* Bounce pages used by osc_encrypt_pagecache_blocks()
+                * called from osc_brw_prep_request()
                 * are identified thanks to the PageChecked flag.
                 */
-               if (PageChecked(pga[i]->pg))
-                       llcrypt_finalize_bounce_page(&pga[i]->pg);
+               if (PageChecked(pga[i]->pg)) {
+                       if (pa)
+                               pa[j++] = pga[i]->pg;
+                       osc_finalize_bounce_page(&pga[i]->pg);
+               }
                pga[i]->count -= pga[i]->bp_count_diff;
                pga[i]->off += pga[i]->bp_off_diff;
        }
+
+       if (pa) {
+               sptlrpc_enc_pool_put_pages_array(pa, j);
+               OBD_FREE_PTR_ARRAY_LARGE(pa, page_count);
+       }
 #endif
 }
 
@@ -1436,6 +1538,24 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
 
        if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode) &&
            llcrypt_has_encryption_key(inode)) {
+               struct page **pa = NULL;
+
+#ifdef CONFIG_LL_ENCRYPTION
+               OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count);
+               if (pa == NULL) {
+                       ptlrpc_request_free(req);
+                       RETURN(-ENOMEM);
+               }
+
+               rc = sptlrpc_enc_pool_get_pages_array(pa, page_count);
+               if (rc) {
+                       CDEBUG(D_SEC, "failed to allocate from enc pool: %d\n",
+                              rc);
+                       ptlrpc_request_free(req);
+                       RETURN(rc);
+               }
+#endif
+
                for (i = 0; i < page_count; i++) {
                        struct brw_page *brwpg = pga[i];
                        struct page *data_page = NULL;
@@ -1465,9 +1585,10 @@ retry_encrypt:
                                brwpg->pg->index = clpage->cp_page_index;
                        }
                        data_page =
-                               llcrypt_encrypt_pagecache_blocks(brwpg->pg,
-                                                                nunits, 0,
-                                                                GFP_NOFS);
+                               osc_encrypt_pagecache_blocks(brwpg->pg,
+                                                           pa ? pa[i] : NULL,
+                                                           nunits, 0,
+                                                           GFP_NOFS);
                        if (directio) {
                                brwpg->pg->mapping = map_orig;
                                brwpg->pg->index = index_orig;
@@ -1481,6 +1602,12 @@ retry_encrypt:
                                        rc = 0;
                                        goto retry_encrypt;
                                }
+                               if (pa) {
+                                       sptlrpc_enc_pool_put_pages_array(pa + i,
+                                                               page_count - i);
+                                       OBD_FREE_PTR_ARRAY_LARGE(pa,
+                                                                page_count);
+                               }
                                ptlrpc_request_free(req);
                                RETURN(rc);
                        }
@@ -1505,6 +1632,9 @@ retry_encrypt:
                        brwpg->bp_off_diff = brwpg->off & ~PAGE_MASK;
                        brwpg->off = brwpg->off & PAGE_MASK;
                }
+
+               if (pa)
+                       OBD_FREE_PTR_ARRAY_LARGE(pa, page_count);
        } else if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
                struct osc_async_page *oap = brw_page2oap(pga[0]);
                struct cl_page *clpage = oap2cl_page(oap);
index c0c6a14..bdb65dd 100644 (file)
@@ -472,10 +472,9 @@ static inline void enc_pools_wakeup(void)
 {
        assert_spin_locked(&page_pools.epp_lock);
 
-       if (unlikely(page_pools.epp_waitqlen)) {
-               LASSERT(waitqueue_active(&page_pools.epp_waitq));
+       /* waitqueue_active */
+       if (unlikely(waitqueue_active(&page_pools.epp_waitq)))
                wake_up(&page_pools.epp_waitq);
-       }
 }
 
 static int enc_pools_should_grow(int page_needed, time64_t now)
@@ -525,44 +524,50 @@ int pool_is_at_full_capacity(void)
 }
 EXPORT_SYMBOL(pool_is_at_full_capacity);
 
+static inline struct page **page_from_bulkdesc(void *array, int index)
+{
+       struct ptlrpc_bulk_desc *desc = (struct ptlrpc_bulk_desc *)array;
+
+       return &desc->bd_enc_vec[index].bv_page;
+}
+
+static inline struct page **page_from_pagearray(void *array, int index)
+{
+       struct page **pa = (struct page **)array;
+
+       return &pa[index];
+}
+
 /*
  * we allocate the requested pages atomically.
  */
-int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc)
+static inline int __sptlrpc_enc_pool_get_pages(void *array, unsigned int count,
+                                       struct page **(*page_from)(void *, int))
 {
        wait_queue_entry_t waitlink;
        unsigned long this_idle = -1;
        u64 tick_ns = 0;
        time64_t now;
        int p_idx, g_idx;
-       int i;
+       int i, rc = 0;
 
-       LASSERT(desc->bd_iov_count > 0);
-       LASSERT(desc->bd_iov_count <= page_pools.epp_max_pages);
-
-       /* resent bulk, enc iov might have been allocated previously */
-       if (desc->bd_enc_vec != NULL)
-               return 0;
-
-       OBD_ALLOC_LARGE(desc->bd_enc_vec,
-                 desc->bd_iov_count * sizeof(*desc->bd_enc_vec));
-       if (desc->bd_enc_vec == NULL)
-               return -ENOMEM;
+       if (!array || count <= 0 || count > page_pools.epp_max_pages)
+               return -EINVAL;
 
        spin_lock(&page_pools.epp_lock);
 
        page_pools.epp_st_access++;
 again:
-       if (unlikely(page_pools.epp_free_pages < desc->bd_iov_count)) {
+       if (unlikely(page_pools.epp_free_pages < count)) {
                if (tick_ns == 0)
                        tick_ns = ktime_get_ns();
 
                now = ktime_get_real_seconds();
 
                page_pools.epp_st_missings++;
-               page_pools.epp_pages_short += desc->bd_iov_count;
+               page_pools.epp_pages_short += count;
 
-               if (enc_pools_should_grow(desc->bd_iov_count, now)) {
+               if (enc_pools_should_grow(count, now)) {
                        page_pools.epp_growing = 1;
 
                        spin_unlock(&page_pools.epp_lock);
@@ -577,7 +582,7 @@ again:
                                if (++page_pools.epp_waitqlen >
                                    page_pools.epp_st_max_wqlen)
                                        page_pools.epp_st_max_wqlen =
-                                                       page_pools.epp_waitqlen;
+                                               page_pools.epp_waitqlen;
 
                                set_current_state(TASK_UNINTERRUPTIBLE);
                                init_wait(&waitlink);
@@ -588,7 +593,6 @@ again:
                                schedule();
                                remove_wait_queue(&page_pools.epp_waitq,
                                                  &waitlink);
-                               LASSERT(page_pools.epp_waitqlen > 0);
                                spin_lock(&page_pools.epp_lock);
                                page_pools.epp_waitqlen--;
                        } else {
@@ -599,17 +603,13 @@ again:
                                 * will put request back in queue.
                                 */
                                page_pools.epp_st_outofmem++;
-                               spin_unlock(&page_pools.epp_lock);
-                               OBD_FREE_LARGE(desc->bd_enc_vec,
-                                              desc->bd_iov_count *
-                                               sizeof(*desc->bd_enc_vec));
-                               desc->bd_enc_vec = NULL;
-                               return -ENOMEM;
+                               GOTO(out_unlock, rc = -ENOMEM);
                        }
                }
 
-               LASSERT(page_pools.epp_pages_short >= desc->bd_iov_count);
-               page_pools.epp_pages_short -= desc->bd_iov_count;
+               if (page_pools.epp_pages_short < count)
+                       GOTO(out_unlock, rc = -EPROTO);
+               page_pools.epp_pages_short -= count;
 
                this_idle = 0;
                goto again;
@@ -624,15 +624,17 @@ again:
        }
 
        /* proceed with rest of allocation */
-       page_pools.epp_free_pages -= desc->bd_iov_count;
+       page_pools.epp_free_pages -= count;
 
        p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
        g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
 
-       for (i = 0; i < desc->bd_iov_count; i++) {
-               LASSERT(page_pools.epp_pools[p_idx][g_idx] != NULL);
-               desc->bd_enc_vec[i].bv_page =
-                      page_pools.epp_pools[p_idx][g_idx];
+       for (i = 0; i < count; i++) {
+               struct page **pagep = page_from(array, i);
+
+               if (page_pools.epp_pools[p_idx][g_idx] == NULL)
+                       GOTO(out_unlock, rc = -EPROTO);
+               *pagep = page_pools.epp_pools[p_idx][g_idx];
                page_pools.epp_pools[p_idx][g_idx] = NULL;
 
                if (++g_idx == PAGES_PER_POOL) {
@@ -653,59 +655,120 @@ again:
        }
        page_pools.epp_idle_idx = (page_pools.epp_idle_idx * IDLE_IDX_WEIGHT +
                                   this_idle) /
-                                  (IDLE_IDX_WEIGHT + 1);
+               (IDLE_IDX_WEIGHT + 1);
 
        page_pools.epp_last_access = ktime_get_seconds();
 
+out_unlock:
        spin_unlock(&page_pools.epp_lock);
-       return 0;
+       return rc;
 }
-EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages);
 
-void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
+int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc)
 {
-       int p_idx, g_idx;
-       int i;
+       int rc;
+
+       LASSERT(desc->bd_iov_count > 0);
+       LASSERT(desc->bd_iov_count <= page_pools.epp_max_pages);
+
+       /* resent bulk, enc iov might have been allocated previously */
+       if (desc->bd_enc_vec != NULL)
+               return 0;
 
+       OBD_ALLOC_LARGE(desc->bd_enc_vec,
+                       desc->bd_iov_count * sizeof(*desc->bd_enc_vec));
        if (desc->bd_enc_vec == NULL)
-               return;
+               return -ENOMEM;
 
-       LASSERT(desc->bd_iov_count > 0);
+       rc = __sptlrpc_enc_pool_get_pages((void *)desc, desc->bd_iov_count,
+                                         page_from_bulkdesc);
+       if (rc) {
+               OBD_FREE_LARGE(desc->bd_enc_vec,
+                              desc->bd_iov_count *
+                              sizeof(*desc->bd_enc_vec));
+               desc->bd_enc_vec = NULL;
+       }
+       return rc;
+}
+EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages);
+
+int sptlrpc_enc_pool_get_pages_array(struct page **pa, unsigned int count)
+{
+       return __sptlrpc_enc_pool_get_pages((void *)pa, count,
+                                           page_from_pagearray);
+}
+EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages_array);
+
+static int __sptlrpc_enc_pool_put_pages(void *array, unsigned int count,
+                                       struct page **(*page_from)(void *, int))
+{
+       int p_idx, g_idx;
+       int i, rc = 0;
+
+       if (!array || count <= 0)
+               return -EINVAL;
 
        spin_lock(&page_pools.epp_lock);
 
        p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
        g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
 
-       LASSERT(page_pools.epp_free_pages + desc->bd_iov_count <=
-               page_pools.epp_total_pages);
-       LASSERT(page_pools.epp_pools[p_idx]);
+       if (page_pools.epp_free_pages + count > page_pools.epp_total_pages)
+               GOTO(out_unlock, rc = -EPROTO);
+       if (!page_pools.epp_pools[p_idx])
+               GOTO(out_unlock, rc = -EPROTO);
 
-       for (i = 0; i < desc->bd_iov_count; i++) {
-               LASSERT(desc->bd_enc_vec[i].bv_page);
-               LASSERT(g_idx != 0 || page_pools.epp_pools[p_idx]);
-               LASSERT(page_pools.epp_pools[p_idx][g_idx] == NULL);
+       for (i = 0; i < count; i++) {
+               struct page **pagep = page_from(array, i);
 
-               page_pools.epp_pools[p_idx][g_idx] =
-                       desc->bd_enc_vec[i].bv_page;
+               if (!*pagep ||
+                   page_pools.epp_pools[p_idx][g_idx] != NULL)
+                       GOTO(out_unlock, rc = -EPROTO);
 
+               page_pools.epp_pools[p_idx][g_idx] = *pagep;
                if (++g_idx == PAGES_PER_POOL) {
                        p_idx++;
                        g_idx = 0;
                }
        }
 
-       page_pools.epp_free_pages += desc->bd_iov_count;
-
+       page_pools.epp_free_pages += count;
        enc_pools_wakeup();
 
+out_unlock:
        spin_unlock(&page_pools.epp_lock);
+       return rc;
+}
+
+void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
+{
+       int rc;
+
+       if (desc->bd_enc_vec == NULL)
+               return;
+
+       rc = __sptlrpc_enc_pool_put_pages((void *)desc, desc->bd_iov_count,
+                                         page_from_bulkdesc);
+       if (rc)
+               CDEBUG(D_SEC, "error putting pages in enc pool: %d\n", rc);
 
        OBD_FREE_LARGE(desc->bd_enc_vec,
-                desc->bd_iov_count * sizeof(*desc->bd_enc_vec));
+                      desc->bd_iov_count * sizeof(*desc->bd_enc_vec));
        desc->bd_enc_vec = NULL;
 }
 
+void sptlrpc_enc_pool_put_pages_array(struct page **pa, unsigned int count)
+{
+       int rc;
+
+       rc = __sptlrpc_enc_pool_put_pages((void *)pa, count,
+                                         page_from_pagearray);
+
+       if (rc)
+               CDEBUG(D_SEC, "error putting pages in enc pool: %d\n", rc);
+}
+EXPORT_SYMBOL(sptlrpc_enc_pool_put_pages_array);
+
 /*
  * we don't do much stuff for add_user/del_user anymore, except adding some
  * initial pages in add_user() if current pools are empty, rest would be