From 8ef7c5c45567fc9cdedfe4242a6c5b73193ab9fe Mon Sep 17 00:00:00 2001 From: Alexey Lyashkov Date: Tue, 19 Jan 2021 21:57:28 -0500 Subject: [PATCH] LU-10157 ptlrpc: fill md correctly. MD fill should limit to the overall transfer size in additional to the number a fragment. Let's do this. Lustre-change: https://review.whamcloud.com/37387 Lustre-commit: e1ac9e74844dc75d77ef740b3a44fad2efde30c5 Cray-bug-id: LUS-7159 Signed-off-by: Alexey Lyashkov Signed-off-by: Serguei Smirnov Change-Id: Ibd3be1989c8dd5012e1b158f3942fd041f2da350 Reviewed-on: https://review.whamcloud.com/41276 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Alexey Lyashkov Reviewed-by: Oleg Drokin --- lustre/include/lustre_net.h | 3 +++ lustre/ptlrpc/client.c | 26 ++++++++++++++++++++++++-- lustre/ptlrpc/niobuf.c | 11 +++++------ lustre/ptlrpc/pers.c | 29 +++++++++++++++++++++-------- 4 files changed, 53 insertions(+), 16 deletions(-) diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index b47cc9d..d4ff8cb 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -1462,6 +1462,7 @@ struct ptlrpc_bulk_desc { int bd_max_iov; /* allocated size of bd_iov */ int bd_nob; /* # bytes covered */ int bd_nob_transferred; /* # bytes GOT/PUT */ + unsigned int bd_nob_last; /* # bytes in last MD */ __u64 bd_last_mbits; @@ -1469,6 +1470,8 @@ struct ptlrpc_bulk_desc { lnet_nid_t bd_sender; /* stash event::sender */ int bd_md_count; /* # valid entries in bd_mds */ int bd_md_max_brw; /* max entries in bd_mds */ + /** array of offsets for each MD */ + unsigned int bd_mds_off[PTLRPC_BULK_OPS_COUNT]; /** array of associated MDs */ struct lnet_handle_md bd_mds[PTLRPC_BULK_OPS_COUNT]; diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 622e501..995f4dd 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -127,6 +127,12 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw, (ptlrpc_is_bulk_desc_kvec(type) && ops->add_iov_frag != NULL)); + if (max_brw > PTLRPC_BULK_OPS_COUNT) + RETURN(NULL); + + if (nfrags > LNET_MAX_IOV * max_brw) + RETURN(NULL); + OBD_ALLOC_PTR(desc); if (desc == NULL) return NULL; @@ -149,6 +155,7 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw, desc->bd_portal = portal; desc->bd_type = type; desc->bd_md_count = 0; + desc->bd_nob_last = LNET_MTU; desc->bd_frag_ops = (struct ptlrpc_bulk_frag_ops *) ops; LASSERT(max_brw > 0); desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT); @@ -215,7 +222,15 @@ void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc, LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type)); kiov = &BD_GET_KIOV(desc, desc->bd_iov_count); + if (((desc->bd_iov_count % LNET_MAX_IOV) == 0) || + ((desc->bd_nob_last + len) > LNET_MTU)) { + desc->bd_mds_off[desc->bd_md_count] = desc->bd_iov_count; + desc->bd_md_count++; + desc->bd_nob_last = 0; + LASSERT(desc->bd_md_count <= PTLRPC_BULK_OPS_COUNT); + } + desc->bd_nob_last += len; desc->bd_nob += len; if (pin) @@ -241,7 +256,15 @@ int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc, LASSERT(ptlrpc_is_bulk_desc_kvec(desc->bd_type)); iovec = &BD_GET_KVEC(desc, desc->bd_iov_count); + if (((desc->bd_iov_count % LNET_MAX_IOV) == 0) || + ((desc->bd_nob_last + len) > LNET_MTU)) { + desc->bd_mds_off[desc->bd_md_count] = desc->bd_iov_count; + desc->bd_md_count++; + desc->bd_nob_last = 0; + LASSERT(desc->bd_md_count <= PTLRPC_BULK_OPS_COUNT); + } + desc->bd_nob_last += len; desc->bd_nob += len; iovec->iov_base = frag; @@ -3329,8 +3352,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) /* For multi-bulk RPCs, rq_mbits is the last mbits needed for bulks so * that server can infer the number of bulks that were prepared, * see LU-1431 */ - req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) / - LNET_MAX_IOV) - 1; + req->rq_mbits += bd->bd_md_count - 1; /* Set rq_xid as rq_mbits to indicate the final bulk for the old * server which does not support OBD_CONNECT_BULK_MBITS. LU-6808. diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 4d99341..d378ce7 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -167,7 +167,6 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc) RETURN(0); /* NB no locking required until desc is on the network */ - LASSERT(desc->bd_md_count == 0); LASSERT(ptlrpc_is_bulk_op_active(desc->bd_type)); LASSERT(desc->bd_cbid.cbid_fn == server_bulk_callback); @@ -327,7 +326,6 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req) /* NB no locking required until desc is on the network */ LASSERT(desc->bd_nob > 0); - LASSERT(desc->bd_md_count == 0); LASSERT(desc->bd_md_max_brw <= PTLRPC_BULK_OPS_COUNT); LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES); LASSERT(desc->bd_req != NULL); @@ -349,9 +347,9 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req) LASSERT(desc->bd_cbid.cbid_fn == client_bulk_callback); LASSERT(desc->bd_cbid.cbid_arg == desc); - total_md = (desc->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV; + total_md = desc->bd_md_count; /* rq_mbits is matchbits of the final bulk */ - mbits = req->rq_mbits - total_md + 1; + mbits = req->rq_mbits - desc->bd_md_count + 1; LASSERTF(mbits == (req->rq_mbits & PTLRPC_BULK_OPS_MASK), "first mbits = x%llu, last mbits = x%llu\n", @@ -369,13 +367,14 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req) md.eq_handle = ptlrpc_eq_h; md.threshold = 1; /* PUT or GET */ - for (posted_md = 0; posted_md < total_md; posted_md++, mbits++) { + for (posted_md = 0; posted_md < desc->bd_md_count; + posted_md++, mbits++) { md.options = PTLRPC_MD_OPTIONS | (ptlrpc_is_bulk_op_get(desc->bd_type) ? LNET_MD_OP_GET : LNET_MD_OP_PUT); ptlrpc_fill_bulk_md(&md, desc, posted_md); - if (posted_md > 0 && posted_md + 1 == total_md && + if (posted_md > 0 && posted_md + 1 == desc->bd_md_count && OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_BULK_ATTACH)) { rc = -ENOMEM; } else { diff --git a/lustre/ptlrpc/pers.c b/lustre/ptlrpc/pers.c index 51e17e2..d0c8fa7 100644 --- a/lustre/ptlrpc/pers.c +++ b/lustre/ptlrpc/pers.c @@ -44,6 +44,8 @@ void ptlrpc_fill_bulk_md(struct lnet_md *md, struct ptlrpc_bulk_desc *desc, int mdidx) { + unsigned int start = desc->bd_mds_off[mdidx]; + CLASSERT(PTLRPC_MAX_BRW_PAGES < LI_POISON); LASSERT(mdidx < desc->bd_md_max_brw); @@ -51,23 +53,34 @@ void ptlrpc_fill_bulk_md(struct lnet_md *md, struct ptlrpc_bulk_desc *desc, LASSERT(!(md->options & (LNET_MD_IOVEC | LNET_MD_KIOV | LNET_MD_PHYS))); - md->length = max(0, desc->bd_iov_count - mdidx * LNET_MAX_IOV); - md->length = min_t(unsigned int, LNET_MAX_IOV, md->length); + /* just send a lnet header */ + if (mdidx >= desc->bd_md_count) { + if (ptlrpc_is_bulk_desc_kiov(desc->bd_type)) + md->options |= LNET_MD_KIOV; + else if (ptlrpc_is_bulk_desc_kvec(desc->bd_type)) + md->options |= LNET_MD_IOVEC; + md->length = 0; + md->start = NULL; + return; + } + + if (mdidx == (desc->bd_md_count - 1)) + md->length = desc->bd_iov_count - start; + else + md->length = desc->bd_mds_off[mdidx + 1] - start; if (ptlrpc_is_bulk_desc_kiov(desc->bd_type)) { md->options |= LNET_MD_KIOV; if (GET_ENC_KIOV(desc)) - md->start = &BD_GET_ENC_KIOV(desc, mdidx * - LNET_MAX_IOV); + md->start = &BD_GET_ENC_KIOV(desc, start); else - md->start = &BD_GET_KIOV(desc, mdidx * LNET_MAX_IOV); + md->start = &BD_GET_KIOV(desc, start); } else if (ptlrpc_is_bulk_desc_kvec(desc->bd_type)) { md->options |= LNET_MD_IOVEC; if (GET_ENC_KVEC(desc)) - md->start = &BD_GET_ENC_KVEC(desc, mdidx * - LNET_MAX_IOV); + md->start = &BD_GET_ENC_KVEC(desc, start); else - md->start = &BD_GET_KVEC(desc, mdidx * LNET_MAX_IOV); + md->start = &BD_GET_KVEC(desc, start); } } -- 1.8.3.1