Whamcloud - gitweb
LU-13005 ptlrpc: use percpu refcount to track requests. 45/36845/10
authorMr NeilBrown <neilb@suse.de>
Wed, 20 Nov 2019 02:16:04 +0000 (13:16 +1100)
committerOleg Drokin <green@whamcloud.com>
Tue, 24 Mar 2020 05:16:54 +0000 (05:16 +0000)
ptlrpc needs to wait for outstanding RPC requests to complete before
ptlrpc_ni_fini() finishes.
It currently does this using a refcount in the event-queue, but that
refcount is otherwise unused, and it will shortly be removed.

So add a percpu refcount that explicitly tracks these requests, and
wait for it with a completion.

Also move the declaration of ptlrpc_eq to ptlrpc_internal.h which is a
more appropriate location.

Signed-off-by: Mr NeilBrown <neilb@suse.de>
Change-Id: I479df29d1ddfb4e3d82a26b5fb567f3432dad8af
Reviewed-on: https://review.whamcloud.com/36845
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: James Simmons <jsimmons@infradead.org>
Reviewed-by: Shaun Tancheff <shaun.tancheff@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_net.h
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/ptlrpc_internal.h

index 1632b3b..2bb3b46 100644 (file)
@@ -1937,7 +1937,6 @@ static inline bool nrs_policy_compat_one(const struct ptlrpc_service *svc,
 /** @} nrs */
 
 /* ptlrpc/events.c */
-extern struct lnet_eq *ptlrpc_eq;
 extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
                               struct lnet_process_id *peer, lnet_nid_t *self);
 /**
index fb4dfc4..34d1282 100644 (file)
@@ -41,6 +41,7 @@
 #include "ptlrpc_internal.h"
 
 struct lnet_eq *ptlrpc_eq;
+struct percpu_ref ptlrpc_pending;
 
 /*
  *  Client's outgoing request callback
@@ -481,22 +482,24 @@ void server_bulk_callback(struct lnet_event *ev)
 
 static void ptlrpc_master_callback(struct lnet_event *ev)
 {
-        struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+       struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
        void (*callback)(struct lnet_event *ev) = cbid->cbid_fn;
 
-        /* Honestly, it's best to find out early. */
-        LASSERT (cbid->cbid_arg != LP_POISON);
-        LASSERT (callback == request_out_callback ||
-                 callback == reply_in_callback ||
-                 callback == client_bulk_callback ||
-                 callback == request_in_callback ||
-                 callback == reply_out_callback
+       /* Honestly, it's best to find out early. */
+       LASSERT(cbid->cbid_arg != LP_POISON);
+       LASSERT(callback == request_out_callback ||
+               callback == reply_in_callback ||
+               callback == client_bulk_callback ||
+               callback == request_in_callback ||
+               callback == reply_out_callback
 #ifdef HAVE_SERVER_SUPPORT
-                 || callback == server_bulk_callback
+               || callback == server_bulk_callback
 #endif
-                 );
+               );
 
-        callback (ev);
+       callback(ev);
+       if (ev->unlinked)
+               percpu_ref_put(&ptlrpc_pending);
 }
 
 int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
@@ -545,36 +548,26 @@ int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
        return rc;
 }
 
-void ptlrpc_ni_fini(void)
+static struct completion ptlrpc_done;
+
+static void ptlrpc_release(struct percpu_ref *ref)
 {
-       int                 rc;
-       int                 retries;
+       complete(&ptlrpc_done);
+}
 
+static void ptlrpc_ni_fini(void)
+{
        /* Wait for the event queue to become idle since there may still be
         * messages in flight with pending events (i.e. the fire-and-forget
         * messages == client requests and "non-difficult" server
         * replies */
 
-       for (retries = 0;; retries++) {
-               rc = LNetEQFree(ptlrpc_eq);
-               switch (rc) {
-               default:
-                       LBUG();
+       init_completion(&ptlrpc_done);
+       percpu_ref_kill(&ptlrpc_pending);
+       wait_for_completion(&ptlrpc_done);
 
-               case 0:
-                       LNetNIFini();
-                       return;
-
-               case -EBUSY:
-                       if (retries != 0)
-                               CWARN("Event queue still busy\n");
-
-                       /* Wait for a bit */
-                       ssleep(2);
-                       break;
-               }
-       }
-       /* notreached */
+       LNetEQFree(ptlrpc_eq);
+       LNetNIFini();
 }
 
 lnet_pid_t ptl_get_pid(void)
@@ -584,21 +577,28 @@ lnet_pid_t ptl_get_pid(void)
 
 int ptlrpc_ni_init(void)
 {
-        int              rc;
-        lnet_pid_t       pid;
+       int rc;
+       lnet_pid_t pid;
 
-        pid = ptl_get_pid();
-        CDEBUG(D_NET, "My pid is: %x\n", pid);
+       pid = ptl_get_pid();
+       CDEBUG(D_NET, "My pid is: %x\n", pid);
 
-        /* We're not passing any limits yet... */
-        rc = LNetNIInit(pid);
-        if (rc < 0) {
-                CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
+       /* We're not passing any limits yet... */
+       rc = LNetNIInit(pid);
+       if (rc < 0) {
+               CDEBUG(D_NET, "ptlrpc: Can't init network interface: rc = %d\n",
+                      rc);
                return rc;
-        }
+       }
 
-        /* CAVEAT EMPTOR: how we process portals events is _radically_
-         * different depending on... */
+       rc = percpu_ref_init(&ptlrpc_pending, ptlrpc_release, 0, GFP_KERNEL);
+       if (rc) {
+               CERROR("ptlrpc: Can't init percpu refcount: rc = %d\n", rc);
+               return rc;
+       }
+       /* CAVEAT EMPTOR: how we process portals events is _radically_
+        * different depending on...
+        */
        /* kernel LNet calls our master callback when there are new event,
         * because we are guaranteed to get every event via callback,
         * so we just set EQ size to 0 to avoid overhread of serializing
index 82b4666..9d1a954 100644 (file)
@@ -85,6 +85,8 @@ static int ptl_send_buf(struct lnet_handle_md *mdh, void *base, int len,
        CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %lld, offset %u\n",
               len, portal, xid, offset);
 
+       percpu_ref_get(&ptlrpc_pending);
+
        rc = LNetPut(self, *mdh, ack,
                     peer_id, portal, xid, offset, 0);
        if (unlikely(rc != 0)) {
@@ -216,6 +218,7 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
                        }
                        break;
                }
+               percpu_ref_get(&ptlrpc_pending);
 
                /* sanity.sh 224c: lets skip last md */
                if (posted_md == desc->bd_md_max_brw - 1)
@@ -386,6 +389,7 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
                               posted_md, rc);
                        break;
                }
+               percpu_ref_get(&ptlrpc_pending);
 
                /* About to let the network at it... */
                rc = LNetMDAttach(me, md, LNET_UNLINK,
@@ -866,14 +870,15 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
                        /* ...but the MD attach didn't succeed... */
                        request->rq_receiving_reply = 0;
                        spin_unlock(&request->rq_lock);
-                        GOTO(cleanup_me, rc = -ENOMEM);
-                }
+                       GOTO(cleanup_me, rc = -ENOMEM);
+               }
+               percpu_ref_get(&ptlrpc_pending);
 
-               CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %llu"
-                       ", portal %u\n",
-                       request->rq_repbuf_len, request->rq_xid,
-                       request->rq_reply_portal);
-        }
+               CDEBUG(D_NET,
+                      "Setup reply buffer: %u bytes, xid %llu, portal %u\n",
+                      request->rq_repbuf_len, request->rq_xid,
+                      request->rq_reply_portal);
+       }
 
         /* add references on request for request_out_callback */
         ptlrpc_request_addref(request);
@@ -981,10 +986,12 @@ int ptlrpc_register_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
        md.eq_handle = ptlrpc_eq;
 
        rc = LNetMDAttach(me, md, LNET_UNLINK, &rqbd->rqbd_md_h);
-       if (rc == 0)
+       if (rc == 0) {
+               percpu_ref_get(&ptlrpc_pending);
                return 0;
+       }
 
-       CERROR("LNetMDAttach failed: %d;\n", rc);
+       CERROR("ptlrpc: LNetMDAttach failed: rc = %d\n", rc);
        LASSERT(rc == -ENOMEM);
        LNetMEUnlink(me);
        LASSERT(rc == 0);
index ffa31c6..a19788d 100644 (file)
@@ -63,6 +63,9 @@ extern struct nrs_core nrs_core;
 extern struct mutex ptlrpcd_mutex;
 extern struct mutex pinger_mutex;
 
+extern struct lnet_eq *ptlrpc_eq;
+extern struct percpu_ref ptlrpc_pending;
+
 int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait);
 /* ptlrpcd.c */
 int ptlrpcd_start(struct ptlrpcd_ctl *pc);