+int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *bulk, int portal)
+{
+ int rc;
+ ptl_process_id_t remote_id;
+ ptl_handle_md_t md_h;
+
+ bulk->b_md.start = bulk->b_buf;
+ bulk->b_md.length = bulk->b_buflen;
+ bulk->b_md.eventq = bulk_source_eq;
+ bulk->b_md.threshold = 2; /* SENT and ACK events */
+ bulk->b_md.options = PTL_MD_OP_PUT;
+ bulk->b_md.user_ptr = bulk;
+
+ rc = PtlMDBind(bulk->b_peer.peer_ni, bulk->b_md, &md_h);
+ if (rc != 0) {
+ BUG();
+ CERROR("PtlMDBind failed: %d\n", rc);
+ return rc;
+ }
+
+ remote_id.addr_kind = PTL_ADDR_NID;
+ remote_id.nid = bulk->b_peer.peer_nid;
+ remote_id.pid = 0;
+
+ CERROR("Sending %d bytes to portal %d, xid %d\n",
+ bulk->b_md.length, portal, bulk->b_xid);
+
+ rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0, bulk->b_xid, 0, 0);
+ if (rc != PTL_OK) {
+ BUG();
+ CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
+ portal, bulk->b_xid, rc);
+ /* FIXME: tear down md */
+ }
+
+ return rc;
+}
+
+int ptlrpc_wait_bulk(struct ptlrpc_bulk_desc *bulk)
+{
+ int rc;
+
+ ENTRY;
+
+ rc = PtlMEPrepend(bulk->b_peer.peer_ni, bulk->b_portal, local_id,
+ bulk->b_xid, 0, PTL_UNLINK, &bulk->b_me_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMEAttach failed: %d\n", rc);
+ BUG();
+ EXIT;
+ goto cleanup1;
+ }
+
+ bulk->b_md.start = bulk->b_buf;
+ bulk->b_md.length = bulk->b_buflen;
+ bulk->b_md.threshold = 1;
+ bulk->b_md.options = PTL_MD_OP_PUT;
+ bulk->b_md.user_ptr = bulk;
+ bulk->b_md.eventq = bulk_sink_eq;
+
+ rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK, &bulk->b_md_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMDAttach failed: %d\n", rc);
+ BUG();
+ EXIT;
+ goto cleanup2;
+ }
+
+ CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, portal %u\n",
+ bulk->b_buflen, bulk->b_xid, bulk->b_portal);
+
+ cleanup2:
+ PtlMEUnlink(bulk->b_me_h);
+ cleanup1:
+ PtlMDUnlink(bulk->b_md_h);
+
+ return rc;
+}
+
+int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
+{
+ struct ptlrpc_request *clnt_req = req->rq_reply_handle;
+ ENTRY;
+
+ if (req->rq_reply_handle == NULL) {
+ /* This is a request that came from the network via portals. */
+
+ /* FIXME: we need to increment the count of handled events */
+ req->rq_type = PTL_RPC_REPLY;
+ req->rq_reqhdr->xid = req->rq_reqhdr->xid;
+ ptl_send_buf(req, &req->rq_peer, svc->srv_rep_portal);
+ } else {
+ /* This is a local request that came from another thread. */
+
+ /* move the reply to the client */
+ clnt_req->rq_replen = req->rq_replen;
+ clnt_req->rq_repbuf = req->rq_repbuf;
+ req->rq_repbuf = NULL;
+ req->rq_replen = 0;
+
+ /* free the request buffer */
+ OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
+ req->rq_reqbuf = NULL;
+
+ /* wake up the client */
+ wake_up_interruptible(&clnt_req->rq_wait_for_rep);
+ }
+
+ EXIT;
+ return 0;
+}
+
+int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
+{
+ struct ptlrep_hdr *hdr;
+
+ ENTRY;
+
+ OBD_ALLOC(hdr, sizeof(*hdr));
+ if (!hdr) {
+ EXIT;
+ return -ENOMEM;
+ }
+
+ memset(hdr, 0, sizeof(*hdr));
+
+ hdr->xid = req->rq_reqhdr->xid;
+ hdr->status = req->rq_status;
+ hdr->type = OST_TYPE_ERR;
+
+ if (req->rq_repbuf) {
+ CERROR("req has repbuf\n");
+ BUG();
+ }
+
+ req->rq_repbuf = (char *)hdr;
+ req->rq_replen = sizeof(*hdr);
+
+ EXIT;
+ return ptlrpc_reply(obddev, svc, req);
+}
+