Whamcloud - gitweb
- Added some fields for rpc double buffering
[fs/lustre-release.git] / lustre / ptlrpc / rpc.c
index df6e1bb..b61b8ce 100644 (file)
 #include <linux/lustre_net.h>
 
 static ptl_handle_eq_t req_eq, bulk_source_eq, bulk_sink_eq;
+int obd_debug_level;
+int obd_print_entry;
 
-/* This callback performs two functions:
- *
+/*
  * 1. Free the request buffer after it has gone out on the wire
  * 2. Wake up the thread waiting for the reply once it comes in.
  */
@@ -56,8 +57,46 @@ static int request_callback(ptl_event_t *ev, void *data)
 static int incoming_callback(ptl_event_t *ev, void *data)
 {
         struct ptlrpc_service *service = data;
+       int rc;
 
-        ENTRY;
+       if (ev->rlength != ev->mlength)
+               printk("Warning: Possibly truncated rpc (%d/%d)\n",
+                       ev->mlength, ev->rlength);
+
+       /* The ME is unlinked when there is less than 1024 bytes free
+        * on its MD.  This ensures we are always able to handle the rpc, 
+        * although the 1024 value is a guess as to the size of a
+         * large rpc (the known safe margin should be determined).
+        *
+        * NOTE: The portals API by default unlinks all MD's associated
+        *       with an ME when it's unlinked.  For now, this behavior
+        *       has been commented out of the portals library so the
+        *       MD can be unlinked when its ref count drops to zero.
+        *       A new MD and ME will then be created that use the same
+        *       kmalloc()'ed memory and inserted at the ring tail.
+        */
+
+       service->srv_ref_count[service->srv_md_active]++;
+
+       if (ev->offset >= (service->srv_buf_size - 1024)) {
+               printk("Unlinking ME %d\n", service->srv_me_active);
+
+               rc = PtlMEUnlink(service->srv_me_h[service->srv_me_active]);
+               service->srv_me_h[service->srv_me_active] = 0;
+
+               if (rc != PTL_OK) {
+                       printk("PtlMEUnlink failed: %d\n", rc); 
+                       return rc;
+               }
+
+               service->srv_me_active = NEXT_INDEX(service->srv_me_active,
+                       service->srv_ring_length);
+
+               if (service->srv_me_h[service->srv_me_active] == 0)
+                       printk("All %d ring ME's are unlinked!\n",
+                               service->srv_ring_length);
+
+       }
 
         if (ev->type == PTL_EVENT_PUT) {
                 wake_up(service->srv_wait_queue);
@@ -65,7 +104,6 @@ static int incoming_callback(ptl_event_t *ev, void *data)
                 printk("Unexpected event type: %d\n", ev->type);
         }
 
-        EXIT;
         return 0;
 }
 
@@ -228,10 +266,68 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
         return ptl_send_buf(request, peer, request->rq_req_portal, 1);
 }
 
+/* ptl_received_rpc() should be called by the sleeping process once
+ * it finishes processing an event.  This ensures the ref count is
+ * decrimented and that the rpc ring buffer cycles properly.
+ */ 
+int ptl_received_rpc(struct ptlrpc_service *service) {
+       int rc, index;
+
+       index = service->srv_md_active;
+       CDEBUG(D_INFO, "MD index=%d Ref Count=%d\n", index,
+               service->srv_ref_count[index]);
+       service->srv_ref_count[index]--;
+
+       if ((service->srv_ref_count[index] <= 0) &&
+           (service->srv_me_h[index] == 0)) {
+
+               CDEBUG(D_INFO, "Removing MD at index %d\n", index);
+               rc = PtlMDUnlink(service->srv_md_h[index]);
+
+                if (rc)
+                        printk(__FUNCTION__ ": PtlMDUnlink failed: %d\n", rc);
+
+                /* Replace the unlinked ME and MD */
+               CDEBUG(D_INFO, "Inserting new ME and MD in ring\n");
+
+                rc = PtlMEInsert(service->srv_me_h[service->srv_me_tail],
+                        service->srv_id, 0, ~0, PTL_RETAIN,
+                        PTL_INS_AFTER, &(service->srv_me_h[index]));
+               service->srv_me_tail = index;
+                service->srv_ref_count[index] = 0;
+                
+               if (rc != PTL_OK) {
+                        printk("PtlMEInsert failed: %d\n", rc);
+                        return rc;
+                }
+
+                service->srv_md[index].start        = service->srv_buf[index];
+                service->srv_md[index].length       = service->srv_buf_size;
+                service->srv_md[index].threshold    = PTL_MD_THRESH_INF;
+                service->srv_md[index].options      = PTL_MD_OP_PUT;
+                service->srv_md[index].user_ptr     = service;
+                service->srv_md[index].eventq       = service->srv_eq_h;
+
+                rc = PtlMDAttach(service->srv_me_h[index], service->srv_md[index],
+                        PTL_RETAIN, &(service->srv_md_h[index]));
+
+                if (rc != PTL_OK) {
+                        /* cleanup */
+                        printk("PtlMDAttach failed: %d\n", rc);
+                        return rc;
+                }
+
+               service->srv_md_active = NEXT_INDEX(index,
+                       service->srv_ring_length);
+       } 
+       
+       return 0;
+}
+
 int rpc_register_service(struct ptlrpc_service *service, char *uuid)
 {
         struct lustre_peer peer;
-        int rc;
+        int rc, i;
 
         rc = kportal_uuid_to_peer(uuid, &peer);
         if (rc != 0) {
@@ -239,66 +335,94 @@ int rpc_register_service(struct ptlrpc_service *service, char *uuid)
                 return -EINVAL;
         }
 
-        OBD_ALLOC(service->srv_buf, service->srv_buf_size);
-        if (service->srv_buf == NULL) {
-                printk(__FUNCTION__ ": no memory\n");
-                return -ENOMEM;
-        }
+        service->srv_ring_length = RPC_RING_LENGTH;
+       service->srv_me_active = 0;
+       service->srv_md_active = 0;
 
         service->srv_id.addr_kind = PTL_ADDR_GID;
         service->srv_id.gid = PTL_ID_ANY;
         service->srv_id.rid = PTL_ID_ANY;
 
-       rc = PtlMEAttach(peer.peer_ni, service->srv_portal, service->srv_id,
-                         0, ~0, PTL_RETAIN, &service->srv_me_h);
-        if (rc != PTL_OK) {
-                printk("PtlMEAttach failed: %d\n", rc);
-                return rc;
-        }
+        rc = PtlEQAlloc(peer.peer_ni, 128, incoming_callback,
+                service, &(service->srv_eq_h));
 
-        rc = PtlEQAlloc(peer.peer_ni, 128, incoming_callback, service,
-                        &service->srv_eq_h);
         if (rc != PTL_OK) {
                 printk("PtlEQAlloc failed: %d\n", rc);
                 return rc;
         }
 
-        /* FIXME: Build an auto-unlinking MD and build a ring. */
-        /* FIXME: Make sure that these are reachable by DMA on well-known
-         * addresses. */
-       service->srv_md.start           = service->srv_buf;
-       service->srv_md.length          = service->srv_buf_size;
-       service->srv_md.threshold       = PTL_MD_THRESH_INF;
-       service->srv_md.options         = PTL_MD_OP_PUT;
-       service->srv_md.user_ptr        = service;
-       service->srv_md.eventq          = service->srv_eq_h;
-
-       rc = PtlMDAttach(service->srv_me_h, service->srv_md,
-                         PTL_RETAIN, &service->srv_md_h);
+        /* Attach the leading ME on which we build the ring */
+        rc = PtlMEAttach(peer.peer_ni, service->srv_portal,
+                service->srv_id, 0, ~0, PTL_RETAIN,
+                &(service->srv_me_h[0]));
+
         if (rc != PTL_OK) {
-                printk("PtlMDAttach failed: %d\n", rc);
-                /* FIXME: wow, we need to clean up. */
+                printk("PtlMEAttach failed: %d\n", rc);
                 return rc;
         }
 
+        for (i = 0; i < service->srv_ring_length; i++) {
+               OBD_ALLOC(service->srv_buf[i], service->srv_buf_size);                
+
+                if (service->srv_buf[i] == NULL) {
+                        printk(__FUNCTION__ ": no memory\n");
+                        return -ENOMEM;
+                }
+
+                /* Insert additional ME's to the ring */
+                if (i > 0) {
+                       rc = PtlMEInsert(service->srv_me_h[i-1],
+                               service->srv_id, 0, ~0, PTL_RETAIN,
+                               PTL_INS_AFTER, &(service->srv_me_h[i]));
+                       service->srv_me_tail = i;
+
+                       if (rc != PTL_OK) {
+                               printk("PtlMEInsert failed: %d\n", rc);
+                               return rc;
+                       }
+               }
+
+                service->srv_ref_count[i] = 0;
+                service->srv_md[i].start       = service->srv_buf[i];
+                service->srv_md[i].length      = service->srv_buf_size;
+                service->srv_md[i].threshold   = PTL_MD_THRESH_INF;
+                service->srv_md[i].options     = PTL_MD_OP_PUT;
+                service->srv_md[i].user_ptr    = service;
+                service->srv_md[i].eventq      = service->srv_eq_h;
+
+                rc = PtlMDAttach(service->srv_me_h[i], service->srv_md[i],
+                        PTL_RETAIN, &(service->srv_md_h[i]));
+
+                if (rc != PTL_OK) {
+                        /* cleanup */
+                        printk("PtlMDAttach failed: %d\n", rc);
+                        return rc;
+                }
+       }
+
         return 0;
 }
 
 int rpc_unregister_service(struct ptlrpc_service *service)
 {
-        int rc;
+        int rc, i;
+
+       for (i = 0; i < service->srv_ring_length; i++) {
+               rc = PtlMDUnlink(service->srv_md_h[i]);
+               if (rc)
+                       printk(__FUNCTION__ ": PtlMDUnlink failed: %d\n", rc);
+       
+               rc = PtlMEUnlink(service->srv_me_h[i]);
+               if (rc)
+                       printk(__FUNCTION__ ": PtlMEUnlink failed: %d\n", rc);
+       
+               OBD_FREE(service->srv_buf[i], service->srv_buf_size);           
+       }
 
-        rc = PtlMDUnlink(service->srv_md_h);
-        if (rc)
-                printk(__FUNCTION__ ": PtlMDUnlink failed: %d\n", rc);
         rc = PtlEQFree(service->srv_eq_h);
         if (rc)
                 printk(__FUNCTION__ ": PtlEQFree failed: %d\n", rc);
-        rc = PtlMEUnlink(service->srv_me_h);
-        if (rc)
-                printk(__FUNCTION__ ": PtlMEUnlink failed: %d\n", rc);
 
-        OBD_FREE(service->srv_buf, service->srv_buf_size);
         return 0;
 }