return -EIO;
} else {
char *buf;
+ DECLARE_WAITQUEUE(wait, current);
OBD_ALLOC(buf, PAGE_SIZE);
if (!buf)
return -EIO;
}
+ req->rq_type = PTLRPC_BULK;
req->rq_bulkbuf = buf;
req->rq_bulklen = PAGE_SIZE;
+
init_waitqueue_head(&req->rq_wait_for_bulk);
- rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL, 0);
- sleep_on(&req->rq_wait_for_bulk);
+ rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL);
+ add_wait_queue(&req->rq_wait_for_bulk, &wait);
+ /* The bulk callback will set rq->bulkbuf to NULL when it's
+ * been ACKed and it's finished using it. */
+ while (req->rq_bulkbuf != NULL) {
+ set_current_state(TASK_INTERRUPTIBLE);
+
+ /* if this process really wants to die, let it go */
+ if (sigismember(&(current->pending.signal), SIGKILL) ||
+ sigismember(&(current->pending.signal), SIGINT))
+ break;
+
+ schedule();
+ }
+ remove_wait_queue(&req->rq_wait_for_bulk, &wait);
+ set_current_state(TASK_RUNNING);
+
+ if (req->rq_bulkbuf != NULL) {
+ EXIT;
+ return -EINTR;
+ }
+
OBD_FREE(buf, PAGE_SIZE);
req->rq_bulklen = 0; /* FIXME: eek. */
}
/* This is a request that came from the network via portals. */
/* FIXME: we need to increment the count of handled events */
- ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL, 0);
+ req->rq_type = PTLRPC_REPLY;
+ ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL);
} else {
/* This is a local request that came from another thread. */
OBD_FREE(request, sizeof(*request));
}
-int ptlrpc_queue_wait(struct ptlrpc_request *req,
- struct ptlrpc_client *cl)
+/* Abort this request and cleanup any resources associated with it. */
+int ptl_abort_rpc(struct ptlrpc_request *request)
+{
+ /* First remove the MD for the reply; in theory, this means
+ * that we can tear down the buffer safely. */
+ PtlMEUnlink(request->rq_reply_me_h);
+ PtlMDUnlink(request->rq_reply_md_h);
+
+ if (request->rq_bulklen != 0) {
+ PtlMEUnlink(request->rq_bulk_me_h);
+ PtlMDUnlink(request->rq_bulk_md_h);
+ }
+
+ return 0;
+}
+
+int ptlrpc_queue_wait(struct ptlrpc_request *req, struct ptlrpc_client *cl)
{
int rc;
DECLARE_WAITQUEUE(wait, current);
CDEBUG(0, "-- done\n");
if (req->rq_repbuf == NULL) {
- /* We broke out because of a signal */
+ /* We broke out because of a signal. Clean up the dangling
+ * reply buffers! */
+ ptl_abort_rpc(req);
EXIT;
return -EINTR;
}
- rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, &req->rq_rep);
+ rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr,
+ &req->rq_rep);
if (rc) {
CERROR("unpack_rep failed: %d\n", rc);
return rc;
CDEBUG(D_NET, "got SENT event\n");
} else if (ev->type == PTL_EVENT_ACK) {
CDEBUG(D_NET, "got ACK event\n");
+ rpc->rq_bulkbuf = NULL;
wake_up_interruptible(&rpc->rq_wait_for_bulk);
} else {
CERROR("Unexpected event type!\n");
if (ev->type == PTL_EVENT_PUT) {
if (rpc->rq_bulkbuf != ev->mem_desc.start + ev->offset)
CERROR("bulkbuf != mem_desc -- why?\n");
- wake_up_interruptible(&rpc->rq_wait_for_bulk);
+ //wake_up_interruptible(&rpc->rq_wait_for_bulk);
} else {
CERROR("Unexpected event type!\n");
BUG();
}
int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
- int portal, int is_request)
+ int portal)
{
int rc;
ptl_process_id_t remote_id;
ptl_handle_md_t md_h;
+ ptl_ack_req_t ack;
- /* FIXME: This is bad. */
- if (request->rq_bulklen) {
+ switch (request->rq_type) {
+ case PTLRPC_BULK:
request->rq_req_md.start = request->rq_bulkbuf;
request->rq_req_md.length = request->rq_bulklen;
request->rq_req_md.eventq = bulk_source_eq;
- } else if (is_request) {
+ request->rq_req_md.threshold = 2; /* SENT and ACK events */
+ ack = PTL_ACK_REQ;
+ break;
+ case PTLRPC_REQUEST:
request->rq_req_md.start = request->rq_reqbuf;
request->rq_req_md.length = request->rq_reqlen;
request->rq_req_md.eventq = sent_pkt_eq;
- } else {
+ request->rq_req_md.threshold = 1;
+ ack = PTL_NOACK_REQ;
+ break;
+ case PTLRPC_REPLY:
request->rq_req_md.start = request->rq_repbuf;
request->rq_req_md.length = request->rq_replen;
request->rq_req_md.eventq = sent_pkt_eq;
+ request->rq_req_md.threshold = 1;
+ ack = PTL_NOACK_REQ;
+ break;
+ default:
+ BUG();
}
- request->rq_req_md.threshold = 1;
request->rq_req_md.options = PTL_MD_OP_PUT;
request->rq_req_md.user_ptr = request;
remote_id.nid = peer->peer_nid;
remote_id.pid = 0;
- if (request->rq_bulklen) {
- rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0,
- request->rq_xid, 0, 0);
- } else {
- rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0,
- request->rq_xid, 0, 0);
- }
+ CERROR("Sending %d bytes to portal %d, xid %d\n",
+ request->rq_req_md.length, portal, request->rq_xid);
+
+ rc = PtlPut(md_h, ack, remote_id, portal, 0, request->rq_xid, 0, 0);
if (rc != PTL_OK) {
BUG();
CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
{
- ptl_handle_me_t me_h, bulk_me_h;
ptl_process_id_t local_id;
int rc;
char *repbuf;
CERROR("sending req %d\n", request->rq_xid);
rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
- request->rq_xid, 0, PTL_UNLINK, &me_h);
+ request->rq_xid, 0, PTL_UNLINK,
+ &request->rq_reply_me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
BUG();
goto cleanup;
}
+ request->rq_type = PTLRPC_REQUEST;
request->rq_reply_md.start = repbuf;
request->rq_reply_md.length = request->rq_replen;
request->rq_reply_md.threshold = 1;
request->rq_reply_md.user_ptr = request;
request->rq_reply_md.eventq = rcvd_rep_eq;
- rc = PtlMDAttach(me_h, request->rq_reply_md, PTL_UNLINK,
- &request->rq_reply_md_h);
+ rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md,
+ PTL_UNLINK, &request->rq_reply_md_h);
if (rc != PTL_OK) {
CERROR("PtlMDAttach failed: %d\n", rc);
BUG();
if (request->rq_bulklen != 0) {
rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal,
local_id, request->rq_xid, 0, PTL_UNLINK,
- &bulk_me_h);
+ &request->rq_bulk_me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
BUG();
request->rq_bulk_md.user_ptr = request;
request->rq_bulk_md.eventq = bulk_sink_eq;
- rc = PtlMDAttach(bulk_me_h, request->rq_bulk_md, PTL_UNLINK,
+ rc = PtlMDAttach(request->rq_bulk_me_h,
+ request->rq_bulk_md, PTL_UNLINK,
&request->rq_bulk_md_h);
if (rc != PTL_OK) {
CERROR("PtlMDAttach failed: %d\n", rc);
}
}
- return ptl_send_buf(request, peer, request->rq_req_portal, 1);
+ return ptl_send_buf(request, peer, request->rq_req_portal);
cleanup4:
- PtlMEUnlink(bulk_me_h);
+ PtlMEUnlink(request->rq_bulk_me_h);
cleanup3:
PtlMDUnlink(request->rq_reply_md_h);
cleanup2:
- PtlMEUnlink(me_h);
+ PtlMEUnlink(request->rq_reply_me_h);
cleanup:
OBD_FREE(repbuf, request->rq_replen);