From e1ac9e74844dc75d77ef740b3a44fad2efde30c5 Mon Sep 17 00:00:00 2001 From: Alexey Lyashkov Date: Mon, 1 Jun 2020 09:00:11 -0400 Subject: [PATCH] LU-10157 ptlrpc: fill md correctly. MD fill should limit to the overall transfer size in additional to the number a fragment. Let's do this. Cray-bug-id: LUS-8139 Signed-off-by: Alexey Lyashkov Change-Id: I45219ffd8206f89f54688e7ecb0ccbb65ed3e3c1 Reviewed-on: https://review.whamcloud.com/37387 Reviewed-by: James Simmons Tested-by: jenkins Tested-by: Maloo Reviewed-by: Shaun Tancheff Reviewed-by: Oleg Drokin --- lustre/include/lustre_net.h | 4 ++++ lustre/ptlrpc/client.c | 23 ++++++++++++++++++----- lustre/ptlrpc/niobuf.c | 11 +++++------ lustre/ptlrpc/pers.c | 21 ++++++++++++++++----- 4 files changed, 43 insertions(+), 16 deletions(-) diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 916470ad..eea505a 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -1448,6 +1448,7 @@ struct ptlrpc_bulk_desc { int bd_max_iov; /* allocated size of bd_iov */ int bd_nob; /* # bytes covered */ int bd_nob_transferred; /* # bytes GOT/PUT */ + unsigned int bd_nob_last; /* # bytes in last MD */ __u64 bd_last_mbits; @@ -1455,6 +1456,9 @@ struct ptlrpc_bulk_desc { lnet_nid_t bd_sender; /* stash event::sender */ int bd_md_count; /* # valid entries in bd_mds */ int bd_md_max_brw; /* max entries in bd_mds */ + + /** array of offsets for each MD */ + unsigned int bd_mds_off[PTLRPC_BULK_OPS_COUNT]; /** array of associated MDs */ struct lnet_handle_md bd_mds[PTLRPC_BULK_OPS_COUNT]; diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 606abd7..a902a5b 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -169,6 +169,12 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags, LASSERT(ops->add_kiov_frag != NULL); + if (max_brw > PTLRPC_BULK_OPS_COUNT) + RETURN(NULL); + + if (nfrags > LNET_MAX_IOV * max_brw) + RETURN(NULL); + OBD_ALLOC_PTR(desc); if (!desc) return NULL; @@ -185,6 +191,7 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags, desc->bd_portal = portal; desc->bd_type = type; desc->bd_md_count = 0; + desc->bd_nob_last = LNET_MTU; desc->bd_frag_ops = ops; LASSERT(max_brw > 0); desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT); @@ -253,6 +260,15 @@ void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc, kiov = &desc->bd_vec[desc->bd_iov_count]; + if (((desc->bd_iov_count % LNET_MAX_IOV) == 0) || + ((desc->bd_nob_last + len) > LNET_MTU)) { + desc->bd_mds_off[desc->bd_md_count] = desc->bd_iov_count; + desc->bd_md_count++; + desc->bd_nob_last = 0; + LASSERT(desc->bd_md_count <= PTLRPC_BULK_OPS_COUNT); + } + + desc->bd_nob_last += len; desc->bd_nob += len; if (pin) @@ -3444,9 +3460,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) || req->rq_mbits == 0) { req->rq_mbits = req->rq_xid; } else { - int total_md = (bd->bd_iov_count + LNET_MAX_IOV - 1) / - LNET_MAX_IOV; - req->rq_mbits -= total_md - 1; + req->rq_mbits -= bd->bd_md_count - 1; } } else { /* @@ -3461,8 +3475,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req) * that server can infer the number of bulks that were prepared, * see LU-1431 */ - req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) / - LNET_MAX_IOV) - 1; + req->rq_mbits += bd->bd_md_count - 1; /* * Set rq_xid as rq_mbits to indicate the final bulk for the old diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index b19dbdc..9fde3ac 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -171,7 +171,6 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc) RETURN(0); /* NB no locking required until desc is on the network */ - LASSERT(desc->bd_md_count == 0); LASSERT(ptlrpc_is_bulk_op_active(desc->bd_type)); LASSERT(desc->bd_cbid.cbid_fn == server_bulk_callback); @@ -327,7 +326,6 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req) /* NB no locking required until desc is on the network */ LASSERT(desc->bd_nob > 0); - LASSERT(desc->bd_md_count == 0); LASSERT(desc->bd_md_max_brw <= PTLRPC_BULK_OPS_COUNT); LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES); LASSERT(desc->bd_req != NULL); @@ -349,9 +347,9 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req) LASSERT(desc->bd_cbid.cbid_fn == client_bulk_callback); LASSERT(desc->bd_cbid.cbid_arg == desc); - total_md = (desc->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV; + total_md = desc->bd_md_count; /* rq_mbits is matchbits of the final bulk */ - mbits = req->rq_mbits - total_md + 1; + mbits = req->rq_mbits - desc->bd_md_count + 1; LASSERTF(mbits == (req->rq_mbits & PTLRPC_BULK_OPS_MASK), "first mbits = x%llu, last mbits = x%llu\n", @@ -369,13 +367,14 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req) md.handler = ptlrpc_handler; md.threshold = 1; /* PUT or GET */ - for (posted_md = 0; posted_md < total_md; posted_md++, mbits++) { + for (posted_md = 0; posted_md < desc->bd_md_count; + posted_md++, mbits++) { md.options = PTLRPC_MD_OPTIONS | (ptlrpc_is_bulk_op_get(desc->bd_type) ? LNET_MD_OP_GET : LNET_MD_OP_PUT); ptlrpc_fill_bulk_md(&md, desc, posted_md); - if (posted_md > 0 && posted_md + 1 == total_md && + if (posted_md > 0 && posted_md + 1 == desc->bd_md_count && OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_BULK_ATTACH)) { rc = -ENOMEM; } else { diff --git a/lustre/ptlrpc/pers.c b/lustre/ptlrpc/pers.c index 7183e37..14e11a6 100644 --- a/lustre/ptlrpc/pers.c +++ b/lustre/ptlrpc/pers.c @@ -44,21 +44,32 @@ void ptlrpc_fill_bulk_md(struct lnet_md *md, struct ptlrpc_bulk_desc *desc, int mdidx) { + unsigned int start = desc->bd_mds_off[mdidx]; + BUILD_BUG_ON(PTLRPC_MAX_BRW_PAGES >= LI_POISON); LASSERT(mdidx < desc->bd_md_max_brw); LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES); LASSERT(!(md->options & (LNET_MD_KIOV | LNET_MD_PHYS))); - md->length = max(0, desc->bd_iov_count - mdidx * LNET_MAX_IOV); - md->length = min_t(unsigned int, LNET_MAX_IOV, md->length); + /* just send a lnet header */ + if (mdidx >= desc->bd_md_count) { + md->options |= LNET_MD_KIOV; + md->length = 0; + md->start = NULL; + return; + } + + if (mdidx == (desc->bd_md_count - 1)) + md->length = desc->bd_iov_count - start; + else + md->length = desc->bd_mds_off[mdidx + 1] - start; md->options |= LNET_MD_KIOV; if (desc->bd_enc_vec) - md->start = &desc->bd_enc_vec[mdidx * - LNET_MAX_IOV]; + md->start = &desc->bd_enc_vec[start]; else - md->start = &desc->bd_vec[mdidx * LNET_MAX_IOV]; + md->start = &desc->bd_vec[start]; } -- 1.8.3.1