#include "ptlrpc_internal.h"
struct lnet_eq *ptlrpc_eq;
+struct percpu_ref ptlrpc_pending;
/*
* Client's outgoing request callback
static void ptlrpc_master_callback(struct lnet_event *ev)
{
- struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+ struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
void (*callback)(struct lnet_event *ev) = cbid->cbid_fn;
- /* Honestly, it's best to find out early. */
- LASSERT (cbid->cbid_arg != LP_POISON);
- LASSERT (callback == request_out_callback ||
- callback == reply_in_callback ||
- callback == client_bulk_callback ||
- callback == request_in_callback ||
- callback == reply_out_callback
+ /* Honestly, it's best to find out early. */
+ LASSERT(cbid->cbid_arg != LP_POISON);
+ LASSERT(callback == request_out_callback ||
+ callback == reply_in_callback ||
+ callback == client_bulk_callback ||
+ callback == request_in_callback ||
+ callback == reply_out_callback
#ifdef HAVE_SERVER_SUPPORT
- || callback == server_bulk_callback
+ || callback == server_bulk_callback
#endif
- );
+ );
- callback (ev);
+ callback(ev);
+ if (ev->unlinked)
+ percpu_ref_put(&ptlrpc_pending);
}
int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
return rc;
}
-void ptlrpc_ni_fini(void)
+static struct completion ptlrpc_done;
+
+static void ptlrpc_release(struct percpu_ref *ref)
{
- int rc;
- int retries;
+ complete(&ptlrpc_done);
+}
+static void ptlrpc_ni_fini(void)
+{
/* Wait for the event queue to become idle since there may still be
* messages in flight with pending events (i.e. the fire-and-forget
* messages == client requests and "non-difficult" server
* replies */
- for (retries = 0;; retries++) {
- rc = LNetEQFree(ptlrpc_eq);
- switch (rc) {
- default:
- LBUG();
+ init_completion(&ptlrpc_done);
+ percpu_ref_kill(&ptlrpc_pending);
+ wait_for_completion(&ptlrpc_done);
- case 0:
- LNetNIFini();
- return;
-
- case -EBUSY:
- if (retries != 0)
- CWARN("Event queue still busy\n");
-
- /* Wait for a bit */
- ssleep(2);
- break;
- }
- }
- /* notreached */
+ LNetEQFree(ptlrpc_eq);
+ LNetNIFini();
}
lnet_pid_t ptl_get_pid(void)
int ptlrpc_ni_init(void)
{
- int rc;
- lnet_pid_t pid;
+ int rc;
+ lnet_pid_t pid;
- pid = ptl_get_pid();
- CDEBUG(D_NET, "My pid is: %x\n", pid);
+ pid = ptl_get_pid();
+ CDEBUG(D_NET, "My pid is: %x\n", pid);
- /* We're not passing any limits yet... */
- rc = LNetNIInit(pid);
- if (rc < 0) {
- CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
+ /* We're not passing any limits yet... */
+ rc = LNetNIInit(pid);
+ if (rc < 0) {
+ CDEBUG(D_NET, "ptlrpc: Can't init network interface: rc = %d\n",
+ rc);
return rc;
- }
+ }
- /* CAVEAT EMPTOR: how we process portals events is _radically_
- * different depending on... */
+ rc = percpu_ref_init(&ptlrpc_pending, ptlrpc_release, 0, GFP_KERNEL);
+ if (rc) {
+ CERROR("ptlrpc: Can't init percpu refcount: rc = %d\n", rc);
+ return rc;
+ }
+ /* CAVEAT EMPTOR: how we process portals events is _radically_
+ * different depending on...
+ */
/* kernel LNet calls our master callback when there are new event,
* because we are guaranteed to get every event via callback,
* so we just set EQ size to 0 to avoid overhread of serializing