Whamcloud - gitweb
- zero out the request structure after allocation
[fs/lustre-release.git] / lustre / ptlrpc / rpc.c
index 9c0e38a..4969603 100644 (file)
@@ -29,7 +29,7 @@
 #include <linux/obd_support.h>
 #include <linux/lustre_net.h>
 
-static ptl_handle_eq_t req_eq;
+static ptl_handle_eq_t req_eq, bulk_source_eq, bulk_sink_eq;
 
 static int request_callback(ptl_event_t *ev, void *data)
 {
@@ -64,6 +64,42 @@ static int incoming_callback(ptl_event_t *ev, void *data)
         return 0;
 }
 
+static int bulk_source_callback(ptl_event_t *ev, void *data)
+{
+        struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
+
+        ENTRY;
+
+        if (ev->type == PTL_EVENT_SENT) {
+                ;
+        } else if (ev->type == PTL_EVENT_ACK) {
+                wake_up_interruptible(&rpc->rq_wait_for_bulk);
+        } else {
+                printk("Unexpected event type in " __FUNCTION__ "!\n");
+        }
+
+        EXIT;
+        return 1;
+}
+
+static int bulk_sink_callback(ptl_event_t *ev, void *data)
+{
+        struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
+
+        ENTRY;
+
+        if (ev->type == PTL_EVENT_PUT) {
+                if (rpc->rq_bulkbuf != ev->mem_desc.start + ev->offset)
+                        printk(__FUNCTION__ ": bulkbuf != mem_desc -- why?\n");
+                wake_up_interruptible(&rpc->rq_wait_for_bulk);
+        } else {
+                printk("Unexpected event type in " __FUNCTION__ "!\n");
+        }
+
+        EXIT;
+        return 1;
+}
+
 int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
                  int portal, int is_request)
 {
@@ -71,17 +107,23 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
         ptl_process_id_t remote_id;
         ptl_handle_md_t md_h;
 
-        if (is_request) {
+        /* FIXME: This is bad. */
+        if (request->rq_bulklen) {
+                request->rq_req_md.start = request->rq_bulkbuf;
+                request->rq_req_md.length = request->rq_bulklen;
+                request->rq_req_md.eventq = bulk_source_eq;
+        } else if (is_request) {
                 request->rq_req_md.start = request->rq_reqbuf;
                 request->rq_req_md.length = request->rq_reqlen;
+                request->rq_req_md.eventq = req_eq;
         } else {
                 request->rq_req_md.start = request->rq_repbuf;
                 request->rq_req_md.length = request->rq_replen;
+                request->rq_req_md.eventq = req_eq;
         }
-        request->rq_req_md.threshold = PTL_MD_THRESH_INF;
+        request->rq_req_md.threshold = 1;
         request->rq_req_md.options = PTL_MD_OP_PUT;
         request->rq_req_md.user_ptr = request;
-        request->rq_req_md.eventq = req_eq;
 
         rc = PtlMDBind(peer->peer_ni, request->rq_req_md, &md_h);
         if (rc != 0) {
@@ -93,8 +135,13 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
         remote_id.nid = peer->peer_nid;
         remote_id.pid = 0;
 
-        rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0, request->rq_xid,
-                    0, 0);
+        if (request->rq_bulklen) {
+                rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0,
+                            request->rq_xid, 0, 0);
+        } else {
+                rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0,
+                            request->rq_xid, 0, 0);
+        }
         if (rc != PTL_OK) {
                 printk(__FUNCTION__ ": PtlPut failed: %d\n", rc);
                 /* FIXME: tear down md */
@@ -105,7 +152,7 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
 
 int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
 {
-        ptl_handle_me_t me_h;
+        ptl_handle_me_t me_h, bulk_me_h;
         ptl_process_id_t local_id;
         int rc;
 
@@ -143,6 +190,30 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
                 return rc;
         }
 
+        if (request->rq_bulklen != 0) {
+                rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal,
+                                 local_id, request->rq_xid, 0, PTL_UNLINK,
+                                 &bulk_me_h);
+                if (rc != PTL_OK) {
+                        EXIT;
+                        return rc;
+                }
+
+                request->rq_bulk_md.start = request->rq_bulkbuf;
+                request->rq_bulk_md.length = request->rq_bulklen;
+                request->rq_bulk_md.threshold = 1;
+                request->rq_bulk_md.options = PTL_MD_OP_PUT;
+                request->rq_bulk_md.user_ptr = request;
+                request->rq_bulk_md.eventq = bulk_sink_eq;
+
+                rc = PtlMDAttach(bulk_me_h, request->rq_bulk_md, PTL_UNLINK,
+                                 &request->rq_bulk_md_h);
+                if (rc != PTL_OK) {
+                        EXIT;
+                        return rc;
+                }
+        }
+
         return ptl_send_buf(request, peer, request->rq_req_portal, 1);
 }
 
@@ -219,6 +290,14 @@ static int req_init_portals(void)
         if (rc != PTL_OK)
                 printk("PtlEQAlloc failed: %d\n", rc);
 
+        rc = PtlEQAlloc(ni, 128, bulk_source_callback, NULL, &bulk_source_eq);
+        if (rc != PTL_OK)
+                printk("PtlEQAlloc failed: %d\n", rc);
+
+        rc = PtlEQAlloc(ni, 128, bulk_sink_callback, NULL, &bulk_sink_eq);
+        if (rc != PTL_OK)
+                printk("PtlEQAlloc failed: %d\n", rc);
+
         return rc;
 }