#include "ptlrpc_internal.h"
struct lnet_eq *ptlrpc_eq;
+struct percpu_ref ptlrpc_pending;
/*
* Client's outgoing request callback
static void ptlrpc_master_callback(struct lnet_event *ev)
{
- struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+ struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
void (*callback)(struct lnet_event *ev) = cbid->cbid_fn;
- /* Honestly, it's best to find out early. */
- LASSERT (cbid->cbid_arg != LP_POISON);
- LASSERT (callback == request_out_callback ||
- callback == reply_in_callback ||
- callback == client_bulk_callback ||
- callback == request_in_callback ||
- callback == reply_out_callback
+ /* Honestly, it's best to find out early. */
+ LASSERT(cbid->cbid_arg != LP_POISON);
+ LASSERT(callback == request_out_callback ||
+ callback == reply_in_callback ||
+ callback == client_bulk_callback ||
+ callback == request_in_callback ||
+ callback == reply_out_callback
#ifdef HAVE_SERVER_SUPPORT
- || callback == server_bulk_callback
+ || callback == server_bulk_callback
#endif
- );
+ );
- callback (ev);
+ callback(ev);
+ if (ev->unlinked)
+ percpu_ref_put(&ptlrpc_pending);
}
int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
return rc;
}
-void ptlrpc_ni_fini(void)
+static struct completion ptlrpc_done;
+
+static void ptlrpc_release(struct percpu_ref *ref)
{
- int rc;
- int retries;
+ complete(&ptlrpc_done);
+}
+static void ptlrpc_ni_fini(void)
+{
/* Wait for the event queue to become idle since there may still be
* messages in flight with pending events (i.e. the fire-and-forget
* messages == client requests and "non-difficult" server
* replies */
- for (retries = 0;; retries++) {
- rc = LNetEQFree(ptlrpc_eq);
- switch (rc) {
- default:
- LBUG();
+ init_completion(&ptlrpc_done);
+ percpu_ref_kill(&ptlrpc_pending);
+ wait_for_completion(&ptlrpc_done);
- case 0:
- LNetNIFini();
- return;
-
- case -EBUSY:
- if (retries != 0)
- CWARN("Event queue still busy\n");
-
- /* Wait for a bit */
- ssleep(2);
- break;
- }
- }
- /* notreached */
+ LNetEQFree(ptlrpc_eq);
+ LNetNIFini();
}
lnet_pid_t ptl_get_pid(void)
int ptlrpc_ni_init(void)
{
- int rc;
- lnet_pid_t pid;
+ int rc;
+ lnet_pid_t pid;
- pid = ptl_get_pid();
- CDEBUG(D_NET, "My pid is: %x\n", pid);
+ pid = ptl_get_pid();
+ CDEBUG(D_NET, "My pid is: %x\n", pid);
- /* We're not passing any limits yet... */
- rc = LNetNIInit(pid);
- if (rc < 0) {
- CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
+ /* We're not passing any limits yet... */
+ rc = LNetNIInit(pid);
+ if (rc < 0) {
+ CDEBUG(D_NET, "ptlrpc: Can't init network interface: rc = %d\n",
+ rc);
return rc;
- }
+ }
- /* CAVEAT EMPTOR: how we process portals events is _radically_
- * different depending on... */
+ rc = percpu_ref_init(&ptlrpc_pending, ptlrpc_release, 0, GFP_KERNEL);
+ if (rc) {
+ CERROR("ptlrpc: Can't init percpu refcount: rc = %d\n", rc);
+ return rc;
+ }
+ /* CAVEAT EMPTOR: how we process portals events is _radically_
+ * different depending on...
+ */
/* kernel LNet calls our master callback when there are new event,
* because we are guaranteed to get every event via callback,
* so we just set EQ size to 0 to avoid overhread of serializing
CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %lld, offset %u\n",
len, portal, xid, offset);
+ percpu_ref_get(&ptlrpc_pending);
+
rc = LNetPut(self, *mdh, ack,
peer_id, portal, xid, offset, 0);
if (unlikely(rc != 0)) {
}
break;
}
+ percpu_ref_get(&ptlrpc_pending);
/* sanity.sh 224c: lets skip last md */
if (posted_md == desc->bd_md_max_brw - 1)
posted_md, rc);
break;
}
+ percpu_ref_get(&ptlrpc_pending);
/* About to let the network at it... */
rc = LNetMDAttach(me, md, LNET_UNLINK,
/* ...but the MD attach didn't succeed... */
request->rq_receiving_reply = 0;
spin_unlock(&request->rq_lock);
- GOTO(cleanup_me, rc = -ENOMEM);
- }
+ GOTO(cleanup_me, rc = -ENOMEM);
+ }
+ percpu_ref_get(&ptlrpc_pending);
- CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %llu"
- ", portal %u\n",
- request->rq_repbuf_len, request->rq_xid,
- request->rq_reply_portal);
- }
+ CDEBUG(D_NET,
+ "Setup reply buffer: %u bytes, xid %llu, portal %u\n",
+ request->rq_repbuf_len, request->rq_xid,
+ request->rq_reply_portal);
+ }
/* add references on request for request_out_callback */
ptlrpc_request_addref(request);
md.eq_handle = ptlrpc_eq;
rc = LNetMDAttach(me, md, LNET_UNLINK, &rqbd->rqbd_md_h);
- if (rc == 0)
+ if (rc == 0) {
+ percpu_ref_get(&ptlrpc_pending);
return 0;
+ }
- CERROR("LNetMDAttach failed: %d;\n", rc);
+ CERROR("ptlrpc: LNetMDAttach failed: rc = %d\n", rc);
LASSERT(rc == -ENOMEM);
LNetMEUnlink(me);
LASSERT(rc == 0);