Whamcloud - gitweb
LU-10157 ptlrpc: separate number MD and refrences for bulk 86/37386/10
authorAlexey Lyashkov <c17817@cray.com>
Mon, 1 Jun 2020 12:57:53 +0000 (08:57 -0400)
committerOleg Drokin <green@whamcloud.com>
Sat, 6 Jun 2020 14:02:25 +0000 (14:02 +0000)
Introduce a bulk desc refs, it's different from MD's count ptlrpc
expects to have events from all MD's even it's filled or not. So,
number an MD's to post is related to the requested transfer size,
not a number MD's with data.

Cray-bug-id: LUS-8139
Signed-off-by: Alexey Lyashkov <c17817@cray.com>
Change-Id: I86a13d89eb68f469678baa842d47f5a9d910802a
Reviewed-on: https://review.whamcloud.com/37386
Reviewed-by: James Simmons <jsimmons@infradead.org>
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Shaun Tancheff <shaun.tancheff@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_net.h
lustre/ptlrpc/client.c
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c

index c2d9f07..916470a 100644 (file)
@@ -1425,6 +1425,7 @@ extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops;
  *  Another user is readpage for MDT.
  */
 struct ptlrpc_bulk_desc {
+       unsigned int    bd_refs; /* number MD's assigned including zero-sends */
        /** completed with failure */
        unsigned long bd_failure:1;
        /** client side */
@@ -2020,7 +2021,7 @@ static inline int ptlrpc_server_bulk_active(struct ptlrpc_bulk_desc *desc)
        LASSERT(desc != NULL);
 
        spin_lock(&desc->bd_lock);
-       rc = desc->bd_md_count;
+       rc = desc->bd_refs;
        spin_unlock(&desc->bd_lock);
        return rc;
 }
@@ -2045,7 +2046,7 @@ static inline int ptlrpc_client_bulk_active(struct ptlrpc_request *req)
 
 
        spin_lock(&desc->bd_lock);
-       rc = desc->bd_md_count;
+       rc = desc->bd_refs;
        spin_unlock(&desc->bd_lock);
        return rc;
 }
index 254852d..606abd7 100644 (file)
@@ -272,7 +272,7 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
 
        LASSERT(desc != NULL);
        LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
-       LASSERT(desc->bd_md_count == 0);         /* network hands off */
+       LASSERT(desc->bd_refs == 0);         /* network hands off */
        LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
        LASSERT(desc->bd_frag_ops != NULL);
 
index 147d406..7ef8f67 100644 (file)
@@ -204,8 +204,8 @@ void client_bulk_callback(struct lnet_event *ev)
 
        spin_lock(&desc->bd_lock);
        req = desc->bd_req;
-       LASSERT(desc->bd_md_count > 0);
-       desc->bd_md_count--;
+       LASSERT(desc->bd_refs > 0);
+       desc->bd_refs--;
 
        if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
                desc->bd_nob_transferred += ev->mlength;
@@ -222,7 +222,7 @@ void client_bulk_callback(struct lnet_event *ev)
 
        /* NB don't unlock till after wakeup; desc can disappear under us
         * otherwise */
-       if (desc->bd_md_count == 0)
+       if (desc->bd_refs == 0)
                ptlrpc_client_wake_req(desc->bd_req);
 
        spin_unlock(&desc->bd_lock);
@@ -454,7 +454,7 @@ void server_bulk_callback(struct lnet_event *ev)
 
        spin_lock(&desc->bd_lock);
 
-       LASSERT(desc->bd_md_count > 0);
+       LASSERT(desc->bd_refs > 0);
 
        if ((ev->type == LNET_EVENT_ACK ||
             ev->type == LNET_EVENT_REPLY) &&
@@ -470,9 +470,9 @@ void server_bulk_callback(struct lnet_event *ev)
                desc->bd_failure = 1;
 
        if (ev->unlinked) {
-               desc->bd_md_count--;
+               desc->bd_refs--;
                /* This is the last callback no matter what... */
-               if (desc->bd_md_count == 0)
+               if (desc->bd_refs == 0)
                        wake_up(&desc->bd_waitq);
        }
 
index bca143e..b19dbdc 100644 (file)
@@ -193,8 +193,7 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
         * off high bits to get bulk count for this RPC. LU-1431 */
        mbits = desc->bd_req->rq_mbits & ~((__u64)desc->bd_md_max_brw - 1);
        total_md = desc->bd_req->rq_mbits - mbits + 1;
-
-       desc->bd_md_count = total_md;
+       desc->bd_refs = total_md;
        desc->bd_failure = 0;
 
        md.user_ptr = &desc->bd_cbid;
@@ -250,9 +249,9 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
                 * event this creates will signal completion with failure,
                 * so we return SUCCESS here! */
                spin_lock(&desc->bd_lock);
-               desc->bd_md_count -= total_md - posted_md;
+               desc->bd_refs -= total_md - posted_md;
                spin_unlock(&desc->bd_lock);
-               LASSERT(desc->bd_md_count >= 0);
+               LASSERT(desc->bd_refs >= 0);
 
                mdunlink_iterate_helper(desc->bd_mds, posted_md);
                RETURN(0);
@@ -365,7 +364,7 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
 
        desc->bd_registered = 1;
        desc->bd_last_mbits = mbits;
-       desc->bd_md_count = total_md;
+       desc->bd_refs = total_md;
        md.user_ptr = &desc->bd_cbid;
        md.handler = ptlrpc_handler;
        md.threshold = 1;                       /* PUT or GET */
@@ -407,9 +406,9 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
        if (rc != 0) {
                LASSERT(rc == -ENOMEM);
                spin_lock(&desc->bd_lock);
-               desc->bd_md_count -= total_md - posted_md;
+               desc->bd_refs -= total_md - posted_md;
                spin_unlock(&desc->bd_lock);
-               LASSERT(desc->bd_md_count >= 0);
+               LASSERT(desc->bd_refs >= 0);
                mdunlink_iterate_helper(desc->bd_mds, desc->bd_md_max_brw);
                req->rq_status = -ENOMEM;
                desc->bd_registered = 0;
@@ -418,14 +417,15 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
 
        spin_lock(&desc->bd_lock);
        /* Holler if peer manages to touch buffers before he knows the mbits */
-       if (desc->bd_md_count != total_md)
+       if (desc->bd_refs != total_md)
                CWARN("%s: Peer %s touched %d buffers while I registered\n",
                      desc->bd_import->imp_obd->obd_name, libcfs_id2str(peer),
-                     total_md - desc->bd_md_count);
+                     total_md - desc->bd_refs);
        spin_unlock(&desc->bd_lock);
 
-       CDEBUG(D_NET, "Setup %u bulk %s buffers: %u pages %u bytes, "
-              "mbits x%#llx-%#llx, portal %u\n", desc->bd_md_count,
+       CDEBUG(D_NET,
+              "Setup %u bulk %s buffers: %u pages %u bytes, mbits x%#llx-%#llx, portal %u\n",
+              desc->bd_refs,
               ptlrpc_is_bulk_op_get(desc->bd_type) ? "get-source" : "put-sink",
               desc->bd_iov_count, desc->bd_nob,
               desc->bd_last_mbits, req->rq_mbits, desc->bd_portal);