Whamcloud - gitweb
LU-10157 ptlrpc: fill md correctly. 76/41276/3
authorAlexey Lyashkov <c17817@cray.com>
Wed, 20 Jan 2021 02:57:28 +0000 (21:57 -0500)
committerOleg Drokin <green@whamcloud.com>
Thu, 4 Mar 2021 08:36:20 +0000 (08:36 +0000)
MD fill should limit to the overall transfer size in additional
to the number a fragment.
Let's do this.

Lustre-change: https://review.whamcloud.com/37387
Lustre-commit: e1ac9e74844dc75d77ef740b3a44fad2efde30c5

Cray-bug-id: LUS-7159
Signed-off-by: Alexey Lyashkov <c17817@cray.com>
Signed-off-by: Serguei Smirnov <ssmirnov@whamcloud.com>
Change-Id: Ibd3be1989c8dd5012e1b158f3942fd041f2da350
Reviewed-on: https://review.whamcloud.com/41276
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Alexey Lyashkov <alexey.lyashkov@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_net.h
lustre/ptlrpc/client.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/pers.c

index b47cc9d..d4ff8cb 100644 (file)
@@ -1462,6 +1462,7 @@ struct ptlrpc_bulk_desc {
        int                    bd_max_iov;      /* allocated size of bd_iov */
        int                    bd_nob;          /* # bytes covered */
        int                    bd_nob_transferred; /* # bytes GOT/PUT */
+       unsigned int           bd_nob_last;     /* # bytes in last MD */
 
        __u64                  bd_last_mbits;
 
@@ -1469,6 +1470,8 @@ struct ptlrpc_bulk_desc {
        lnet_nid_t             bd_sender;       /* stash event::sender */
        int                     bd_md_count;    /* # valid entries in bd_mds */
        int                     bd_md_max_brw;  /* max entries in bd_mds */
+       /** array of offsets for each MD */
+       unsigned int            bd_mds_off[PTLRPC_BULK_OPS_COUNT];
        /** array of associated MDs */
        struct lnet_handle_md   bd_mds[PTLRPC_BULK_OPS_COUNT];
 
index 622e501..995f4dd 100644 (file)
@@ -127,6 +127,12 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw,
                (ptlrpc_is_bulk_desc_kvec(type) &&
                 ops->add_iov_frag != NULL));
 
+       if (max_brw > PTLRPC_BULK_OPS_COUNT)
+               RETURN(NULL);
+
+       if (nfrags > LNET_MAX_IOV * max_brw)
+               RETURN(NULL);
+
        OBD_ALLOC_PTR(desc);
        if (desc == NULL)
                return NULL;
@@ -149,6 +155,7 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned nfrags, unsigned max_brw,
        desc->bd_portal = portal;
        desc->bd_type = type;
        desc->bd_md_count = 0;
+       desc->bd_nob_last = LNET_MTU;
        desc->bd_frag_ops = (struct ptlrpc_bulk_frag_ops *) ops;
        LASSERT(max_brw > 0);
        desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
@@ -215,7 +222,15 @@ void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
        LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
 
        kiov = &BD_GET_KIOV(desc, desc->bd_iov_count);
+       if (((desc->bd_iov_count % LNET_MAX_IOV) == 0) ||
+            ((desc->bd_nob_last + len) > LNET_MTU)) {
+               desc->bd_mds_off[desc->bd_md_count] = desc->bd_iov_count;
+               desc->bd_md_count++;
+               desc->bd_nob_last = 0;
+               LASSERT(desc->bd_md_count <= PTLRPC_BULK_OPS_COUNT);
+       }
 
+       desc->bd_nob_last += len;
        desc->bd_nob += len;
 
        if (pin)
@@ -241,7 +256,15 @@ int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc,
        LASSERT(ptlrpc_is_bulk_desc_kvec(desc->bd_type));
 
        iovec = &BD_GET_KVEC(desc, desc->bd_iov_count);
+       if (((desc->bd_iov_count % LNET_MAX_IOV) == 0) ||
+            ((desc->bd_nob_last + len) > LNET_MTU)) {
+               desc->bd_mds_off[desc->bd_md_count] = desc->bd_iov_count;
+               desc->bd_md_count++;
+               desc->bd_nob_last = 0;
+               LASSERT(desc->bd_md_count <= PTLRPC_BULK_OPS_COUNT);
+       }
 
+       desc->bd_nob_last += len;
        desc->bd_nob += len;
 
        iovec->iov_base = frag;
@@ -3329,8 +3352,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
        /* For multi-bulk RPCs, rq_mbits is the last mbits needed for bulks so
         * that server can infer the number of bulks that were prepared,
         * see LU-1431 */
-       req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) /
-                         LNET_MAX_IOV) - 1;
+       req->rq_mbits += bd->bd_md_count - 1;
 
        /* Set rq_xid as rq_mbits to indicate the final bulk for the old
         * server which does not support OBD_CONNECT_BULK_MBITS. LU-6808.
index 4d99341..d378ce7 100644 (file)
@@ -167,7 +167,6 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
                RETURN(0);
 
        /* NB no locking required until desc is on the network */
-       LASSERT(desc->bd_md_count == 0);
        LASSERT(ptlrpc_is_bulk_op_active(desc->bd_type));
 
        LASSERT(desc->bd_cbid.cbid_fn == server_bulk_callback);
@@ -327,7 +326,6 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
 
        /* NB no locking required until desc is on the network */
        LASSERT(desc->bd_nob > 0);
-       LASSERT(desc->bd_md_count == 0);
        LASSERT(desc->bd_md_max_brw <= PTLRPC_BULK_OPS_COUNT);
        LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES);
        LASSERT(desc->bd_req != NULL);
@@ -349,9 +347,9 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
        LASSERT(desc->bd_cbid.cbid_fn == client_bulk_callback);
        LASSERT(desc->bd_cbid.cbid_arg == desc);
 
-       total_md = (desc->bd_iov_count + LNET_MAX_IOV - 1) / LNET_MAX_IOV;
+       total_md = desc->bd_md_count;
        /* rq_mbits is matchbits of the final bulk */
-       mbits = req->rq_mbits - total_md + 1;
+       mbits = req->rq_mbits - desc->bd_md_count + 1;
 
        LASSERTF(mbits == (req->rq_mbits & PTLRPC_BULK_OPS_MASK),
                 "first mbits = x%llu, last mbits = x%llu\n",
@@ -369,13 +367,14 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
        md.eq_handle = ptlrpc_eq_h;
        md.threshold = 1;                       /* PUT or GET */
 
-       for (posted_md = 0; posted_md < total_md; posted_md++, mbits++) {
+       for (posted_md = 0; posted_md < desc->bd_md_count;
+            posted_md++, mbits++) {
                md.options = PTLRPC_MD_OPTIONS |
                             (ptlrpc_is_bulk_op_get(desc->bd_type) ?
                              LNET_MD_OP_GET : LNET_MD_OP_PUT);
                ptlrpc_fill_bulk_md(&md, desc, posted_md);
 
-               if (posted_md > 0 && posted_md + 1 == total_md &&
+               if (posted_md > 0 && posted_md + 1 == desc->bd_md_count &&
                    OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_BULK_ATTACH)) {
                        rc = -ENOMEM;
                } else {
index 51e17e2..d0c8fa7 100644 (file)
@@ -44,6 +44,8 @@
 void ptlrpc_fill_bulk_md(struct lnet_md *md, struct ptlrpc_bulk_desc *desc,
                         int mdidx)
 {
+       unsigned int start = desc->bd_mds_off[mdidx];
+
        CLASSERT(PTLRPC_MAX_BRW_PAGES < LI_POISON);
 
        LASSERT(mdidx < desc->bd_md_max_brw);
@@ -51,23 +53,34 @@ void ptlrpc_fill_bulk_md(struct lnet_md *md, struct ptlrpc_bulk_desc *desc,
        LASSERT(!(md->options & (LNET_MD_IOVEC | LNET_MD_KIOV |
                                 LNET_MD_PHYS)));
 
-       md->length = max(0, desc->bd_iov_count - mdidx * LNET_MAX_IOV);
-       md->length = min_t(unsigned int, LNET_MAX_IOV, md->length);
+       /* just send a lnet header */
+       if (mdidx >= desc->bd_md_count) {
+               if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
+                       md->options |= LNET_MD_KIOV;
+               else if (ptlrpc_is_bulk_desc_kvec(desc->bd_type))
+                       md->options |= LNET_MD_IOVEC;
+               md->length = 0;
+               md->start = NULL;
+               return;
+       }
+
+       if (mdidx == (desc->bd_md_count - 1))
+               md->length = desc->bd_iov_count - start;
+       else
+               md->length = desc->bd_mds_off[mdidx + 1] - start;
 
        if (ptlrpc_is_bulk_desc_kiov(desc->bd_type)) {
                md->options |= LNET_MD_KIOV;
                if (GET_ENC_KIOV(desc))
-                       md->start = &BD_GET_ENC_KIOV(desc, mdidx *
-                                                    LNET_MAX_IOV);
+                       md->start = &BD_GET_ENC_KIOV(desc, start);
                else
-                       md->start = &BD_GET_KIOV(desc, mdidx * LNET_MAX_IOV);
+                       md->start = &BD_GET_KIOV(desc, start);
        } else if (ptlrpc_is_bulk_desc_kvec(desc->bd_type)) {
                md->options |= LNET_MD_IOVEC;
                if (GET_ENC_KVEC(desc))
-                       md->start = &BD_GET_ENC_KVEC(desc, mdidx *
-                                                     LNET_MAX_IOV);
+                       md->start = &BD_GET_ENC_KVEC(desc, start);
                else
-                       md->start = &BD_GET_KVEC(desc, mdidx * LNET_MAX_IOV);
+                       md->start = &BD_GET_KVEC(desc, start);
        }
 }