#include <linux/obd_support.h>
#include <linux/lustre_net.h>
-static ptl_handle_eq_t req_eq, bulk_source_eq, bulk_sink_eq;
+static ptl_handle_eq_t sent_pkt_eq, rcvd_rep_eq,
+ bulk_source_eq, bulk_sink_eq;
+
+struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
+ int opcode, int namelen, char *name,
+ int tgtlen, char *tgt)
+{
+ struct ptlrpc_request *request;
+ int rc;
+ ENTRY;
+
+ OBD_ALLOC(request, sizeof(*request));
+ if (!request) {
+ CERROR("request allocation out of memory\n");
+ return NULL;
+ }
+
+ memset(request, 0, sizeof(*request));
+ request->rq_xid = cl->cli_xid++;
+
+ rc = cl->cli_req_pack(name, namelen, tgt, tgtlen,
+ &request->rq_reqhdr, &request->rq_req,
+ &request->rq_reqlen, &request->rq_reqbuf);
+ if (rc) {
+ CERROR("cannot pack request %d\n", rc);
+ return NULL;
+ }
+ request->rq_reqhdr->opc = opcode;
+
+ EXIT;
+ return request;
+}
+
+void ptlrpc_free_req(struct ptlrpc_request *request)
+{
+ OBD_FREE(request, sizeof(*request));
+}
+
+int ptlrpc_queue_wait(struct ptlrpc_request *req,
+ struct ptlrpc_client *cl)
+{
+ int rc;
+ DECLARE_WAITQUEUE(wait, current);
+
+ init_waitqueue_head(&req->rq_wait_for_rep);
+
+ if (cl->cli_enqueue) {
+ /* Local delivery */
+ ENTRY;
+ rc = cl->cli_enqueue(req);
+ } else {
+ /* Remote delivery via portals. */
+ req->rq_req_portal = cl->cli_request_portal;
+ req->rq_reply_portal = cl->cli_reply_portal;
+ rc = ptl_send_rpc(req, &cl->cli_server);
+ }
+ if (rc) {
+ CERROR("error %d, opcode %d\n", rc,
+ req->rq_reqhdr->opc);
+ return -rc;
+ }
+
+ CDEBUG(0, "-- sleeping\n");
+ add_wait_queue(&req->rq_wait_for_rep, &wait);
+ while (req->rq_repbuf == NULL) {
+ set_current_state(TASK_INTERRUPTIBLE);
+
+ /* if this process really wants to die, let it go */
+ if (sigismember(&(current->pending.signal), SIGKILL) ||
+ sigismember(&(current->pending.signal), SIGINT))
+ break;
+
+ schedule();
+ }
+ remove_wait_queue(&req->rq_wait_for_rep, &wait);
+ set_current_state(TASK_RUNNING);
+ CDEBUG(0, "-- done\n");
+
+ if (req->rq_repbuf == NULL) {
+ /* We broke out because of a signal */
+ EXIT;
+ return -EINTR;
+ }
+
+ rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, &req->rq_rep);
+ if (rc) {
+ CERROR("unpack_rep failed: %d\n", rc);
+ return rc;
+ }
+
+ if ( req->rq_rephdr->status == 0 )
+ CDEBUG(0, "--> buf %p len %d status %d\n",
+ req->rq_repbuf, req->rq_replen,
+ req->rq_rephdr->status);
+
+ EXIT;
+ return 0;
+}
/*
- * 1. Free the request buffer after it has gone out on the wire
- * 2. Wake up the thread waiting for the reply once it comes in.
+ * Free the packet when it has gone out
*/
-static int client_packet_callback(ptl_event_t *ev, void *data)
+static int sent_packet_callback(ptl_event_t *ev, void *data)
{
- struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
-
ENTRY;
- // XXX make sure we understand all events, including ACK's
if (ev->type == PTL_EVENT_SENT) {
OBD_FREE(ev->mem_desc.start, ev->mem_desc.length);
- } else if (ev->type == PTL_EVENT_PUT) {
+ } else {
+ // XXX make sure we understand all events, including ACK's
+ CERROR("Unknown event %d\n", ev->type);
+ BUG();
+ }
+
+ EXIT;
+ return 1;
+}
+
+/*
+ * Wake up the thread waiting for the reply once it comes in.
+ */
+static int rcvd_reply_callback(ptl_event_t *ev, void *data)
+{
+ struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
+ ENTRY;
+
+ if (ev->type == PTL_EVENT_PUT) {
rpc->rq_repbuf = ev->mem_desc.start + ev->offset;
wake_up_interruptible(&rpc->rq_wait_for_rep);
+ } else {
+ // XXX make sure we understand all events, including ACK's
+ CERROR("Unknown event %d\n", ev->type);
+ BUG();
}
EXIT;
wake_up_interruptible(&rpc->rq_wait_for_bulk);
} else {
CERROR("Unexpected event type!\n");
+ BUG();
}
EXIT;
wake_up_interruptible(&rpc->rq_wait_for_bulk);
} else {
CERROR("Unexpected event type!\n");
+ BUG();
}
EXIT;
} else if (is_request) {
request->rq_req_md.start = request->rq_reqbuf;
request->rq_req_md.length = request->rq_reqlen;
- request->rq_req_md.eventq = req_eq;
+ request->rq_req_md.eventq = sent_pkt_eq;
} else {
request->rq_req_md.start = request->rq_repbuf;
request->rq_req_md.length = request->rq_replen;
- request->rq_req_md.eventq = req_eq;
+ request->rq_req_md.eventq = sent_pkt_eq;
}
request->rq_req_md.threshold = 1;
request->rq_req_md.options = PTL_MD_OP_PUT;
rc = PtlMDBind(peer->peer_ni, request->rq_req_md, &md_h);
if (rc != 0) {
+ BUG();
CERROR("PtlMDBind failed: %d\n", rc);
return rc;
}
request->rq_xid, 0, 0);
}
if (rc != PTL_OK) {
+ BUG();
CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
portal, request->rq_xid, rc);
/* FIXME: tear down md */
local_id.gid = PTL_ID_ANY;
local_id.rid = PTL_ID_ANY;
+ CERROR("sending req %d\n", request->rq_xid);
rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
request->rq_xid, 0, PTL_UNLINK, &me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
+ BUG();
EXIT;
goto cleanup;
}
request->rq_reply_md.threshold = 1;
request->rq_reply_md.options = PTL_MD_OP_PUT;
request->rq_reply_md.user_ptr = request;
- request->rq_reply_md.eventq = req_eq;
+ request->rq_reply_md.eventq = rcvd_rep_eq;
rc = PtlMDAttach(me_h, request->rq_reply_md, PTL_UNLINK,
&request->rq_reply_md_h);
if (rc != PTL_OK) {
CERROR("PtlMDAttach failed: %d\n", rc);
+ BUG();
EXIT;
goto cleanup2;
}
&bulk_me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
+ BUG();
EXIT;
goto cleanup3;
}
&request->rq_bulk_md_h);
if (rc != PTL_OK) {
CERROR("PtlMDAttach failed: %d\n", rc);
+ BUG();
EXIT;
goto cleanup4;
}
CDEBUG(D_INFO, "Attach MD in ring, rc %d\n", rc);
if (rc != PTL_OK) {
- /* cleanup */
+ /* XXX cleanup */
+ BUG();
CERROR("PtlMDAttach failed: %d\n", rc);
return rc;
}
}
ni = *nip;
- rc = PtlEQAlloc(ni, 128, client_packet_callback, NULL, &req_eq);
+ rc = PtlEQAlloc(ni, 128, sent_packet_callback, NULL, &sent_pkt_eq);
+ if (rc != PTL_OK)
+ CERROR("PtlEQAlloc failed: %d\n", rc);
+
+ rc = PtlEQAlloc(ni, 128, rcvd_reply_callback, NULL, &rcvd_rep_eq);
if (rc != PTL_OK)
CERROR("PtlEQAlloc failed: %d\n", rc);
static void __exit ptlrpc_exit(void)
{
- PtlEQFree(req_eq);
+ PtlEQFree(sent_pkt_eq);
+ PtlEQFree(rcvd_rep_eq);
PtlEQFree(bulk_source_eq);
PtlEQFree(bulk_sink_eq);