From f3fe144b8572e9e75bb55076e29057227476ebf5 Mon Sep 17 00:00:00 2001 From: Sebastien Buisson Date: Fri, 25 Mar 2022 09:24:32 +0100 Subject: [PATCH] LU-15003 sec: use enc pool for bounce pages MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Take pages from the enc pool so that they can be used for encryption, instead of letting llcrypt allocate a bounce page for every call to the encryption primitives. Pages are taken from the enc pool a whole array at a time. This requires modifying the llcrypt API, so that new functions llcrypt_encrypt_page() and llcrypt_decrypt_page() are exported. These functions take a destination page parameter. Until this change is pushed in upstream fscrypt, this performance optimization is not available when Lustre is built and run against the in-kernel fscrypt lib. Using enc pool for bounce pages is a worthwhile performance win. Here are performance penalties incurred by encryption, without this patch, and with this patch: ||=====================|=====================|| || Performance penalty | Performance penalty || || without patch | with patch || ||==========================================|=====================|| || Bandwidth – write | 30%-35% | 5%-10% large IOs || || | | 15% small IOs || ||------------------------------------------|---------------------|| || Bandwidth – read | 20% | less than 10% || ||------------------------------------------|---------------------|| || Metadata | N/A | 5% || || creat,stat,remove | | || ||==========================================|=====================|| Signed-off-by: Sebastien Buisson Signed-off-by: James Simmons Change-Id: I3078d0a3349b3d24acc5e61ab53ac434b5f9d0e3 Reviewed-on: https://review.whamcloud.com/47149 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Oleg Drokin --- libcfs/include/libcfs/crypto/llcrypt.h | 41 +++++--- libcfs/libcfs/crypto/crypto.c | 34 ++++--- lustre/include/lustre_crypto.h | 2 - lustre/include/lustre_sec.h | 2 + lustre/llite/dir.c | 23 ++++- lustre/osc/osc_request.c | 146 ++++++++++++++++++++++++++-- lustre/ptlrpc/sec_bulk.c | 171 ++++++++++++++++++++++----------- 7 files changed, 324 insertions(+), 95 deletions(-) diff --git a/libcfs/include/libcfs/crypto/llcrypt.h b/libcfs/include/libcfs/crypto/llcrypt.h index 5df68c4..893ac1e 100644 --- a/libcfs/include/libcfs/crypto/llcrypt.h +++ b/libcfs/include/libcfs/crypto/llcrypt.h @@ -121,16 +121,26 @@ extern struct page *llcrypt_encrypt_pagecache_blocks(struct page *page, unsigned int len, unsigned int offs, gfp_t gfp_flags); -extern int llcrypt_encrypt_block_inplace(const struct inode *inode, - struct page *page, unsigned int len, - unsigned int offs, u64 lblk_num, - gfp_t gfp_flags); +extern int llcrypt_encrypt_block(const struct inode *inode, struct page *src, + struct page *dst, unsigned int len, + unsigned int offs, u64 lblk_num, gfp_t gfp_flags); extern int llcrypt_decrypt_pagecache_blocks(struct page *page, unsigned int len, unsigned int offs); -extern int llcrypt_decrypt_block_inplace(const struct inode *inode, - struct page *page, unsigned int len, - unsigned int offs, u64 lblk_num); + +extern int llcrypt_decrypt_block(const struct inode *inode, struct page *src, + struct page *dst, unsigned int len, + unsigned int offs, u64 lblk_num, gfp_t gfp_flags); + +static inline int llcrypt_decrypt_block_inplace(const struct inode *inode, + struct page *page, + unsigned int len, + unsigned int offs, + u64 lblk_num) +{ + return llcrypt_decrypt_block(inode, page, page, len, offs, lblk_num, + GFP_NOFS); +} static inline bool llcrypt_is_bounce_page(struct page *page) { @@ -336,11 +346,10 @@ static inline struct page *llcrypt_encrypt_pagecache_blocks(struct page *page, return ERR_PTR(-EOPNOTSUPP); } -static inline int llcrypt_encrypt_block_inplace(const struct inode *inode, - struct page *page, - unsigned int len, - unsigned int offs, u64 lblk_num, - gfp_t gfp_flags) +static inline int llcrypt_encrypt_block(const struct inode *inode, + struct page *src, struct page *dst, + unsigned int len, unsigned int offs, + u64 lblk_num, gfp_t gfp_flags) { return -EOPNOTSUPP; } @@ -352,6 +361,14 @@ static inline int llcrypt_decrypt_pagecache_blocks(struct page *page, return -EOPNOTSUPP; } +static inline int llcrypt_decrypt_block(const struct inode *inode, + struct page *src, struct page *dst, + unsigned int len, unsigned int offs, + u64 lblk_num, gfp_t gfp_flags) +{ + return -EOPNOTSUPP; +} + static inline int llcrypt_decrypt_block_inplace(const struct inode *inode, struct page *page, unsigned int len, diff --git a/libcfs/libcfs/crypto/crypto.c b/libcfs/libcfs/crypto/crypto.c index d079b81..3d18715 100644 --- a/libcfs/libcfs/crypto/crypto.c +++ b/libcfs/libcfs/crypto/crypto.c @@ -272,9 +272,10 @@ struct page *llcrypt_encrypt_pagecache_blocks(struct page *page, EXPORT_SYMBOL(llcrypt_encrypt_pagecache_blocks); /** - * llcrypt_encrypt_block_inplace() - Encrypt a filesystem block in-place + * llcrypt_encrypt_block() - Encrypt a filesystem block in a page * @inode: The inode to which this block belongs - * @page: The page containing the block to encrypt + * @src: The page containing the block to encrypt + * @dst: The page which will contain the encrypted data * @len: Size of block to encrypt. Doesn't need to be a multiple of the * fs block size, but must be a multiple of LL_CRYPTO_BLOCK_SIZE. * @offs: Byte offset within @page at which the block to encrypt begins @@ -285,17 +286,18 @@ EXPORT_SYMBOL(llcrypt_encrypt_pagecache_blocks); * Encrypt a possibly-compressed filesystem block that is located in an * arbitrary page, not necessarily in the original pagecache page. The @inode * and @lblk_num must be specified, as they can't be determined from @page. + * The decrypted data will be stored in @dst. * * Return: 0 on success; -errno on failure */ -int llcrypt_encrypt_block_inplace(const struct inode *inode, struct page *page, - unsigned int len, unsigned int offs, - u64 lblk_num, gfp_t gfp_flags) +int llcrypt_encrypt_block(const struct inode *inode, struct page *src, + struct page *dst, unsigned int len, unsigned int offs, + u64 lblk_num, gfp_t gfp_flags) { - return llcrypt_crypt_block(inode, FS_ENCRYPT, lblk_num, page, page, + return llcrypt_crypt_block(inode, FS_ENCRYPT, lblk_num, src, dst, len, offs, gfp_flags); } -EXPORT_SYMBOL(llcrypt_encrypt_block_inplace); +EXPORT_SYMBOL(llcrypt_encrypt_block); /** * llcrypt_decrypt_pagecache_blocks() - Decrypt filesystem blocks in a pagecache page @@ -341,9 +343,10 @@ int llcrypt_decrypt_pagecache_blocks(struct page *page, unsigned int len, EXPORT_SYMBOL(llcrypt_decrypt_pagecache_blocks); /** - * llcrypt_decrypt_block_inplace() - Decrypt a filesystem block in-place + * llcrypt_decrypt_block() - Cache a decrypted filesystem block in a page * @inode: The inode to which this block belongs - * @page: The page containing the block to decrypt + * @src: The page containing the block to decrypt + * @dst: The page which will contain the plain data * @len: Size of block to decrypt. Doesn't need to be a multiple of the * fs block size, but must be a multiple of LL_CRYPTO_BLOCK_SIZE. * @offs: Byte offset within @page at which the block to decrypt begins @@ -353,17 +356,18 @@ EXPORT_SYMBOL(llcrypt_decrypt_pagecache_blocks); * Decrypt a possibly-compressed filesystem block that is located in an * arbitrary page, not necessarily in the original pagecache page. The @inode * and @lblk_num must be specified, as they can't be determined from @page. + * The encrypted data will be stored in @dst. * * Return: 0 on success; -errno on failure */ -int llcrypt_decrypt_block_inplace(const struct inode *inode, struct page *page, - unsigned int len, unsigned int offs, - u64 lblk_num) +int llcrypt_decrypt_block(const struct inode *inode, struct page *src, + struct page *dst, unsigned int len, unsigned int offs, + u64 lblk_num, gfp_t gfp_flags) { - return llcrypt_crypt_block(inode, FS_DECRYPT, lblk_num, page, page, - len, offs, GFP_NOFS); + return llcrypt_crypt_block(inode, FS_DECRYPT, lblk_num, src, dst, + len, offs, gfp_flags); } -EXPORT_SYMBOL(llcrypt_decrypt_block_inplace); +EXPORT_SYMBOL(llcrypt_decrypt_block); /* * Validate dentries in encrypted directories to make sure we aren't potentially diff --git a/lustre/include/lustre_crypto.h b/lustre/include/lustre_crypto.h index 5d25d11..d048470 100644 --- a/lustre/include/lustre_crypto.h +++ b/lustre/include/lustre_crypto.h @@ -50,8 +50,6 @@ #define llcrypt_has_encryption_key(inode) fscrypt_has_encryption_key(inode) #define llcrypt_encrypt_pagecache_blocks(page, len, offs, gfp_flags) \ fscrypt_encrypt_pagecache_blocks(page, len, offs, gfp_flags) -#define llcrypt_encrypt_block_inplace(inode, page, len, offs, lblk, gfp_flags) \ - fscrypt_encrypt_block_inplace(inode, page, len, offs, lblk, gfp_flags) #define llcrypt_decrypt_pagecache_blocks(page, len, offs) \ fscrypt_decrypt_pagecache_blocks(page, len, offs) #define llcrypt_decrypt_block_inplace(inode, page, len, offs, lblk_num) \ diff --git a/lustre/include/lustre_sec.h b/lustre/include/lustre_sec.h index b70563c..831d351 100644 --- a/lustre/include/lustre_sec.h +++ b/lustre/include/lustre_sec.h @@ -1165,7 +1165,9 @@ int sptlrpc_cli_install_rvs_ctx(struct obd_import *imp, int sptlrpc_enc_pool_add_user(void); int sptlrpc_enc_pool_del_user(void); int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc); +int sptlrpc_enc_pool_get_pages_array(struct page **pa, unsigned int count); void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc); +void sptlrpc_enc_pool_put_pages_array(struct page **pa, unsigned int count); int get_free_pages_in_pool(void); int pool_is_at_full_capacity(void); diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index de7c285..f430ebb 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -2317,16 +2317,31 @@ out_detach: case LL_IOC_ADD_ENCRYPTION_KEY: if (!ll_sbi_has_encrypt(ll_i2sbi(inode))) return -EOPNOTSUPP; - return llcrypt_ioctl_add_key(file, (void __user *)arg); + rc = llcrypt_ioctl_add_key(file, (void __user *)arg); +#ifdef CONFIG_LL_ENCRYPTION + if (!rc) + sptlrpc_enc_pool_add_user(); +#endif + return rc; case LL_IOC_REMOVE_ENCRYPTION_KEY: if (!ll_sbi_has_encrypt(ll_i2sbi(inode))) return -EOPNOTSUPP; - return llcrypt_ioctl_remove_key(file, (void __user *)arg); + rc = llcrypt_ioctl_remove_key(file, (void __user *)arg); +#ifdef CONFIG_LL_ENCRYPTION + if (!rc) + sptlrpc_enc_pool_del_user(); +#endif + return rc; case LL_IOC_REMOVE_ENCRYPTION_KEY_ALL_USERS: if (!ll_sbi_has_encrypt(ll_i2sbi(inode))) return -EOPNOTSUPP; - return llcrypt_ioctl_remove_key_all_users(file, - (void __user *)arg); + rc = llcrypt_ioctl_remove_key_all_users(file, + (void __user *)arg); +#ifdef CONFIG_LL_ENCRYPTION + if (!rc) + sptlrpc_enc_pool_del_user(); +#endif + return rc; case LL_IOC_GET_ENCRYPTION_KEY_STATUS: if (!ll_sbi_has_encrypt(ll_i2sbi(inode))) return -EOPNOTSUPP; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 5847bf2..fded520 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1369,22 +1369,124 @@ static int osc_checksum_bulk_rw(const char *obd_name, RETURN(rc); } +#ifdef CONFIG_LL_ENCRYPTION +/** + * osc_encrypt_pagecache_blocks() - overlay to llcrypt_encrypt_pagecache_blocks + * @srcpage: The locked pagecache page containing the block(s) to encrypt + * @dstpage: The page to put encryption result + * @len: Total size of the block(s) to encrypt. Must be a nonzero + * multiple of the filesystem's block size. + * @offs: Byte offset within @page of the first block to encrypt. Must be + * a multiple of the filesystem's block size. + * @gfp_flags: Memory allocation flags + * + * This overlay function is necessary to be able to provide our own bounce page. + */ +static struct page *osc_encrypt_pagecache_blocks(struct page *srcpage, + struct page *dstpage, + unsigned int len, + unsigned int offs, + gfp_t gfp_flags) + +{ + const struct inode *inode = srcpage->mapping->host; + const unsigned int blockbits = inode->i_blkbits; + const unsigned int blocksize = 1 << blockbits; + u64 lblk_num = ((u64)srcpage->index << (PAGE_SHIFT - blockbits)) + + (offs >> blockbits); + unsigned int i; + int err; + + if (unlikely(!dstpage)) + return llcrypt_encrypt_pagecache_blocks(srcpage, len, offs, + gfp_flags); + + if (WARN_ON_ONCE(!PageLocked(srcpage))) + return ERR_PTR(-EINVAL); + + if (WARN_ON_ONCE(len <= 0 || !IS_ALIGNED(len | offs, blocksize))) + return ERR_PTR(-EINVAL); + + /* Set PagePrivate2 for disambiguation in + * osc_finalize_bounce_page(). + * It means cipher page was not allocated by llcrypt. + */ + SetPagePrivate2(dstpage); + + for (i = offs; i < offs + len; i += blocksize, lblk_num++) { + err = llcrypt_encrypt_block(inode, srcpage, dstpage, blocksize, + i, lblk_num, gfp_flags); + if (err) + return ERR_PTR(err); + } + SetPagePrivate(dstpage); + set_page_private(dstpage, (unsigned long)srcpage); + return dstpage; +} + +/** + * osc_finalize_bounce_page() - overlay to llcrypt_finalize_bounce_page + * + * This overlay function is necessary to handle bounce pages + * allocated by ourselves. + */ +static inline void osc_finalize_bounce_page(struct page **pagep) +{ + struct page *page = *pagep; + + /* PagePrivate2 was set in osc_encrypt_pagecache_blocks + * to indicate the cipher page was allocated by ourselves. + * So we must not free it via llcrypt. + */ + if (unlikely(!page || !PagePrivate2(page))) + return llcrypt_finalize_bounce_page(pagep); + + if (llcrypt_is_bounce_page(page)) { + *pagep = llcrypt_pagecache_page(page); + ClearPagePrivate2(page); + set_page_private(page, (unsigned long)NULL); + ClearPagePrivate(page); + } +} +#else /* !CONFIG_LL_ENCRYPTION */ +#define osc_encrypt_pagecache_blocks(srcpage, dstpage, len, offs, gfp_flags) \ + llcrypt_encrypt_pagecache_blocks(srcpage, len, offs, gfp_flags) +#define osc_finalize_bounce_page(page) llcrypt_finalize_bounce_page(page) +#endif + static inline void osc_release_bounce_pages(struct brw_page **pga, u32 page_count) { #ifdef HAVE_LUSTRE_CRYPTO - int i; + struct page **pa = NULL; + int i, j = 0; + +#ifdef CONFIG_LL_ENCRYPTION + if (PageChecked(pga[0]->pg)) { + OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count); + if (!pa) + return; + } +#endif for (i = 0; i < page_count; i++) { - /* Bounce pages allocated by a call to - * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request() + /* Bounce pages used by osc_encrypt_pagecache_blocks() + * called from osc_brw_prep_request() * are identified thanks to the PageChecked flag. */ - if (PageChecked(pga[i]->pg)) - llcrypt_finalize_bounce_page(&pga[i]->pg); + if (PageChecked(pga[i]->pg)) { + if (pa) + pa[j++] = pga[i]->pg; + osc_finalize_bounce_page(&pga[i]->pg); + } pga[i]->count -= pga[i]->bp_count_diff; pga[i]->off += pga[i]->bp_off_diff; } + + if (pa) { + sptlrpc_enc_pool_put_pages_array(pa, j); + OBD_FREE_PTR_ARRAY_LARGE(pa, page_count); + } #endif } @@ -1436,6 +1538,24 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode) && llcrypt_has_encryption_key(inode)) { + struct page **pa = NULL; + +#ifdef CONFIG_LL_ENCRYPTION + OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count); + if (pa == NULL) { + ptlrpc_request_free(req); + RETURN(-ENOMEM); + } + + rc = sptlrpc_enc_pool_get_pages_array(pa, page_count); + if (rc) { + CDEBUG(D_SEC, "failed to allocate from enc pool: %d\n", + rc); + ptlrpc_request_free(req); + RETURN(rc); + } +#endif + for (i = 0; i < page_count; i++) { struct brw_page *brwpg = pga[i]; struct page *data_page = NULL; @@ -1465,9 +1585,10 @@ retry_encrypt: brwpg->pg->index = clpage->cp_page_index; } data_page = - llcrypt_encrypt_pagecache_blocks(brwpg->pg, - nunits, 0, - GFP_NOFS); + osc_encrypt_pagecache_blocks(brwpg->pg, + pa ? pa[i] : NULL, + nunits, 0, + GFP_NOFS); if (directio) { brwpg->pg->mapping = map_orig; brwpg->pg->index = index_orig; @@ -1481,6 +1602,12 @@ retry_encrypt: rc = 0; goto retry_encrypt; } + if (pa) { + sptlrpc_enc_pool_put_pages_array(pa + i, + page_count - i); + OBD_FREE_PTR_ARRAY_LARGE(pa, + page_count); + } ptlrpc_request_free(req); RETURN(rc); } @@ -1505,6 +1632,9 @@ retry_encrypt: brwpg->bp_off_diff = brwpg->off & ~PAGE_MASK; brwpg->off = brwpg->off & PAGE_MASK; } + + if (pa) + OBD_FREE_PTR_ARRAY_LARGE(pa, page_count); } else if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) { struct osc_async_page *oap = brw_page2oap(pga[0]); struct cl_page *clpage = oap2cl_page(oap); diff --git a/lustre/ptlrpc/sec_bulk.c b/lustre/ptlrpc/sec_bulk.c index c0c6a14..bdb65dd 100644 --- a/lustre/ptlrpc/sec_bulk.c +++ b/lustre/ptlrpc/sec_bulk.c @@ -472,10 +472,9 @@ static inline void enc_pools_wakeup(void) { assert_spin_locked(&page_pools.epp_lock); - if (unlikely(page_pools.epp_waitqlen)) { - LASSERT(waitqueue_active(&page_pools.epp_waitq)); + /* waitqueue_active */ + if (unlikely(waitqueue_active(&page_pools.epp_waitq))) wake_up(&page_pools.epp_waitq); - } } static int enc_pools_should_grow(int page_needed, time64_t now) @@ -525,44 +524,50 @@ int pool_is_at_full_capacity(void) } EXPORT_SYMBOL(pool_is_at_full_capacity); +static inline struct page **page_from_bulkdesc(void *array, int index) +{ + struct ptlrpc_bulk_desc *desc = (struct ptlrpc_bulk_desc *)array; + + return &desc->bd_enc_vec[index].bv_page; +} + +static inline struct page **page_from_pagearray(void *array, int index) +{ + struct page **pa = (struct page **)array; + + return &pa[index]; +} + /* * we allocate the requested pages atomically. */ -int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc) +static inline int __sptlrpc_enc_pool_get_pages(void *array, unsigned int count, + struct page **(*page_from)(void *, int)) { wait_queue_entry_t waitlink; unsigned long this_idle = -1; u64 tick_ns = 0; time64_t now; int p_idx, g_idx; - int i; + int i, rc = 0; - LASSERT(desc->bd_iov_count > 0); - LASSERT(desc->bd_iov_count <= page_pools.epp_max_pages); - - /* resent bulk, enc iov might have been allocated previously */ - if (desc->bd_enc_vec != NULL) - return 0; - - OBD_ALLOC_LARGE(desc->bd_enc_vec, - desc->bd_iov_count * sizeof(*desc->bd_enc_vec)); - if (desc->bd_enc_vec == NULL) - return -ENOMEM; + if (!array || count <= 0 || count > page_pools.epp_max_pages) + return -EINVAL; spin_lock(&page_pools.epp_lock); page_pools.epp_st_access++; again: - if (unlikely(page_pools.epp_free_pages < desc->bd_iov_count)) { + if (unlikely(page_pools.epp_free_pages < count)) { if (tick_ns == 0) tick_ns = ktime_get_ns(); now = ktime_get_real_seconds(); page_pools.epp_st_missings++; - page_pools.epp_pages_short += desc->bd_iov_count; + page_pools.epp_pages_short += count; - if (enc_pools_should_grow(desc->bd_iov_count, now)) { + if (enc_pools_should_grow(count, now)) { page_pools.epp_growing = 1; spin_unlock(&page_pools.epp_lock); @@ -577,7 +582,7 @@ again: if (++page_pools.epp_waitqlen > page_pools.epp_st_max_wqlen) page_pools.epp_st_max_wqlen = - page_pools.epp_waitqlen; + page_pools.epp_waitqlen; set_current_state(TASK_UNINTERRUPTIBLE); init_wait(&waitlink); @@ -588,7 +593,6 @@ again: schedule(); remove_wait_queue(&page_pools.epp_waitq, &waitlink); - LASSERT(page_pools.epp_waitqlen > 0); spin_lock(&page_pools.epp_lock); page_pools.epp_waitqlen--; } else { @@ -599,17 +603,13 @@ again: * will put request back in queue. */ page_pools.epp_st_outofmem++; - spin_unlock(&page_pools.epp_lock); - OBD_FREE_LARGE(desc->bd_enc_vec, - desc->bd_iov_count * - sizeof(*desc->bd_enc_vec)); - desc->bd_enc_vec = NULL; - return -ENOMEM; + GOTO(out_unlock, rc = -ENOMEM); } } - LASSERT(page_pools.epp_pages_short >= desc->bd_iov_count); - page_pools.epp_pages_short -= desc->bd_iov_count; + if (page_pools.epp_pages_short < count) + GOTO(out_unlock, rc = -EPROTO); + page_pools.epp_pages_short -= count; this_idle = 0; goto again; @@ -624,15 +624,17 @@ again: } /* proceed with rest of allocation */ - page_pools.epp_free_pages -= desc->bd_iov_count; + page_pools.epp_free_pages -= count; p_idx = page_pools.epp_free_pages / PAGES_PER_POOL; g_idx = page_pools.epp_free_pages % PAGES_PER_POOL; - for (i = 0; i < desc->bd_iov_count; i++) { - LASSERT(page_pools.epp_pools[p_idx][g_idx] != NULL); - desc->bd_enc_vec[i].bv_page = - page_pools.epp_pools[p_idx][g_idx]; + for (i = 0; i < count; i++) { + struct page **pagep = page_from(array, i); + + if (page_pools.epp_pools[p_idx][g_idx] == NULL) + GOTO(out_unlock, rc = -EPROTO); + *pagep = page_pools.epp_pools[p_idx][g_idx]; page_pools.epp_pools[p_idx][g_idx] = NULL; if (++g_idx == PAGES_PER_POOL) { @@ -653,59 +655,120 @@ again: } page_pools.epp_idle_idx = (page_pools.epp_idle_idx * IDLE_IDX_WEIGHT + this_idle) / - (IDLE_IDX_WEIGHT + 1); + (IDLE_IDX_WEIGHT + 1); page_pools.epp_last_access = ktime_get_seconds(); +out_unlock: spin_unlock(&page_pools.epp_lock); - return 0; + return rc; } -EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages); -void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc) +int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc) { - int p_idx, g_idx; - int i; + int rc; + + LASSERT(desc->bd_iov_count > 0); + LASSERT(desc->bd_iov_count <= page_pools.epp_max_pages); + + /* resent bulk, enc iov might have been allocated previously */ + if (desc->bd_enc_vec != NULL) + return 0; + OBD_ALLOC_LARGE(desc->bd_enc_vec, + desc->bd_iov_count * sizeof(*desc->bd_enc_vec)); if (desc->bd_enc_vec == NULL) - return; + return -ENOMEM; - LASSERT(desc->bd_iov_count > 0); + rc = __sptlrpc_enc_pool_get_pages((void *)desc, desc->bd_iov_count, + page_from_bulkdesc); + if (rc) { + OBD_FREE_LARGE(desc->bd_enc_vec, + desc->bd_iov_count * + sizeof(*desc->bd_enc_vec)); + desc->bd_enc_vec = NULL; + } + return rc; +} +EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages); + +int sptlrpc_enc_pool_get_pages_array(struct page **pa, unsigned int count) +{ + return __sptlrpc_enc_pool_get_pages((void *)pa, count, + page_from_pagearray); +} +EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages_array); + +static int __sptlrpc_enc_pool_put_pages(void *array, unsigned int count, + struct page **(*page_from)(void *, int)) +{ + int p_idx, g_idx; + int i, rc = 0; + + if (!array || count <= 0) + return -EINVAL; spin_lock(&page_pools.epp_lock); p_idx = page_pools.epp_free_pages / PAGES_PER_POOL; g_idx = page_pools.epp_free_pages % PAGES_PER_POOL; - LASSERT(page_pools.epp_free_pages + desc->bd_iov_count <= - page_pools.epp_total_pages); - LASSERT(page_pools.epp_pools[p_idx]); + if (page_pools.epp_free_pages + count > page_pools.epp_total_pages) + GOTO(out_unlock, rc = -EPROTO); + if (!page_pools.epp_pools[p_idx]) + GOTO(out_unlock, rc = -EPROTO); - for (i = 0; i < desc->bd_iov_count; i++) { - LASSERT(desc->bd_enc_vec[i].bv_page); - LASSERT(g_idx != 0 || page_pools.epp_pools[p_idx]); - LASSERT(page_pools.epp_pools[p_idx][g_idx] == NULL); + for (i = 0; i < count; i++) { + struct page **pagep = page_from(array, i); - page_pools.epp_pools[p_idx][g_idx] = - desc->bd_enc_vec[i].bv_page; + if (!*pagep || + page_pools.epp_pools[p_idx][g_idx] != NULL) + GOTO(out_unlock, rc = -EPROTO); + page_pools.epp_pools[p_idx][g_idx] = *pagep; if (++g_idx == PAGES_PER_POOL) { p_idx++; g_idx = 0; } } - page_pools.epp_free_pages += desc->bd_iov_count; - + page_pools.epp_free_pages += count; enc_pools_wakeup(); +out_unlock: spin_unlock(&page_pools.epp_lock); + return rc; +} + +void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc) +{ + int rc; + + if (desc->bd_enc_vec == NULL) + return; + + rc = __sptlrpc_enc_pool_put_pages((void *)desc, desc->bd_iov_count, + page_from_bulkdesc); + if (rc) + CDEBUG(D_SEC, "error putting pages in enc pool: %d\n", rc); OBD_FREE_LARGE(desc->bd_enc_vec, - desc->bd_iov_count * sizeof(*desc->bd_enc_vec)); + desc->bd_iov_count * sizeof(*desc->bd_enc_vec)); desc->bd_enc_vec = NULL; } +void sptlrpc_enc_pool_put_pages_array(struct page **pa, unsigned int count) +{ + int rc; + + rc = __sptlrpc_enc_pool_put_pages((void *)pa, count, + page_from_pagearray); + + if (rc) + CDEBUG(D_SEC, "error putting pages in enc pool: %d\n", rc); +} +EXPORT_SYMBOL(sptlrpc_enc_pool_put_pages_array); + /* * we don't do much stuff for add_user/del_user anymore, except adding some * initial pages in add_user() if current pools are empty, rest would be -- 1.8.3.1