- if (rc != 0) {
- CERROR ("Can't map RDMA for "LPX64": %d\n", nid, rc);
- goto failed;
- }
-
- if (type == IBNAL_MSG_GET_RDMA) {
- /* reply gets finalized when tx completes */
- tx->tx_libmsg[1] = lib_create_reply_msg(&kibnal_lib,
- nid, libmsg);
- if (tx->tx_libmsg[1] == NULL) {
- CERROR ("Can't create reply for GET -> "LPX64"\n",
- nid);
- rc = -ENOMEM;
- goto failed;
- }
- }
-
- tx->tx_passive_rdma = 1;
-
- ibmsg = tx->tx_msg;
-
- ibmsg->ibm_u.rdma.ibrm_hdr = *hdr;
- ibmsg->ibm_u.rdma.ibrm_cookie = tx->tx_passive_rdma_cookie;
- /* map_kiov alrady filled the rdma descs for the whole_mem case */
- if (!kibnal_whole_mem()) {
- ibmsg->ibm_u.rdma.rd_key = tx->tx_md.md_rkey;
- ibmsg->ibm_u.rdma.ibrm_desc[0].rd_addr = tx->tx_md.md_addr;
- ibmsg->ibm_u.rdma.ibrm_desc[0].rd_nob = nob;
- ibmsg->ibm_u.rdma.ibrm_num_descs = 1;
- }
-
- kibnal_init_tx_msg (tx, type,
- kib_rdma_msg_len(ibmsg->ibm_u.rdma.ibrm_num_descs));
-
- CDEBUG(D_NET, "Passive: %p cookie "LPX64", key %x, addr "
- LPX64", nob %d\n",
- tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,
- tx->tx_md.md_addr, nob);
-
- /* libmsg gets finalized when tx completes. */
- tx->tx_libmsg[0] = libmsg;
-
- kibnal_launch_tx(tx, nid);
- return (PTL_OK);
-
- failed:
- tx->tx_status = rc;
- kibnal_tx_done (tx);
- return (PTL_FAIL);
-}
-
-void
-kibnal_start_active_rdma (int type, int status,
- kib_rx_t *rx, lib_msg_t *libmsg,
- unsigned int niov,
- struct iovec *iov, ptl_kiov_t *kiov,
- size_t offset, size_t nob)
-{
- kib_msg_t *rxmsg = rx->rx_msg;
- kib_msg_t *txmsg;
- kib_tx_t *tx;
- vv_access_con_bit_mask_t access;
- vv_wr_operation_t rdma_op;
- int rc;
- __u32 i;
-
- CDEBUG(D_NET, "type %d, status %d, niov %d, offset %d, nob %d\n",
- type, status, niov, offset, nob);
-
- /* Called by scheduler */
- LASSERT (!in_interrupt ());
-
- /* Either all pages or all vaddrs */
- LASSERT (!(kiov != NULL && iov != NULL));
-
- /* No data if we're completing with failure */
- LASSERT (status == 0 || nob == 0);
-
- LASSERT (type == IBNAL_MSG_GET_DONE ||
- type == IBNAL_MSG_PUT_DONE);
-
- /* Flag I'm completing the RDMA. Even if I fail to send the
- * completion message, I will have tried my best so further
- * attempts shouldn't be tried. */
- LASSERT (!rx->rx_rdma);
- rx->rx_rdma = 1;
-
- if (type == IBNAL_MSG_GET_DONE) {
- access = 0;
- rdma_op = vv_wr_rdma_write;
- LASSERT (rxmsg->ibm_type == IBNAL_MSG_GET_RDMA);
- } else {
- access = vv_acc_l_mem_write;
- rdma_op = vv_wr_rdma_read;
- LASSERT (rxmsg->ibm_type == IBNAL_MSG_PUT_RDMA);
- }
-
- tx = kibnal_get_idle_tx (0); /* Mustn't block */
- if (tx == NULL) {
- CERROR ("tx descs exhausted on RDMA from "LPX64
- " completing locally with failure\n",
- rx->rx_conn->ibc_peer->ibp_nid);
- lib_finalize (&kibnal_lib, NULL, libmsg, PTL_NO_SPACE);
- return;
- }
- LASSERT (tx->tx_nsp == 0);
-
- if (nob == 0)
- GOTO(init_tx, 0);
-
- /* We actually need to transfer some data (the transfer
- * size could get truncated to zero when the incoming
- * message is matched) */
- if (kiov != NULL)
- rc = kibnal_map_kiov (tx, access, niov, kiov, offset, nob, 1);
- else
- rc = kibnal_map_iov (tx, access, niov, iov, offset, nob, 1);
-
- if (rc != 0) {
- CERROR ("Can't map RDMA -> "LPX64": %d\n",
- rx->rx_conn->ibc_peer->ibp_nid, rc);
- /* We'll skip the RDMA and complete with failure. */
- status = rc;
- nob = 0;
- GOTO(init_tx, rc);
- }
-
- if (!kibnal_whole_mem()) {
- tx->tx_msg->ibm_u.rdma.rd_key = tx->tx_md.md_lkey;
- tx->tx_msg->ibm_u.rdma.ibrm_desc[0].rd_addr = tx->tx_md.md_addr;
- tx->tx_msg->ibm_u.rdma.ibrm_desc[0].rd_nob = nob;
- tx->tx_msg->ibm_u.rdma.ibrm_num_descs = 1;
- }
-
- /* XXX ugh. different page-sized hosts. */
- if (tx->tx_msg->ibm_u.rdma.ibrm_num_descs !=
- rxmsg->ibm_u.rdma.ibrm_num_descs) {
- CERROR("tx descs (%u) != rx descs (%u)\n",
- tx->tx_msg->ibm_u.rdma.ibrm_num_descs,
- rxmsg->ibm_u.rdma.ibrm_num_descs);
- /* We'll skip the RDMA and complete with failure. */
- status = rc;
- nob = 0;
- GOTO(init_tx, rc);
- }
-
- /* map_kiov filled in the rdma descs which describe our side of the
- * rdma transfer. */
- /* ibrm_num_descs was verified in rx_callback */
- for(i = 0; i < rxmsg->ibm_u.rdma.ibrm_num_descs; i++) {
- kib_rdma_desc_t *ldesc, *rdesc; /* local, remote */
- vv_scatgat_t *ds = &tx->tx_gl[i];
- vv_wr_t *wrq = &tx->tx_wrq[i];
-
- ldesc = &tx->tx_msg->ibm_u.rdma.ibrm_desc[i];
- rdesc = &rxmsg->ibm_u.rdma.ibrm_desc[i];
-
- ds->v_address = (void *)(unsigned long)ldesc->rd_addr;
- ds->length = ldesc->rd_nob;
- ds->l_key = tx->tx_msg->ibm_u.rdma.rd_key;
-
- wrq->wr_id = kibnal_ptr2wreqid(tx, 0);
-
-#if 0
- /* only the last rdma post triggers tx completion */
- if (i == rxmsg->ibm_u.rdma.ibrm_num_descs - 1)
- wrq->completion_notification = 1;
- else
- wrq->completion_notification = 0;
-
-#else
- /* TODO: hack. Right now complete everything, else the
- * driver will deadlock. This is less efficient than
- * requestion a notification for only a few of the
- * WQE. */
- wrq->completion_notification = 1;
-#endif
-
- wrq->scatgat_list = ds;
- wrq->num_of_data_segments = 1;
- wrq->wr_type = rdma_op;
-
- wrq->type.send.solicited_event = 0;
-
- wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
- wrq->type.send.send_qp_type.rc_type.r_addr = rdesc->rd_addr;
- wrq->type.send.send_qp_type.rc_type.r_r_key = rxmsg->ibm_u.rdma.rd_key;
-
- CDEBUG(D_NET, "prepared RDMA with r_addr=%llx r_key=%x\n",
- wrq->type.send.send_qp_type.rc_type.r_addr,
- wrq->type.send.send_qp_type.rc_type.r_r_key);
-
- tx->tx_nsp++;
- }
-
-init_tx:
- txmsg = tx->tx_msg;
-
- txmsg->ibm_u.completion.ibcm_cookie = rxmsg->ibm_u.rdma.ibrm_cookie;
- txmsg->ibm_u.completion.ibcm_status = status;
-
- kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
-
- if (status == 0 && nob != 0) {
- LASSERT (tx->tx_nsp > 1);
- /* RDMA: libmsg gets finalized when the tx completes. This
- * is after the completion message has been sent, which in
- * turn is after the RDMA has finished. */
- tx->tx_libmsg[0] = libmsg;
- } else {
- LASSERT (tx->tx_nsp == 1);
- /* No RDMA: local completion happens now! */
- CDEBUG(D_WARNING,"No data: immediate completion\n");
- lib_finalize (&kibnal_lib, NULL, libmsg,
- status == 0 ? PTL_OK : PTL_FAIL);
- }
-
- /* +1 ref for this tx... */
- CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
- rx->rx_conn, rx->rx_conn->ibc_state,
- rx->rx_conn->ibc_peer->ibp_nid,
- atomic_read (&rx->rx_conn->ibc_refcount));
- atomic_inc (&rx->rx_conn->ibc_refcount);
- /* ...and queue it up */
- kibnal_queue_tx(tx, rx->rx_conn);
-}
-
-static ptl_err_t
-kibnal_sendmsg(lib_nal_t *nal,
- void *private,
- lib_msg_t *libmsg,
- ptl_hdr_t *hdr,
- int type,
- ptl_nid_t nid,
- ptl_pid_t pid,
- unsigned int payload_niov,
- struct iovec *payload_iov,
- ptl_kiov_t *payload_kiov,
- size_t payload_offset,
- size_t payload_nob)
-{
- kib_msg_t *ibmsg;
- kib_tx_t *tx;
- int nob;
-
- /* NB 'private' is different depending on what we're sending.... */
-
- CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
- " pid %d\n", payload_nob, payload_niov, nid , pid);
-
- LASSERT (payload_nob == 0 || payload_niov > 0);
- LASSERT (payload_niov <= PTL_MD_MAX_IOV);
-
- /* Thread context if we're sending payload */
- LASSERT (!in_interrupt() || payload_niov == 0);