#include "qswnal.h"
+EP_STATUSBLK kqswnal_rpc_success;
+EP_STATUSBLK kqswnal_rpc_failed;
+
/*
* LIB functions follow
*
void
kqswnal_unmap_tx (kqswnal_tx_t *ktx)
{
+#if MULTIRAIL_EKC
+ int i;
+#endif
+
if (ktx->ktx_nmappedpages == 0)
return;
-
+
+#if MULTIRAIL_EKC
+ CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
+ ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
+
+ for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
+ ep_dvma_unload(kqswnal_data.kqn_ep,
+ kqswnal_data.kqn_ep_tx_nmh,
+ &ktx->ktx_frags[i]);
+#else
CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
- elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
+ elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
kqswnal_data.kqn_eptxdmahandle,
ktx->ktx_basepage, ktx->ktx_nmappedpages);
+
+#endif
ktx->ktx_nmappedpages = 0;
}
int maxmapped = ktx->ktx_npages;
uint32_t basepage = ktx->ktx_basepage + nmapped;
char *ptr;
+#if MULTIRAIL_EKC
+ EP_RAILMASK railmask;
+ int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+ EP_RAILMASK_ALL,
+ kqswnal_nid2elanid(ktx->ktx_nid));
+ if (rail < 0) {
+ CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
+ return (-ENETDOWN);
+ }
+ railmask = 1 << rail;
+#endif
LASSERT (nmapped <= maxmapped);
+ LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
LASSERT (nfrags <= EP_MAXFRAG);
LASSERT (niov > 0);
LASSERT (nob > 0);
-
+
do {
int fraglen = kiov->kiov_len;
"%p[%d] loading %p for %d, page %d, %d total\n",
ktx, nfrags, ptr, fraglen, basepage, nmapped);
- elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
+#if MULTIRAIL_EKC
+ ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+ ptr, fraglen,
+ kqswnal_data.kqn_ep_tx_nmh, basepage,
+ &railmask, &ktx->ktx_frags[nfrags]);
+
+ if (nfrags == ktx->ktx_firsttmpfrag ||
+ !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
+ &ktx->ktx_frags[nfrags - 1],
+ &ktx->ktx_frags[nfrags])) {
+ /* new frag if this is the first or can't merge */
+ nfrags++;
+ }
+#else
+ elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
kqswnal_data.kqn_eptxdmahandle,
ptr, fraglen,
- basepage, &ktx->ktx_frags.iov[nfrags].Base);
-
- kunmap (kiov->kiov_page);
-
- /* keep in loop for failure case */
- ktx->ktx_nmappedpages = nmapped;
+ basepage, &ktx->ktx_frags[nfrags].Base);
if (nfrags > 0 && /* previous frag mapped */
- ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
- (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
+ ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
+ (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
/* just extend previous */
- ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
+ ktx->ktx_frags[nfrags - 1].Len += fraglen;
else {
- ktx->ktx_frags.iov[nfrags].Len = fraglen;
+ ktx->ktx_frags[nfrags].Len = fraglen;
nfrags++; /* new frag */
}
+#endif
+
+ kunmap (kiov->kiov_page);
+
+ /* keep in loop for failure case */
+ ktx->ktx_nmappedpages = nmapped;
basepage++;
kiov++;
int nmapped = ktx->ktx_nmappedpages;
int maxmapped = ktx->ktx_npages;
uint32_t basepage = ktx->ktx_basepage + nmapped;
-
+#if MULTIRAIL_EKC
+ EP_RAILMASK railmask;
+ int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+ EP_RAILMASK_ALL,
+ kqswnal_nid2elanid(ktx->ktx_nid));
+
+ if (rail < 0) {
+ CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
+ return (-ENETDOWN);
+ }
+ railmask = 1 << rail;
+#endif
LASSERT (nmapped <= maxmapped);
+ LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
LASSERT (nfrags <= EP_MAXFRAG);
LASSERT (niov > 0);
LASSERT (nob > 0);
ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
nmapped);
- elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
+#if MULTIRAIL_EKC
+ ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+ iov->iov_base, fraglen,
+ kqswnal_data.kqn_ep_tx_nmh, basepage,
+ &railmask, &ktx->ktx_frags[nfrags]);
+
+ if (nfrags == ktx->ktx_firsttmpfrag ||
+ !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
+ &ktx->ktx_frags[nfrags - 1],
+ &ktx->ktx_frags[nfrags])) {
+ /* new frag if this is the first or can't merge */
+ nfrags++;
+ }
+#else
+ elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
kqswnal_data.kqn_eptxdmahandle,
iov->iov_base, fraglen,
- basepage, &ktx->ktx_frags.iov[nfrags].Base);
- /* keep in loop for failure case */
- ktx->ktx_nmappedpages = nmapped;
+ basepage, &ktx->ktx_frags[nfrags].Base);
if (nfrags > 0 && /* previous frag mapped */
- ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
- (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
+ ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
+ (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
/* just extend previous */
- ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
+ ktx->ktx_frags[nfrags - 1].Len += fraglen;
else {
- ktx->ktx_frags.iov[nfrags].Len = fraglen;
+ ktx->ktx_frags[nfrags].Len = fraglen;
nfrags++; /* new frag */
}
+#endif
+
+ /* keep in loop for failure case */
+ ktx->ktx_nmappedpages = nmapped;
basepage += npages;
iov++;
break;
case KTX_GETTING: /* Peer has DMA-ed direct? */
- LASSERT (KQSW_OPTIMIZE_GETS);
msg = (lib_msg_t *)ktx->ktx_args[1];
repmsg = NULL;
CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
- if (status != EP_SUCCESS)
- {
+ if (status != EP_SUCCESS) {
+
CERROR ("Tx completion to "LPX64" failed: %d\n",
ktx->ktx_nid, status);
} else if (ktx->ktx_state == KTX_GETTING) {
/* RPC completed OK; what did our peer put in the status
* block? */
- LASSERT (KQSW_OPTIMIZE_GETS);
+#if MULTIRAIL_EKC
+ status = ep_txd_statusblk(txd)->Data[0];
+#else
status = ep_txd_statusblk(txd)->Status;
+#endif
} else {
status = 0;
}
LASSERT (dest >= 0); /* must be a peer */
if (ktx->ktx_state == KTX_GETTING) {
- LASSERT (KQSW_OPTIMIZE_GETS);
+ /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The
+ * other frags are the GET sink which we obviously don't
+ * send here :) */
+#if MULTIRAIL_EKC
+ rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
+ ktx->ktx_port, attr,
+ kqswnal_txhandler, ktx,
+ NULL, ktx->ktx_frags, 1);
+#else
rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
ktx->ktx_port, attr, kqswnal_txhandler,
- ktx, NULL, ktx->ktx_frags.iov, ktx->ktx_nfrag);
+ ktx, NULL, ktx->ktx_frags, 1);
+#endif
} else {
+#if MULTIRAIL_EKC
+ rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
+ ktx->ktx_port, attr,
+ kqswnal_txhandler, ktx,
+ NULL, ktx->ktx_frags, ktx->ktx_nfrag);
+#else
rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
- ktx->ktx_port, attr, kqswnal_txhandler,
- ktx, ktx->ktx_frags.iov, ktx->ktx_nfrag);
+ ktx->ktx_port, attr,
+ kqswnal_txhandler, ktx,
+ ktx->ktx_frags, ktx->ktx_nfrag);
+#endif
}
switch (rc) {
- case ESUCCESS: /* success */
+ case EP_SUCCESS: /* success */
return (0);
- case ENOMEM: /* can't allocate ep txd => queue for later */
+ case EP_ENOMEM: /* can't allocate ep txd => queue for later */
LASSERT (in_interrupt());
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
default: /* fatal error */
CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
kqswnal_notify_peer_down(ktx);
- return (rc);
+ return (-EHOSTUNREACH);
}
}
} /* end of print_hdr() */
+#if !MULTIRAIL_EKC
void
kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov)
{
CERROR ("DATAVEC too small\n");
return (-E2BIG);
}
+#endif
int
kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
char *buffer = (char *)page_address(krx->krx_pages[0]);
kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
- EP_IOVEC eiov[EP_MAXFRAG];
- EP_STATUSBLK blk;
int rc;
-
- LASSERT (ep_rxd_isrpc(krx->krx_rxd) && !krx->krx_rpc_completed);
+#if MULTIRAIL_EKC
+ int i;
+#else
+ EP_DATAVEC datav[EP_MAXFRAG];
+ int ndatav;
+#endif
+ LASSERT (krx->krx_rpc_reply_needed);
LASSERT ((iov == NULL) != (kiov == NULL));
- /* see .*_pack_k?iov comment regarding endian-ness */
+ /* see kqswnal_sendmsg comment regarding endian-ness */
if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
/* msg too small to discover rmd size */
CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
return (-EINVAL);
}
- if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) {
+ if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
/* rmd doesn't fit in the incoming message */
CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
- krx->krx_nob, rmd->kqrmd_neiov,
- (int)(((char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) - buffer));
+ krx->krx_nob, rmd->kqrmd_nfrag,
+ (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
return (-EINVAL);
}
- /* Ghastly hack part 1, uses the existing procedures to map the source data... */
- ktx->ktx_nfrag = 0;
+ /* Map the source data... */
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
if (kiov != NULL)
rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov);
else
return (rc);
}
- /* Ghastly hack part 2, copy out eiov so we can create the datav; Ugghh... */
- memcpy (eiov, ktx->ktx_frags.iov, ktx->ktx_nfrag * sizeof (eiov[0]));
-
- rc = kqswnal_eiovs2datav (EP_MAXFRAG, ktx->ktx_frags.datav,
- ktx->ktx_nfrag, eiov,
- rmd->kqrmd_neiov, rmd->kqrmd_eiov);
- if (rc < 0) {
- CERROR ("Can't create datavec: %d\n", rc);
- return (rc);
+#if MULTIRAIL_EKC
+ if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
+ CERROR("Can't cope with unequal # frags: %d local %d remote\n",
+ ktx->ktx_nfrag, rmd->kqrmd_nfrag);
+ return (-EINVAL);
}
- ktx->ktx_nfrag = rc;
-
- memset (&blk, 0, sizeof (blk)); /* zero blk.Status */
+
+ for (i = 0; i < rmd->kqrmd_nfrag; i++)
+ if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
+ CERROR("Can't cope with unequal frags %d(%d):"
+ " %d local %d remote\n",
+ i, rmd->kqrmd_nfrag,
+ ktx->ktx_frags[i].nmd_len,
+ rmd->kqrmd_frag[i].nmd_len);
+ return (-EINVAL);
+ }
+#else
+ ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
+ ktx->ktx_nfrag, ktx->ktx_frags,
+ rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+ if (ndatav < 0) {
+ CERROR ("Can't create datavec: %d\n", ndatav);
+ return (ndatav);
+ }
+#endif
- /* Our caller will start to race with kqswnal_rpc_complete... */
+ /* Our caller will start to race with kqswnal_dma_reply_complete... */
LASSERT (atomic_read (&krx->krx_refcount) == 1);
atomic_set (&krx->krx_refcount, 2);
- rc = ep_complete_rpc (krx->krx_rxd, kqswnal_reply_complete, ktx,
- &blk, ktx->ktx_frags.datav, ktx->ktx_nfrag);
- if (rc == ESUCCESS)
+#if MULTIRAIL_EKC
+ rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
+ &kqswnal_rpc_success,
+ ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
+ if (rc == EP_SUCCESS)
+ return (0);
+
+ /* Well we tried... */
+ krx->krx_rpc_reply_needed = 0;
+#else
+ rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
+ &kqswnal_rpc_success, datav, ndatav);
+ if (rc == EP_SUCCESS)
return (0);
+ /* "old" EKC destroys rxd on failed completion */
+ krx->krx_rxd = NULL;
+#endif
+
+ CERROR("can't complete RPC: %d\n", rc);
+
/* reset refcount back to 1: we're not going to be racing with
- * kqswnal_rely_complete. */
+ * kqswnal_dma_reply_complete. */
atomic_set (&krx->krx_refcount, 1);
+
return (-ECONNABORTED);
}
return (PTL_NOSPACE);
}
+ ktx->ktx_nid = targetnid;
ktx->ktx_args[0] = private;
ktx->ktx_args[1] = libmsg;
-#if KQSW_OPTIMIZE_GETS
if (type == PTL_MSG_REPLY &&
- ep_rxd_isrpc(((kqswnal_rx_t *)private)->krx_rxd)) {
+ ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
if (nid != targetnid ||
kqswnal_nid2elanid(nid) !=
ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
"nid "LPX64" via "LPX64" elanID %d\n",
nid, targetnid,
ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
- return(PTL_FAIL);
+ return (PTL_FAIL);
}
/* peer expects RPC completion with GET data */
payload_niov, payload_iov,
payload_kiov, payload_nob);
if (rc == 0)
- return (0);
+ return (PTL_OK);
CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
kqswnal_put_idle_tx (ktx);
return (PTL_FAIL);
}
-#endif
memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
#endif
- /* Set up first frag from pre-mapped buffer (it's at least the
- * portals header) */
- ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer;
- ktx->ktx_frags.iov[0].Len = KQSW_HDR_SIZE;
- ktx->ktx_nfrag = 1;
- ktx->ktx_state = KTX_SENDING; /* => lib_finalize() on completion */
-
-#if KQSW_OPTIMIZE_GETS
- if (type == PTL_MSG_GET && /* doing a GET */
+ if (kqswnal_data.kqn_optimized_gets &&
+ type == PTL_MSG_GET && /* doing a GET */
nid == targetnid) { /* not forwarding */
lib_md_t *md = libmsg->md;
kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
*
* First I set up ktx as if it was going to send this
* payload, (it needs to map it anyway). This fills
- * ktx_frags.iov[1] and onward with the network addresses
- * of the get sink frags. I copy these into ktx_buffer,
+ * ktx_frags[1] and onward with the network addresses
+ * of the GET sink frags. I copy these into ktx_buffer,
* immediately after the header, and send that as my GET
* message.
*
* When EKC copes with different endian nodes, I'll fix
* this (and eat my hat :) */
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+ ktx->ktx_state = KTX_GETTING;
+
if ((libmsg->md->options & PTL_MD_KIOV) != 0)
rc = kqswnal_map_tx_kiov (ktx, md->length,
md->md_niov, md->md_iov.kiov);
return (PTL_FAIL);
}
- rmd->kqrmd_neiov = ktx->ktx_nfrag - 1;
- memcpy (&rmd->kqrmd_eiov[0], &ktx->ktx_frags.iov[1],
- rmd->kqrmd_neiov * sizeof (EP_IOVEC));
+ rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
- ktx->ktx_nfrag = 1;
- ktx->ktx_frags.iov[0].Len += offsetof (kqswnal_remotemd_t,
- kqrmd_eiov[rmd->kqrmd_neiov]);
- payload_nob = ktx->ktx_frags.iov[0].Len;
- ktx->ktx_state = KTX_GETTING;
- } else
+ payload_nob = offsetof(kqswnal_remotemd_t,
+ kqrmd_frag[rmd->kqrmd_nfrag]);
+ LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
+
+#if MULTIRAIL_EKC
+ memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
+ rmd->kqrmd_nfrag * sizeof(EP_NMD));
+
+ ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+ 0, KQSW_HDR_SIZE + payload_nob);
+#else
+ memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
+ rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
+
+ ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+ ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
+#endif
+ } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
+
+ /* small message: single frag copied into the pre-mapped buffer */
+
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+ ktx->ktx_state = KTX_SENDING;
+#if MULTIRAIL_EKC
+ ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+ 0, KQSW_HDR_SIZE + payload_nob);
+#else
+ ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+ ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
#endif
- if (payload_nob > 0) { /* got some payload (something more to do) */
- /* make a single contiguous message? */
- if (payload_nob <= KQSW_TX_MAXCONTIG) {
- /* copy payload to ktx_buffer, immediately after hdr */
+ if (payload_nob > 0) {
if (payload_kiov != NULL)
lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
payload_niov, payload_kiov, payload_nob);
else
lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
payload_niov, payload_iov, payload_nob);
- /* first frag includes payload */
- ktx->ktx_frags.iov[0].Len += payload_nob;
- } else {
- if (payload_kiov != NULL)
- rc = kqswnal_map_tx_kiov (ktx, payload_nob,
- payload_niov, payload_kiov);
- else
- rc = kqswnal_map_tx_iov (ktx, payload_nob,
- payload_niov, payload_iov);
- if (rc != 0) {
- kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
- }
- }
- }
+ }
+ } else {
- ktx->ktx_nid = targetnid;
+ /* large message: multiple frags: first is hdr in pre-mapped buffer */
+
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+ ktx->ktx_state = KTX_SENDING;
+#if MULTIRAIL_EKC
+ ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+ 0, KQSW_HDR_SIZE);
+#else
+ ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+ ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
+#endif
+ if (payload_kiov != NULL)
+ rc = kqswnal_map_tx_kiov (ktx, payload_nob,
+ payload_niov, payload_kiov);
+ else
+ rc = kqswnal_map_tx_iov (ktx, payload_nob,
+ payload_niov, payload_iov);
+ if (rc != 0) {
+ kqswnal_put_idle_tx (ktx);
+ return (PTL_FAIL);
+ }
+ }
+
ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
- EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
+ EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
rc = kqswnal_launch (ktx);
if (rc != 0) { /* failed? */
payload_niov, NULL, payload_kiov, payload_nob));
}
-int kqswnal_fwd_copy_contig = 0;
-
void
kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
{
LASSERT (niov > 0);
- ktx = kqswnal_get_idle_tx (fwd, FALSE);
+ ktx = kqswnal_get_idle_tx (fwd, 0);
if (ktx == NULL) /* can't get txd right now */
return; /* fwd will be scheduled when tx desc freed */
goto failed;
}
- if ((kqswnal_fwd_copy_contig || niov > 1) &&
+ ktx->ktx_port = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ?
+ EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
+ ktx->ktx_nid = nid;
+ ktx->ktx_state = KTX_FORWARDING;
+ ktx->ktx_args[0] = fwd;
+
+ if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) &&
nob <= KQSW_TX_BUFFER_SIZE)
{
- /* send from ktx's pre-allocated/mapped contiguous buffer? */
+ /* send from ktx's pre-mapped contiguous buffer? */
lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
- ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
- ktx->ktx_frags.iov[0].Len = nob;
- ktx->ktx_nfrag = 1;
+#if MULTIRAIL_EKC
+ ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+ 0, nob);
+#else
+ ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+ ktx->ktx_frags[0].Len = nob;
+#endif
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
}
else
{
/* zero copy */
- ktx->ktx_nfrag = 0; /* no frags mapped yet */
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
if (rc != 0)
goto failed;
ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
}
- ktx->ktx_port = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
- EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
- ktx->ktx_nid = nid;
- ktx->ktx_state = KTX_FORWARDING; /* kpr_put_packet() on completion */
- ktx->ktx_args[0] = fwd;
-
rc = kqswnal_launch (ktx);
if (rc == 0)
return;
}
void
-kqswnal_reply_complete (EP_RXD *rxd)
+kqswnal_dma_reply_complete (EP_RXD *rxd)
{
int status = ep_rxd_status(rxd);
kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
"rxd %p, ktx %p, status %d\n", rxd, ktx, status);
LASSERT (krx->krx_rxd == rxd);
+ LASSERT (krx->krx_rpc_reply_needed);
- krx->krx_rpc_completed = 1;
- kqswnal_requeue_rx (krx);
+ krx->krx_rpc_reply_needed = 0;
+ kqswnal_rx_done (krx);
lib_finalize (&kqswnal_lib, NULL, msg);
kqswnal_put_idle_tx (ktx);
"rxd %p, krx %p, status %d\n", rxd, krx, status);
LASSERT (krx->krx_rxd == rxd);
+ LASSERT (krx->krx_rpc_reply_needed);
- krx->krx_rpc_completed = 1;
+ krx->krx_rpc_reply_needed = 0;
kqswnal_requeue_rx (krx);
}
void
-kqswnal_requeue_rx (kqswnal_rx_t *krx)
+kqswnal_requeue_rx (kqswnal_rx_t *krx)
{
- EP_STATUSBLK blk;
- int rc;
+ int rc;
- LASSERT (atomic_read (&krx->krx_refcount) > 0);
- if (!atomic_dec_and_test (&krx->krx_refcount))
- return;
+ LASSERT (atomic_read(&krx->krx_refcount) == 0);
- if (!ep_rxd_isrpc(krx->krx_rxd) ||
- krx->krx_rpc_completed) {
+ if (krx->krx_rpc_reply_needed) {
- /* don't actually requeue on shutdown */
- if (kqswnal_data.kqn_shuttingdown)
+ /* We failed to complete the peer's optimized GET (e.g. we
+ * couldn't map the source buffers). We complete the
+ * peer's EKC rpc now with failure. */
+#if MULTIRAIL_EKC
+ rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
+ &kqswnal_rpc_failed, NULL, NULL, 0);
+ if (rc == EP_SUCCESS)
return;
- ep_requeue_receive (krx->krx_rxd, kqswnal_rxhandler, krx,
- krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE);
- return;
- }
-
- /* Sender wanted an RPC, but we didn't complete it (we must have
- * dropped the sender's message). We complete it now with
- * failure... */
- memset (&blk, 0, sizeof (blk));
- blk.Status = -ECONNREFUSED;
-
- atomic_set (&krx->krx_refcount, 1);
+ CERROR("can't complete RPC: %d\n", rc);
+#else
+ if (krx->krx_rxd != NULL) {
+ /* We didn't try (and fail) to complete earlier... */
+ rc = ep_complete_rpc(krx->krx_rxd,
+ kqswnal_rpc_complete, krx,
+ &kqswnal_rpc_failed, NULL, 0);
+ if (rc == EP_SUCCESS)
+ return;
+
+ CERROR("can't complete RPC: %d\n", rc);
+ }
+
+ /* NB the old ep_complete_rpc() frees rxd on failure, so we
+ * have to requeue from scratch here, unless we're shutting
+ * down */
+ if (kqswnal_data.kqn_shuttingdown)
+ return;
- rc = ep_complete_rpc (krx->krx_rxd,
- kqswnal_rpc_complete, krx,
- &blk, NULL, 0);
- if (rc == ESUCCESS) {
- /* callback will call me again to requeue, having set
- * krx_rpc_completed... */
+ rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
+ krx->krx_elanbuffer,
+ krx->krx_npages * PAGE_SIZE, 0);
+ LASSERT (rc == EP_SUCCESS);
+ /* We don't handle failure here; it's incredibly rare
+ * (never reported?) and only happens with "old" EKC */
return;
+#endif
}
- CERROR("can't complete RPC: %d\n", rc);
-
- /* we don't actually requeue on shutdown */
- if (kqswnal_data.kqn_shuttingdown)
- return;
-
- /* NB ep_complete_rpc() frees rxd on failure, so we have to requeue
- * from scratch here... */
- rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
- krx->krx_elanaddr,
- krx->krx_npages * PAGE_SIZE, 0);
-
- LASSERT (rc == ESUCCESS);
- /* This needs to be fixed by ep_complete_rpc NOT freeing
- * krx->krx_rxd on failure so we can just ep_requeue_receive() */
+#if MULTIRAIL_EKC
+ if (kqswnal_data.kqn_shuttingdown) {
+ /* free EKC rxd on shutdown */
+ ep_complete_receive(krx->krx_rxd);
+ } else {
+ /* repost receive */
+ ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+ &krx->krx_elanbuffer, 0);
+ }
+#else
+ /* don't actually requeue on shutdown */
+ if (!kqswnal_data.kqn_shuttingdown)
+ ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+ krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
+#endif
}
-
+
void
kqswnal_rx (kqswnal_rx_t *krx)
{
int nob;
int niov;
+ LASSERT (atomic_read(&krx->krx_refcount) == 0);
+
if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
- /* NB krx requeued when lib_parse() calls back kqswnal_recv */
+ atomic_set(&krx->krx_refcount, 1);
lib_parse (&kqswnal_lib, hdr, krx);
+ kqswnal_rx_done(krx);
return;
}
krx->krx_rxd = rxd;
krx->krx_nob = nob;
- LASSERT (atomic_read (&krx->krx_refcount) == 0);
- atomic_set (&krx->krx_refcount, 1);
- krx->krx_rpc_completed = 0;
+#if MULTIRAIL_EKC
+ krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
+#else
+ krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
+#endif
/* must receive a whole header to be able to parse */
if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
{
/* receives complete with failure when receiver is removed */
+#if MULTIRAIL_EKC
+ if (status == EP_SHUTDOWN)
+ LASSERT (kqswnal_data.kqn_shuttingdown);
+ else
+ CERROR("receive status failed with status %d nob %d\n",
+ ep_rxd_status(rxd), nob);
+#else
if (!kqswnal_data.kqn_shuttingdown)
CERROR("receive status failed with status %d nob %d\n",
ep_rxd_status(rxd), nob);
-
+#endif
kqswnal_requeue_rx (krx);
return;
}
#endif
lib_finalize(nal, private, libmsg);
- kqswnal_requeue_rx (krx);
-
return (rlen);
}
return ((int)pid);
atomic_inc (&kqswnal_data.kqn_nthreads);
+ atomic_inc (&kqswnal_data.kqn_nthreads_running);
return (0);
}
long flags;
int rc;
int counter = 0;
+ int shuttingdown = 0;
int did_something;
kportal_daemonize ("kqswnal_sched");
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
- while (!kqswnal_data.kqn_shuttingdown)
+ for (;;)
{
- did_something = FALSE;
+ if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
+
+ if (kqswnal_data.kqn_shuttingdown == 2)
+ break;
+
+ /* During stage 1 of shutdown we are still responsive
+ * to receives */
+
+ atomic_dec (&kqswnal_data.kqn_nthreads_running);
+ shuttingdown = kqswnal_data.kqn_shuttingdown;
+ }
+
+ did_something = 0;
if (!list_empty (&kqswnal_data.kqn_readyrxds))
{
kqswnal_rx (krx);
- did_something = TRUE;
+ did_something = 1;
spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
}
- if (!list_empty (&kqswnal_data.kqn_delayedtxds))
+ if (!shuttingdown &&
+ !list_empty (&kqswnal_data.kqn_delayedtxds))
{
ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
kqswnal_tx_t, ktx_list);
kqswnal_tx_done (ktx, rc);
}
- did_something = TRUE;
+ did_something = 1;
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
}
- if (!list_empty (&kqswnal_data.kqn_delayedfwds))
+ if (!shuttingdown &
+ !list_empty (&kqswnal_data.kqn_delayedfwds))
{
fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
list_del (&fwd->kprfd_list);
kqswnal_fwd_packet (NULL, fwd);
- did_something = TRUE;
+ did_something = 1;
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
}
if (!did_something) {
rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
- kqswnal_data.kqn_shuttingdown ||
+ kqswnal_data.kqn_shuttingdown != shuttingdown ||
!list_empty(&kqswnal_data.kqn_readyrxds) ||
!list_empty(&kqswnal_data.kqn_delayedtxds) ||
!list_empty(&kqswnal_data.kqn_delayedfwds));