From 51b32ac2b9b86a600e92ab03f387717e526153d3 Mon Sep 17 00:00:00 2001 From: Jinshan Xiong Date: Thu, 8 May 2014 17:13:19 -0700 Subject: [PATCH] LU-7990 rpc: increase bulk size To make the ptlrpc be able to size 16MB IO Signed-off-by: Jinshan Xiong Signed-off-by: Gu Zheng Change-Id: I32727beaed08cb0e9048e7fe0e27489e330fce60 Reviewed-on: http://review.whamcloud.com/19366 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/include/lustre/lustre_idl.h | 4 +++- lustre/include/lustre_net.h | 14 +++++++++----- lustre/ptlrpc/client.c | 34 +++++++++++++++++++--------------- lustre/ptlrpc/sec_bulk.c | 8 ++++---- lustre/ptlrpc/wiretest.c | 2 ++ lustre/utils/wirecheck.c | 1 + lustre/utils/wiretest.c | 2 ++ 7 files changed, 40 insertions(+), 25 deletions(-) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 713f99d..2b9d6b9 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -1781,8 +1781,10 @@ struct obd_ioobj { __u32 ioo_bufcnt; /* number of niobufs for this object */ }; +/* NOTE: IOOBJ_MAX_BRW_BITS defines the _offset_ of the max_brw field in + * ioo_max_brw, NOT the maximum number of bits in PTLRPC_BULK_OPS_BITS. + * That said, ioo_max_brw is a 32-bit field so the limit is also 16 bits. */ #define IOOBJ_MAX_BRW_BITS 16 -#define IOOBJ_TYPE_MASK ((1U << IOOBJ_MAX_BRW_BITS) - 1) #define ioobj_max_brw_get(ioo) (((ioo)->ioo_max_brw >> IOOBJ_MAX_BRW_BITS) + 1) #define ioobj_max_brw_set(ioo, num) \ do { (ioo)->ioo_max_brw = ((num) - 1) << IOOBJ_MAX_BRW_BITS; } while (0) diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index b74307f..8858384 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -73,12 +73,16 @@ #define PTLRPC_MD_OPTIONS 0 /** - * Max # of bulk operations in one request. + * log2 max # of bulk operations in one request: 2=4MB/RPC, 5=32MB/RPC, ... * In order for the client and server to properly negotiate the maximum * possible transfer size, PTLRPC_BULK_OPS_COUNT must be a power-of-two * value. The client is free to limit the actual RPC size for any bulk - * transfer via cl_max_pages_per_rpc to some non-power-of-two value. */ -#define PTLRPC_BULK_OPS_BITS 2 + * transfer via cl_max_pages_per_rpc to some non-power-of-two value. + * NOTE: This is limited to 16 (=64GB RPCs) by IOOBJ_MAX_BRW_BITS. */ +#define PTLRPC_BULK_OPS_BITS 4 +#if PTLRPC_BULK_OPS_BITS > 16 +#error "More than 65536 BRW RPCs not allowed by IOOBJ_MAX_BRW_BITS." +#endif #define PTLRPC_BULK_OPS_COUNT (1U << PTLRPC_BULK_OPS_BITS) /** * PTLRPC_BULK_OPS_MASK is for the convenience of the client only, and @@ -1466,12 +1470,12 @@ struct ptlrpc_bulk_desc { * encrypt iov, size is either 0 or bd_iov_count. */ lnet_kiov_t *bd_enc_vec; - lnet_kiov_t bd_vec[0]; + lnet_kiov_t *bd_vec; } bd_kiov; struct { struct kvec *bd_enc_kvec; - struct kvec bd_kvec[0]; + struct kvec *bd_kvec; } bd_kvec; } bd_u; diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 2749d04..c9f293c 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -128,19 +128,21 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw, (ptlrpc_is_bulk_desc_kvec(type) && ops->add_iov_frag != NULL)); + OBD_ALLOC_PTR(desc); + if (desc == NULL) + return NULL; if (type & PTLRPC_BULK_BUF_KIOV) { - OBD_ALLOC(desc, - offsetof(struct ptlrpc_bulk_desc, - bd_u.bd_kiov.bd_vec[nfrags])); + OBD_ALLOC_LARGE(GET_KIOV(desc), + nfrags * sizeof(*GET_KIOV(desc))); + if (GET_KIOV(desc) == NULL) + goto out; } else { - OBD_ALLOC(desc, - offsetof(struct ptlrpc_bulk_desc, - bd_u.bd_kvec.bd_kvec[nfrags])); + OBD_ALLOC_LARGE(GET_KVEC(desc), + nfrags * sizeof(*GET_KVEC(desc))); + if (GET_KVEC(desc) == NULL) + goto out; } - if (!desc) - return NULL; - spin_lock_init(&desc->bd_lock); init_waitqueue_head(&desc->bd_waitq); desc->bd_max_iov = nfrags; @@ -157,6 +159,9 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw, LNetInvalidateHandle(&desc->bd_mds[i]); return desc; +out: + OBD_FREE_PTR(desc); + return NULL; } /** @@ -271,13 +276,12 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc) desc->bd_frag_ops->release_frags(desc); if (ptlrpc_is_bulk_desc_kiov(desc->bd_type)) - OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, - bd_u.bd_kiov.bd_vec[desc->bd_max_iov])); + OBD_FREE_LARGE(GET_KIOV(desc), + desc->bd_max_iov * sizeof(*GET_KIOV(desc))); else - OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, - bd_u.bd_kvec.bd_kvec[desc-> - bd_max_iov])); - + OBD_FREE_LARGE(GET_KVEC(desc), + desc->bd_max_iov * sizeof(*GET_KVEC(desc))); + OBD_FREE_PTR(desc); EXIT; } EXPORT_SYMBOL(ptlrpc_free_bulk); diff --git a/lustre/ptlrpc/sec_bulk.c b/lustre/ptlrpc/sec_bulk.c index 87b570e..4b5609c 100644 --- a/lustre/ptlrpc/sec_bulk.c +++ b/lustre/ptlrpc/sec_bulk.c @@ -560,7 +560,7 @@ int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc) if (GET_ENC_KIOV(desc) != NULL) return 0; - OBD_ALLOC(GET_ENC_KIOV(desc), + OBD_ALLOC_LARGE(GET_ENC_KIOV(desc), desc->bd_iov_count * sizeof(*GET_ENC_KIOV(desc))); if (GET_ENC_KIOV(desc) == NULL) return -ENOMEM; @@ -614,8 +614,8 @@ again: * will put request back in queue. */ page_pools.epp_st_outofmem++; spin_unlock(&page_pools.epp_lock); - OBD_FREE(GET_ENC_KIOV(desc), - desc->bd_iov_count * + OBD_FREE_LARGE(GET_ENC_KIOV(desc), + desc->bd_iov_count * sizeof(*GET_ENC_KIOV(desc))); GET_ENC_KIOV(desc) = NULL; return -ENOMEM; @@ -716,7 +716,7 @@ void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc) spin_unlock(&page_pools.epp_lock); - OBD_FREE(GET_ENC_KIOV(desc), + OBD_FREE_LARGE(GET_ENC_KIOV(desc), desc->bd_iov_count * sizeof(*GET_ENC_KIOV(desc))); GET_ENC_KIOV(desc) = NULL; } diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index 33ea7b1..0272c77 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -1683,6 +1683,8 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct obd_ioobj, ioo_bufcnt)); LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_bufcnt) == 4, "found %lld\n", (long long)(int)sizeof(((struct obd_ioobj *)0)->ioo_bufcnt)); + LASSERTF(IOOBJ_MAX_BRW_BITS == 16, "found %lld\n", + (long long)IOOBJ_MAX_BRW_BITS); /* Checks for union lquota_id */ LASSERTF((int)sizeof(union lquota_id) == 16, "found %lld\n", diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index 9b65370..b7d1ca7 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -775,6 +775,7 @@ check_obd_ioobj(void) CHECK_MEMBER(obd_ioobj, ioo_oid); CHECK_MEMBER(obd_ioobj, ioo_max_brw); CHECK_MEMBER(obd_ioobj, ioo_bufcnt); + CHECK_VALUE(IOOBJ_MAX_BRW_BITS); } static void diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index f965bac..95aa57e 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -1698,6 +1698,8 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct obd_ioobj, ioo_bufcnt)); LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_bufcnt) == 4, "found %lld\n", (long long)(int)sizeof(((struct obd_ioobj *)0)->ioo_bufcnt)); + LASSERTF(IOOBJ_MAX_BRW_BITS == 16, "found %lld\n", + (long long)IOOBJ_MAX_BRW_BITS); /* Checks for union lquota_id */ LASSERTF((int)sizeof(union lquota_id) == 16, "found %lld\n", -- 1.8.3.1