Whamcloud - gitweb
LU-7990 rpc: increase bulk size 66/19366/3
authorJinshan Xiong <jinshan.xiong@intel.com>
Fri, 9 May 2014 00:13:19 +0000 (17:13 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Sun, 17 Apr 2016 20:52:11 +0000 (20:52 +0000)
To make the ptlrpc be able to size 16MB IO

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Signed-off-by: Gu Zheng <gzheng@ddn.com>
Change-Id: I32727beaed08cb0e9048e7fe0e27489e330fce60
Reviewed-on: http://review.whamcloud.com/19366
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_net.h
lustre/ptlrpc/client.c
lustre/ptlrpc/sec_bulk.c
lustre/ptlrpc/wiretest.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 713f99d..2b9d6b9 100644 (file)
@@ -1781,8 +1781,10 @@ struct obd_ioobj {
        __u32           ioo_bufcnt;     /* number of niobufs for this object */
 };
 
+/* NOTE: IOOBJ_MAX_BRW_BITS defines the _offset_ of the max_brw field in
+ * ioo_max_brw, NOT the maximum number of bits in PTLRPC_BULK_OPS_BITS.
+ * That said, ioo_max_brw is a 32-bit field so the limit is also 16 bits. */
 #define IOOBJ_MAX_BRW_BITS     16
-#define IOOBJ_TYPE_MASK                ((1U << IOOBJ_MAX_BRW_BITS) - 1)
 #define ioobj_max_brw_get(ioo) (((ioo)->ioo_max_brw >> IOOBJ_MAX_BRW_BITS) + 1)
 #define ioobj_max_brw_set(ioo, num)                                    \
 do { (ioo)->ioo_max_brw = ((num) - 1) << IOOBJ_MAX_BRW_BITS; } while (0)
index b74307f..8858384 100644 (file)
 #define PTLRPC_MD_OPTIONS  0
 
 /**
- * Max # of bulk operations in one request.
+ * log2 max # of bulk operations in one request: 2=4MB/RPC, 5=32MB/RPC, ...
  * In order for the client and server to properly negotiate the maximum
  * possible transfer size, PTLRPC_BULK_OPS_COUNT must be a power-of-two
  * value.  The client is free to limit the actual RPC size for any bulk
- * transfer via cl_max_pages_per_rpc to some non-power-of-two value. */
-#define PTLRPC_BULK_OPS_BITS   2
+ * transfer via cl_max_pages_per_rpc to some non-power-of-two value.
+ * NOTE: This is limited to 16 (=64GB RPCs) by IOOBJ_MAX_BRW_BITS. */
+#define PTLRPC_BULK_OPS_BITS   4
+#if PTLRPC_BULK_OPS_BITS > 16
+#error "More than 65536 BRW RPCs not allowed by IOOBJ_MAX_BRW_BITS."
+#endif
 #define PTLRPC_BULK_OPS_COUNT  (1U << PTLRPC_BULK_OPS_BITS)
 /**
  * PTLRPC_BULK_OPS_MASK is for the convenience of the client only, and
@@ -1466,12 +1470,12 @@ struct ptlrpc_bulk_desc {
                         * encrypt iov, size is either 0 or bd_iov_count.
                         */
                        lnet_kiov_t *bd_enc_vec;
-                       lnet_kiov_t bd_vec[0];
+                       lnet_kiov_t *bd_vec;
                } bd_kiov;
 
                struct {
                        struct kvec *bd_enc_kvec;
-                       struct kvec bd_kvec[0];
+                       struct kvec *bd_kvec;
                } bd_kvec;
        } bd_u;
 
index 2749d04..c9f293c 100644 (file)
@@ -128,19 +128,21 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw,
                (ptlrpc_is_bulk_desc_kvec(type) &&
                 ops->add_iov_frag != NULL));
 
+       OBD_ALLOC_PTR(desc);
+       if (desc == NULL)
+               return NULL;
        if (type & PTLRPC_BULK_BUF_KIOV) {
-               OBD_ALLOC(desc,
-                         offsetof(struct ptlrpc_bulk_desc,
-                                  bd_u.bd_kiov.bd_vec[nfrags]));
+               OBD_ALLOC_LARGE(GET_KIOV(desc),
+                               nfrags * sizeof(*GET_KIOV(desc)));
+               if (GET_KIOV(desc) == NULL)
+                       goto out;
        } else {
-               OBD_ALLOC(desc,
-                         offsetof(struct ptlrpc_bulk_desc,
-                                  bd_u.bd_kvec.bd_kvec[nfrags]));
+               OBD_ALLOC_LARGE(GET_KVEC(desc),
+                               nfrags * sizeof(*GET_KVEC(desc)));
+               if (GET_KVEC(desc) == NULL)
+                       goto out;
        }
 
-       if (!desc)
-               return NULL;
-
        spin_lock_init(&desc->bd_lock);
        init_waitqueue_head(&desc->bd_waitq);
        desc->bd_max_iov = nfrags;
@@ -157,6 +159,9 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw,
                LNetInvalidateHandle(&desc->bd_mds[i]);
 
        return desc;
+out:
+       OBD_FREE_PTR(desc);
+       return NULL;
 }
 
 /**
@@ -271,13 +276,12 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
                desc->bd_frag_ops->release_frags(desc);
 
        if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
-               OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
-                                       bd_u.bd_kiov.bd_vec[desc->bd_max_iov]));
+               OBD_FREE_LARGE(GET_KIOV(desc),
+                       desc->bd_max_iov * sizeof(*GET_KIOV(desc)));
        else
-               OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
-                                       bd_u.bd_kvec.bd_kvec[desc->
-                                               bd_max_iov]));
-
+               OBD_FREE_LARGE(GET_KVEC(desc),
+                       desc->bd_max_iov * sizeof(*GET_KVEC(desc)));
+       OBD_FREE_PTR(desc);
        EXIT;
 }
 EXPORT_SYMBOL(ptlrpc_free_bulk);
index 87b570e..4b5609c 100644 (file)
@@ -560,7 +560,7 @@ int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc)
        if (GET_ENC_KIOV(desc) != NULL)
                return 0;
 
-       OBD_ALLOC(GET_ENC_KIOV(desc),
+       OBD_ALLOC_LARGE(GET_ENC_KIOV(desc),
                  desc->bd_iov_count * sizeof(*GET_ENC_KIOV(desc)));
        if (GET_ENC_KIOV(desc) == NULL)
                return -ENOMEM;
@@ -614,8 +614,8 @@ again:
                                 * will put request back in queue. */
                                page_pools.epp_st_outofmem++;
                                spin_unlock(&page_pools.epp_lock);
-                               OBD_FREE(GET_ENC_KIOV(desc),
-                                        desc->bd_iov_count *
+                               OBD_FREE_LARGE(GET_ENC_KIOV(desc),
+                                              desc->bd_iov_count *
                                                sizeof(*GET_ENC_KIOV(desc)));
                                GET_ENC_KIOV(desc) = NULL;
                                return -ENOMEM;
@@ -716,7 +716,7 @@ void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
 
        spin_unlock(&page_pools.epp_lock);
 
-       OBD_FREE(GET_ENC_KIOV(desc),
+       OBD_FREE_LARGE(GET_ENC_KIOV(desc),
                 desc->bd_iov_count * sizeof(*GET_ENC_KIOV(desc)));
        GET_ENC_KIOV(desc) = NULL;
 }
index 33ea7b1..0272c77 100644 (file)
@@ -1683,6 +1683,8 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct obd_ioobj, ioo_bufcnt));
        LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_bufcnt) == 4, "found %lld\n",
                 (long long)(int)sizeof(((struct obd_ioobj *)0)->ioo_bufcnt));
+       LASSERTF(IOOBJ_MAX_BRW_BITS == 16, "found %lld\n",
+                (long long)IOOBJ_MAX_BRW_BITS);
 
        /* Checks for union lquota_id */
        LASSERTF((int)sizeof(union lquota_id) == 16, "found %lld\n",
index 9b65370..b7d1ca7 100644 (file)
@@ -775,6 +775,7 @@ check_obd_ioobj(void)
        CHECK_MEMBER(obd_ioobj, ioo_oid);
        CHECK_MEMBER(obd_ioobj, ioo_max_brw);
        CHECK_MEMBER(obd_ioobj, ioo_bufcnt);
+       CHECK_VALUE(IOOBJ_MAX_BRW_BITS);
 }
 
 static void
index f965bac..95aa57e 100644 (file)
@@ -1698,6 +1698,8 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct obd_ioobj, ioo_bufcnt));
        LASSERTF((int)sizeof(((struct obd_ioobj *)0)->ioo_bufcnt) == 4, "found %lld\n",
                 (long long)(int)sizeof(((struct obd_ioobj *)0)->ioo_bufcnt));
+       LASSERTF(IOOBJ_MAX_BRW_BITS == 16, "found %lld\n",
+                (long long)IOOBJ_MAX_BRW_BITS);
 
        /* Checks for union lquota_id */
        LASSERTF((int)sizeof(union lquota_id) == 16, "found %lld\n",