Whamcloud - gitweb
- Added a queue length and waitqueue to the client structure in an
authorbraam <braam>
Thu, 28 Mar 2002 03:42:42 +0000 (03:42 +0000)
committerbraam <braam>
Thu, 28 Mar 2002 03:42:42 +0000 (03:42 +0000)
attempt to limit the number of concurrent requests outstanding at a
given time.

lustre/include/linux/lustre_net.h
lustre/mds/handler.c
lustre/mds/mds_reint.c
lustre/ptlrpc/client.c
lustre/ptlrpc/niobuf.c

index 78a14fd..d6ab12a 100644 (file)
@@ -84,6 +84,8 @@ struct ptlrpc_client {
 
         spinlock_t cli_lock;
         __u32 cli_xid;
+        atomic_t cli_queue_length;
+        wait_queue_head_t cli_waitq;
 };
 
 /* These do double-duty in rq_type and rq_flags */
index 1fa95aa..49452f3 100644 (file)
@@ -461,6 +461,7 @@ int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
 out:
         if (rc) {
                 CERROR("no header\n");
+                LBUG();
                 return 0;
         }
 
index fad45cd..5ee12fb 100644 (file)
@@ -171,6 +171,9 @@ static int mds_reint_create(struct mds_update_record *rec,
                 dchild->d_inode->i_gid = rec->ur_gid;
                 rep->ino = dchild->d_inode->i_ino;
                 rep->generation = dchild->d_inode->i_generation;
+        } else {
+                CERROR("error during create: %d\n", rc);
+                LBUG();
         }
 
 out_reint_create:
index 7e25b67..7f0d835 100644 (file)
@@ -81,6 +81,8 @@ int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal,
         cl->cli_reply_portal = rep_portal;
         cl->cli_rep_unpack = rep_unpack;
         cl->cli_req_pack = req_pack;
+        atomic_set(&cl->cli_queue_length, 32);
+        init_waitqueue_head(&cl->cli_waitq);
 
         /* non networked client */
         if (dev >= 0 && dev < MAX_OBD_DEVICES) {
@@ -171,26 +173,19 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
 {
         int rc = 0;
 
-        //spin_lock(&req->rq_lock);
         if (req->rq_repbuf != NULL) {
                 req->rq_flags = PTL_RPC_REPLY;
-                EXIT;
-                rc = 1;
-                goto out;
+                GOTO(out, rc = 1);
         }
 
         if (sigismember(&(current->pending.signal), SIGKILL) ||
             sigismember(&(current->pending.signal), SIGTERM) ||
-            sigismember(&(current->pending.signal), SIGSTOP) ||
             sigismember(&(current->pending.signal), SIGINT)) { 
                 req->rq_flags = PTL_RPC_INTR;
-                EXIT;
-                rc = 1; 
-                goto out;
+                GOTO(out, rc = 1);
         }
 
  out:
-        //spin_unlock(&req->rq_lock);
         return rc;
 }
 
@@ -243,6 +238,20 @@ int ptlrpc_abort(struct ptlrpc_request *request)
         return 0;
 }
 
+static int ptlrpc_check_queue_depth(void *data)
+{
+        struct ptlrpc_client *cl = data;
+
+        if (atomic_read(&cl->cli_queue_length) > 0)
+                return 1;
+
+        if (sigismember(&(current->pending.signal), SIGKILL) ||
+            sigismember(&(current->pending.signal), SIGTERM) ||
+            sigismember(&(current->pending.signal), SIGINT))
+                return 1;
+
+        return 0;
+}
 
 int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
 {
@@ -251,6 +260,10 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
 
         init_waitqueue_head(&req->rq_wait_for_rep);
 
+        if (atomic_dec_and_test(&cl->cli_queue_length))
+                wait_event_interruptible(cl->cli_waitq,
+                                         ptlrpc_check_queue_depth(cl));
+
         if (cl->cli_obd) {
                 /* Local delivery */
                 rc = ptlrpc_enqueue(cl, req); 
@@ -262,36 +275,34 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
         }
         if (rc) { 
                 CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc);
-                return -rc;
+                atomic_inc(&cl->cli_queue_length);
+                wake_up(&cl->cli_waitq);
+                RETURN(-rc);
         }
 
         CDEBUG(D_OTHER, "-- sleeping\n");
         wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req));
         CDEBUG(D_OTHER, "-- done\n");
-        //spin_lock(&req->rq_lock);
+        atomic_inc(&cl->cli_queue_length);
+        wake_up(&cl->cli_waitq);
         if (req->rq_flags == PTL_RPC_INTR) { 
                 /* Clean up the dangling reply buffers */
                 ptlrpc_abort(req);
-                EXIT;
-                rc = -EINTR;
-                goto out;
+                GOTO(out, rc = -EINTR);
         }
 
         if (req->rq_flags != PTL_RPC_REPLY) { 
                 CERROR("Unknown reason for wakeup\n");
                 /* XXX Phil - I end up here when I kill obdctl */
                 ptlrpc_abort(req); 
-                //LBUG();
-                EXIT;
-                rc = -EINTR;
-                goto out;
+                GOTO(out, rc = -EINTR);
         }
 
         rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen,
                                 &req->rq_rephdr, &req->rq_rep);
         if (rc) {
                 CERROR("unpack_rep failed: %d\n", rc);
-                goto out;
+                GOTO(out, rc);
         }
         CDEBUG(D_NET, "got rep %d\n", req->rq_rephdr->xid);
 
@@ -301,6 +312,5 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
 
         EXIT;
  out:
-        //spin_unlock(&req->rq_lock);
         return rc;
 }
index 9738531..cf0a43c 100644 (file)
@@ -293,7 +293,6 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
         local_id.nid = PTL_ID_ANY;
         local_id.pid = PTL_ID_ANY;
 
-        //CERROR("sending req %d\n", request->rq_xid);
         rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
                           request->rq_xid, 0, PTL_UNLINK, PTL_INS_AFTER,
                           &request->rq_reply_me_h);
@@ -314,7 +313,6 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
 
         rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md,
                          PTL_UNLINK, &request->rq_reply_md_h);
-        //CERROR("MDAttach (send RPC): %Lu\n", (__u64)request->rq_reply_md_h);
         if (rc != PTL_OK) {
                 CERROR("PtlMDAttach failed: %d\n", rc);
                 LBUG();