attempt to limit the number of concurrent requests outstanding at a
given time.
spinlock_t cli_lock;
__u32 cli_xid;
spinlock_t cli_lock;
__u32 cli_xid;
+ atomic_t cli_queue_length;
+ wait_queue_head_t cli_waitq;
};
/* These do double-duty in rq_type and rq_flags */
};
/* These do double-duty in rq_type and rq_flags */
out:
if (rc) {
CERROR("no header\n");
out:
if (rc) {
CERROR("no header\n");
dchild->d_inode->i_gid = rec->ur_gid;
rep->ino = dchild->d_inode->i_ino;
rep->generation = dchild->d_inode->i_generation;
dchild->d_inode->i_gid = rec->ur_gid;
rep->ino = dchild->d_inode->i_ino;
rep->generation = dchild->d_inode->i_generation;
+ } else {
+ CERROR("error during create: %d\n", rc);
+ LBUG();
cl->cli_reply_portal = rep_portal;
cl->cli_rep_unpack = rep_unpack;
cl->cli_req_pack = req_pack;
cl->cli_reply_portal = rep_portal;
cl->cli_rep_unpack = rep_unpack;
cl->cli_req_pack = req_pack;
+ atomic_set(&cl->cli_queue_length, 32);
+ init_waitqueue_head(&cl->cli_waitq);
/* non networked client */
if (dev >= 0 && dev < MAX_OBD_DEVICES) {
/* non networked client */
if (dev >= 0 && dev < MAX_OBD_DEVICES) {
- //spin_lock(&req->rq_lock);
if (req->rq_repbuf != NULL) {
req->rq_flags = PTL_RPC_REPLY;
if (req->rq_repbuf != NULL) {
req->rq_flags = PTL_RPC_REPLY;
- EXIT;
- rc = 1;
- goto out;
}
if (sigismember(&(current->pending.signal), SIGKILL) ||
sigismember(&(current->pending.signal), SIGTERM) ||
}
if (sigismember(&(current->pending.signal), SIGKILL) ||
sigismember(&(current->pending.signal), SIGTERM) ||
- sigismember(&(current->pending.signal), SIGSTOP) ||
sigismember(&(current->pending.signal), SIGINT)) {
req->rq_flags = PTL_RPC_INTR;
sigismember(&(current->pending.signal), SIGINT)) {
req->rq_flags = PTL_RPC_INTR;
- EXIT;
- rc = 1;
- goto out;
- //spin_unlock(&req->rq_lock);
+static int ptlrpc_check_queue_depth(void *data)
+{
+ struct ptlrpc_client *cl = data;
+
+ if (atomic_read(&cl->cli_queue_length) > 0)
+ return 1;
+
+ if (sigismember(&(current->pending.signal), SIGKILL) ||
+ sigismember(&(current->pending.signal), SIGTERM) ||
+ sigismember(&(current->pending.signal), SIGINT))
+ return 1;
+
+ return 0;
+}
int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
{
int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
{
init_waitqueue_head(&req->rq_wait_for_rep);
init_waitqueue_head(&req->rq_wait_for_rep);
+ if (atomic_dec_and_test(&cl->cli_queue_length))
+ wait_event_interruptible(cl->cli_waitq,
+ ptlrpc_check_queue_depth(cl));
+
if (cl->cli_obd) {
/* Local delivery */
rc = ptlrpc_enqueue(cl, req);
if (cl->cli_obd) {
/* Local delivery */
rc = ptlrpc_enqueue(cl, req);
}
if (rc) {
CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc);
}
if (rc) {
CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc);
+ atomic_inc(&cl->cli_queue_length);
+ wake_up(&cl->cli_waitq);
+ RETURN(-rc);
}
CDEBUG(D_OTHER, "-- sleeping\n");
wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req));
CDEBUG(D_OTHER, "-- done\n");
}
CDEBUG(D_OTHER, "-- sleeping\n");
wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req));
CDEBUG(D_OTHER, "-- done\n");
- //spin_lock(&req->rq_lock);
+ atomic_inc(&cl->cli_queue_length);
+ wake_up(&cl->cli_waitq);
if (req->rq_flags == PTL_RPC_INTR) {
/* Clean up the dangling reply buffers */
ptlrpc_abort(req);
if (req->rq_flags == PTL_RPC_INTR) {
/* Clean up the dangling reply buffers */
ptlrpc_abort(req);
- EXIT;
- rc = -EINTR;
- goto out;
+ GOTO(out, rc = -EINTR);
}
if (req->rq_flags != PTL_RPC_REPLY) {
CERROR("Unknown reason for wakeup\n");
/* XXX Phil - I end up here when I kill obdctl */
ptlrpc_abort(req);
}
if (req->rq_flags != PTL_RPC_REPLY) {
CERROR("Unknown reason for wakeup\n");
/* XXX Phil - I end up here when I kill obdctl */
ptlrpc_abort(req);
- //LBUG();
- EXIT;
- rc = -EINTR;
- goto out;
+ GOTO(out, rc = -EINTR);
}
rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen,
&req->rq_rephdr, &req->rq_rep);
if (rc) {
CERROR("unpack_rep failed: %d\n", rc);
}
rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen,
&req->rq_rephdr, &req->rq_rep);
if (rc) {
CERROR("unpack_rep failed: %d\n", rc);
}
CDEBUG(D_NET, "got rep %d\n", req->rq_rephdr->xid);
}
CDEBUG(D_NET, "got rep %d\n", req->rq_rephdr->xid);
- //spin_unlock(&req->rq_lock);
local_id.nid = PTL_ID_ANY;
local_id.pid = PTL_ID_ANY;
local_id.nid = PTL_ID_ANY;
local_id.pid = PTL_ID_ANY;
- //CERROR("sending req %d\n", request->rq_xid);
rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
request->rq_xid, 0, PTL_UNLINK, PTL_INS_AFTER,
&request->rq_reply_me_h);
rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
request->rq_xid, 0, PTL_UNLINK, PTL_INS_AFTER,
&request->rq_reply_me_h);
rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md,
PTL_UNLINK, &request->rq_reply_md_h);
rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md,
PTL_UNLINK, &request->rq_reply_md_h);
- //CERROR("MDAttach (send RPC): %Lu\n", (__u64)request->rq_reply_md_h);
if (rc != PTL_OK) {
CERROR("PtlMDAttach failed: %d\n", rc);
LBUG();
if (rc != PTL_OK) {
CERROR("PtlMDAttach failed: %d\n", rc);
LBUG();