#include "qswnal.h"
-EP_STATUSBLK kqswnal_rpc_success;
-EP_STATUSBLK kqswnal_rpc_failed;
-
/*
* LIB functions follow
*
*/
-static ptl_err_t
-kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
- size_t len)
-{
- CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
- nal->ni.nid, len, src_addr, dst_addr );
- memcpy( dst_addr, src_addr, len );
-
- return (PTL_OK);
-}
-
-static ptl_err_t
-kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
- size_t len)
-{
- CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
- nal->ni.nid, len, src_addr, dst_addr );
- memcpy( dst_addr, src_addr, len );
-
- return (PTL_OK);
-}
-
-static void *
-kqswnal_malloc(nal_cb_t *nal, size_t len)
-{
- void *buf;
-
- PORTAL_ALLOC(buf, len);
- return (buf);
-}
-
-static void
-kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
-{
- PORTAL_FREE(buf, len);
-}
-
-static void
-kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
-{
- va_list ap;
- char msg[256];
-
- va_start (ap, fmt);
- vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
- va_end (ap);
-
- msg[sizeof (msg) - 1] = 0; /* ensure terminated */
-
- CDEBUG (D_NET, "%s", msg);
-}
-
-#if (defined(CONFIG_SPARC32) || defined(CONFIG_SPARC64))
-# error "Can't save/restore irq contexts in different procedures"
-#endif
-
-static void
-kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
-{
- kqswnal_data_t *data= nal->nal_data;
-
- spin_lock_irqsave(&data->kqn_statelock, *flags);
-}
-
-
-static void
-kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
-{
- kqswnal_data_t *data= nal->nal_data;
-
- spin_unlock_irqrestore(&data->kqn_statelock, *flags);
-}
-
-static void
-kqswnal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
-{
- /* holding kqn_statelock */
-
- if (eq->event_callback != NULL)
- eq->event_callback(ev);
-
- if (waitqueue_active(&kqswnal_data.kqn_yield_waitq))
- wake_up_all(&kqswnal_data.kqn_yield_waitq);
-}
-
static int
-kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
+kqswnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
{
- if (nid == nal->ni.nid)
+ if (nid == nal->libnal_ni.ni_pid.nid)
*dist = 0; /* it's me */
else if (kqswnal_nid2elanid (nid) >= 0)
*dist = 1; /* it's my peer */
do {
int fraglen = kiov->kiov_len - offset;
- /* nob exactly spans the iovs */
- LASSERT (fraglen <= nob);
- /* each frag fits in a page */
+ /* each page frag is contained in one page */
LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
+ if (fraglen > nob)
+ fraglen = nob;
+
nmapped++;
if (nmapped > maxmapped) {
CERROR("Can't map message in %d pages (max %d)\n",
do {
int fraglen = iov->iov_len - offset;
- long npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
-
- /* nob exactly spans the iovs */
- LASSERT (fraglen <= nob);
+ long npages;
+ if (fraglen > nob)
+ fraglen = nob;
+ npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
+
nmapped += npages;
if (nmapped > maxmapped) {
CERROR("Can't map message in %d pages (max %d)\n",
void
kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
{
- lib_msg_t *msg;
- lib_msg_t *repmsg = NULL;
-
switch (ktx->ktx_state) {
case KTX_FORWARDING: /* router asked me to forward this packet */
kpr_fwd_done (&kqswnal_data.kqn_router,
(kpr_fwd_desc_t *)ktx->ktx_args[0], error);
break;
- case KTX_SENDING: /* packet sourced locally */
- lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
+ case KTX_RDMAING: /* optimized GET/PUT handled */
+ case KTX_PUTTING: /* optimized PUT sent */
+ case KTX_SENDING: /* normal send */
+ lib_finalize (&kqswnal_lib, NULL,
(lib_msg_t *)ktx->ktx_args[1],
- (error == 0) ? PTL_OK :
- (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
+ (error == 0) ? PTL_OK : PTL_FAIL);
break;
- case KTX_GETTING: /* Peer has DMA-ed direct? */
- msg = (lib_msg_t *)ktx->ktx_args[1];
-
- if (error == 0) {
- repmsg = lib_create_reply_msg (&kqswnal_lib,
- ktx->ktx_nid, msg);
- if (repmsg == NULL)
- error = -ENOMEM;
- }
-
- if (error == 0) {
- lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
- msg, PTL_OK);
- lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
- } else {
- lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
- (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
- }
+ case KTX_GETTING: /* optimized GET sent & REPLY received */
+ /* Complete the GET with success since we can't avoid
+ * delivering a REPLY event; we committed to it when we
+ * launched the GET */
+ lib_finalize (&kqswnal_lib, NULL,
+ (lib_msg_t *)ktx->ktx_args[1], PTL_OK);
+ lib_finalize (&kqswnal_lib, NULL,
+ (lib_msg_t *)ktx->ktx_args[2],
+ (error == 0) ? PTL_OK : PTL_FAIL);
break;
default:
kqswnal_notify_peer_down(ktx);
status = -EHOSTDOWN;
- } else if (ktx->ktx_state == KTX_GETTING) {
- /* RPC completed OK; what did our peer put in the status
+ } else switch (ktx->ktx_state) {
+
+ case KTX_GETTING:
+ case KTX_PUTTING:
+ /* RPC completed OK; but what did our peer put in the status
* block? */
#if MULTIRAIL_EKC
status = ep_txd_statusblk(txd)->Data[0];
#else
status = ep_txd_statusblk(txd)->Status;
#endif
- } else {
+ break;
+
+ case KTX_FORWARDING:
+ case KTX_SENDING:
status = 0;
+ break;
+
+ default:
+ LBUG();
+ break;
}
kqswnal_tx_done (ktx, status);
return (-ESHUTDOWN);
LASSERT (dest >= 0); /* must be a peer */
- if (ktx->ktx_state == KTX_GETTING) {
- /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The
- * other frags are the GET sink which we obviously don't
- * send here :) */
-#if MULTIRAIL_EKC
+
+ switch (ktx->ktx_state) {
+ case KTX_GETTING:
+ case KTX_PUTTING:
+ /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
+ * The other frags are the payload, awaiting RDMA */
rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
ktx->ktx_port, attr,
kqswnal_txhandler, ktx,
NULL, ktx->ktx_frags, 1);
-#else
- rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
- ktx->ktx_port, attr, kqswnal_txhandler,
- ktx, NULL, ktx->ktx_frags, 1);
-#endif
- } else {
+ break;
+
+ case KTX_FORWARDING:
+ case KTX_SENDING:
#if MULTIRAIL_EKC
rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
ktx->ktx_port, attr,
kqswnal_txhandler, ktx,
ktx->ktx_frags, ktx->ktx_nfrag);
#endif
+ break;
+
+ default:
+ LBUG();
+ rc = -EINVAL; /* no compiler warning please */
+ break;
}
switch (rc) {
}
}
+#if 0
static char *
hdr_type_string (ptl_hdr_t *hdr)
{
}
} /* end of print_hdr() */
+#endif
#if !MULTIRAIL_EKC
void
CERROR ("DATAVEC too small\n");
return (-E2BIG);
}
+#else
+int
+kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
+ int nrfrag, EP_NMD *rfrag)
+{
+ int i;
+
+ if (nlfrag != nrfrag) {
+ CERROR("Can't cope with unequal # frags: %d local %d remote\n",
+ nlfrag, nrfrag);
+ return (-EINVAL);
+ }
+
+ for (i = 0; i < nlfrag; i++)
+ if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
+ CERROR("Can't cope with unequal frags %d(%d):"
+ " %d local %d remote\n",
+ i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
+ return (-EINVAL);
+ }
+
+ return (0);
+}
#endif
-int
-kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
- struct iovec *iov, ptl_kiov_t *kiov,
- int offset, int nob)
+kqswnal_remotemd_t *
+kqswnal_parse_rmd (kqswnal_rx_t *krx, int type, ptl_nid_t expected_nid)
{
- kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
+ ptl_hdr_t *hdr = (ptl_hdr_t *)buffer;
kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
- int rc;
-#if MULTIRAIL_EKC
- int i;
-#else
- EP_DATAVEC datav[EP_MAXFRAG];
- int ndatav;
-#endif
- LASSERT (krx->krx_rpc_reply_needed);
- LASSERT ((iov == NULL) != (kiov == NULL));
+ ptl_nid_t nid = kqswnal_rx_nid(krx);
+
+ /* Note (1) lib_parse has already flipped hdr.
+ * (2) RDMA addresses are sent in native endian-ness. When
+ * EKC copes with different endian nodes, I'll fix this (and
+ * eat my hat :) */
+
+ LASSERT (krx->krx_nob >= sizeof(*hdr));
+
+ if (hdr->type != type) {
+ CERROR ("Unexpected optimized get/put type %d (%d expected)"
+ "from "LPX64"\n", hdr->type, type, nid);
+ return (NULL);
+ }
+
+ if (hdr->src_nid != nid) {
+ CERROR ("Unexpected optimized get/put source NID "
+ LPX64" from "LPX64"\n", hdr->src_nid, nid);
+ return (NULL);
+ }
+
+ LASSERT (nid == expected_nid);
- /* see kqswnal_sendmsg comment regarding endian-ness */
if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
/* msg too small to discover rmd size */
CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
- return (-EINVAL);
+ return (NULL);
}
-
+
if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
/* rmd doesn't fit in the incoming message */
CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
krx->krx_nob, rmd->kqrmd_nfrag,
(int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
- return (-EINVAL);
+ return (NULL);
}
- /* Map the source data... */
+ return (rmd);
+}
+
+void
+kqswnal_rdma_store_complete (EP_RXD *rxd)
+{
+ int status = ep_rxd_status(rxd);
+ kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+ kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+
+ CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+ "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+ LASSERT (ktx->ktx_state == KTX_RDMAING);
+ LASSERT (krx->krx_rxd == rxd);
+ LASSERT (krx->krx_rpc_reply_needed);
+
+ krx->krx_rpc_reply_needed = 0;
+ kqswnal_rx_decref (krx);
+
+ /* free ktx & finalize() its lib_msg_t */
+ kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
+}
+
+void
+kqswnal_rdma_fetch_complete (EP_RXD *rxd)
+{
+ /* Completed fetching the PUT data */
+ int status = ep_rxd_status(rxd);
+ kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+ kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+ unsigned long flags;
+
+ CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+ "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+ LASSERT (ktx->ktx_state == KTX_RDMAING);
+ LASSERT (krx->krx_rxd == rxd);
+ LASSERT (krx->krx_rpc_reply_needed);
+
+ /* Set the RPC completion status */
+ status = (status == EP_SUCCESS) ? 0 : -ECONNABORTED;
+ krx->krx_rpc_reply_status = status;
+
+ /* free ktx & finalize() its lib_msg_t */
+ kqswnal_tx_done(ktx, status);
+
+ if (!in_interrupt()) {
+ /* OK to complete the RPC now (iff I had the last ref) */
+ kqswnal_rx_decref (krx);
+ return;
+ }
+
+ LASSERT (krx->krx_state == KRX_PARSE);
+ krx->krx_state = KRX_COMPLETING;
+
+ /* Complete the RPC in thread context */
+ spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
+
+ list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
+ wake_up (&kqswnal_data.kqn_sched_waitq);
+
+ spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+}
+
+int
+kqswnal_rdma (kqswnal_rx_t *krx, lib_msg_t *libmsg, int type,
+ int niov, struct iovec *iov, ptl_kiov_t *kiov,
+ size_t offset, size_t len)
+{
+ kqswnal_remotemd_t *rmd;
+ kqswnal_tx_t *ktx;
+ int eprc;
+ int rc;
+#if !MULTIRAIL_EKC
+ EP_DATAVEC datav[EP_MAXFRAG];
+ int ndatav;
+#endif
+
+ LASSERT (type == PTL_MSG_GET || type == PTL_MSG_PUT);
+ /* Not both mapped and paged payload */
+ LASSERT (iov == NULL || kiov == NULL);
+ /* RPC completes with failure by default */
+ LASSERT (krx->krx_rpc_reply_needed);
+ LASSERT (krx->krx_rpc_reply_status != 0);
+
+ rmd = kqswnal_parse_rmd(krx, type, libmsg->ev.initiator.nid);
+ if (rmd == NULL)
+ return (-EPROTO);
+
+ if (len == 0) {
+ /* data got truncated to nothing. */
+ lib_finalize(&kqswnal_lib, krx, libmsg, PTL_OK);
+ /* Let kqswnal_rx_done() complete the RPC with success */
+ krx->krx_rpc_reply_status = 0;
+ return (0);
+ }
+
+ /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
+ actually sending a portals message with it */
+ ktx = kqswnal_get_idle_tx(NULL, 0);
+ if (ktx == NULL) {
+ CERROR ("Can't get txd for RDMA with "LPX64"\n",
+ libmsg->ev.initiator.nid);
+ return (-ENOMEM);
+ }
+
+ ktx->ktx_state = KTX_RDMAING;
+ ktx->ktx_nid = libmsg->ev.initiator.nid;
+ ktx->ktx_args[0] = krx;
+ ktx->ktx_args[1] = libmsg;
+
+ /* Start mapping at offset 0 (we're not mapping any headers) */
ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
+
if (kiov != NULL)
- rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
+ rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
else
- rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
+ rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
if (rc != 0) {
- CERROR ("Can't map source data: %d\n", rc);
- return (rc);
+ CERROR ("Can't map local RDMA data: %d\n", rc);
+ goto out;
}
#if MULTIRAIL_EKC
- if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
- CERROR("Can't cope with unequal # frags: %d local %d remote\n",
- ktx->ktx_nfrag, rmd->kqrmd_nfrag);
- return (-EINVAL);
+ rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
+ rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+ if (rc != 0) {
+ CERROR ("Incompatible RDMA descriptors\n");
+ goto out;
}
-
- for (i = 0; i < rmd->kqrmd_nfrag; i++)
- if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
- CERROR("Can't cope with unequal frags %d(%d):"
- " %d local %d remote\n",
- i, rmd->kqrmd_nfrag,
- ktx->ktx_frags[i].nmd_len,
- rmd->kqrmd_frag[i].nmd_len);
- return (-EINVAL);
- }
#else
- ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
- ktx->ktx_nfrag, ktx->ktx_frags,
- rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+ switch (type) {
+ default:
+ LBUG();
+
+ case PTL_MSG_GET:
+ ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
+ ktx->ktx_nfrag, ktx->ktx_frags,
+ rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+ break;
+
+ case PTL_MSG_PUT:
+ ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
+ rmd->kqrmd_nfrag, rmd->kqrmd_frag,
+ ktx->ktx_nfrag, ktx->ktx_frags);
+ break;
+ }
+
if (ndatav < 0) {
CERROR ("Can't create datavec: %d\n", ndatav);
- return (ndatav);
+ rc = ndatav;
+ goto out;
}
#endif
- /* Our caller will start to race with kqswnal_dma_reply_complete... */
- LASSERT (atomic_read (&krx->krx_refcount) == 1);
- atomic_set (&krx->krx_refcount, 2);
+ LASSERT (atomic_read(&krx->krx_refcount) > 0);
+ /* Take an extra ref for the completion callback */
+ atomic_inc(&krx->krx_refcount);
-#if MULTIRAIL_EKC
- rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
- &kqswnal_rpc_success,
- ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
- if (rc == EP_SUCCESS)
- return (0);
+ switch (type) {
+ default:
+ LBUG();
- /* Well we tried... */
- krx->krx_rpc_reply_needed = 0;
+ case PTL_MSG_GET:
+#if MULTIRAIL_EKC
+ eprc = ep_complete_rpc(krx->krx_rxd,
+ kqswnal_rdma_store_complete, ktx,
+ &kqswnal_data.kqn_rpc_success,
+ ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
#else
- rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
- &kqswnal_rpc_success, datav, ndatav);
- if (rc == EP_SUCCESS)
- return (0);
-
- /* "old" EKC destroys rxd on failed completion */
- krx->krx_rxd = NULL;
+ eprc = ep_complete_rpc (krx->krx_rxd,
+ kqswnal_rdma_store_complete, ktx,
+ &kqswnal_data.kqn_rpc_success,
+ datav, ndatav);
+ if (eprc != EP_SUCCESS) /* "old" EKC destroys rxd on failed completion */
+ krx->krx_rxd = NULL;
#endif
+ if (eprc != EP_SUCCESS) {
+ CERROR("can't complete RPC: %d\n", eprc);
+ /* don't re-attempt RPC completion */
+ krx->krx_rpc_reply_needed = 0;
+ rc = -ECONNABORTED;
+ }
+ break;
+
+ case PTL_MSG_PUT:
+#if MULTIRAIL_EKC
+ eprc = ep_rpc_get (krx->krx_rxd,
+ kqswnal_rdma_fetch_complete, ktx,
+ rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
+#else
+ eprc = ep_rpc_get (krx->krx_rxd,
+ kqswnal_rdma_fetch_complete, ktx,
+ datav, ndatav);
+#endif
+ if (eprc != EP_SUCCESS) {
+ CERROR("ep_rpc_get failed: %d\n", eprc);
+ rc = -ECONNABORTED;
+ }
+ break;
+ }
- CERROR("can't complete RPC: %d\n", rc);
-
- /* reset refcount back to 1: we're not going to be racing with
- * kqswnal_dma_reply_complete. */
- atomic_set (&krx->krx_refcount, 1);
+ out:
+ if (rc != 0) {
+ kqswnal_rx_decref(krx); /* drop callback's ref */
+ kqswnal_put_idle_tx (ktx);
+ }
- return (-ECONNABORTED);
+ atomic_dec(&kqswnal_data.kqn_pending_txs);
+ return (rc);
}
static ptl_err_t
-kqswnal_sendmsg (nal_cb_t *nal,
+kqswnal_sendmsg (lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
ptl_hdr_t *hdr,
int sumoff;
int sumnob;
#endif
+ /* NB 1. hdr is in network byte order */
+ /* 2. 'private' depends on the message type */
CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
" pid %u\n", payload_nob, payload_niov, nid, pid);
return (PTL_FAIL);
}
+ if (type == PTL_MSG_REPLY && /* can I look in 'private' */
+ ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) { /* is it an RPC */
+ /* Must be a REPLY for an optimized GET */
+ rc = kqswnal_rdma ((kqswnal_rx_t *)private, libmsg, PTL_MSG_GET,
+ payload_niov, payload_iov, payload_kiov,
+ payload_offset, payload_nob);
+ return ((rc == 0) ? PTL_OK : PTL_FAIL);
+ }
+
targetnid = nid;
if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */
rc = kpr_lookup (&kqswnal_data.kqn_router, nid,
type == PTL_MSG_REPLY ||
in_interrupt()));
if (ktx == NULL) {
- kqswnal_cerror_hdr (hdr);
+ CERROR ("Can't get txd for msg type %d for "LPX64"\n",
+ type, libmsg->ev.initiator.nid);
return (PTL_NO_SPACE);
}
+ ktx->ktx_state = KTX_SENDING;
ktx->ktx_nid = targetnid;
ktx->ktx_args[0] = private;
ktx->ktx_args[1] = libmsg;
-
- if (type == PTL_MSG_REPLY &&
- ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
- if (nid != targetnid ||
- kqswnal_nid2elanid(nid) !=
- ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
- CERROR("Optimized reply nid conflict: "
- "nid "LPX64" via "LPX64" elanID %d\n",
- nid, targetnid,
- ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
- rc = -EINVAL;
- goto out;
- }
-
- /* peer expects RPC completion with GET data */
- rc = kqswnal_dma_reply (ktx, payload_niov,
- payload_iov, payload_kiov,
- payload_offset, payload_nob);
- if (rc != 0)
- CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
- goto out;
- }
+ ktx->ktx_args[2] = NULL; /* set when a GET commits to REPLY */
memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
#endif
- if (kqswnal_tunables.kqn_optimized_gets &&
- type == PTL_MSG_GET && /* doing a GET */
- nid == targetnid) { /* not forwarding */
+ /* The first frag will be the pre-mapped buffer for (at least) the
+ * portals header. */
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+
+ if (nid == targetnid && /* not forwarding */
+ ((type == PTL_MSG_GET && /* optimize GET? */
+ kqswnal_tunables.kqn_optimized_gets != 0 &&
+ NTOH__u32(hdr->msg.get.sink_length) >= kqswnal_tunables.kqn_optimized_gets) ||
+ (type == PTL_MSG_PUT && /* optimize PUT? */
+ kqswnal_tunables.kqn_optimized_puts != 0 &&
+ payload_nob >= kqswnal_tunables.kqn_optimized_puts))) {
lib_md_t *md = libmsg->md;
kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
- /* Optimised path: I send over the Elan vaddrs of the get
- * sink buffers, and my peer DMAs directly into them.
+ /* Optimised path: I send over the Elan vaddrs of the local
+ * buffers, and my peer DMAs directly to/from them.
*
* First I set up ktx as if it was going to send this
* payload, (it needs to map it anyway). This fills
* ktx_frags[1] and onward with the network addresses
* of the GET sink frags. I copy these into ktx_buffer,
- * immediately after the header, and send that as my GET
- * message.
- *
- * Note that the addresses are sent in native endian-ness.
- * When EKC copes with different endian nodes, I'll fix
- * this (and eat my hat :) */
+ * immediately after the header, and send that as my
+ * message. */
- ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
- ktx->ktx_state = KTX_GETTING;
+ ktx->ktx_state = (type == PTL_MSG_PUT) ? KTX_PUTTING : KTX_GETTING;
if ((libmsg->md->options & PTL_MD_KIOV) != 0)
rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
#endif
+ if (type == PTL_MSG_GET) {
+ /* Allocate reply message now while I'm in thread context */
+ ktx->ktx_args[2] = lib_create_reply_msg (&kqswnal_lib,
+ nid, libmsg);
+ if (ktx->ktx_args[2] == NULL)
+ goto out;
+
+ /* NB finalizing the REPLY message is my
+ * responsibility now, whatever happens. */
+ }
+
} else if (payload_nob <= KQSW_TX_MAXCONTIG) {
/* small message: single frag copied into the pre-mapped buffer */
- ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
- ktx->ktx_state = KTX_SENDING;
#if MULTIRAIL_EKC
ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
0, KQSW_HDR_SIZE + payload_nob);
/* large message: multiple frags: first is hdr in pre-mapped buffer */
- ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
- ktx->ktx_state = KTX_SENDING;
#if MULTIRAIL_EKC
ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
0, KQSW_HDR_SIZE);
rc == 0 ? "Sent" : "Failed to send",
payload_nob, nid, targetnid, rc);
- if (rc != 0)
+ if (rc != 0) {
+ if (ktx->ktx_state == KTX_GETTING &&
+ ktx->ktx_args[2] != NULL) {
+ /* We committed to reply, but there was a problem
+ * launching the GET. We can't avoid delivering a
+ * REPLY event since we committed above, so we
+ * pretend the GET succeeded but the REPLY
+ * failed. */
+ rc = 0;
+ lib_finalize (&kqswnal_lib, private, libmsg, PTL_OK);
+ lib_finalize (&kqswnal_lib, private,
+ (lib_msg_t *)ktx->ktx_args[2], PTL_FAIL);
+ }
+
kqswnal_put_idle_tx (ktx);
-
+ }
+
atomic_dec(&kqswnal_data.kqn_pending_txs);
return (rc == 0 ? PTL_OK : PTL_FAIL);
}
static ptl_err_t
-kqswnal_send (nal_cb_t *nal,
+kqswnal_send (lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
ptl_hdr_t *hdr,
}
static ptl_err_t
-kqswnal_send_pages (nal_cb_t *nal,
+kqswnal_send_pages (lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
ptl_hdr_t *hdr,
if (ktx == NULL) /* can't get txd right now */
return; /* fwd will be scheduled when tx desc freed */
- if (nid == kqswnal_lib.ni.nid) /* gateway is me */
+ if (nid == kqswnal_lib.libnal_ni.ni_pid.nid) /* gateway is me */
nid = fwd->kprfd_target_nid; /* target is final dest */
if (kqswnal_nid2elanid (nid) < 0) {
if (rc != 0) {
CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
- kqswnal_put_idle_tx (ktx);
/* complete now (with failure) */
- kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+ kqswnal_tx_done (ktx, rc);
}
atomic_dec(&kqswnal_data.kqn_pending_txs);
NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
}
- kqswnal_requeue_rx (krx);
+ LASSERT (atomic_read(&krx->krx_refcount) == 1);
+ kqswnal_rx_decref (krx);
}
void
-kqswnal_dma_reply_complete (EP_RXD *rxd)
+kqswnal_requeue_rx (kqswnal_rx_t *krx)
{
- int status = ep_rxd_status(rxd);
- kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
- kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
- lib_msg_t *msg = (lib_msg_t *)ktx->ktx_args[1];
-
- CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
- "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+ LASSERT (atomic_read(&krx->krx_refcount) == 0);
+ LASSERT (!krx->krx_rpc_reply_needed);
- LASSERT (krx->krx_rxd == rxd);
- LASSERT (krx->krx_rpc_reply_needed);
+ krx->krx_state = KRX_POSTED;
- krx->krx_rpc_reply_needed = 0;
- kqswnal_rx_done (krx);
+#if MULTIRAIL_EKC
+ if (kqswnal_data.kqn_shuttingdown) {
+ /* free EKC rxd on shutdown */
+ ep_complete_receive(krx->krx_rxd);
+ } else {
+ /* repost receive */
+ ep_requeue_receive(krx->krx_rxd,
+ kqswnal_rxhandler, krx,
+ &krx->krx_elanbuffer, 0);
+ }
+#else
+ if (kqswnal_data.kqn_shuttingdown)
+ return;
- lib_finalize (&kqswnal_lib, NULL, msg,
- (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
- kqswnal_put_idle_tx (ktx);
+ if (krx->krx_rxd == NULL) {
+ /* We had a failed ep_complete_rpc() which nukes the
+ * descriptor in "old" EKC */
+ int eprc = ep_queue_receive(krx->krx_eprx,
+ kqswnal_rxhandler, krx,
+ krx->krx_elanbuffer,
+ krx->krx_npages * PAGE_SIZE, 0);
+ LASSERT (eprc == EP_SUCCESS);
+ /* We don't handle failure here; it's incredibly rare
+ * (never reported?) and only happens with "old" EKC */
+ } else {
+ ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+ krx->krx_elanbuffer,
+ krx->krx_npages * PAGE_SIZE);
+ }
+#endif
}
void
}
void
-kqswnal_requeue_rx (kqswnal_rx_t *krx)
+kqswnal_rx_done (kqswnal_rx_t *krx)
{
- int rc;
+ int rc;
+ EP_STATUSBLK *sblk;
LASSERT (atomic_read(&krx->krx_refcount) == 0);
if (krx->krx_rpc_reply_needed) {
+ /* We've not completed the peer's RPC yet... */
+ sblk = (krx->krx_rpc_reply_status == 0) ?
+ &kqswnal_data.kqn_rpc_success :
+ &kqswnal_data.kqn_rpc_failed;
- /* We failed to complete the peer's optimized GET (e.g. we
- * couldn't map the source buffers). We complete the
- * peer's EKC rpc now with failure. */
+ LASSERT (!in_interrupt());
#if MULTIRAIL_EKC
- rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
- &kqswnal_rpc_failed, NULL, NULL, 0);
+ rc = ep_complete_rpc(krx->krx_rxd,
+ kqswnal_rpc_complete, krx,
+ sblk, NULL, NULL, 0);
if (rc == EP_SUCCESS)
return;
-
- CERROR("can't complete RPC: %d\n", rc);
#else
- if (krx->krx_rxd != NULL) {
- /* We didn't try (and fail) to complete earlier... */
- rc = ep_complete_rpc(krx->krx_rxd,
- kqswnal_rpc_complete, krx,
- &kqswnal_rpc_failed, NULL, 0);
- if (rc == EP_SUCCESS)
- return;
-
- CERROR("can't complete RPC: %d\n", rc);
- }
-
- /* NB the old ep_complete_rpc() frees rxd on failure, so we
- * have to requeue from scratch here, unless we're shutting
- * down */
- if (kqswnal_data.kqn_shuttingdown)
+ rc = ep_complete_rpc(krx->krx_rxd,
+ kqswnal_rpc_complete, krx,
+ sblk, NULL, 0);
+ if (rc == EP_SUCCESS)
return;
- rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
- krx->krx_elanbuffer,
- krx->krx_npages * PAGE_SIZE, 0);
- LASSERT (rc == EP_SUCCESS);
- /* We don't handle failure here; it's incredibly rare
- * (never reported?) and only happens with "old" EKC */
- return;
+ /* "old" EKC destroys rxd on failed completion */
+ krx->krx_rxd = NULL;
#endif
+ CERROR("can't complete RPC: %d\n", rc);
+ krx->krx_rpc_reply_needed = 0;
}
-#if MULTIRAIL_EKC
- if (kqswnal_data.kqn_shuttingdown) {
- /* free EKC rxd on shutdown */
- ep_complete_receive(krx->krx_rxd);
- } else {
- /* repost receive */
- ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
- &krx->krx_elanbuffer, 0);
- }
-#else
- /* don't actually requeue on shutdown */
- if (!kqswnal_data.kqn_shuttingdown)
- ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
- krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
-#endif
+ kqswnal_requeue_rx(krx);
}
void
-kqswnal_rx (kqswnal_rx_t *krx)
+kqswnal_parse (kqswnal_rx_t *krx)
{
ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid);
int nob;
int niov;
- LASSERT (atomic_read(&krx->krx_refcount) == 0);
+ LASSERT (atomic_read(&krx->krx_refcount) == 1);
+
+ if (dest_nid == kqswnal_lib.libnal_ni.ni_pid.nid) { /* It's for me :) */
+ /* I ignore parse errors since I'm not consuming a byte
+ * stream */
+ (void)lib_parse (&kqswnal_lib, hdr, krx);
- if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
- atomic_set(&krx->krx_refcount, 1);
- lib_parse (&kqswnal_lib, hdr, krx);
- kqswnal_rx_done(krx);
+ /* Drop my ref; any RDMA activity takes an additional ref */
+ kqswnal_rx_decref(krx);
return;
}
#if KQSW_CHECKSUM
- CERROR ("checksums for forwarded packets not implemented\n");
- LBUG ();
+ LASSERTF (0, "checksums for forwarded packets not implemented\n");
#endif
+
if (kqswnal_nid2elanid (dest_nid) >= 0) /* should have gone direct to peer */
{
CERROR("dropping packet from "LPX64" for "LPX64
": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
- kqswnal_requeue_rx (krx);
+ kqswnal_rx_decref (krx);
return;
}
rxd, krx, nob, status);
LASSERT (krx != NULL);
-
+ LASSERT (krx->krx_state = KRX_POSTED);
+
+ krx->krx_state = KRX_PARSE;
krx->krx_rxd = rxd;
krx->krx_nob = nob;
#if MULTIRAIL_EKC
#else
krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
#endif
-
+ /* Default to failure if an RPC reply is requested but not handled */
+ krx->krx_rpc_reply_status = -EPROTO;
+ atomic_set (&krx->krx_refcount, 1);
+
/* must receive a whole header to be able to parse */
if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
{
CERROR("receive status failed with status %d nob %d\n",
ep_rxd_status(rxd), nob);
#endif
- kqswnal_requeue_rx (krx);
+ kqswnal_rx_decref(krx);
return;
}
if (!in_interrupt()) {
- kqswnal_rx (krx);
+ kqswnal_parse(krx);
return;
}
#endif
static ptl_err_t
-kqswnal_recvmsg (nal_cb_t *nal,
+kqswnal_recvmsg (lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
unsigned int niov,
{
kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
char *buffer = page_address(krx->krx_kiov[0].kiov_page);
+ ptl_hdr_t *hdr = (ptl_hdr_t *)buffer;
int page;
char *page_ptr;
int page_nob;
char *iov_ptr;
int iov_nob;
int frag;
+ int rc;
#if KQSW_CHECKSUM
kqsw_csum_t senders_csum;
kqsw_csum_t payload_csum = 0;
- kqsw_csum_t hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
+ kqsw_csum_t hdr_csum = kqsw_csum(0, hdr, sizeof(*hdr));
size_t csum_len = mlen;
int csum_frags = 0;
int csum_nob = 0;
if (senders_csum != hdr_csum)
kqswnal_csum_error (krx, 1);
#endif
+ /* NB lib_parse() has already flipped *hdr */
+
CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
+ if (krx->krx_rpc_reply_needed &&
+ hdr->type == PTL_MSG_PUT) {
+ /* This must be an optimized PUT */
+ rc = kqswnal_rdma (krx, libmsg, PTL_MSG_PUT,
+ niov, iov, kiov, offset, mlen);
+ return (rc == 0 ? PTL_OK : PTL_FAIL);
+ }
+
/* What was actually received must be >= payload. */
LASSERT (mlen <= rlen);
if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
}
static ptl_err_t
-kqswnal_recv(nal_cb_t *nal,
+kqswnal_recv(lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
unsigned int niov,
}
static ptl_err_t
-kqswnal_recv_pages (nal_cb_t *nal,
+kqswnal_recv_pages (lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
unsigned int niov,
spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
flags);
- kqswnal_rx (krx);
+ switch (krx->krx_state) {
+ case KRX_PARSE:
+ kqswnal_parse (krx);
+ break;
+ case KRX_COMPLETING:
+ /* Drop last ref to reply to RPC and requeue */
+ LASSERT (krx->krx_rpc_reply_needed);
+ kqswnal_rx_decref (krx);
+ break;
+ default:
+ LBUG();
+ }
did_something = 1;
spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
return (0);
}
-nal_cb_t kqswnal_lib =
+lib_nal_t kqswnal_lib =
{
- nal_data: &kqswnal_data, /* NAL private data */
- cb_send: kqswnal_send,
- cb_send_pages: kqswnal_send_pages,
- cb_recv: kqswnal_recv,
- cb_recv_pages: kqswnal_recv_pages,
- cb_read: kqswnal_read,
- cb_write: kqswnal_write,
- cb_malloc: kqswnal_malloc,
- cb_free: kqswnal_free,
- cb_printf: kqswnal_printf,
- cb_cli: kqswnal_cli,
- cb_sti: kqswnal_sti,
- cb_callback: kqswnal_callback,
- cb_dist: kqswnal_dist
+ libnal_data: &kqswnal_data, /* NAL private data */
+ libnal_send: kqswnal_send,
+ libnal_send_pages: kqswnal_send_pages,
+ libnal_recv: kqswnal_recv,
+ libnal_recv_pages: kqswnal_recv_pages,
+ libnal_dist: kqswnal_dist
};