#include "qswnal.h"
-atomic_t kqswnal_packets_launched;
-atomic_t kqswnal_packets_transmitted;
-atomic_t kqswnal_packets_received;
-
-
/*
* LIB functions follow
*
return;
CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
- ktx, ktx->ktx_niov, ktx->ktx_basepage, ktx->ktx_nmappedpages);
+ ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
int
kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
{
- int nfrags = ktx->ktx_niov;
- const int maxfrags = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]);
+ int nfrags = ktx->ktx_nfrag;
int nmapped = ktx->ktx_nmappedpages;
int maxmapped = ktx->ktx_npages;
uint32_t basepage = ktx->ktx_basepage + nmapped;
char *ptr;
LASSERT (nmapped <= maxmapped);
- LASSERT (nfrags <= maxfrags);
+ LASSERT (nfrags <= EP_MAXFRAG);
LASSERT (niov > 0);
LASSERT (nob > 0);
return (-EMSGSIZE);
}
- if (nfrags == maxfrags) {
+ if (nfrags == EP_MAXFRAG) {
CERROR("Message too fragmented in Elan VM (max %d frags)\n",
- maxfrags);
+ EP_MAXFRAG);
return (-EMSGSIZE);
}
elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
kqswnal_data.kqn_eptxdmahandle,
ptr, fraglen,
- basepage, &ktx->ktx_iov[nfrags].Base);
+ basepage, &ktx->ktx_frags.iov[nfrags].Base);
kunmap (kiov->kiov_page);
ktx->ktx_nmappedpages = nmapped;
if (nfrags > 0 && /* previous frag mapped */
- ktx->ktx_iov[nfrags].Base == /* contiguous with this one */
- (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len))
+ ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
+ (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
/* just extend previous */
- ktx->ktx_iov[nfrags - 1].Len += fraglen;
+ ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
else {
- ktx->ktx_iov[nfrags].Len = fraglen;
+ ktx->ktx_frags.iov[nfrags].Len = fraglen;
nfrags++; /* new frag */
}
} while (nob > 0);
- ktx->ktx_niov = nfrags;
+ ktx->ktx_nfrag = nfrags;
CDEBUG (D_NET, "%p got %d frags over %d pages\n",
- ktx, ktx->ktx_niov, ktx->ktx_nmappedpages);
+ ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
return (0);
}
int
kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
{
- int nfrags = ktx->ktx_niov;
- const int maxfrags = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]);
+ int nfrags = ktx->ktx_nfrag;
int nmapped = ktx->ktx_nmappedpages;
int maxmapped = ktx->ktx_npages;
uint32_t basepage = ktx->ktx_basepage + nmapped;
LASSERT (nmapped <= maxmapped);
- LASSERT (nfrags <= maxfrags);
+ LASSERT (nfrags <= EP_MAXFRAG);
LASSERT (niov > 0);
LASSERT (nob > 0);
return (-EMSGSIZE);
}
- if (nfrags == maxfrags) {
+ if (nfrags == EP_MAXFRAG) {
CERROR("Message too fragmented in Elan VM (max %d frags)\n",
- maxfrags);
+ EP_MAXFRAG);
return (-EMSGSIZE);
}
elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
kqswnal_data.kqn_eptxdmahandle,
iov->iov_base, fraglen,
- basepage, &ktx->ktx_iov[nfrags].Base);
+ basepage, &ktx->ktx_frags.iov[nfrags].Base);
/* keep in loop for failure case */
ktx->ktx_nmappedpages = nmapped;
if (nfrags > 0 && /* previous frag mapped */
- ktx->ktx_iov[nfrags].Base == /* contiguous with this one */
- (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len))
+ ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
+ (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
/* just extend previous */
- ktx->ktx_iov[nfrags - 1].Len += fraglen;
+ ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
else {
- ktx->ktx_iov[nfrags].Len = fraglen;
+ ktx->ktx_frags.iov[nfrags].Len = fraglen;
nfrags++; /* new frag */
}
} while (nob > 0);
- ktx->ktx_niov = nfrags;
+ ktx->ktx_nfrag = nfrags;
CDEBUG (D_NET, "%p got %d frags over %d pages\n",
- ktx, ktx->ktx_niov, ktx->ktx_nmappedpages);
+ ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
return (0);
}
+
void
kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
{
unsigned long flags;
kqswnal_unmap_tx (ktx); /* release temporary mappings */
+ ktx->ktx_state = KTX_IDLE;
spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
/* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
+
return (ktx);
}
void
kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
{
- if (ktx->ktx_forwarding) /* router asked me to forward this packet */
+ lib_msg_t *msg;
+ lib_msg_t *repmsg;
+
+ switch (ktx->ktx_state) {
+ case KTX_FORWARDING: /* router asked me to forward this packet */
kpr_fwd_done (&kqswnal_data.kqn_router,
(kpr_fwd_desc_t *)ktx->ktx_args[0], error);
- else /* packet sourced locally */
+ break;
+
+ case KTX_SENDING: /* packet sourced locally */
lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
(lib_msg_t *)ktx->ktx_args[1]);
+ break;
+
+ case KTX_GETTING: /* Peer has DMA-ed direct? */
+ LASSERT (KQSW_OPTIMIZE_GETS);
+ msg = (lib_msg_t *)ktx->ktx_args[1];
+ repmsg = NULL;
+
+ if (error == 0)
+ repmsg = lib_fake_reply_msg (&kqswnal_lib,
+ ktx->ktx_nid, msg->md);
+
+ lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg);
+
+ if (repmsg != NULL)
+ lib_finalize (&kqswnal_lib, NULL, repmsg);
+ break;
+
+ default:
+ LASSERT (0);
+ }
kqswnal_put_idle_tx (ktx);
}
kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
{
kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg;
-
+
LASSERT (txd != NULL);
LASSERT (ktx != NULL);
CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
- if (status == EP_SUCCESS)
- atomic_inc (&kqswnal_packets_transmitted);
-
if (status != EP_SUCCESS)
{
CERROR ("Tx completion to "LPX64" failed: %d\n",
kqswnal_notify_peer_down(ktx);
status = -EIO;
+
+ } else if (ktx->ktx_state == KTX_GETTING) {
+ /* RPC completed OK; what did our peer put in the status
+ * block? */
+ LASSERT (KQSW_OPTIMIZE_GETS);
+ status = ep_txd_statusblk(txd)->Status;
+ } else {
+ status = 0;
}
kqswnal_tx_done (ktx, status);
ktx->ktx_launchtime = jiffies;
LASSERT (dest >= 0); /* must be a peer */
- rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
- ktx->ktx_port, attr, kqswnal_txhandler,
- ktx, ktx->ktx_iov, ktx->ktx_niov);
+ if (ktx->ktx_state == KTX_GETTING) {
+ LASSERT (KQSW_OPTIMIZE_GETS);
+ rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
+ ktx->ktx_port, attr, kqswnal_txhandler,
+ ktx, NULL, ktx->ktx_frags.iov, ktx->ktx_nfrag);
+ } else {
+ rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
+ ktx->ktx_port, attr, kqswnal_txhandler,
+ ktx, ktx->ktx_frags.iov, ktx->ktx_nfrag);
+ }
+
switch (rc) {
- case 0: /* success */
- atomic_inc (&kqswnal_packets_launched);
+ case ESUCCESS: /* success */
return (0);
case ENOMEM: /* can't allocate ep txd => queue for later */
{
char *type_str = hdr_type_string (hdr);
- CERROR("P3 Header at %p of type %s\n", hdr, type_str);
- CERROR(" From nid/pid "LPU64"/%u", NTOH__u64(hdr->src_nid),
+ CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
+ NTOH__u32(hdr->payload_length));
+ CERROR(" From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
NTOH__u32(hdr->src_pid));
CERROR(" To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
NTOH__u32(hdr->dest_pid));
hdr->msg.put.ack_wmd.wh_interface_cookie,
hdr->msg.put.ack_wmd.wh_object_cookie,
NTOH__u64 (hdr->msg.put.match_bits));
- CERROR(" Length %d, offset %d, hdr data "LPX64"\n",
- NTOH__u32(PTL_HDR_LENGTH(hdr)),
+ CERROR(" offset %d, hdr data "LPX64"\n",
NTOH__u32(hdr->msg.put.offset),
hdr->msg.put.hdr_data);
break;
break;
case PTL_MSG_REPLY:
- CERROR(" dst md "LPX64"."LPX64", length %d\n",
+ CERROR(" dst md "LPX64"."LPX64"\n",
hdr->msg.reply.dst_wmd.wh_interface_cookie,
- hdr->msg.reply.dst_wmd.wh_object_cookie,
- NTOH__u32 (PTL_HDR_LENGTH(hdr)));
+ hdr->msg.reply.dst_wmd.wh_object_cookie);
}
} /* end of print_hdr() */
+void
+kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov)
+{
+ int i;
+
+ CDEBUG (how, "%s: %d\n", str, n);
+ for (i = 0; i < n; i++) {
+ CDEBUG (how, " %08x for %d\n", iov[i].Base, iov[i].Len);
+ }
+}
+
+int
+kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
+ int nsrc, EP_IOVEC *src,
+ int ndst, EP_IOVEC *dst)
+{
+ int count;
+ int nob;
+
+ LASSERT (ndv > 0);
+ LASSERT (nsrc > 0);
+ LASSERT (ndst > 0);
+
+ for (count = 0; count < ndv; count++, dv++) {
+
+ if (nsrc == 0 || ndst == 0) {
+ if (nsrc != ndst) {
+ /* For now I'll barf on any left over entries */
+ CERROR ("mismatched src and dst iovs\n");
+ return (-EINVAL);
+ }
+ return (count);
+ }
+
+ nob = (src->Len < dst->Len) ? src->Len : dst->Len;
+ dv->Len = nob;
+ dv->Source = src->Base;
+ dv->Dest = dst->Base;
+
+ if (nob >= src->Len) {
+ src++;
+ nsrc--;
+ } else {
+ src->Len -= nob;
+ src->Base += nob;
+ }
+
+ if (nob >= dst->Len) {
+ dst++;
+ ndst--;
+ } else {
+ src->Len -= nob;
+ src->Base += nob;
+ }
+ }
+
+ CERROR ("DATAVEC too small\n");
+ return (-E2BIG);
+}
+
+int
+kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
+ struct iovec *iov, ptl_kiov_t *kiov, int nob)
+{
+ kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+ char *buffer = (char *)page_address(krx->krx_pages[0]);
+ kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
+ EP_IOVEC eiov[EP_MAXFRAG];
+ EP_STATUSBLK blk;
+ int rc;
+
+ LASSERT (ep_rxd_isrpc(krx->krx_rxd) && !krx->krx_rpc_completed);
+ LASSERT ((iov == NULL) != (kiov == NULL));
+
+ /* see .*_pack_k?iov comment regarding endian-ness */
+ if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
+ /* msg too small to discover rmd size */
+ CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
+ krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
+ return (-EINVAL);
+ }
+
+ if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) {
+ /* rmd doesn't fit in the incoming message */
+ CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
+ krx->krx_nob, rmd->kqrmd_neiov,
+ (int)(((char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) - buffer));
+ return (-EINVAL);
+ }
+
+ /* Ghastly hack part 1, uses the existing procedures to map the source data... */
+ ktx->ktx_nfrag = 0;
+ if (kiov != NULL)
+ rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov);
+ else
+ rc = kqswnal_map_tx_iov (ktx, nob, nfrag, iov);
+
+ if (rc != 0) {
+ CERROR ("Can't map source data: %d\n", rc);
+ return (rc);
+ }
+
+ /* Ghastly hack part 2, copy out eiov so we can create the datav; Ugghh... */
+ memcpy (eiov, ktx->ktx_frags.iov, ktx->ktx_nfrag * sizeof (eiov[0]));
+
+ rc = kqswnal_eiovs2datav (EP_MAXFRAG, ktx->ktx_frags.datav,
+ ktx->ktx_nfrag, eiov,
+ rmd->kqrmd_neiov, rmd->kqrmd_eiov);
+ if (rc < 0) {
+ CERROR ("Can't create datavec: %d\n", rc);
+ return (rc);
+ }
+ ktx->ktx_nfrag = rc;
+
+ memset (&blk, 0, sizeof (blk)); /* zero blk.Status */
+
+ /* Our caller will start to race with kqswnal_rpc_complete... */
+ LASSERT (atomic_read (&krx->krx_refcount) == 1);
+ atomic_set (&krx->krx_refcount, 2);
+
+ rc = ep_complete_rpc (krx->krx_rxd, kqswnal_reply_complete, ktx,
+ &blk, ktx->ktx_frags.datav, ktx->ktx_nfrag);
+ if (rc == ESUCCESS)
+ return (0);
+
+ /* reset refcount back to 1: we're not going to be racing with
+ * kqswnal_rely_complete. */
+ atomic_set (&krx->krx_refcount, 1);
+ return (-ECONNABORTED);
+}
+
static int
kqswnal_sendmsg (nal_cb_t *nal,
void *private,
- lib_msg_t *cookie,
+ lib_msg_t *libmsg,
ptl_hdr_t *hdr,
int type,
ptl_nid_t nid,
{
kqswnal_tx_t *ktx;
int rc;
- ptl_nid_t gatewaynid;
+ ptl_nid_t targetnid;
#if KQSW_CHECKSUM
int i;
kqsw_csum_t csum;
return (PTL_FAIL);
}
+ targetnid = nid;
if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */
rc = kpr_lookup (&kqswnal_data.kqn_router, nid,
- sizeof (ptl_hdr_t) + payload_nob, &gatewaynid);
+ sizeof (ptl_hdr_t) + payload_nob, &targetnid);
if (rc != 0) {
CERROR("Can't route to "LPX64": router error %d\n",
nid, rc);
return (PTL_FAIL);
}
- if (kqswnal_nid2elanid (gatewaynid) < 0) {
+ if (kqswnal_nid2elanid (targetnid) < 0) {
CERROR("Bad gateway "LPX64" for "LPX64"\n",
- gatewaynid, nid);
+ targetnid, nid);
return (PTL_FAIL);
}
- nid = gatewaynid;
}
/* I may not block for a transmit descriptor if I might block the
return (PTL_NOSPACE);
}
+ ktx->ktx_args[0] = private;
+ ktx->ktx_args[1] = libmsg;
+
+#if KQSW_OPTIMIZE_GETS
+ if (type == PTL_MSG_REPLY &&
+ ep_rxd_isrpc(((kqswnal_rx_t *)private)->krx_rxd)) {
+ if (nid != targetnid ||
+ kqswnal_nid2elanid(nid) !=
+ ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
+ CERROR("Optimized reply nid conflict: "
+ "nid "LPX64" via "LPX64" elanID %d\n",
+ nid, targetnid,
+ ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
+ return(PTL_FAIL);
+ }
+
+ /* peer expects RPC completion with GET data */
+ rc = kqswnal_dma_reply (ktx,
+ payload_niov, payload_iov,
+ payload_kiov, payload_nob);
+ if (rc == 0)
+ return (0);
+
+ CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
+ kqswnal_put_idle_tx (ktx);
+ return (PTL_FAIL);
+ }
+#endif
+
memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
}
memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
#endif
-
+
/* Set up first frag from pre-mapped buffer (it's at least the
* portals header) */
- ktx->ktx_iov[0].Base = ktx->ktx_ebuffer;
- ktx->ktx_iov[0].Len = KQSW_HDR_SIZE;
- ktx->ktx_niov = 1;
+ ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer;
+ ktx->ktx_frags.iov[0].Len = KQSW_HDR_SIZE;
+ ktx->ktx_nfrag = 1;
+ ktx->ktx_state = KTX_SENDING; /* => lib_finalize() on completion */
+
+#if KQSW_OPTIMIZE_GETS
+ if (type == PTL_MSG_GET && /* doing a GET */
+ nid == targetnid) { /* not forwarding */
+ lib_md_t *md = libmsg->md;
+ kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
+
+ /* Optimised path: I send over the Elan vaddrs of the get
+ * sink buffers, and my peer DMAs directly into them.
+ *
+ * First I set up ktx as if it was going to send this
+ * payload, (it needs to map it anyway). This fills
+ * ktx_frags.iov[1] and onward with the network addresses
+ * of the get sink frags. I copy these into ktx_buffer,
+ * immediately after the header, and send that as my GET
+ * message.
+ *
+ * Note that the addresses are sent in native endian-ness.
+ * When EKC copes with different endian nodes, I'll fix
+ * this (and eat my hat :) */
+
+ if ((libmsg->md->options & PTL_MD_KIOV) != 0)
+ rc = kqswnal_map_tx_kiov (ktx, md->length,
+ md->md_niov, md->md_iov.kiov);
+ else
+ rc = kqswnal_map_tx_iov (ktx, md->length,
+ md->md_niov, md->md_iov.iov);
+
+ if (rc < 0) {
+ kqswnal_put_idle_tx (ktx);
+ return (PTL_FAIL);
+ }
+
+ rmd->kqrmd_neiov = ktx->ktx_nfrag - 1;
+ memcpy (&rmd->kqrmd_eiov[0], &ktx->ktx_frags.iov[1],
+ rmd->kqrmd_neiov * sizeof (EP_IOVEC));
+ ktx->ktx_nfrag = 1;
+ ktx->ktx_frags.iov[0].Len += offsetof (kqswnal_remotemd_t,
+ kqrmd_eiov[rmd->kqrmd_neiov]);
+ payload_nob = ktx->ktx_frags.iov[0].Len;
+ ktx->ktx_state = KTX_GETTING;
+ } else
+#endif
if (payload_nob > 0) { /* got some payload (something more to do) */
/* make a single contiguous message? */
if (payload_nob <= KQSW_TX_MAXCONTIG) {
lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
payload_niov, payload_iov, payload_nob);
/* first frag includes payload */
- ktx->ktx_iov[0].Len += payload_nob;
+ ktx->ktx_frags.iov[0].Len += payload_nob;
} else {
if (payload_kiov != NULL)
rc = kqswnal_map_tx_kiov (ktx, payload_nob,
}
}
- ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
- EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
- ktx->ktx_nid = nid;
- ktx->ktx_forwarding = 0; /* => lib_finalize() on completion */
- ktx->ktx_args[0] = private;
- ktx->ktx_args[1] = cookie;
+ ktx->ktx_nid = targetnid;
+ ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
+ EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
rc = kqswnal_launch (ktx);
if (rc != 0) { /* failed? */
- CERROR ("Failed to send packet to "LPX64": %d\n", nid, rc);
+ CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
kqswnal_put_idle_tx (ktx);
return (PTL_FAIL);
}
- CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64"\n", payload_nob, nid);
+ CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n",
+ payload_nob, nid, targetnid);
return (PTL_OK);
}
static int
kqswnal_send (nal_cb_t *nal,
void *private,
- lib_msg_t *cookie,
+ lib_msg_t *libmsg,
ptl_hdr_t *hdr,
int type,
ptl_nid_t nid,
struct iovec *payload_iov,
size_t payload_nob)
{
- return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid,
+ return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
payload_niov, payload_iov, NULL, payload_nob));
}
static int
kqswnal_send_pages (nal_cb_t *nal,
void *private,
- lib_msg_t *cookie,
+ lib_msg_t *libmsg,
ptl_hdr_t *hdr,
int type,
ptl_nid_t nid,
ptl_kiov_t *payload_kiov,
size_t payload_nob)
{
- return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid,
+ return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
payload_niov, NULL, payload_kiov, payload_nob));
}
{
/* send from ktx's pre-allocated/mapped contiguous buffer? */
lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
- ktx->ktx_iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
- ktx->ktx_iov[0].Len = nob;
- ktx->ktx_niov = 1;
-
+ ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
+ ktx->ktx_frags.iov[0].Len = nob;
+ ktx->ktx_nfrag = 1;
ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
}
else
{
/* zero copy */
- ktx->ktx_niov = 0; /* no frags mapped yet */
+ ktx->ktx_nfrag = 0; /* no frags mapped yet */
rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
if (rc != 0)
goto failed;
ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
}
- ktx->ktx_port = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
- EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
- ktx->ktx_nid = nid;
- ktx->ktx_forwarding = 1;
- ktx->ktx_args[0] = fwd;
+ ktx->ktx_port = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
+ EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
+ ktx->ktx_nid = nid;
+ ktx->ktx_state = KTX_FORWARDING; /* kpr_put_packet() on completion */
+ ktx->ktx_args[0] = fwd;
rc = kqswnal_launch (ktx);
if (rc == 0)
}
void
+kqswnal_reply_complete (EP_RXD *rxd)
+{
+ int status = ep_rxd_status(rxd);
+ kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+ kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+ lib_msg_t *msg = (lib_msg_t *)ktx->ktx_args[1];
+
+ CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+ "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+ LASSERT (krx->krx_rxd == rxd);
+
+ krx->krx_rpc_completed = 1;
+ kqswnal_requeue_rx (krx);
+
+ lib_finalize (&kqswnal_lib, NULL, msg);
+ kqswnal_put_idle_tx (ktx);
+}
+
+void
+kqswnal_rpc_complete (EP_RXD *rxd)
+{
+ int status = ep_rxd_status(rxd);
+ kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg(rxd);
+
+ CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+ "rxd %p, krx %p, status %d\n", rxd, krx, status);
+
+ LASSERT (krx->krx_rxd == rxd);
+
+ krx->krx_rpc_completed = 1;
+ kqswnal_requeue_rx (krx);
+}
+
+void
+kqswnal_requeue_rx (kqswnal_rx_t *krx)
+{
+ EP_STATUSBLK blk;
+ int rc;
+
+ LASSERT (atomic_read (&krx->krx_refcount) > 0);
+ if (!atomic_dec_and_test (&krx->krx_refcount))
+ return;
+
+ if (!ep_rxd_isrpc(krx->krx_rxd) ||
+ krx->krx_rpc_completed) {
+
+ /* don't actually requeue on shutdown */
+ if (kqswnal_data.kqn_shuttingdown)
+ return;
+
+ ep_requeue_receive (krx->krx_rxd, kqswnal_rxhandler, krx,
+ krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE);
+ return;
+ }
+
+ /* Sender wanted an RPC, but we didn't complete it (we must have
+ * dropped the sender's message). We complete it now with
+ * failure... */
+ memset (&blk, 0, sizeof (blk));
+ blk.Status = -ECONNREFUSED;
+
+ atomic_set (&krx->krx_refcount, 1);
+
+ rc = ep_complete_rpc (krx->krx_rxd,
+ kqswnal_rpc_complete, krx,
+ &blk, NULL, 0);
+ if (rc == ESUCCESS) {
+ /* callback will call me again to requeue, having set
+ * krx_rpc_completed... */
+ return;
+ }
+
+ CERROR("can't complete RPC: %d\n", rc);
+
+ /* we don't actually requeue on shutdown */
+ if (kqswnal_data.kqn_shuttingdown)
+ return;
+
+ /* NB ep_complete_rpc() frees rxd on failure, so we have to requeue
+ * from scratch here... */
+ rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
+ krx->krx_elanaddr,
+ krx->krx_npages * PAGE_SIZE, 0);
+
+ LASSERT (rc == ESUCCESS);
+ /* This needs to be fixed by ep_complete_rpc NOT freeing
+ * krx->krx_rxd on failure so we can just ep_requeue_receive() */
+}
+
+void
kqswnal_rx (kqswnal_rx_t *krx)
{
ptl_hdr_t *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
{
CERROR("dropping packet from "LPX64" for "LPX64
": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
+
kqswnal_requeue_rx (krx);
return;
}
krx->krx_rxd = rxd;
krx->krx_nob = nob;
-
+ LASSERT (atomic_read (&krx->krx_refcount) == 0);
+ atomic_set (&krx->krx_refcount, 1);
+ krx->krx_rpc_completed = 0;
+
/* must receive a whole header to be able to parse */
if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
{
/* receives complete with failure when receiver is removed */
- if (kqswnal_data.kqn_shuttingdown)
- return;
+ if (!kqswnal_data.kqn_shuttingdown)
+ CERROR("receive status failed with status %d nob %d\n",
+ ep_rxd_status(rxd), nob);
- CERROR("receive status failed with status %d nob %d\n",
- ep_rxd_status(rxd), nob);
kqswnal_requeue_rx (krx);
return;
}
- atomic_inc (&kqswnal_packets_received);
+ if (!in_interrupt()) {
+ kqswnal_rx (krx);
+ return;
+ }
spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
static int
kqswnal_recvmsg (nal_cb_t *nal,
void *private,
- lib_msg_t *cookie,
+ lib_msg_t *libmsg,
unsigned int niov,
struct iovec *iov,
ptl_kiov_t *kiov,
"csum_nob %d\n",
hdr_csum, payload_csum, csum_frags, csum_nob);
#endif
- lib_finalize(nal, private, cookie);
+ lib_finalize(nal, private, libmsg);
kqswnal_requeue_rx (krx);
static int
kqswnal_recv(nal_cb_t *nal,
void *private,
- lib_msg_t *cookie,
+ lib_msg_t *libmsg,
unsigned int niov,
struct iovec *iov,
size_t mlen,
size_t rlen)
{
- return (kqswnal_recvmsg (nal, private, cookie, niov, iov, NULL, mlen, rlen));
+ return (kqswnal_recvmsg (nal, private, libmsg, niov, iov, NULL, mlen, rlen));
}
static int
kqswnal_recv_pages (nal_cb_t *nal,
void *private,
- lib_msg_t *cookie,
+ lib_msg_t *libmsg,
unsigned int niov,
ptl_kiov_t *kiov,
size_t mlen,
size_t rlen)
{
- return (kqswnal_recvmsg (nal, private, cookie, niov, NULL, kiov, mlen, rlen));
+ return (kqswnal_recvmsg (nal, private, libmsg, niov, NULL, kiov, mlen, rlen));
}
int