Whamcloud - gitweb
LU-10157 ptlrpc: fill md correctly. 87/37387/10
authorAlexey Lyashkov <c17817@cray.com>
Mon, 1 Jun 2020 13:00:11 +0000 (09:00 -0400)
committerOleg Drokin <green@whamcloud.com>
Sat, 6 Jun 2020 14:02:30 +0000 (14:02 +0000)
MD fill should limit to the overall transfer size in additional
to the number a fragment.
Let's do this.

Cray-bug-id: LUS-8139
Signed-off-by: Alexey Lyashkov <c17817@cray.com>
Change-Id: I45219ffd8206f89f54688e7ecb0ccbb65ed3e3c1
Reviewed-on: https://review.whamcloud.com/37387
Reviewed-by: James Simmons <jsimmons@infradead.org>
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Shaun Tancheff <shaun.tancheff@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_net.h
lustre/ptlrpc/client.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/pers.c

index 916470a..eea505a 100644 (file)
@@ -1448,6 +1448,7 @@ struct ptlrpc_bulk_desc {
        int                    bd_max_iov;      /* allocated size of bd_iov */
        int                    bd_nob;          /* # bytes covered */
        int                    bd_nob_transferred; /* # bytes GOT/PUT */
+       unsigned int            bd_nob_last;    /* # bytes in last MD */
 
        __u64                  bd_last_mbits;
 
@@ -1455,6 +1456,9 @@ struct ptlrpc_bulk_desc {
        lnet_nid_t             bd_sender;       /* stash event::sender */
        int                     bd_md_count;    /* # valid entries in bd_mds */
        int                     bd_md_max_brw;  /* max entries in bd_mds */
+
+       /** array of offsets for each MD */
+       unsigned int            bd_mds_off[PTLRPC_BULK_OPS_COUNT];
        /** array of associated MDs */
        struct lnet_handle_md   bd_mds[PTLRPC_BULK_OPS_COUNT];
 
index 606abd7..a902a5b 100644 (file)
@@ -169,6 +169,12 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags,
 
        LASSERT(ops->add_kiov_frag != NULL);
 
+       if (max_brw > PTLRPC_BULK_OPS_COUNT)
+               RETURN(NULL);
+
+       if (nfrags > LNET_MAX_IOV * max_brw)
+               RETURN(NULL);
+
        OBD_ALLOC_PTR(desc);
        if (!desc)
                return NULL;
@@ -185,6 +191,7 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags,
        desc->bd_portal = portal;
        desc->bd_type = type;
        desc->bd_md_count = 0;
+       desc->bd_nob_last = LNET_MTU;
        desc->bd_frag_ops = ops;
        LASSERT(max_brw > 0);
        desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
@@ -253,6 +260,15 @@ void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
 
        kiov = &desc->bd_vec[desc->bd_iov_count];
 
+       if (((desc->bd_iov_count % LNET_MAX_IOV) == 0) ||
+            ((desc->bd_nob_last + len) > LNET_MTU)) {
+               desc->bd_mds_off[desc->bd_md_count] = desc->bd_iov_count;
+               desc->bd_md_count++;
+               desc->bd_nob_last = 0;
+               LASSERT(desc->bd_md_count <= PTLRPC_BULK_OPS_COUNT);
+       }
+
+       desc->bd_nob_last += len;
        desc->bd_nob += len;
 
        if (pin)
@@ -3444,9 +3460,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
                    || req->rq_mbits == 0) {
                        req->rq_mbits = req->rq_xid;
                } else {
-                       int total_md = (bd->bd_iov_count + LNET_MAX_IOV - 1) /
-                                       LNET_MAX_IOV;
-                       req->rq_mbits -= total_md - 1;
+                       req->rq_mbits -= bd->bd_md_count - 1;
                }
        } else {
                /*
@@ -3461,8 +3475,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
         * that server can infer the number of bulks that were prepared,
         * see LU-1431
         */
-       req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) /
-                         LNET_MAX_IOV) - 1;
+       req->rq_mbits += bd->bd_md_count - 1;
 
        /*
         * Set rq_xid as rq_mbits to indicate the final bulk for the old
index b19dbdc..9fde3ac 100644 (file)
@@ -171,7 +171,6 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
                RETURN(0);
 
        /* NB no locking required until desc is on the network */
-       LASSERT(desc->bd_md_count == 0);
        LASSERT(ptlrpc_is_bulk_op_active(desc->bd_type));
 
        LASSERT(desc->bd_cbid.cbid_fn == server_bulk_callback);
@@ -327,7 +326,6 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
 
        /* NB no locking required until desc is on the network */
        LASSERT(desc->bd_nob > 0);
-       LASSERT(desc->bd_md_count == 0);
        LASSERT(desc->bd_md_max_brw <= PTLRPC_BULK_OPS_COUNT);
        LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES);
        LASSERT(desc->bd_req != NULL);
@@ -349,9 +347,9 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
        LASSERT(desc->bd_cbid.cbid_fn == client_bulk_callback);
        LASSERT(desc->bd_cbid.cbid_arg == desc);
 
-       total_md = (desc->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV;
+       total_md = desc->bd_md_count;
        /* rq_mbits is matchbits of the final bulk */
-       mbits = req->rq_mbits - total_md + 1;
+       mbits = req->rq_mbits - desc->bd_md_count + 1;
 
        LASSERTF(mbits == (req->rq_mbits & PTLRPC_BULK_OPS_MASK),
                 "first mbits = x%llu, last mbits = x%llu\n",
@@ -369,13 +367,14 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
        md.handler = ptlrpc_handler;
        md.threshold = 1;                       /* PUT or GET */
 
-       for (posted_md = 0; posted_md < total_md; posted_md++, mbits++) {
+       for (posted_md = 0; posted_md < desc->bd_md_count;
+            posted_md++, mbits++) {
                md.options = PTLRPC_MD_OPTIONS |
                             (ptlrpc_is_bulk_op_get(desc->bd_type) ?
                              LNET_MD_OP_GET : LNET_MD_OP_PUT);
                ptlrpc_fill_bulk_md(&md, desc, posted_md);
 
-               if (posted_md > 0 && posted_md + 1 == total_md &&
+               if (posted_md > 0 && posted_md + 1 == desc->bd_md_count &&
                    OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_BULK_ATTACH)) {
                        rc = -ENOMEM;
                } else {
index 7183e37..14e11a6 100644 (file)
 void ptlrpc_fill_bulk_md(struct lnet_md *md, struct ptlrpc_bulk_desc *desc,
                         int mdidx)
 {
+       unsigned int start = desc->bd_mds_off[mdidx];
+
        BUILD_BUG_ON(PTLRPC_MAX_BRW_PAGES >= LI_POISON);
 
        LASSERT(mdidx < desc->bd_md_max_brw);
        LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES);
        LASSERT(!(md->options & (LNET_MD_KIOV | LNET_MD_PHYS)));
 
-       md->length = max(0, desc->bd_iov_count - mdidx * LNET_MAX_IOV);
-       md->length = min_t(unsigned int, LNET_MAX_IOV, md->length);
+       /* just send a lnet header */
+       if (mdidx >= desc->bd_md_count) {
+               md->options |= LNET_MD_KIOV;
+               md->length = 0;
+               md->start = NULL;
+               return;
+       }
+
+       if (mdidx == (desc->bd_md_count - 1))
+               md->length = desc->bd_iov_count - start;
+       else
+               md->length = desc->bd_mds_off[mdidx + 1] - start;
 
        md->options |= LNET_MD_KIOV;
        if (desc->bd_enc_vec)
-               md->start = &desc->bd_enc_vec[mdidx *
-                                             LNET_MAX_IOV];
+               md->start = &desc->bd_enc_vec[start];
        else
-               md->start = &desc->bd_vec[mdidx * LNET_MAX_IOV];
+               md->start = &desc->bd_vec[start];
 }