{
int rc;
mm_segment_t oldfs = get_fs();
- /* dst->addr is a user address, but in a different task! */
- set_fs(KERNEL_DS);
- rc = generic_file_read(file, (char *)(long)dst->addr,
- PAGE_SIZE, &offset);
- set_fs(oldfs);
-
- if (rc != PAGE_SIZE)
- return -EIO;
+
+ if (req->rq_peer.peer_nid == 0) {
+ /* dst->addr is a user address, but in a different task! */
+ set_fs(KERNEL_DS);
+ rc = generic_file_read(file, (char *)(long)dst->addr,
+ PAGE_SIZE, &offset);
+ set_fs(oldfs);
+
+ if (rc != PAGE_SIZE)
+ return -EIO;
+ } else {
+ char *buf;
+
+ buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
+ if (!buf) {
+ return -ENOMEM;
+ }
+
+ set_fs(KERNEL_DS);
+ rc = generic_file_read(file, buf, PAGE_SIZE, &offset);
+ set_fs(oldfs);
+
+ if (rc != PAGE_SIZE)
+ return -EIO;
+
+ req->rq_bulkbuf = buf;
+ req->rq_bulklen = PAGE_SIZE;
+ rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL, 0);
+ init_waitqueue_head(&req->rq_wait_for_bulk);
+ sleep_on(&req->rq_wait_for_bulk);
+ kfree(buf);
+ req->rq_bulklen = 0; /* FIXME: eek. */
+ }
+
return 0;
}
#include <linux/obd_support.h>
#include <linux/lustre_net.h>
-static ptl_handle_eq_t req_eq;
+static ptl_handle_eq_t req_eq, bulk_source_eq, bulk_sink_eq;
static int request_callback(ptl_event_t *ev, void *data)
{
return 0;
}
+static int bulk_source_callback(ptl_event_t *ev, void *data)
+{
+ struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
+
+ ENTRY;
+
+ if (ev->type == PTL_EVENT_SENT) {
+ ;
+ } else if (ev->type == PTL_EVENT_ACK) {
+ wake_up_interruptible(&rpc->rq_wait_for_bulk);
+ } else {
+ printk("Unexpected event type in " __FUNCTION__ "!\n");
+ }
+
+ EXIT;
+ return 1;
+}
+
+static int bulk_sink_callback(ptl_event_t *ev, void *data)
+{
+ struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
+
+ ENTRY;
+
+ if (ev->type == PTL_EVENT_PUT) {
+ if (rpc->rq_bulkbuf != ev->mem_desc.start + ev->offset)
+ printk(__FUNCTION__ ": bulkbuf != mem_desc -- why?\n");
+ wake_up_interruptible(&rpc->rq_wait_for_bulk);
+ } else {
+ printk("Unexpected event type in " __FUNCTION__ "!\n");
+ }
+
+ EXIT;
+ return 1;
+}
+
int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
int portal, int is_request)
{
ptl_process_id_t remote_id;
ptl_handle_md_t md_h;
- if (is_request) {
+ /* FIXME: This is bad. */
+ if (request->rq_bulklen) {
+ request->rq_req_md.start = request->rq_bulkbuf;
+ request->rq_req_md.length = request->rq_bulklen;
+ request->rq_req_md.eventq = bulk_source_eq;
+ } else if (is_request) {
request->rq_req_md.start = request->rq_reqbuf;
request->rq_req_md.length = request->rq_reqlen;
+ request->rq_req_md.eventq = req_eq;
} else {
request->rq_req_md.start = request->rq_repbuf;
request->rq_req_md.length = request->rq_replen;
+ request->rq_req_md.eventq = req_eq;
}
- request->rq_req_md.threshold = PTL_MD_THRESH_INF;
+ request->rq_req_md.threshold = 1;
request->rq_req_md.options = PTL_MD_OP_PUT;
request->rq_req_md.user_ptr = request;
- request->rq_req_md.eventq = req_eq;
rc = PtlMDBind(peer->peer_ni, request->rq_req_md, &md_h);
if (rc != 0) {
remote_id.nid = peer->peer_nid;
remote_id.pid = 0;
- rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0, request->rq_xid,
- 0, 0);
+ if (request->rq_bulklen) {
+ rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0,
+ request->rq_xid, 0, 0);
+ } else {
+ rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0,
+ request->rq_xid, 0, 0);
+ }
if (rc != PTL_OK) {
printk(__FUNCTION__ ": PtlPut failed: %d\n", rc);
/* FIXME: tear down md */
int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
{
- ptl_handle_me_t me_h;
+ ptl_handle_me_t me_h, bulk_me_h;
ptl_process_id_t local_id;
int rc;
return rc;
}
+ if (request->rq_bulklen != 0) {
+ rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal,
+ local_id, request->rq_xid, 0, PTL_UNLINK,
+ &bulk_me_h);
+ if (rc != PTL_OK) {
+ EXIT;
+ return rc;
+ }
+
+ request->rq_bulk_md.start = request->rq_bulkbuf;
+ request->rq_bulk_md.length = request->rq_bulklen;
+ request->rq_bulk_md.threshold = 1;
+ request->rq_bulk_md.options = PTL_MD_OP_PUT;
+ request->rq_bulk_md.user_ptr = request;
+ request->rq_bulk_md.eventq = bulk_sink_eq;
+
+ rc = PtlMDAttach(bulk_me_h, request->rq_bulk_md, PTL_UNLINK,
+ &request->rq_bulk_md_h);
+ if (rc != PTL_OK) {
+ EXIT;
+ return rc;
+ }
+ }
+
return ptl_send_buf(request, peer, request->rq_req_portal, 1);
}
if (rc != PTL_OK)
printk("PtlEQAlloc failed: %d\n", rc);
+ rc = PtlEQAlloc(ni, 128, bulk_source_callback, NULL, &bulk_source_eq);
+ if (rc != PTL_OK)
+ printk("PtlEQAlloc failed: %d\n", rc);
+
+ rc = PtlEQAlloc(ni, 128, bulk_sink_callback, NULL, &bulk_sink_eq);
+ if (rc != PTL_OK)
+ printk("PtlEQAlloc failed: %d\n", rc);
+
return rc;
}