#include "qswnal.h"
+EP_STATUSBLK kqswnal_rpc_success;
+EP_STATUSBLK kqswnal_rpc_failed;
+
/*
* LIB functions follow
*
*/
-static int
+static ptl_err_t
kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
size_t len)
{
nal->ni.nid, len, src_addr, dst_addr );
memcpy( dst_addr, src_addr, len );
- return (0);
+ return (PTL_OK);
}
-static int
+static ptl_err_t
kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
size_t len)
{
nal->ni.nid, len, src_addr, dst_addr );
memcpy( dst_addr, src_addr, len );
- return (0);
+ return (PTL_OK);
}
static void *
void
kqswnal_unmap_tx (kqswnal_tx_t *ktx)
{
+#if MULTIRAIL_EKC
+ int i;
+#endif
+
if (ktx->ktx_nmappedpages == 0)
return;
-
+
+#if MULTIRAIL_EKC
+ CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
+ ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
+
+ for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
+ ep_dvma_unload(kqswnal_data.kqn_ep,
+ kqswnal_data.kqn_ep_tx_nmh,
+ &ktx->ktx_frags[i]);
+#else
CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
- elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
+ elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
kqswnal_data.kqn_eptxdmahandle,
ktx->ktx_basepage, ktx->ktx_nmappedpages);
+#endif
ktx->ktx_nmappedpages = 0;
}
int
-kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
+kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
{
int nfrags = ktx->ktx_nfrag;
int nmapped = ktx->ktx_nmappedpages;
int maxmapped = ktx->ktx_npages;
uint32_t basepage = ktx->ktx_basepage + nmapped;
char *ptr;
+#if MULTIRAIL_EKC
+ EP_RAILMASK railmask;
+ int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+ EP_RAILMASK_ALL,
+ kqswnal_nid2elanid(ktx->ktx_nid));
+ if (rail < 0) {
+ CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
+ return (-ENETDOWN);
+ }
+ railmask = 1 << rail;
+#endif
LASSERT (nmapped <= maxmapped);
+ LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
LASSERT (nfrags <= EP_MAXFRAG);
LASSERT (niov > 0);
LASSERT (nob > 0);
-
+
+ /* skip complete frags before 'offset' */
+ while (offset >= kiov->kiov_len) {
+ offset -= kiov->kiov_len;
+ kiov++;
+ niov--;
+ LASSERT (niov > 0);
+ }
+
do {
- int fraglen = kiov->kiov_len;
+ int fraglen = kiov->kiov_len - offset;
/* nob exactly spans the iovs */
LASSERT (fraglen <= nob);
/* XXX this is really crap, but we'll have to kmap until
* EKC has a page (rather than vaddr) mapping interface */
- ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
+ ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
CDEBUG(D_NET,
"%p[%d] loading %p for %d, page %d, %d total\n",
ktx, nfrags, ptr, fraglen, basepage, nmapped);
- elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
+#if MULTIRAIL_EKC
+ ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+ ptr, fraglen,
+ kqswnal_data.kqn_ep_tx_nmh, basepage,
+ &railmask, &ktx->ktx_frags[nfrags]);
+
+ if (nfrags == ktx->ktx_firsttmpfrag ||
+ !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
+ &ktx->ktx_frags[nfrags - 1],
+ &ktx->ktx_frags[nfrags])) {
+ /* new frag if this is the first or can't merge */
+ nfrags++;
+ }
+#else
+ elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
kqswnal_data.kqn_eptxdmahandle,
ptr, fraglen,
- basepage, &ktx->ktx_frags.iov[nfrags].Base);
-
- kunmap (kiov->kiov_page);
-
- /* keep in loop for failure case */
- ktx->ktx_nmappedpages = nmapped;
+ basepage, &ktx->ktx_frags[nfrags].Base);
if (nfrags > 0 && /* previous frag mapped */
- ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
- (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
+ ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
+ (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
/* just extend previous */
- ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
+ ktx->ktx_frags[nfrags - 1].Len += fraglen;
else {
- ktx->ktx_frags.iov[nfrags].Len = fraglen;
+ ktx->ktx_frags[nfrags].Len = fraglen;
nfrags++; /* new frag */
}
+#endif
+
+ kunmap (kiov->kiov_page);
+
+ /* keep in loop for failure case */
+ ktx->ktx_nmappedpages = nmapped;
basepage++;
kiov++;
niov--;
nob -= fraglen;
+ offset = 0;
/* iov must not run out before end of data */
LASSERT (nob == 0 || niov > 0);
}
int
-kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
+kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob,
+ int niov, struct iovec *iov)
{
int nfrags = ktx->ktx_nfrag;
int nmapped = ktx->ktx_nmappedpages;
int maxmapped = ktx->ktx_npages;
uint32_t basepage = ktx->ktx_basepage + nmapped;
-
+#if MULTIRAIL_EKC
+ EP_RAILMASK railmask;
+ int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+ EP_RAILMASK_ALL,
+ kqswnal_nid2elanid(ktx->ktx_nid));
+
+ if (rail < 0) {
+ CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
+ return (-ENETDOWN);
+ }
+ railmask = 1 << rail;
+#endif
LASSERT (nmapped <= maxmapped);
+ LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
LASSERT (nfrags <= EP_MAXFRAG);
LASSERT (niov > 0);
LASSERT (nob > 0);
+ /* skip complete frags before offset */
+ while (offset >= iov->iov_len) {
+ offset -= iov->iov_len;
+ iov++;
+ niov--;
+ LASSERT (niov > 0);
+ }
+
do {
- int fraglen = iov->iov_len;
+ int fraglen = iov->iov_len - offset;
long npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
/* nob exactly spans the iovs */
CDEBUG(D_NET,
"%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
- ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
- nmapped);
-
- elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
+ ktx, nfrags, iov->iov_base + offset, fraglen,
+ basepage, npages, nmapped);
+
+#if MULTIRAIL_EKC
+ ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+ iov->iov_base + offset, fraglen,
+ kqswnal_data.kqn_ep_tx_nmh, basepage,
+ &railmask, &ktx->ktx_frags[nfrags]);
+
+ if (nfrags == ktx->ktx_firsttmpfrag ||
+ !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
+ &ktx->ktx_frags[nfrags - 1],
+ &ktx->ktx_frags[nfrags])) {
+ /* new frag if this is the first or can't merge */
+ nfrags++;
+ }
+#else
+ elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
kqswnal_data.kqn_eptxdmahandle,
- iov->iov_base, fraglen,
- basepage, &ktx->ktx_frags.iov[nfrags].Base);
- /* keep in loop for failure case */
- ktx->ktx_nmappedpages = nmapped;
+ iov->iov_base + offset, fraglen,
+ basepage, &ktx->ktx_frags[nfrags].Base);
if (nfrags > 0 && /* previous frag mapped */
- ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
- (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
+ ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
+ (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
/* just extend previous */
- ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
+ ktx->ktx_frags[nfrags - 1].Len += fraglen;
else {
- ktx->ktx_frags.iov[nfrags].Len = fraglen;
+ ktx->ktx_frags[nfrags].Len = fraglen;
nfrags++; /* new frag */
}
+#endif
+
+ /* keep in loop for failure case */
+ ktx->ktx_nmappedpages = nmapped;
basepage += npages;
iov++;
niov--;
nob -= fraglen;
+ offset = 0;
/* iov must not run out before end of data */
LASSERT (nob == 0 || niov > 0);
kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
{
lib_msg_t *msg;
- lib_msg_t *repmsg;
+ lib_msg_t *repmsg = NULL;
switch (ktx->ktx_state) {
case KTX_FORWARDING: /* router asked me to forward this packet */
case KTX_SENDING: /* packet sourced locally */
lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
- (lib_msg_t *)ktx->ktx_args[1]);
+ (lib_msg_t *)ktx->ktx_args[1],
+ (error == 0) ? PTL_OK :
+ (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
break;
case KTX_GETTING: /* Peer has DMA-ed direct? */
- LASSERT (KQSW_OPTIMIZE_GETS);
msg = (lib_msg_t *)ktx->ktx_args[1];
- repmsg = NULL;
- if (error == 0)
+ if (error == 0) {
repmsg = lib_fake_reply_msg (&kqswnal_lib,
ktx->ktx_nid, msg->md);
+ if (repmsg == NULL)
+ error = -ENOMEM;
+ }
- lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg);
-
- if (repmsg != NULL)
- lib_finalize (&kqswnal_lib, NULL, repmsg);
+ if (error == 0) {
+ lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
+ msg, PTL_OK);
+ lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
+ } else {
+ lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
+ (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
+ }
break;
default:
CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
- if (status != EP_SUCCESS)
- {
+ if (status != EP_SUCCESS) {
+
CERROR ("Tx completion to "LPX64" failed: %d\n",
ktx->ktx_nid, status);
kqswnal_notify_peer_down(ktx);
- status = -EIO;
+ status = -EHOSTDOWN;
} else if (ktx->ktx_state == KTX_GETTING) {
/* RPC completed OK; what did our peer put in the status
* block? */
- LASSERT (KQSW_OPTIMIZE_GETS);
+#if MULTIRAIL_EKC
+ status = ep_txd_statusblk(txd)->Data[0];
+#else
status = ep_txd_statusblk(txd)->Status;
+#endif
} else {
status = 0;
}
LASSERT (dest >= 0); /* must be a peer */
if (ktx->ktx_state == KTX_GETTING) {
- LASSERT (KQSW_OPTIMIZE_GETS);
+ /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The
+ * other frags are the GET sink which we obviously don't
+ * send here :) */
+#if MULTIRAIL_EKC
+ rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
+ ktx->ktx_port, attr,
+ kqswnal_txhandler, ktx,
+ NULL, ktx->ktx_frags, 1);
+#else
rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
ktx->ktx_port, attr, kqswnal_txhandler,
- ktx, NULL, ktx->ktx_frags.iov, ktx->ktx_nfrag);
+ ktx, NULL, ktx->ktx_frags, 1);
+#endif
} else {
+#if MULTIRAIL_EKC
+ rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
+ ktx->ktx_port, attr,
+ kqswnal_txhandler, ktx,
+ NULL, ktx->ktx_frags, ktx->ktx_nfrag);
+#else
rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
- ktx->ktx_port, attr, kqswnal_txhandler,
- ktx, ktx->ktx_frags.iov, ktx->ktx_nfrag);
+ ktx->ktx_port, attr,
+ kqswnal_txhandler, ktx,
+ ktx->ktx_frags, ktx->ktx_nfrag);
+#endif
}
switch (rc) {
- case ESUCCESS: /* success */
+ case EP_SUCCESS: /* success */
return (0);
- case ENOMEM: /* can't allocate ep txd => queue for later */
+ case EP_ENOMEM: /* can't allocate ep txd => queue for later */
LASSERT (in_interrupt());
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
default: /* fatal error */
CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
kqswnal_notify_peer_down(ktx);
- return (rc);
+ return (-EHOSTUNREACH);
}
}
} /* end of print_hdr() */
+#if !MULTIRAIL_EKC
void
kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov)
{
CERROR ("DATAVEC too small\n");
return (-E2BIG);
}
+#endif
int
kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
- struct iovec *iov, ptl_kiov_t *kiov, int nob)
+ struct iovec *iov, ptl_kiov_t *kiov,
+ int offset, int nob)
{
kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
- char *buffer = (char *)page_address(krx->krx_pages[0]);
+ char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
- EP_IOVEC eiov[EP_MAXFRAG];
- EP_STATUSBLK blk;
int rc;
-
- LASSERT (ep_rxd_isrpc(krx->krx_rxd) && !krx->krx_rpc_completed);
+#if MULTIRAIL_EKC
+ int i;
+#else
+ EP_DATAVEC datav[EP_MAXFRAG];
+ int ndatav;
+#endif
+ LASSERT (krx->krx_rpc_reply_needed);
LASSERT ((iov == NULL) != (kiov == NULL));
- /* see .*_pack_k?iov comment regarding endian-ness */
+ /* see kqswnal_sendmsg comment regarding endian-ness */
if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
/* msg too small to discover rmd size */
CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
return (-EINVAL);
}
- if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) {
+ if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
/* rmd doesn't fit in the incoming message */
CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
- krx->krx_nob, rmd->kqrmd_neiov,
- (int)(((char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) - buffer));
+ krx->krx_nob, rmd->kqrmd_nfrag,
+ (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
return (-EINVAL);
}
- /* Ghastly hack part 1, uses the existing procedures to map the source data... */
- ktx->ktx_nfrag = 0;
+ /* Map the source data... */
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
if (kiov != NULL)
- rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov);
+ rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
else
- rc = kqswnal_map_tx_iov (ktx, nob, nfrag, iov);
+ rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
if (rc != 0) {
CERROR ("Can't map source data: %d\n", rc);
return (rc);
}
- /* Ghastly hack part 2, copy out eiov so we can create the datav; Ugghh... */
- memcpy (eiov, ktx->ktx_frags.iov, ktx->ktx_nfrag * sizeof (eiov[0]));
-
- rc = kqswnal_eiovs2datav (EP_MAXFRAG, ktx->ktx_frags.datav,
- ktx->ktx_nfrag, eiov,
- rmd->kqrmd_neiov, rmd->kqrmd_eiov);
- if (rc < 0) {
- CERROR ("Can't create datavec: %d\n", rc);
- return (rc);
+#if MULTIRAIL_EKC
+ if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
+ CERROR("Can't cope with unequal # frags: %d local %d remote\n",
+ ktx->ktx_nfrag, rmd->kqrmd_nfrag);
+ return (-EINVAL);
}
- ktx->ktx_nfrag = rc;
-
- memset (&blk, 0, sizeof (blk)); /* zero blk.Status */
+
+ for (i = 0; i < rmd->kqrmd_nfrag; i++)
+ if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
+ CERROR("Can't cope with unequal frags %d(%d):"
+ " %d local %d remote\n",
+ i, rmd->kqrmd_nfrag,
+ ktx->ktx_frags[i].nmd_len,
+ rmd->kqrmd_frag[i].nmd_len);
+ return (-EINVAL);
+ }
+#else
+ ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
+ ktx->ktx_nfrag, ktx->ktx_frags,
+ rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+ if (ndatav < 0) {
+ CERROR ("Can't create datavec: %d\n", ndatav);
+ return (ndatav);
+ }
+#endif
- /* Our caller will start to race with kqswnal_rpc_complete... */
+ /* Our caller will start to race with kqswnal_dma_reply_complete... */
LASSERT (atomic_read (&krx->krx_refcount) == 1);
atomic_set (&krx->krx_refcount, 2);
- rc = ep_complete_rpc (krx->krx_rxd, kqswnal_reply_complete, ktx,
- &blk, ktx->ktx_frags.datav, ktx->ktx_nfrag);
- if (rc == ESUCCESS)
+#if MULTIRAIL_EKC
+ rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
+ &kqswnal_rpc_success,
+ ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
+ if (rc == EP_SUCCESS)
+ return (0);
+
+ /* Well we tried... */
+ krx->krx_rpc_reply_needed = 0;
+#else
+ rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
+ &kqswnal_rpc_success, datav, ndatav);
+ if (rc == EP_SUCCESS)
return (0);
+ /* "old" EKC destroys rxd on failed completion */
+ krx->krx_rxd = NULL;
+#endif
+
+ CERROR("can't complete RPC: %d\n", rc);
+
/* reset refcount back to 1: we're not going to be racing with
- * kqswnal_rely_complete. */
+ * kqswnal_dma_reply_complete. */
atomic_set (&krx->krx_refcount, 1);
+
return (-ECONNABORTED);
}
-static int
+static ptl_err_t
kqswnal_sendmsg (nal_cb_t *nal,
void *private,
lib_msg_t *libmsg,
unsigned int payload_niov,
struct iovec *payload_iov,
ptl_kiov_t *payload_kiov,
+ size_t payload_offset,
size_t payload_nob)
{
kqswnal_tx_t *ktx;
#if KQSW_CHECKSUM
int i;
kqsw_csum_t csum;
+ int sumoff;
int sumnob;
#endif
return (PTL_NOSPACE);
}
+ ktx->ktx_nid = targetnid;
ktx->ktx_args[0] = private;
ktx->ktx_args[1] = libmsg;
-#if KQSW_OPTIMIZE_GETS
if (type == PTL_MSG_REPLY &&
- ep_rxd_isrpc(((kqswnal_rx_t *)private)->krx_rxd)) {
+ ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
if (nid != targetnid ||
kqswnal_nid2elanid(nid) !=
ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
"nid "LPX64" via "LPX64" elanID %d\n",
nid, targetnid,
ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
- return(PTL_FAIL);
+ return (PTL_FAIL);
}
/* peer expects RPC completion with GET data */
- rc = kqswnal_dma_reply (ktx,
- payload_niov, payload_iov,
- payload_kiov, payload_nob);
+ rc = kqswnal_dma_reply (ktx, payload_niov,
+ payload_iov, payload_kiov,
+ payload_offset, payload_nob);
if (rc == 0)
- return (0);
+ return (PTL_OK);
CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
kqswnal_put_idle_tx (ktx);
return (PTL_FAIL);
}
-#endif
memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
#if KQSW_CHECKSUM
csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
- for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) {
+ for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
+ LASSERT(i < niov);
if (payload_kiov != NULL) {
ptl_kiov_t *kiov = &payload_kiov[i];
- char *addr = ((char *)kmap (kiov->kiov_page)) +
- kiov->kiov_offset;
-
- csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len));
- sumnob -= kiov->kiov_len;
+
+ if (sumoff >= kiov->kiov_len) {
+ sumoff -= kiov->kiov_len;
+ } else {
+ char *addr = ((char *)kmap (kiov->kiov_page)) +
+ kiov->kiov_offset + sumoff;
+ int fragnob = kiov->kiov_len - sumoff;
+
+ csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
+ sumnob -= fragnob;
+ sumoff = 0;
+ kunmap(kiov->kiov_page);
+ }
} else {
struct iovec *iov = &payload_iov[i];
- csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len));
- sumnob -= iov->iov_len;
+ if (sumoff > iov->iov_len) {
+ sumoff -= iov->iov_len;
+ } else {
+ char *addr = iov->iov_base + sumoff;
+ int fragnob = iov->iov_len - sumoff;
+
+ csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
+ sumnob -= fragnob;
+ sumoff = 0;
+ }
}
}
- memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
+ memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
#endif
-
- /* Set up first frag from pre-mapped buffer (it's at least the
- * portals header) */
- ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer;
- ktx->ktx_frags.iov[0].Len = KQSW_HDR_SIZE;
- ktx->ktx_nfrag = 1;
- ktx->ktx_state = KTX_SENDING; /* => lib_finalize() on completion */
-
-#if KQSW_OPTIMIZE_GETS
- if (type == PTL_MSG_GET && /* doing a GET */
+
+ if (kqswnal_data.kqn_optimized_gets &&
+ type == PTL_MSG_GET && /* doing a GET */
nid == targetnid) { /* not forwarding */
lib_md_t *md = libmsg->md;
kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
*
* First I set up ktx as if it was going to send this
* payload, (it needs to map it anyway). This fills
- * ktx_frags.iov[1] and onward with the network addresses
- * of the get sink frags. I copy these into ktx_buffer,
+ * ktx_frags[1] and onward with the network addresses
+ * of the GET sink frags. I copy these into ktx_buffer,
* immediately after the header, and send that as my GET
* message.
*
* When EKC copes with different endian nodes, I'll fix
* this (and eat my hat :) */
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+ ktx->ktx_state = KTX_GETTING;
+
if ((libmsg->md->options & PTL_MD_KIOV) != 0)
- rc = kqswnal_map_tx_kiov (ktx, md->length,
+ rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
md->md_niov, md->md_iov.kiov);
else
- rc = kqswnal_map_tx_iov (ktx, md->length,
+ rc = kqswnal_map_tx_iov (ktx, 0, md->length,
md->md_niov, md->md_iov.iov);
if (rc < 0) {
return (PTL_FAIL);
}
- rmd->kqrmd_neiov = ktx->ktx_nfrag - 1;
- memcpy (&rmd->kqrmd_eiov[0], &ktx->ktx_frags.iov[1],
- rmd->kqrmd_neiov * sizeof (EP_IOVEC));
+ rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
- ktx->ktx_nfrag = 1;
- ktx->ktx_frags.iov[0].Len += offsetof (kqswnal_remotemd_t,
- kqrmd_eiov[rmd->kqrmd_neiov]);
- payload_nob = ktx->ktx_frags.iov[0].Len;
- ktx->ktx_state = KTX_GETTING;
- } else
+ payload_nob = offsetof(kqswnal_remotemd_t,
+ kqrmd_frag[rmd->kqrmd_nfrag]);
+ LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
+
+#if MULTIRAIL_EKC
+ memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
+ rmd->kqrmd_nfrag * sizeof(EP_NMD));
+
+ ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+ 0, KQSW_HDR_SIZE + payload_nob);
+#else
+ memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
+ rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
+
+ ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+ ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
+#endif
+ } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
+
+ /* small message: single frag copied into the pre-mapped buffer */
+
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+ ktx->ktx_state = KTX_SENDING;
+#if MULTIRAIL_EKC
+ ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+ 0, KQSW_HDR_SIZE + payload_nob);
+#else
+ ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+ ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
#endif
- if (payload_nob > 0) { /* got some payload (something more to do) */
- /* make a single contiguous message? */
- if (payload_nob <= KQSW_TX_MAXCONTIG) {
- /* copy payload to ktx_buffer, immediately after hdr */
+ if (payload_nob > 0) {
if (payload_kiov != NULL)
lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
- payload_niov, payload_kiov, payload_nob);
+ payload_niov, payload_kiov,
+ payload_offset, payload_nob);
else
lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
- payload_niov, payload_iov, payload_nob);
- /* first frag includes payload */
- ktx->ktx_frags.iov[0].Len += payload_nob;
- } else {
- if (payload_kiov != NULL)
- rc = kqswnal_map_tx_kiov (ktx, payload_nob,
- payload_niov, payload_kiov);
- else
- rc = kqswnal_map_tx_iov (ktx, payload_nob,
- payload_niov, payload_iov);
- if (rc != 0) {
- kqswnal_put_idle_tx (ktx);
- return (PTL_FAIL);
- }
- }
- }
+ payload_niov, payload_iov,
+ payload_offset, payload_nob);
+ }
+ } else {
+
+ /* large message: multiple frags: first is hdr in pre-mapped buffer */
- ktx->ktx_nid = targetnid;
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+ ktx->ktx_state = KTX_SENDING;
+#if MULTIRAIL_EKC
+ ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+ 0, KQSW_HDR_SIZE);
+#else
+ ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+ ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
+#endif
+ if (payload_kiov != NULL)
+ rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob,
+ payload_niov, payload_kiov);
+ else
+ rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
+ payload_niov, payload_iov);
+ if (rc != 0) {
+ kqswnal_put_idle_tx (ktx);
+ return (PTL_FAIL);
+ }
+ }
+
ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
- EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
+ EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
rc = kqswnal_launch (ktx);
if (rc != 0) { /* failed? */
return (PTL_OK);
}
-static int
+static ptl_err_t
kqswnal_send (nal_cb_t *nal,
void *private,
lib_msg_t *libmsg,
ptl_pid_t pid,
unsigned int payload_niov,
struct iovec *payload_iov,
+ size_t payload_offset,
size_t payload_nob)
{
return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
- payload_niov, payload_iov, NULL, payload_nob));
+ payload_niov, payload_iov, NULL,
+ payload_offset, payload_nob));
}
-static int
+static ptl_err_t
kqswnal_send_pages (nal_cb_t *nal,
void *private,
lib_msg_t *libmsg,
ptl_pid_t pid,
unsigned int payload_niov,
ptl_kiov_t *payload_kiov,
+ size_t payload_offset,
size_t payload_nob)
{
return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
- payload_niov, NULL, payload_kiov, payload_nob));
+ payload_niov, NULL, payload_kiov,
+ payload_offset, payload_nob));
}
-int kqswnal_fwd_copy_contig = 0;
-
void
kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
{
int rc;
kqswnal_tx_t *ktx;
- struct iovec *iov = fwd->kprfd_iov;
+ ptl_kiov_t *kiov = fwd->kprfd_kiov;
int niov = fwd->kprfd_niov;
int nob = fwd->kprfd_nob;
ptl_nid_t nid = fwd->kprfd_gateway_nid;
LBUG ();
#endif
/* The router wants this NAL to forward a packet */
- CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
+ CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
fwd, nid, niov, nob);
- LASSERT (niov > 0);
-
- ktx = kqswnal_get_idle_tx (fwd, FALSE);
+ ktx = kqswnal_get_idle_tx (fwd, 0);
if (ktx == NULL) /* can't get txd right now */
return; /* fwd will be scheduled when tx desc freed */
goto failed;
}
- if (nob > KQSW_NRXMSGBYTES_LARGE) {
- CERROR ("Can't forward [%p] to "LPX64
- ": size %d bigger than max packet size %ld\n",
- fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
- rc = -EMSGSIZE;
- goto failed;
- }
+ /* copy hdr into pre-mapped buffer */
+ memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
+ ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
- if ((kqswnal_fwd_copy_contig || niov > 1) &&
- nob <= KQSW_TX_BUFFER_SIZE)
+ ktx->ktx_port = (nob <= KQSW_SMALLPAYLOAD) ?
+ EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
+ ktx->ktx_nid = nid;
+ ktx->ktx_state = KTX_FORWARDING;
+ ktx->ktx_args[0] = fwd;
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+
+ if (nob <= KQSW_TX_MAXCONTIG)
{
- /* send from ktx's pre-allocated/mapped contiguous buffer? */
- lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
- ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
- ktx->ktx_frags.iov[0].Len = nob;
- ktx->ktx_nfrag = 1;
- ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
+ /* send payload from ktx's pre-mapped contiguous buffer */
+#if MULTIRAIL_EKC
+ ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+ 0, KQSW_HDR_SIZE + nob);
+#else
+ ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+ ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
+#endif
+ if (nob > 0)
+ lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
+ niov, kiov, 0, nob);
}
else
{
- /* zero copy */
- ktx->ktx_nfrag = 0; /* no frags mapped yet */
- rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
+ /* zero copy payload */
+#if MULTIRAIL_EKC
+ ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+ 0, KQSW_HDR_SIZE);
+#else
+ ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+ ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
+#endif
+ rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
if (rc != 0)
goto failed;
-
- ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
}
- ktx->ktx_port = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
- EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
- ktx->ktx_nid = nid;
- ktx->ktx_state = KTX_FORWARDING; /* kpr_put_packet() on completion */
- ktx->ktx_args[0] = fwd;
-
rc = kqswnal_launch (ktx);
if (rc == 0)
return;
if (error != 0)
{
- ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+ ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
}
void
-kqswnal_reply_complete (EP_RXD *rxd)
+kqswnal_dma_reply_complete (EP_RXD *rxd)
{
int status = ep_rxd_status(rxd);
kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
"rxd %p, ktx %p, status %d\n", rxd, ktx, status);
LASSERT (krx->krx_rxd == rxd);
+ LASSERT (krx->krx_rpc_reply_needed);
- krx->krx_rpc_completed = 1;
- kqswnal_requeue_rx (krx);
+ krx->krx_rpc_reply_needed = 0;
+ kqswnal_rx_done (krx);
- lib_finalize (&kqswnal_lib, NULL, msg);
+ lib_finalize (&kqswnal_lib, NULL, msg,
+ (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
kqswnal_put_idle_tx (ktx);
}
"rxd %p, krx %p, status %d\n", rxd, krx, status);
LASSERT (krx->krx_rxd == rxd);
+ LASSERT (krx->krx_rpc_reply_needed);
- krx->krx_rpc_completed = 1;
+ krx->krx_rpc_reply_needed = 0;
kqswnal_requeue_rx (krx);
}
void
-kqswnal_requeue_rx (kqswnal_rx_t *krx)
+kqswnal_requeue_rx (kqswnal_rx_t *krx)
{
- EP_STATUSBLK blk;
- int rc;
+ int rc;
- LASSERT (atomic_read (&krx->krx_refcount) > 0);
- if (!atomic_dec_and_test (&krx->krx_refcount))
- return;
+ LASSERT (atomic_read(&krx->krx_refcount) == 0);
- if (!ep_rxd_isrpc(krx->krx_rxd) ||
- krx->krx_rpc_completed) {
+ if (krx->krx_rpc_reply_needed) {
- /* don't actually requeue on shutdown */
- if (kqswnal_data.kqn_shuttingdown)
+ /* We failed to complete the peer's optimized GET (e.g. we
+ * couldn't map the source buffers). We complete the
+ * peer's EKC rpc now with failure. */
+#if MULTIRAIL_EKC
+ rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
+ &kqswnal_rpc_failed, NULL, NULL, 0);
+ if (rc == EP_SUCCESS)
return;
- ep_requeue_receive (krx->krx_rxd, kqswnal_rxhandler, krx,
- krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE);
- return;
- }
-
- /* Sender wanted an RPC, but we didn't complete it (we must have
- * dropped the sender's message). We complete it now with
- * failure... */
- memset (&blk, 0, sizeof (blk));
- blk.Status = -ECONNREFUSED;
-
- atomic_set (&krx->krx_refcount, 1);
+ CERROR("can't complete RPC: %d\n", rc);
+#else
+ if (krx->krx_rxd != NULL) {
+ /* We didn't try (and fail) to complete earlier... */
+ rc = ep_complete_rpc(krx->krx_rxd,
+ kqswnal_rpc_complete, krx,
+ &kqswnal_rpc_failed, NULL, 0);
+ if (rc == EP_SUCCESS)
+ return;
+
+ CERROR("can't complete RPC: %d\n", rc);
+ }
+
+ /* NB the old ep_complete_rpc() frees rxd on failure, so we
+ * have to requeue from scratch here, unless we're shutting
+ * down */
+ if (kqswnal_data.kqn_shuttingdown)
+ return;
- rc = ep_complete_rpc (krx->krx_rxd,
- kqswnal_rpc_complete, krx,
- &blk, NULL, 0);
- if (rc == ESUCCESS) {
- /* callback will call me again to requeue, having set
- * krx_rpc_completed... */
+ rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
+ krx->krx_elanbuffer,
+ krx->krx_npages * PAGE_SIZE, 0);
+ LASSERT (rc == EP_SUCCESS);
+ /* We don't handle failure here; it's incredibly rare
+ * (never reported?) and only happens with "old" EKC */
return;
+#endif
}
- CERROR("can't complete RPC: %d\n", rc);
-
- /* we don't actually requeue on shutdown */
- if (kqswnal_data.kqn_shuttingdown)
- return;
-
- /* NB ep_complete_rpc() frees rxd on failure, so we have to requeue
- * from scratch here... */
- rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
- krx->krx_elanaddr,
- krx->krx_npages * PAGE_SIZE, 0);
-
- LASSERT (rc == ESUCCESS);
- /* This needs to be fixed by ep_complete_rpc NOT freeing
- * krx->krx_rxd on failure so we can just ep_requeue_receive() */
+#if MULTIRAIL_EKC
+ if (kqswnal_data.kqn_shuttingdown) {
+ /* free EKC rxd on shutdown */
+ ep_complete_receive(krx->krx_rxd);
+ } else {
+ /* repost receive */
+ ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+ &krx->krx_elanbuffer, 0);
+ }
+#else
+ /* don't actually requeue on shutdown */
+ if (!kqswnal_data.kqn_shuttingdown)
+ ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+ krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
+#endif
}
-
+
void
kqswnal_rx (kqswnal_rx_t *krx)
{
- ptl_hdr_t *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
+ ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid);
+ int payload_nob;
int nob;
int niov;
+ LASSERT (atomic_read(&krx->krx_refcount) == 0);
+
if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
- /* NB krx requeued when lib_parse() calls back kqswnal_recv */
+ atomic_set(&krx->krx_refcount, 1);
lib_parse (&kqswnal_lib, hdr, krx);
+ kqswnal_rx_done(krx);
return;
}
return;
}
- /* NB forwarding may destroy iov; rebuild every time */
- for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
- {
- LASSERT (niov < krx->krx_npages);
- krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
- krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
+ nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
+ niov = 0;
+ if (nob > 0) {
+ krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
+ krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
+ niov = 1;
+ nob -= PAGE_SIZE - KQSW_HDR_SIZE;
+
+ while (nob > 0) {
+ LASSERT (niov < krx->krx_npages);
+
+ krx->krx_kiov[niov].kiov_offset = 0;
+ krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
+ niov++;
+ nob -= PAGE_SIZE;
+ }
}
- kpr_fwd_init (&krx->krx_fwd, dest_nid,
- krx->krx_nob, niov, krx->krx_iov,
+ kpr_fwd_init (&krx->krx_fwd, dest_nid,
+ hdr, payload_nob, niov, krx->krx_kiov,
kqswnal_fwd_callback, krx);
kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
krx->krx_rxd = rxd;
krx->krx_nob = nob;
- LASSERT (atomic_read (&krx->krx_refcount) == 0);
- atomic_set (&krx->krx_refcount, 1);
- krx->krx_rpc_completed = 0;
+#if MULTIRAIL_EKC
+ krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
+#else
+ krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
+#endif
/* must receive a whole header to be able to parse */
if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
{
/* receives complete with failure when receiver is removed */
+#if MULTIRAIL_EKC
+ if (status == EP_SHUTDOWN)
+ LASSERT (kqswnal_data.kqn_shuttingdown);
+ else
+ CERROR("receive status failed with status %d nob %d\n",
+ ep_rxd_status(rxd), nob);
+#else
if (!kqswnal_data.kqn_shuttingdown)
CERROR("receive status failed with status %d nob %d\n",
ep_rxd_status(rxd), nob);
-
+#endif
kqswnal_requeue_rx (krx);
return;
}
void
kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
{
- ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+ ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
", dpid %d, spid %d, type %d\n",
}
#endif
-static int
+static ptl_err_t
kqswnal_recvmsg (nal_cb_t *nal,
void *private,
lib_msg_t *libmsg,
unsigned int niov,
struct iovec *iov,
ptl_kiov_t *kiov,
+ size_t offset,
size_t mlen,
size_t rlen)
{
kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
+ char *buffer = page_address(krx->krx_kiov[0].kiov_page);
int page;
char *page_ptr;
int page_nob;
#if KQSW_CHECKSUM
kqsw_csum_t senders_csum;
kqsw_csum_t payload_csum = 0;
- kqsw_csum_t hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
- sizeof(ptl_hdr_t));
+ kqsw_csum_t hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
size_t csum_len = mlen;
int csum_frags = 0;
int csum_nob = 0;
atomic_inc (&csum_counter);
- memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
- sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
+ memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
if (senders_csum != hdr_csum)
kqswnal_csum_error (krx, 1);
#endif
CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
- /* What was actually received must be >= payload.
- * This is an LASSERT, as lib_finalize() doesn't have a completion status. */
- LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen);
+ /* What was actually received must be >= payload. */
LASSERT (mlen <= rlen);
+ if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
+ CERROR("Bad message size: have %d, need %d + %d\n",
+ krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
+ return (PTL_FAIL);
+ }
/* It must be OK to kmap() if required */
LASSERT (kiov == NULL || !in_interrupt ());
/* Either all pages or all vaddrs */
LASSERT (!(kiov != NULL && iov != NULL));
-
- if (mlen != 0)
- {
+
+ if (mlen != 0) {
page = 0;
- page_ptr = ((char *) page_address(krx->krx_pages[0])) +
- KQSW_HDR_SIZE;
+ page_ptr = buffer + KQSW_HDR_SIZE;
page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
LASSERT (niov > 0);
+
if (kiov != NULL) {
- iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
- iov_nob = kiov->kiov_len;
+ /* skip complete frags */
+ while (offset >= kiov->kiov_len) {
+ offset -= kiov->kiov_len;
+ kiov++;
+ niov--;
+ LASSERT (niov > 0);
+ }
+ iov_ptr = ((char *)kmap (kiov->kiov_page)) +
+ kiov->kiov_offset + offset;
+ iov_nob = kiov->kiov_len - offset;
} else {
- iov_ptr = iov->iov_base;
- iov_nob = iov->iov_len;
+ /* skip complete frags */
+ while (offset >= iov->iov_len) {
+ offset -= iov->iov_len;
+ iov++;
+ niov--;
+ LASSERT (niov > 0);
+ }
+ iov_ptr = iov->iov_base + offset;
+ iov_nob = iov->iov_len - offset;
}
-
+
for (;;)
{
- /* We expect the iov to exactly match mlen */
- LASSERT (iov_nob <= mlen);
-
- frag = MIN (page_nob, iov_nob);
+ frag = mlen;
+ if (frag > page_nob)
+ frag = page_nob;
+ if (frag > iov_nob)
+ frag = iov_nob;
+
memcpy (iov_ptr, page_ptr, frag);
#if KQSW_CHECKSUM
payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
{
page++;
LASSERT (page < krx->krx_npages);
- page_ptr = page_address(krx->krx_pages[page]);
+ page_ptr = page_address(krx->krx_kiov[page].kiov_page);
page_nob = PAGE_SIZE;
}
}
#if KQSW_CHECKSUM
- memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
- sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
+ memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t),
+ sizeof(kqsw_csum_t));
if (csum_len != rlen)
CERROR("Unable to checksum data in user's buffer\n");
"csum_nob %d\n",
hdr_csum, payload_csum, csum_frags, csum_nob);
#endif
- lib_finalize(nal, private, libmsg);
-
- kqswnal_requeue_rx (krx);
+ lib_finalize(nal, private, libmsg, PTL_OK);
- return (rlen);
+ return (PTL_OK);
}
-static int
+static ptl_err_t
kqswnal_recv(nal_cb_t *nal,
void *private,
lib_msg_t *libmsg,
unsigned int niov,
struct iovec *iov,
+ size_t offset,
size_t mlen,
size_t rlen)
{
- return (kqswnal_recvmsg (nal, private, libmsg, niov, iov, NULL, mlen, rlen));
+ return (kqswnal_recvmsg(nal, private, libmsg,
+ niov, iov, NULL,
+ offset, mlen, rlen));
}
-static int
+static ptl_err_t
kqswnal_recv_pages (nal_cb_t *nal,
void *private,
lib_msg_t *libmsg,
unsigned int niov,
ptl_kiov_t *kiov,
+ size_t offset,
size_t mlen,
size_t rlen)
{
- return (kqswnal_recvmsg (nal, private, libmsg, niov, NULL, kiov, mlen, rlen));
+ return (kqswnal_recvmsg(nal, private, libmsg,
+ niov, NULL, kiov,
+ offset, mlen, rlen));
}
int
return ((int)pid);
atomic_inc (&kqswnal_data.kqn_nthreads);
+ atomic_inc (&kqswnal_data.kqn_nthreads_running);
return (0);
}
long flags;
int rc;
int counter = 0;
+ int shuttingdown = 0;
int did_something;
kportal_daemonize ("kqswnal_sched");
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
- while (!kqswnal_data.kqn_shuttingdown)
+ for (;;)
{
- did_something = FALSE;
+ if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
+
+ if (kqswnal_data.kqn_shuttingdown == 2)
+ break;
+
+ /* During stage 1 of shutdown we are still responsive
+ * to receives */
+
+ atomic_dec (&kqswnal_data.kqn_nthreads_running);
+ shuttingdown = kqswnal_data.kqn_shuttingdown;
+ }
+
+ did_something = 0;
if (!list_empty (&kqswnal_data.kqn_readyrxds))
{
kqswnal_rx (krx);
- did_something = TRUE;
+ did_something = 1;
spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
}
- if (!list_empty (&kqswnal_data.kqn_delayedtxds))
+ if (!shuttingdown &&
+ !list_empty (&kqswnal_data.kqn_delayedtxds))
{
ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
kqswnal_tx_t, ktx_list);
kqswnal_tx_done (ktx, rc);
}
- did_something = TRUE;
+ did_something = 1;
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
}
- if (!list_empty (&kqswnal_data.kqn_delayedfwds))
+ if (!shuttingdown &
+ !list_empty (&kqswnal_data.kqn_delayedfwds))
{
fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
list_del (&fwd->kprfd_list);
kqswnal_fwd_packet (NULL, fwd);
- did_something = TRUE;
+ did_something = 1;
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
}
if (!did_something) {
rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
- kqswnal_data.kqn_shuttingdown ||
+ kqswnal_data.kqn_shuttingdown != shuttingdown ||
!list_empty(&kqswnal_data.kqn_readyrxds) ||
!list_empty(&kqswnal_data.kqn_delayedtxds) ||
!list_empty(&kqswnal_data.kqn_delayedfwds));