Whamcloud - gitweb
LU-13005 ptlrpc: use percpu refcount to track requests.
[fs/lustre-release.git] / lustre / ptlrpc / events.c
index fb4dfc4..34d1282 100644 (file)
@@ -41,6 +41,7 @@
 #include "ptlrpc_internal.h"
 
 struct lnet_eq *ptlrpc_eq;
+struct percpu_ref ptlrpc_pending;
 
 /*
  *  Client's outgoing request callback
@@ -481,22 +482,24 @@ void server_bulk_callback(struct lnet_event *ev)
 
 static void ptlrpc_master_callback(struct lnet_event *ev)
 {
-        struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+       struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
        void (*callback)(struct lnet_event *ev) = cbid->cbid_fn;
 
-        /* Honestly, it's best to find out early. */
-        LASSERT (cbid->cbid_arg != LP_POISON);
-        LASSERT (callback == request_out_callback ||
-                 callback == reply_in_callback ||
-                 callback == client_bulk_callback ||
-                 callback == request_in_callback ||
-                 callback == reply_out_callback
+       /* Honestly, it's best to find out early. */
+       LASSERT(cbid->cbid_arg != LP_POISON);
+       LASSERT(callback == request_out_callback ||
+               callback == reply_in_callback ||
+               callback == client_bulk_callback ||
+               callback == request_in_callback ||
+               callback == reply_out_callback
 #ifdef HAVE_SERVER_SUPPORT
-                 || callback == server_bulk_callback
+               || callback == server_bulk_callback
 #endif
-                 );
+               );
 
-        callback (ev);
+       callback(ev);
+       if (ev->unlinked)
+               percpu_ref_put(&ptlrpc_pending);
 }
 
 int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
@@ -545,36 +548,26 @@ int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
        return rc;
 }
 
-void ptlrpc_ni_fini(void)
+static struct completion ptlrpc_done;
+
+static void ptlrpc_release(struct percpu_ref *ref)
 {
-       int                 rc;
-       int                 retries;
+       complete(&ptlrpc_done);
+}
 
+static void ptlrpc_ni_fini(void)
+{
        /* Wait for the event queue to become idle since there may still be
         * messages in flight with pending events (i.e. the fire-and-forget
         * messages == client requests and "non-difficult" server
         * replies */
 
-       for (retries = 0;; retries++) {
-               rc = LNetEQFree(ptlrpc_eq);
-               switch (rc) {
-               default:
-                       LBUG();
+       init_completion(&ptlrpc_done);
+       percpu_ref_kill(&ptlrpc_pending);
+       wait_for_completion(&ptlrpc_done);
 
-               case 0:
-                       LNetNIFini();
-                       return;
-
-               case -EBUSY:
-                       if (retries != 0)
-                               CWARN("Event queue still busy\n");
-
-                       /* Wait for a bit */
-                       ssleep(2);
-                       break;
-               }
-       }
-       /* notreached */
+       LNetEQFree(ptlrpc_eq);
+       LNetNIFini();
 }
 
 lnet_pid_t ptl_get_pid(void)
@@ -584,21 +577,28 @@ lnet_pid_t ptl_get_pid(void)
 
 int ptlrpc_ni_init(void)
 {
-        int              rc;
-        lnet_pid_t       pid;
+       int rc;
+       lnet_pid_t pid;
 
-        pid = ptl_get_pid();
-        CDEBUG(D_NET, "My pid is: %x\n", pid);
+       pid = ptl_get_pid();
+       CDEBUG(D_NET, "My pid is: %x\n", pid);
 
-        /* We're not passing any limits yet... */
-        rc = LNetNIInit(pid);
-        if (rc < 0) {
-                CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
+       /* We're not passing any limits yet... */
+       rc = LNetNIInit(pid);
+       if (rc < 0) {
+               CDEBUG(D_NET, "ptlrpc: Can't init network interface: rc = %d\n",
+                      rc);
                return rc;
-        }
+       }
 
-        /* CAVEAT EMPTOR: how we process portals events is _radically_
-         * different depending on... */
+       rc = percpu_ref_init(&ptlrpc_pending, ptlrpc_release, 0, GFP_KERNEL);
+       if (rc) {
+               CERROR("ptlrpc: Can't init percpu refcount: rc = %d\n", rc);
+               return rc;
+       }
+       /* CAVEAT EMPTOR: how we process portals events is _radically_
+        * different depending on...
+        */
        /* kernel LNet calls our master callback when there are new event,
         * because we are guaranteed to get every event via callback,
         * so we just set EQ size to 0 to avoid overhread of serializing