Whamcloud - gitweb
- Added a queue length and waitqueue to the client structure in an
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 7e25b67..7f0d835 100644 (file)
@@ -81,6 +81,8 @@ int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal,
         cl->cli_reply_portal = rep_portal;
         cl->cli_rep_unpack = rep_unpack;
         cl->cli_req_pack = req_pack;
+        atomic_set(&cl->cli_queue_length, 32);
+        init_waitqueue_head(&cl->cli_waitq);
 
         /* non networked client */
         if (dev >= 0 && dev < MAX_OBD_DEVICES) {
@@ -171,26 +173,19 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
 {
         int rc = 0;
 
-        //spin_lock(&req->rq_lock);
         if (req->rq_repbuf != NULL) {
                 req->rq_flags = PTL_RPC_REPLY;
-                EXIT;
-                rc = 1;
-                goto out;
+                GOTO(out, rc = 1);
         }
 
         if (sigismember(&(current->pending.signal), SIGKILL) ||
             sigismember(&(current->pending.signal), SIGTERM) ||
-            sigismember(&(current->pending.signal), SIGSTOP) ||
             sigismember(&(current->pending.signal), SIGINT)) { 
                 req->rq_flags = PTL_RPC_INTR;
-                EXIT;
-                rc = 1; 
-                goto out;
+                GOTO(out, rc = 1);
         }
 
  out:
-        //spin_unlock(&req->rq_lock);
         return rc;
 }
 
@@ -243,6 +238,20 @@ int ptlrpc_abort(struct ptlrpc_request *request)
         return 0;
 }
 
+static int ptlrpc_check_queue_depth(void *data)
+{
+        struct ptlrpc_client *cl = data;
+
+        if (atomic_read(&cl->cli_queue_length) > 0)
+                return 1;
+
+        if (sigismember(&(current->pending.signal), SIGKILL) ||
+            sigismember(&(current->pending.signal), SIGTERM) ||
+            sigismember(&(current->pending.signal), SIGINT))
+                return 1;
+
+        return 0;
+}
 
 int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
 {
@@ -251,6 +260,10 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
 
         init_waitqueue_head(&req->rq_wait_for_rep);
 
+        if (atomic_dec_and_test(&cl->cli_queue_length))
+                wait_event_interruptible(cl->cli_waitq,
+                                         ptlrpc_check_queue_depth(cl));
+
         if (cl->cli_obd) {
                 /* Local delivery */
                 rc = ptlrpc_enqueue(cl, req); 
@@ -262,36 +275,34 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
         }
         if (rc) { 
                 CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc);
-                return -rc;
+                atomic_inc(&cl->cli_queue_length);
+                wake_up(&cl->cli_waitq);
+                RETURN(-rc);
         }
 
         CDEBUG(D_OTHER, "-- sleeping\n");
         wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req));
         CDEBUG(D_OTHER, "-- done\n");
-        //spin_lock(&req->rq_lock);
+        atomic_inc(&cl->cli_queue_length);
+        wake_up(&cl->cli_waitq);
         if (req->rq_flags == PTL_RPC_INTR) { 
                 /* Clean up the dangling reply buffers */
                 ptlrpc_abort(req);
-                EXIT;
-                rc = -EINTR;
-                goto out;
+                GOTO(out, rc = -EINTR);
         }
 
         if (req->rq_flags != PTL_RPC_REPLY) { 
                 CERROR("Unknown reason for wakeup\n");
                 /* XXX Phil - I end up here when I kill obdctl */
                 ptlrpc_abort(req); 
-                //LBUG();
-                EXIT;
-                rc = -EINTR;
-                goto out;
+                GOTO(out, rc = -EINTR);
         }
 
         rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen,
                                 &req->rq_rephdr, &req->rq_rep);
         if (rc) {
                 CERROR("unpack_rep failed: %d\n", rc);
-                goto out;
+                GOTO(out, rc);
         }
         CDEBUG(D_NET, "got rep %d\n", req->rq_rephdr->xid);
 
@@ -301,6 +312,5 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
 
         EXIT;
  out:
-        //spin_unlock(&req->rq_lock);
         return rc;
 }