*/
#define EXPORT_SYMTAB
-
#define DEBUG_SUBSYSTEM S_RPC
#include <linux/lustre_net.h>
-extern ptl_handle_eq_t request_out_eq,
- reply_in_eq,
- reply_out_eq,
- bulk_source_eq,
- bulk_sink_eq;
+extern ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq,
+ bulk_source_eq, bulk_sink_eq;
static ptl_process_id_t local_id = {PTL_ID_ANY, PTL_ID_ANY};
-
-int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk)
+static int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk)
{
ENTRY;
return rc;
}
-int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *bulk, int portal)
+int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *desc)
{
int rc;
+ struct list_head *tmp, *next;
ptl_process_id_t remote_id;
+ ENTRY;
- bulk->b_md.start = bulk->b_buf;
- bulk->b_md.length = bulk->b_buflen;
- bulk->b_md.eventq = bulk_source_eq;
- bulk->b_md.threshold = 2; /* SENT and ACK events */
- bulk->b_md.options = PTL_MD_OP_PUT;
- bulk->b_md.user_ptr = bulk;
-
- rc = PtlMDBind(bulk->b_connection->c_peer.peer_ni, bulk->b_md,
- &bulk->b_md_h);
- if (rc != 0) {
- CERROR("PtlMDBind failed: %d\n", rc);
- LBUG();
- return rc;
+ list_for_each_safe(tmp, next, &desc->b_page_list) {
+ /* only request an ACK for the last page */
+ int ack = (next == &desc->b_page_list);
+ struct ptlrpc_bulk_page *bulk;
+ bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
+
+ bulk->b_md.start = bulk->b_buf;
+ bulk->b_md.length = bulk->b_buflen;
+ bulk->b_md.eventq = bulk_source_eq;
+ bulk->b_md.threshold = 1 + ack; /* SENT and (if last) ACK */
+ bulk->b_md.options = PTL_MD_OP_PUT;
+ bulk->b_md.user_ptr = bulk;
+
+ rc = PtlMDBind(desc->b_connection->c_peer.peer_ni, bulk->b_md,
+ &bulk->b_md_h);
+ if (rc != 0) {
+ CERROR("PtlMDBind failed: %d\n", rc);
+ LBUG();
+ RETURN(rc);
+ }
+
+ remote_id.nid = desc->b_connection->c_peer.peer_nid;
+ remote_id.pid = 0;
+
+ CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n",
+ bulk->b_md.length, desc->b_portal, bulk->b_xid);
+
+ rc = PtlPut(bulk->b_md_h, (ack ? PTL_ACK_REQ : PTL_NOACK_REQ),
+ remote_id, desc->b_portal, 0, bulk->b_xid, 0, 0);
+ if (rc != PTL_OK) {
+ CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
+ desc->b_portal, bulk->b_xid, rc);
+ PtlMDUnlink(bulk->b_md_h);
+ LBUG();
+ RETURN(rc);
+ }
}
- remote_id.nid = bulk->b_connection->c_peer.peer_nid;
- remote_id.pid = 0;
+ wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
- CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n",
- bulk->b_md.length, portal, bulk->b_xid);
+ if (desc->b_flags & PTL_RPC_FL_INTR)
+ RETURN(-EINTR);
- rc = PtlPut(bulk->b_md_h, PTL_ACK_REQ, remote_id, portal, 0,
- bulk->b_xid, 0, 0);
- if (rc != PTL_OK) {
- CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
- portal, bulk->b_xid, rc);
- PtlMDUnlink(bulk->b_md_h);
- LBUG();
- }
-
- return rc;
+ RETURN(0);
}
-int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *bulk)
+int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *desc)
{
+ struct list_head *tmp, *next;
int rc;
ENTRY;
- rc = PtlMEAttach(bulk->b_connection->c_peer.peer_ni, bulk->b_portal,
- local_id, bulk->b_xid, 0, PTL_UNLINK, PTL_INS_AFTER,
- &bulk->b_me_h);
- if (rc != PTL_OK) {
- CERROR("PtlMEAttach failed: %d\n", rc);
- LBUG();
- GOTO(cleanup, rc);
- }
-
- bulk->b_md.start = bulk->b_buf;
- bulk->b_md.length = bulk->b_buflen;
- bulk->b_md.threshold = 1;
- bulk->b_md.options = PTL_MD_OP_PUT;
- bulk->b_md.user_ptr = bulk;
- bulk->b_md.eventq = bulk_sink_eq;
-
- rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK, &bulk->b_md_h);
- //CERROR("MDAttach (bulk sink): %Lu\n", (__u64)bulk->b_md_h);
- if (rc != PTL_OK) {
- CERROR("PtlMDAttach failed: %d\n", rc);
- LBUG();
- GOTO(cleanup, rc);
+ list_for_each_safe(tmp, next, &desc->b_page_list) {
+ struct ptlrpc_bulk_page *bulk;
+ bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
+
+ rc = PtlMEAttach(desc->b_connection->c_peer.peer_ni,
+ desc->b_portal, local_id, bulk->b_xid, 0,
+ PTL_UNLINK, PTL_INS_AFTER, &bulk->b_me_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMEAttach failed: %d\n", rc);
+ LBUG();
+ GOTO(cleanup, rc);
+ }
+
+ bulk->b_md.start = bulk->b_buf;
+ bulk->b_md.length = bulk->b_buflen;
+ bulk->b_md.threshold = 1;
+ bulk->b_md.options = PTL_MD_OP_PUT;
+ bulk->b_md.user_ptr = bulk;
+ bulk->b_md.eventq = bulk_sink_eq;
+
+ rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK,
+ &bulk->b_md_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMDAttach failed: %d\n", rc);
+ LBUG();
+ GOTO(cleanup, rc);
+ }
+
+ CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, "
+ "portal %u\n", bulk->b_buflen, bulk->b_xid,
+ desc->b_portal);
}
- CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, portal %u\n",
- bulk->b_buflen, bulk->b_xid, bulk->b_portal);
RETURN(0);
- // XXX Confirm that this is safe!
cleanup:
- PtlMEUnlink(bulk->b_me_h);
+ ptlrpc_abort_bulk(desc);
+
return rc;
}
-int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk)
+int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc)
{
- int rc;
+ struct list_head *tmp, *next;
- rc = PtlMEUnlink(bulk->b_me_h);
- if (rc != PTL_OK)
- CERROR("PtlMEUnlink failed: %d\n", rc);
+ list_for_each_safe(tmp, next, &desc->b_page_list) {
+ struct ptlrpc_bulk_page *bulk;
+ bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
- return rc;
+ /* This should be safe: these handles are initialized to be
+ * invalid in ptlrpc_prep_bulk_page() */
+ PtlMDUnlink(bulk->b_md_h);
+ PtlMEUnlink(bulk->b_me_h);
+ }
+
+ return 0;
}
int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req)