-
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
#include <linux/lustre_net.h>
extern ptl_handle_eq_t bulk_source_eq, sent_pkt_eq, rcvd_rep_eq, bulk_sink_eq;
+static ptl_process_id_t local_id = {PTL_ADDR_GID, PTL_ID_ANY, PTL_ID_ANY};
+
+
+int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk)
+{
+ if (bulk->b_flags == PTL_BULK_SENT) {
+ EXIT;
+ return 1;
+ }
+
+ if (sigismember(&(current->pending.signal), SIGKILL) ||
+ sigismember(&(current->pending.signal), SIGINT)) {
+ bulk->b_flags = PTL_RPC_INTR;
+ EXIT;
+ return 1;
+ }
+
+ CERROR("no event yet\n");
+ return 0;
+}
int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
- int portal, int is_request)
+ int portal)
{
int rc;
ptl_process_id_t remote_id;
ptl_handle_md_t md_h;
+ ptl_ack_req_t ack;
- /* FIXME: This is bad. */
- if (request->rq_bulklen) {
+ switch (request->rq_type) {
+ case PTL_RPC_BULK:
request->rq_req_md.start = request->rq_bulkbuf;
request->rq_req_md.length = request->rq_bulklen;
request->rq_req_md.eventq = bulk_source_eq;
- } else if (is_request) {
+ request->rq_req_md.threshold = 2; /* SENT and ACK events */
+ ack = PTL_ACK_REQ;
+ break;
+ case PTL_RPC_REQUEST:
request->rq_req_md.start = request->rq_reqbuf;
request->rq_req_md.length = request->rq_reqlen;
request->rq_req_md.eventq = sent_pkt_eq;
- } else {
+ request->rq_req_md.threshold = 1;
+ ack = PTL_NOACK_REQ;
+ break;
+ case PTL_RPC_REPLY:
request->rq_req_md.start = request->rq_repbuf;
request->rq_req_md.length = request->rq_replen;
request->rq_req_md.eventq = sent_pkt_eq;
+ request->rq_req_md.threshold = 1;
+ ack = PTL_NOACK_REQ;
+ break;
+ default:
+ BUG();
+ return -1; /* notreached */
}
- request->rq_req_md.threshold = 1;
request->rq_req_md.options = PTL_MD_OP_PUT;
request->rq_req_md.user_ptr = request;
remote_id.nid = peer->peer_nid;
remote_id.pid = 0;
- if (request->rq_bulklen) {
- rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0,
- request->rq_xid, 0, 0);
- } else {
- rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0,
- request->rq_xid, 0, 0);
- }
+ CERROR("Sending %d bytes to portal %d, xid %d\n",
+ request->rq_req_md.length, portal, request->rq_xid);
+
+ rc = PtlPut(md_h, ack, remote_id, portal, 0, request->rq_xid, 0, 0);
if (rc != PTL_OK) {
BUG();
CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
return rc;
}
+int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *bulk, int portal)
+{
+ int rc;
+ ptl_process_id_t remote_id;
+ ptl_handle_md_t md_h;
+
+ bulk->b_md.start = bulk->b_buf;
+ bulk->b_md.length = bulk->b_buflen;
+ bulk->b_md.eventq = bulk_source_eq;
+ bulk->b_md.threshold = 2; /* SENT and ACK events */
+ bulk->b_md.options = PTL_MD_OP_PUT;
+ bulk->b_md.user_ptr = bulk;
+
+ rc = PtlMDBind(bulk->b_peer.peer_ni, bulk->b_md, &md_h);
+ if (rc != 0) {
+ BUG();
+ CERROR("PtlMDBind failed: %d\n", rc);
+ return rc;
+ }
+
+ remote_id.addr_kind = PTL_ADDR_NID;
+ remote_id.nid = bulk->b_peer.peer_nid;
+ remote_id.pid = 0;
+
+ CERROR("Sending %d bytes to portal %d, xid %d\n",
+ bulk->b_md.length, portal, bulk->b_xid);
+
+ rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0, bulk->b_xid, 0, 0);
+ if (rc != PTL_OK) {
+ BUG();
+ CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
+ portal, bulk->b_xid, rc);
+ /* FIXME: tear down md */
+ }
+
+ return rc;
+}
+
+int ptlrpc_wait_bulk(struct ptlrpc_bulk_desc *bulk)
+{
+ int rc;
+
+ ENTRY;
+
+ rc = PtlMEPrepend(bulk->b_peer.peer_ni, bulk->b_portal, local_id,
+ bulk->b_xid, 0, PTL_UNLINK, &bulk->b_me_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMEAttach failed: %d\n", rc);
+ BUG();
+ EXIT;
+ goto cleanup1;
+ }
+
+ bulk->b_md.start = bulk->b_buf;
+ bulk->b_md.length = bulk->b_buflen;
+ bulk->b_md.threshold = 1;
+ bulk->b_md.options = PTL_MD_OP_PUT;
+ bulk->b_md.user_ptr = bulk;
+ bulk->b_md.eventq = bulk_sink_eq;
+
+ rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK, &bulk->b_md_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMDAttach failed: %d\n", rc);
+ BUG();
+ EXIT;
+ goto cleanup2;
+ }
+
+ CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, portal %u\n",
+ bulk->b_buflen, bulk->b_xid, bulk->b_portal);
+
+ cleanup2:
+ PtlMEUnlink(bulk->b_me_h);
+ cleanup1:
+ PtlMDUnlink(bulk->b_md_h);
+
+ return rc;
+}
+
+int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
+{
+ struct ptlrpc_request *clnt_req = req->rq_reply_handle;
+ ENTRY;
+
+ if (req->rq_reply_handle == NULL) {
+ /* This is a request that came from the network via portals. */
+
+ /* FIXME: we need to increment the count of handled events */
+ req->rq_type = PTL_RPC_REPLY;
+ req->rq_reqhdr->xid = req->rq_reqhdr->xid;
+ ptl_send_buf(req, &req->rq_peer, svc->srv_rep_portal);
+ } else {
+ /* This is a local request that came from another thread. */
+
+ /* move the reply to the client */
+ clnt_req->rq_replen = req->rq_replen;
+ clnt_req->rq_repbuf = req->rq_repbuf;
+ req->rq_repbuf = NULL;
+ req->rq_replen = 0;
+
+ /* free the request buffer */
+ OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
+ req->rq_reqbuf = NULL;
+
+ /* wake up the client */
+ wake_up_interruptible(&clnt_req->rq_wait_for_rep);
+ }
+
+ EXIT;
+ return 0;
+}
+
+int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
+{
+ struct ptlrep_hdr *hdr;
+
+ ENTRY;
+
+ OBD_ALLOC(hdr, sizeof(*hdr));
+ if (!hdr) {
+ EXIT;
+ return -ENOMEM;
+ }
+
+ memset(hdr, 0, sizeof(*hdr));
+
+ hdr->xid = req->rq_reqhdr->xid;
+ hdr->status = req->rq_status;
+ hdr->type = OST_TYPE_ERR;
+
+ if (req->rq_repbuf) {
+ CERROR("req has repbuf\n");
+ BUG();
+ }
+
+ req->rq_repbuf = (char *)hdr;
+ req->rq_replen = sizeof(*hdr);
+
+ EXIT;
+ return ptlrpc_reply(obddev, svc, req);
+}
+
int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
{
- ptl_handle_me_t me_h, bulk_me_h;
ptl_process_id_t local_id;
int rc;
char *repbuf;
local_id.rid = PTL_ID_ANY;
//CERROR("sending req %d\n", request->rq_xid);
- rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
- request->rq_xid, 0, PTL_UNLINK, &me_h);
+ rc = PtlMEPrepend(peer->peer_ni, request->rq_reply_portal, local_id,
+ request->rq_xid, 0, PTL_UNLINK,
+ &request->rq_reply_me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
BUG();
goto cleanup;
}
+ request->rq_type = PTL_RPC_REQUEST;
request->rq_reply_md.start = repbuf;
request->rq_reply_md.length = request->rq_replen;
request->rq_reply_md.threshold = 1;
request->rq_reply_md.user_ptr = request;
request->rq_reply_md.eventq = rcvd_rep_eq;
- rc = PtlMDAttach(me_h, request->rq_reply_md, PTL_UNLINK,
- &request->rq_reply_md_h);
+ rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md,
+ PTL_UNLINK, &request->rq_reply_md_h);
if (rc != PTL_OK) {
CERROR("PtlMDAttach failed: %d\n", rc);
BUG();
goto cleanup2;
}
- if (request->rq_bulklen != 0) {
- rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal,
- local_id, request->rq_xid, 0, PTL_UNLINK,
- &bulk_me_h);
- if (rc != PTL_OK) {
- CERROR("PtlMEAttach failed: %d\n", rc);
- BUG();
- EXIT;
- goto cleanup3;
- }
-
- request->rq_bulk_md.start = request->rq_bulkbuf;
- request->rq_bulk_md.length = request->rq_bulklen;
- request->rq_bulk_md.threshold = 1;
- request->rq_bulk_md.options = PTL_MD_OP_PUT;
- request->rq_bulk_md.user_ptr = request;
- request->rq_bulk_md.eventq = bulk_sink_eq;
-
- rc = PtlMDAttach(bulk_me_h, request->rq_bulk_md, PTL_UNLINK,
- &request->rq_bulk_md_h);
- if (rc != PTL_OK) {
- CERROR("PtlMDAttach failed: %d\n", rc);
- BUG();
- EXIT;
- goto cleanup4;
- }
- }
+ CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %u, portal %u\n",
+ request->rq_replen, request->rq_xid, request->rq_reply_portal);
- return ptl_send_buf(request, peer, request->rq_req_portal, 1);
+ return ptl_send_buf(request, peer, request->rq_req_portal);
- cleanup4:
- PtlMEUnlink(bulk_me_h);
- cleanup3:
- PtlMDUnlink(request->rq_reply_md_h);
cleanup2:
- PtlMEUnlink(me_h);
+ PtlMEUnlink(request->rq_reply_me_h);
cleanup:
OBD_FREE(repbuf, request->rq_replen);