+#if !MULTIRAIL_EKC
+void
+kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov)
+{
+ int i;
+
+ CDEBUG (how, "%s: %d\n", str, n);
+ for (i = 0; i < n; i++) {
+ CDEBUG (how, " %08x for %d\n", iov[i].Base, iov[i].Len);
+ }
+}
+
+int
+kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
+ int nsrc, EP_IOVEC *src,
+ int ndst, EP_IOVEC *dst)
+{
+ int count;
+ int nob;
+
+ LASSERT (ndv > 0);
+ LASSERT (nsrc > 0);
+ LASSERT (ndst > 0);
+
+ for (count = 0; count < ndv; count++, dv++) {
+
+ if (nsrc == 0 || ndst == 0) {
+ if (nsrc != ndst) {
+ /* For now I'll barf on any left over entries */
+ CERROR ("mismatched src and dst iovs\n");
+ return (-EINVAL);
+ }
+ return (count);
+ }
+
+ nob = (src->Len < dst->Len) ? src->Len : dst->Len;
+ dv->Len = nob;
+ dv->Source = src->Base;
+ dv->Dest = dst->Base;
+
+ if (nob >= src->Len) {
+ src++;
+ nsrc--;
+ } else {
+ src->Len -= nob;
+ src->Base += nob;
+ }
+
+ if (nob >= dst->Len) {
+ dst++;
+ ndst--;
+ } else {
+ src->Len -= nob;
+ src->Base += nob;
+ }
+ }
+
+ CERROR ("DATAVEC too small\n");
+ return (-E2BIG);
+}
+#else
+int
+kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
+ int nrfrag, EP_NMD *rfrag)
+{
+ int i;
+
+ if (nlfrag != nrfrag) {
+ CERROR("Can't cope with unequal # frags: %d local %d remote\n",
+ nlfrag, nrfrag);
+ return (-EINVAL);
+ }
+
+ for (i = 0; i < nlfrag; i++)
+ if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
+ CERROR("Can't cope with unequal frags %d(%d):"
+ " %d local %d remote\n",
+ i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
+ return (-EINVAL);
+ }
+
+ return (0);
+}
+#endif
+
+kqswnal_remotemd_t *
+kqswnal_parse_rmd (kqswnal_rx_t *krx, int type, ptl_nid_t expected_nid)
+{
+ char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
+ ptl_hdr_t *hdr = (ptl_hdr_t *)buffer;
+ kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
+ ptl_nid_t nid = kqswnal_rx_nid(krx);
+
+ /* Note (1) lib_parse has already flipped hdr.
+ * (2) RDMA addresses are sent in native endian-ness. When
+ * EKC copes with different endian nodes, I'll fix this (and
+ * eat my hat :) */
+
+ LASSERT (krx->krx_nob >= sizeof(*hdr));
+
+ if (hdr->type != type) {
+ CERROR ("Unexpected optimized get/put type %d (%d expected)"
+ "from "LPX64"\n", hdr->type, type, nid);
+ return (NULL);
+ }
+
+ if (hdr->src_nid != nid) {
+ CERROR ("Unexpected optimized get/put source NID "
+ LPX64" from "LPX64"\n", hdr->src_nid, nid);
+ return (NULL);
+ }
+
+ LASSERT (nid == expected_nid);
+
+ if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
+ /* msg too small to discover rmd size */
+ CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
+ krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
+ return (NULL);
+ }
+
+ if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
+ /* rmd doesn't fit in the incoming message */
+ CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
+ krx->krx_nob, rmd->kqrmd_nfrag,
+ (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
+ return (NULL);
+ }
+
+ return (rmd);
+}
+
+void
+kqswnal_rdma_store_complete (EP_RXD *rxd)
+{
+ int status = ep_rxd_status(rxd);
+ kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+ kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+
+ CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+ "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+ LASSERT (ktx->ktx_state == KTX_RDMAING);
+ LASSERT (krx->krx_rxd == rxd);
+ LASSERT (krx->krx_rpc_reply_needed);
+
+ krx->krx_rpc_reply_needed = 0;
+ kqswnal_rx_decref (krx);
+
+ /* free ktx & finalize() its lib_msg_t */
+ kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
+}
+
+void
+kqswnal_rdma_fetch_complete (EP_RXD *rxd)
+{
+ /* Completed fetching the PUT data */
+ int status = ep_rxd_status(rxd);
+ kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+ kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+ unsigned long flags;
+
+ CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+ "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+ LASSERT (ktx->ktx_state == KTX_RDMAING);
+ LASSERT (krx->krx_rxd == rxd);
+ /* RPC completes with failure by default */
+ LASSERT (krx->krx_rpc_reply_needed);
+ LASSERT (krx->krx_rpc_reply_status != 0);
+
+ if (status == EP_SUCCESS) {
+ status = krx->krx_rpc_reply_status = 0;
+ } else {
+ /* Abandon RPC since get failed */
+ krx->krx_rpc_reply_needed = 0;
+ status = -ECONNABORTED;
+ }
+
+ /* free ktx & finalize() its lib_msg_t */
+ kqswnal_tx_done(ktx, status);
+
+ if (!in_interrupt()) {
+ /* OK to complete the RPC now (iff I had the last ref) */
+ kqswnal_rx_decref (krx);
+ return;
+ }
+
+ LASSERT (krx->krx_state == KRX_PARSE);
+ krx->krx_state = KRX_COMPLETING;
+
+ /* Complete the RPC in thread context */
+ spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
+
+ list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
+ wake_up (&kqswnal_data.kqn_sched_waitq);
+
+ spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+}
+
+int
+kqswnal_rdma (kqswnal_rx_t *krx, lib_msg_t *libmsg, int type,
+ int niov, struct iovec *iov, ptl_kiov_t *kiov,
+ size_t offset, size_t len)
+{
+ kqswnal_remotemd_t *rmd;
+ kqswnal_tx_t *ktx;
+ int eprc;
+ int rc;
+#if !MULTIRAIL_EKC
+ EP_DATAVEC datav[EP_MAXFRAG];
+ int ndatav;
+#endif
+
+ LASSERT (type == PTL_MSG_GET || type == PTL_MSG_PUT);
+ /* Not both mapped and paged payload */
+ LASSERT (iov == NULL || kiov == NULL);
+ /* RPC completes with failure by default */
+ LASSERT (krx->krx_rpc_reply_needed);
+ LASSERT (krx->krx_rpc_reply_status != 0);
+
+ rmd = kqswnal_parse_rmd(krx, type, libmsg->ev.initiator.nid);
+ if (rmd == NULL)
+ return (-EPROTO);
+
+ if (len == 0) {
+ /* data got truncated to nothing. */
+ lib_finalize(&kqswnal_lib, krx, libmsg, PTL_OK);
+ /* Let kqswnal_rx_done() complete the RPC with success */
+ krx->krx_rpc_reply_status = 0;
+ return (0);
+ }
+
+ /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
+ actually sending a portals message with it */
+ ktx = kqswnal_get_idle_tx(NULL, 0);
+ if (ktx == NULL) {
+ CERROR ("Can't get txd for RDMA with "LPX64"\n",
+ libmsg->ev.initiator.nid);
+ return (-ENOMEM);
+ }
+
+ ktx->ktx_state = KTX_RDMAING;
+ ktx->ktx_nid = libmsg->ev.initiator.nid;
+ ktx->ktx_args[0] = krx;
+ ktx->ktx_args[1] = libmsg;
+
+#if MULTIRAIL_EKC
+ /* Map on the rail the RPC prefers */
+ ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
+ ep_rxd_railmask(krx->krx_rxd));
+#endif
+
+ /* Start mapping at offset 0 (we're not mapping any headers) */
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
+
+ if (kiov != NULL)
+ rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
+ else
+ rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
+
+ if (rc != 0) {
+ CERROR ("Can't map local RDMA data: %d\n", rc);
+ goto out;
+ }
+
+#if MULTIRAIL_EKC
+ rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
+ rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+ if (rc != 0) {
+ CERROR ("Incompatible RDMA descriptors\n");
+ goto out;
+ }
+#else
+ switch (type) {
+ default:
+ LBUG();
+
+ case PTL_MSG_GET:
+ ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
+ ktx->ktx_nfrag, ktx->ktx_frags,
+ rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+ break;
+
+ case PTL_MSG_PUT:
+ ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
+ rmd->kqrmd_nfrag, rmd->kqrmd_frag,
+ ktx->ktx_nfrag, ktx->ktx_frags);
+ break;
+ }
+
+ if (ndatav < 0) {
+ CERROR ("Can't create datavec: %d\n", ndatav);
+ rc = ndatav;
+ goto out;
+ }
+#endif
+
+ LASSERT (atomic_read(&krx->krx_refcount) > 0);
+ /* Take an extra ref for the completion callback */
+ atomic_inc(&krx->krx_refcount);
+
+ switch (type) {
+ default:
+ LBUG();
+
+ case PTL_MSG_GET:
+#if MULTIRAIL_EKC
+ eprc = ep_complete_rpc(krx->krx_rxd,
+ kqswnal_rdma_store_complete, ktx,
+ &kqswnal_data.kqn_rpc_success,
+ ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
+#else
+ eprc = ep_complete_rpc (krx->krx_rxd,
+ kqswnal_rdma_store_complete, ktx,
+ &kqswnal_data.kqn_rpc_success,
+ datav, ndatav);
+ if (eprc != EP_SUCCESS) /* "old" EKC destroys rxd on failed completion */
+ krx->krx_rxd = NULL;
+#endif
+ if (eprc != EP_SUCCESS) {
+ CERROR("can't complete RPC: %d\n", eprc);
+ /* don't re-attempt RPC completion */
+ krx->krx_rpc_reply_needed = 0;
+ rc = -ECONNABORTED;
+ }
+ break;
+
+ case PTL_MSG_PUT:
+#if MULTIRAIL_EKC
+ eprc = ep_rpc_get (krx->krx_rxd,
+ kqswnal_rdma_fetch_complete, ktx,
+ rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
+#else
+ eprc = ep_rpc_get (krx->krx_rxd,
+ kqswnal_rdma_fetch_complete, ktx,
+ datav, ndatav);
+#endif
+ if (eprc != EP_SUCCESS) {
+ CERROR("ep_rpc_get failed: %d\n", eprc);
+ /* Don't attempt RPC completion:
+ * EKC nuked it when the get failed */
+ krx->krx_rpc_reply_needed = 0;
+ rc = -ECONNABORTED;
+ }
+ break;
+ }
+
+ out:
+ if (rc != 0) {
+ kqswnal_rx_decref(krx); /* drop callback's ref */
+ kqswnal_put_idle_tx (ktx);
+ }
+
+ atomic_dec(&kqswnal_data.kqn_pending_txs);
+ return (rc);
+}
+
+static ptl_err_t
+kqswnal_sendmsg (lib_nal_t *nal,