Whamcloud - gitweb
- add a figure
authorbraam <braam>
Mon, 18 Mar 2002 15:00:07 +0000 (15:00 +0000)
committerbraam <braam>
Mon, 18 Mar 2002 15:00:07 +0000 (15:00 +0000)
- fix the problem that the refcount is reduced on the wrong md in the
  ring buffer

lustre/include/linux/lustre_net.h
lustre/ptlrpc/client.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/service.c

index 609f2e1..f30f0b8 100644 (file)
@@ -97,6 +97,7 @@ struct ptlrpc_client {
 
 struct ptlrpc_request { 
         int rq_type; /* one of PTL_RPC_REQUEST, PTL_RPC_REPLY, PTL_RPC_BULK */
+        spinlock_t rq_lock;
        struct list_head rq_list;
        struct obd_device *rq_obd;
        int rq_status;
@@ -207,7 +208,6 @@ int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc,
 int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc,
                  struct ptlrpc_request *req);
 int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer);
-int ptl_received_rpc(struct ptlrpc_service *service);
 
 /* rpc/client.c */
 int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal, 
index 4755dda..4393f45 100644 (file)
@@ -60,7 +60,9 @@ int ptlrpc_enqueue(struct ptlrpc_client *peer, struct ptlrpc_request *req)
        /* remember where it came from */
        srv_req->rq_reply_handle = req;
 
+        spin_lock(&peer->cli_lock);
        list_add(&srv_req->rq_list, &peer->cli_obd->obd_req_list); 
+        spin_unlock(&peer->cli_lock);
        wake_up(&peer->cli_obd->obd_req_waitq);
        return 0;
 }
@@ -136,6 +138,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
        }
 
        memset(request, 0, sizeof(*request));
+        spin_lock_init(&request->rq_lock);
 
         spin_lock(&cl->cli_lock);
        request->rq_xid = cl->cli_xid++;
@@ -167,10 +170,14 @@ void ptlrpc_free_req(struct ptlrpc_request *request)
 
 static int ptlrpc_check_reply(struct ptlrpc_request *req)
 {
+        int rc = 0;
+
+        spin_lock(&req->rq_lock);
         if (req->rq_repbuf != NULL) {
                 req->rq_flags = PTL_RPC_REPLY;
                 EXIT;
-                return 1;
+                rc = 1;
+                goto out;
         }
 
         if (sigismember(&(current->pending.signal), SIGKILL) ||
@@ -178,10 +185,13 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
             sigismember(&(current->pending.signal), SIGINT)) { 
                 req->rq_flags = PTL_RPC_INTR;
                 EXIT;
-                return 1;
+                rc = 1; 
+                goto out;
         }
 
-        return 0;
+ out:
+        spin_unlock(&req->rq_lock);
+        return rc;
 }
 
 int ptlrpc_check_status(struct ptlrpc_request *req, int err)
@@ -223,19 +233,22 @@ int ptlrpc_abort(struct ptlrpc_request *request)
 {
         /* First remove the MD for the reply; in theory, this means
          * that we can tear down the buffer safely. */
+        spin_lock(&request->rq_lock);
         PtlMEUnlink(request->rq_reply_me_h);
         PtlMDUnlink(request->rq_reply_md_h);
         OBD_FREE(request->rq_repbuf, request->rq_replen);
         request->rq_repbuf = NULL;
         request->rq_replen = 0;
+        spin_unlock(&request->rq_lock);
 
         return 0;
 }
 
+
 int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
                              
 {
-       int rc;
+       int rc = 0;
         ENTRY;
 
        init_waitqueue_head(&req->rq_wait_for_rep);
@@ -258,12 +271,13 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
         CDEBUG(D_OTHER, "-- sleeping\n");
         wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req));
         CDEBUG(D_OTHER, "-- done\n");
-        
+        spin_lock(&req->rq_lock);
         if (req->rq_flags == PTL_RPC_INTR) { 
                 /* Clean up the dangling reply buffers */
                 ptlrpc_abort(req);
                 EXIT;
-                return -EINTR;
+                rc = -EINTR;
+                goto out;
         }
 
         if (req->rq_flags != PTL_RPC_REPLY) { 
@@ -272,14 +286,15 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
                 ptlrpc_abort(req); 
                 //BUG();
                 EXIT;
-                return -EINTR;
+                rc = -EINTR;
+                goto out;
         }
 
        rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen,
                                 &req->rq_rephdr, &req->rq_rep);
        if (rc) {
                CERROR("unpack_rep failed: %d\n", rc);
-               return rc;
+                goto out;
        }
         CDEBUG(D_NET, "got rep %d\n", req->rq_rephdr->xid);
 
@@ -288,5 +303,7 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
                        req->rq_replen, req->rq_rephdr->status);
 
        EXIT;
-       return 0;
+ out:
+        spin_unlock(&req->rq_lock);
+       return rc;
 }
index e734aac..e941587 100644 (file)
@@ -339,17 +339,28 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
  * it finishes processing an event.  This ensures the ref count is
  * decremented and that the rpc ring buffer cycles properly.
  */ 
-int ptl_received_rpc(struct ptlrpc_service *service
+int ptl_handled_rpc(struct ptlrpc_service *service, void *start
 {
-        int rc, index;
+        int rc, index = 0;
 
         spin_lock(&service->srv_lock);
-        index = service->srv_md_active;
+        /* XXX this is wrong must find index on which request arrived!!!*/ 
+        while (index < service->srv_ring_length) {
+                if ( service->srv_md[index].start == start) 
+                        break;
+                index++;
+        }
+        if (index == service->srv_ring_length)
+                BUG();
+
         CDEBUG(D_INFO, "MD index=%d Ref Count=%d\n", index,
                service->srv_ref_count[index]);
         service->srv_ref_count[index]--;
 
-        if ((service->srv_ref_count[index] <= 0) &&
+        if (service->srv_ref_count[index] < 0)
+                BUG();
+        
+        if ((service->srv_ref_count[index] == 0) &&
             (service->srv_me_h[index] == 0)) {
 
                 /* Replace the unlinked ME and MD */
index f59b267..652da36 100644 (file)
@@ -33,6 +33,7 @@
 #include <linux/lustre_net.h>
 
 extern int server_request_callback(ptl_event_t *ev, void *data);
+extern int ptl_handled_rpc(struct ptlrpc_service *service, void *start);
 
 static int ptlrpc_check_event(struct ptlrpc_service *svc)
 {
@@ -174,11 +175,13 @@ static int ptlrpc_main(void *arg)
 
                 if (svc->srv_flags & SVC_EVENT) { 
                         struct ptlrpc_request request;
+                        void *start;
                         svc->srv_flags = SVC_RUNNING; 
 
                         /* FIXME: If we move to an event-driven model,
                          * we should put the request on the stack of
                          * mds_handle instead. */
+                        start = svc->srv_ev.mem_desc.start;
                         memset(&request, 0, sizeof(request));
                         request.rq_obd = obddev;
                         request.rq_reqbuf = (svc->srv_ev.mem_desc.start +
@@ -195,7 +198,7 @@ static int ptlrpc_main(void *arg)
 
                         spin_unlock(&svc->srv_lock);
                         rc = svc->srv_handler(obddev, svc, &request);
-                        ptl_received_rpc(svc);
+                        ptl_handled_rpc(svc, start);
                         continue;
                 }