Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
index 9452faf..43926c9 100644 (file)
 
 #include "qswnal.h"
 
-atomic_t kqswnal_packets_launched;
-atomic_t kqswnal_packets_transmitted;
-atomic_t kqswnal_packets_received;
-
-
 /*
  *  LIB functions follow
  *
@@ -137,7 +132,7 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx)
                 return;
 
         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
-                ktx, ktx->ktx_niov, ktx->ktx_basepage, ktx->ktx_nmappedpages);
+                ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
 
         LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
@@ -152,15 +147,14 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx)
 int
 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
 {
-        int       nfrags    = ktx->ktx_niov;
-        const int maxfrags  = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]);
+        int       nfrags    = ktx->ktx_nfrag;
         int       nmapped   = ktx->ktx_nmappedpages;
         int       maxmapped = ktx->ktx_npages;
         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
         char     *ptr;
         
         LASSERT (nmapped <= maxmapped);
-        LASSERT (nfrags <= maxfrags);
+        LASSERT (nfrags <= EP_MAXFRAG);
         LASSERT (niov > 0);
         LASSERT (nob > 0);
         
@@ -179,9 +173,9 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
                         return (-EMSGSIZE);
                 }
 
-                if (nfrags == maxfrags) {
+                if (nfrags == EP_MAXFRAG) {
                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
-                               maxfrags);
+                               EP_MAXFRAG);
                         return (-EMSGSIZE);
                 }
 
@@ -197,7 +191,7 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
                 elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
                                        kqswnal_data.kqn_eptxdmahandle,
                                        ptr, fraglen,
-                                       basepage, &ktx->ktx_iov[nfrags].Base);
+                                       basepage, &ktx->ktx_frags.iov[nfrags].Base);
 
                 kunmap (kiov->kiov_page);
                 
@@ -205,12 +199,12 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
                 ktx->ktx_nmappedpages = nmapped;
 
                 if (nfrags > 0 &&                /* previous frag mapped */
-                    ktx->ktx_iov[nfrags].Base == /* contiguous with this one */
-                    (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len))
+                    ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
+                    (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
                         /* just extend previous */
-                        ktx->ktx_iov[nfrags - 1].Len += fraglen;
+                        ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
                 else {
-                        ktx->ktx_iov[nfrags].Len = fraglen;
+                        ktx->ktx_frags.iov[nfrags].Len = fraglen;
                         nfrags++;                /* new frag */
                 }
 
@@ -224,9 +218,9 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
 
         } while (nob > 0);
 
-        ktx->ktx_niov = nfrags;
+        ktx->ktx_nfrag = nfrags;
         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
-                ktx, ktx->ktx_niov, ktx->ktx_nmappedpages);
+                ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
 
         return (0);
 }
@@ -234,14 +228,13 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
 int
 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
 {
-        int       nfrags    = ktx->ktx_niov;
-        const int maxfrags  = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]);
+        int       nfrags    = ktx->ktx_nfrag;
         int       nmapped   = ktx->ktx_nmappedpages;
         int       maxmapped = ktx->ktx_npages;
         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
 
         LASSERT (nmapped <= maxmapped);
-        LASSERT (nfrags <= maxfrags);
+        LASSERT (nfrags <= EP_MAXFRAG);
         LASSERT (niov > 0);
         LASSERT (nob > 0);
 
@@ -259,9 +252,9 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
                         return (-EMSGSIZE);
                 }
 
-                if (nfrags == maxfrags) {
+                if (nfrags == EP_MAXFRAG) {
                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
-                               maxfrags);
+                               EP_MAXFRAG);
                         return (-EMSGSIZE);
                 }
 
@@ -273,17 +266,17 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
                 elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
                                        kqswnal_data.kqn_eptxdmahandle,
                                        iov->iov_base, fraglen,
-                                       basepage, &ktx->ktx_iov[nfrags].Base);
+                                       basepage, &ktx->ktx_frags.iov[nfrags].Base);
                 /* keep in loop for failure case */
                 ktx->ktx_nmappedpages = nmapped;
 
                 if (nfrags > 0 &&                /* previous frag mapped */
-                    ktx->ktx_iov[nfrags].Base == /* contiguous with this one */
-                    (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len))
+                    ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
+                    (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
                         /* just extend previous */
-                        ktx->ktx_iov[nfrags - 1].Len += fraglen;
+                        ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
                 else {
-                        ktx->ktx_iov[nfrags].Len = fraglen;
+                        ktx->ktx_frags.iov[nfrags].Len = fraglen;
                         nfrags++;                /* new frag */
                 }
 
@@ -297,13 +290,14 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
 
         } while (nob > 0);
 
-        ktx->ktx_niov = nfrags;
+        ktx->ktx_nfrag = nfrags;
         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
-                ktx, ktx->ktx_niov, ktx->ktx_nmappedpages);
+                ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
 
         return (0);
 }
 
+
 void
 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
 {
@@ -311,6 +305,7 @@ kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
         unsigned long     flags;
 
         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
+        ktx->ktx_state = KTX_IDLE;
 
         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
 
@@ -407,18 +402,45 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
 
         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
         LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
+
         return (ktx);
 }
 
 void
 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
 {
-        if (ktx->ktx_forwarding)                /* router asked me to forward this packet */
+        lib_msg_t     *msg;
+        lib_msg_t     *repmsg;
+
+        switch (ktx->ktx_state) {
+        case KTX_FORWARDING:       /* router asked me to forward this packet */
                 kpr_fwd_done (&kqswnal_data.kqn_router,
                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
-        else                                    /* packet sourced locally */
+                break;
+
+        case KTX_SENDING:          /* packet sourced locally */
                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
                               (lib_msg_t *)ktx->ktx_args[1]);
+                break;
+
+        case KTX_GETTING:          /* Peer has DMA-ed direct? */
+                LASSERT (KQSW_OPTIMIZE_GETS);
+                msg = (lib_msg_t *)ktx->ktx_args[1];
+                repmsg = NULL;
+
+                if (error == 0) 
+                        repmsg = lib_fake_reply_msg (&kqswnal_lib, 
+                                                     ktx->ktx_nid, msg->md);
+                
+                lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg);
+
+                if (repmsg != NULL) 
+                        lib_finalize (&kqswnal_lib, NULL, repmsg);
+                break;
+
+        default:
+                LASSERT (0);
+        }
 
         kqswnal_put_idle_tx (ktx);
 }
@@ -427,15 +449,12 @@ static void
 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
 {
         kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
-        
+
         LASSERT (txd != NULL);
         LASSERT (ktx != NULL);
 
         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
 
-        if (status == EP_SUCCESS)
-                atomic_inc (&kqswnal_packets_transmitted);
-
         if (status != EP_SUCCESS)
         {
                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
@@ -443,6 +462,14 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
 
                 kqswnal_notify_peer_down(ktx);
                 status = -EIO;
+
+        } else if (ktx->ktx_state == KTX_GETTING) {
+                /* RPC completed OK; what did our peer put in the status
+                 * block? */
+                LASSERT (KQSW_OPTIMIZE_GETS);
+                status = ep_txd_statusblk(txd)->Status;
+        } else {
+                status = 0;
         }
 
         kqswnal_tx_done (ktx, status);
@@ -460,12 +487,19 @@ kqswnal_launch (kqswnal_tx_t *ktx)
         ktx->ktx_launchtime = jiffies;
 
         LASSERT (dest >= 0);                    /* must be a peer */
-        rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
-                               ktx->ktx_port, attr, kqswnal_txhandler,
-                               ktx, ktx->ktx_iov, ktx->ktx_niov);
+        if (ktx->ktx_state == KTX_GETTING) {
+                LASSERT (KQSW_OPTIMIZE_GETS);
+                rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
+                                     ktx->ktx_port, attr, kqswnal_txhandler,
+                                     ktx, NULL, ktx->ktx_frags.iov, ktx->ktx_nfrag);
+        } else {
+                rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
+                                       ktx->ktx_port, attr, kqswnal_txhandler,
+                                       ktx, ktx->ktx_frags.iov, ktx->ktx_nfrag);
+        }
+
         switch (rc) {
-        case 0: /* success */
-                atomic_inc (&kqswnal_packets_launched);
+        case ESUCCESS: /* success */
                 return (0);
 
         case ENOMEM: /* can't allocate ep txd => queue for later */
@@ -508,8 +542,9 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr)
 {
         char *type_str = hdr_type_string (hdr);
 
-        CERROR("P3 Header at %p of type %s\n", hdr, type_str);
-        CERROR("    From nid/pid "LPU64"/%u", NTOH__u64(hdr->src_nid),
+        CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
+               NTOH__u32(hdr->payload_length));
+        CERROR("    From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
                NTOH__u32(hdr->src_pid));
         CERROR("    To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
                NTOH__u32(hdr->dest_pid));
@@ -522,8 +557,7 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr)
                        hdr->msg.put.ack_wmd.wh_interface_cookie,
                        hdr->msg.put.ack_wmd.wh_object_cookie,
                        NTOH__u64 (hdr->msg.put.match_bits));
-                CERROR("    Length %d, offset %d, hdr data "LPX64"\n",
-                       NTOH__u32(PTL_HDR_LENGTH(hdr)),
+                CERROR("    offset %d, hdr data "LPX64"\n",
                        NTOH__u32(hdr->msg.put.offset),
                        hdr->msg.put.hdr_data);
                 break;
@@ -548,18 +582,148 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr)
                 break;
 
         case PTL_MSG_REPLY:
-                CERROR("    dst md "LPX64"."LPX64", length %d\n",
+                CERROR("    dst md "LPX64"."LPX64"\n",
                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
-                       hdr->msg.reply.dst_wmd.wh_object_cookie,
-                       NTOH__u32 (PTL_HDR_LENGTH(hdr)));
+                       hdr->msg.reply.dst_wmd.wh_object_cookie);
         }
 
 }                               /* end of print_hdr() */
 
+void
+kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) 
+{
+        int          i;
+
+        CDEBUG (how, "%s: %d\n", str, n);
+        for (i = 0; i < n; i++) {
+                CDEBUG (how, "   %08x for %d\n", iov[i].Base, iov[i].Len);
+        }
+}
+
+int
+kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
+                     int nsrc, EP_IOVEC *src,
+                     int ndst, EP_IOVEC *dst) 
+{
+        int        count;
+        int        nob;
+
+        LASSERT (ndv > 0);
+        LASSERT (nsrc > 0);
+        LASSERT (ndst > 0);
+
+        for (count = 0; count < ndv; count++, dv++) {
+
+                if (nsrc == 0 || ndst == 0) {
+                        if (nsrc != ndst) {
+                                /* For now I'll barf on any left over entries */
+                                CERROR ("mismatched src and dst iovs\n");
+                                return (-EINVAL);
+                        }
+                        return (count);
+                }
+
+                nob = (src->Len < dst->Len) ? src->Len : dst->Len;
+                dv->Len    = nob;
+                dv->Source = src->Base;
+                dv->Dest   = dst->Base;
+
+                if (nob >= src->Len) {
+                        src++;
+                        nsrc--;
+                } else {
+                        src->Len -= nob;
+                        src->Base += nob;
+                }
+                
+                if (nob >= dst->Len) {
+                        dst++;
+                        ndst--;
+                } else {
+                        src->Len -= nob;
+                        src->Base += nob;
+                }
+        }
+
+        CERROR ("DATAVEC too small\n");
+        return (-E2BIG);
+}
+
+int
+kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
+                   struct iovec *iov, ptl_kiov_t *kiov, int nob)
+{
+        kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+        char               *buffer = (char *)page_address(krx->krx_pages[0]);
+        kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
+        EP_IOVEC            eiov[EP_MAXFRAG];
+        EP_STATUSBLK        blk;
+        int                 rc;
+
+        LASSERT (ep_rxd_isrpc(krx->krx_rxd) && !krx->krx_rpc_completed);
+        LASSERT ((iov == NULL) != (kiov == NULL));
+
+        /* see .*_pack_k?iov comment regarding endian-ness */
+        if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
+                /* msg too small to discover rmd size */
+                CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
+                        krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
+                return (-EINVAL);
+        }
+        
+        if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) {
+                /* rmd doesn't fit in the incoming message */
+                CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
+                        krx->krx_nob, rmd->kqrmd_neiov,
+                        (int)(((char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) - buffer));
+                return (-EINVAL);
+        }
+
+        /* Ghastly hack part 1, uses the existing procedures to map the source data... */
+        ktx->ktx_nfrag = 0;
+        if (kiov != NULL)
+                rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov);
+        else
+                rc = kqswnal_map_tx_iov (ktx, nob, nfrag, iov);
+
+        if (rc != 0) {
+                CERROR ("Can't map source data: %d\n", rc);
+                return (rc);
+        }
+
+        /* Ghastly hack part 2, copy out eiov so we can create the datav; Ugghh... */
+        memcpy (eiov, ktx->ktx_frags.iov, ktx->ktx_nfrag * sizeof (eiov[0]));
+
+        rc = kqswnal_eiovs2datav (EP_MAXFRAG, ktx->ktx_frags.datav,
+                                  ktx->ktx_nfrag, eiov,
+                                  rmd->kqrmd_neiov, rmd->kqrmd_eiov);
+        if (rc < 0) {
+                CERROR ("Can't create datavec: %d\n", rc);
+                return (rc);
+        }
+        ktx->ktx_nfrag = rc;
+
+        memset (&blk, 0, sizeof (blk));         /* zero blk.Status */
+
+        /* Our caller will start to race with kqswnal_rpc_complete... */
+        LASSERT (atomic_read (&krx->krx_refcount) == 1);
+        atomic_set (&krx->krx_refcount, 2);
+
+        rc = ep_complete_rpc (krx->krx_rxd, kqswnal_reply_complete, ktx,
+                              &blk, ktx->ktx_frags.datav, ktx->ktx_nfrag);
+        if (rc == ESUCCESS)
+                return (0);
+
+        /* reset refcount back to 1: we're not going to be racing with
+         * kqswnal_rely_complete. */
+        atomic_set (&krx->krx_refcount, 1);
+        return (-ECONNABORTED);
+}
+
 static int
 kqswnal_sendmsg (nal_cb_t     *nal,
                  void         *private,
-                 lib_msg_t    *cookie,
+                 lib_msg_t    *libmsg,
                  ptl_hdr_t    *hdr,
                  int           type,
                  ptl_nid_t     nid,
@@ -571,7 +735,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
 {
         kqswnal_tx_t      *ktx;
         int                rc;
-        ptl_nid_t          gatewaynid;
+        ptl_nid_t          targetnid;
 #if KQSW_CHECKSUM
         int                i;
         kqsw_csum_t        csum;
@@ -595,20 +759,20 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 return (PTL_FAIL);
         }
 
+        targetnid = nid;
         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
-                                 sizeof (ptl_hdr_t) + payload_nob, &gatewaynid);
+                                 sizeof (ptl_hdr_t) + payload_nob, &targetnid);
                 if (rc != 0) {
                         CERROR("Can't route to "LPX64": router error %d\n",
                                nid, rc);
                         return (PTL_FAIL);
                 }
-                if (kqswnal_nid2elanid (gatewaynid) < 0) {
+                if (kqswnal_nid2elanid (targetnid) < 0) {
                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
-                               gatewaynid, nid);
+                               targetnid, nid);
                         return (PTL_FAIL);
                 }
-                nid = gatewaynid;
         }
 
         /* I may not block for a transmit descriptor if I might block the
@@ -621,6 +785,35 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 return (PTL_NOSPACE);
         }
 
+        ktx->ktx_args[0] = private;
+        ktx->ktx_args[1] = libmsg;
+
+#if KQSW_OPTIMIZE_GETS
+        if (type == PTL_MSG_REPLY &&
+            ep_rxd_isrpc(((kqswnal_rx_t *)private)->krx_rxd)) {
+                if (nid != targetnid ||
+                    kqswnal_nid2elanid(nid) != 
+                    ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
+                        CERROR("Optimized reply nid conflict: "
+                               "nid "LPX64" via "LPX64" elanID %d\n",
+                               nid, targetnid,
+                               ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
+                        return(PTL_FAIL);
+                }
+
+                /* peer expects RPC completion with GET data */
+                rc = kqswnal_dma_reply (ktx,
+                                        payload_niov, payload_iov, 
+                                        payload_kiov, payload_nob);
+                if (rc == 0)
+                        return (0);
+                
+                CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
+                kqswnal_put_idle_tx (ktx);
+                return (PTL_FAIL);
+        }
+#endif
+
         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
 
@@ -644,13 +837,57 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         }
         memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
 #endif
-
+        
         /* Set up first frag from pre-mapped buffer (it's at least the
          * portals header) */
-        ktx->ktx_iov[0].Base = ktx->ktx_ebuffer;
-        ktx->ktx_iov[0].Len = KQSW_HDR_SIZE;
-        ktx->ktx_niov = 1;
+        ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer;
+        ktx->ktx_frags.iov[0].Len = KQSW_HDR_SIZE;
+        ktx->ktx_nfrag = 1;
+        ktx->ktx_state = KTX_SENDING;   /* => lib_finalize() on completion */
+
+#if KQSW_OPTIMIZE_GETS
+        if (type == PTL_MSG_GET &&              /* doing a GET */
+            nid == targetnid) {                 /* not forwarding */
+                lib_md_t           *md = libmsg->md;
+                kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
+                
+                /* Optimised path: I send over the Elan vaddrs of the get
+                 * sink buffers, and my peer DMAs directly into them.
+                 *
+                 * First I set up ktx as if it was going to send this
+                 * payload, (it needs to map it anyway).  This fills
+                 * ktx_frags.iov[1] and onward with the network addresses
+                 * of the get sink frags.  I copy these into ktx_buffer,
+                 * immediately after the header, and send that as my GET
+                 * message.
+                 *
+                 * Note that the addresses are sent in native endian-ness.
+                 * When EKC copes with different endian nodes, I'll fix
+                 * this (and eat my hat :) */
+
+                if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
+                        rc = kqswnal_map_tx_kiov (ktx, md->length,
+                                                  md->md_niov, md->md_iov.kiov);
+                else
+                        rc = kqswnal_map_tx_iov (ktx, md->length,
+                                                 md->md_niov, md->md_iov.iov);
+
+                if (rc < 0) {
+                        kqswnal_put_idle_tx (ktx);
+                        return (PTL_FAIL);
+                }
+
+                rmd->kqrmd_neiov = ktx->ktx_nfrag - 1;
+                memcpy (&rmd->kqrmd_eiov[0], &ktx->ktx_frags.iov[1],
+                        rmd->kqrmd_neiov * sizeof (EP_IOVEC));
 
+                ktx->ktx_nfrag = 1;
+                ktx->ktx_frags.iov[0].Len += offsetof (kqswnal_remotemd_t,
+                                                       kqrmd_eiov[rmd->kqrmd_neiov]);
+                payload_nob = ktx->ktx_frags.iov[0].Len;
+                ktx->ktx_state = KTX_GETTING;
+        } else 
+#endif
         if (payload_nob > 0) { /* got some payload (something more to do) */
                 /* make a single contiguous message? */
                 if (payload_nob <= KQSW_TX_MAXCONTIG) {
@@ -662,7 +899,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
                                                   payload_niov, payload_iov, payload_nob);
                         /* first frag includes payload */
-                        ktx->ktx_iov[0].Len += payload_nob;
+                        ktx->ktx_frags.iov[0].Len += payload_nob;
                 } else {
                         if (payload_kiov != NULL)
                                 rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
@@ -677,28 +914,26 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 } 
         }
 
-        ktx->ktx_port       = (payload_nob <= KQSW_SMALLPAYLOAD) ?
-                              EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
-        ktx->ktx_nid        = nid;
-        ktx->ktx_forwarding = 0;   /* => lib_finalize() on completion */
-        ktx->ktx_args[0]    = private;
-        ktx->ktx_args[1]    = cookie;
+        ktx->ktx_nid  = targetnid;
+        ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
+                        EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
 
         rc = kqswnal_launch (ktx);
         if (rc != 0) {                    /* failed? */
-                CERROR ("Failed to send packet to "LPX64": %d\n", nid, rc);
+                CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
                 kqswnal_put_idle_tx (ktx);
                 return (PTL_FAIL);
         }
 
-        CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64"\n", payload_nob, nid);
+        CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n", 
+               payload_nob, nid, targetnid);
         return (PTL_OK);
 }
 
 static int
 kqswnal_send (nal_cb_t     *nal,
               void         *private,
-              lib_msg_t    *cookie,
+              lib_msg_t    *libmsg,
               ptl_hdr_t    *hdr,
               int           type,
               ptl_nid_t     nid,
@@ -707,14 +942,14 @@ kqswnal_send (nal_cb_t     *nal,
               struct iovec *payload_iov,
               size_t        payload_nob)
 {
-        return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid,
+        return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
                                  payload_niov, payload_iov, NULL, payload_nob));
 }
 
 static int
 kqswnal_send_pages (nal_cb_t     *nal,
                     void         *private,
-                    lib_msg_t    *cookie,
+                    lib_msg_t    *libmsg,
                     ptl_hdr_t    *hdr,
                     int           type,
                     ptl_nid_t     nid,
@@ -723,7 +958,7 @@ kqswnal_send_pages (nal_cb_t     *nal,
                     ptl_kiov_t   *payload_kiov,
                     size_t        payload_nob)
 {
-        return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid,
+        return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
                                  payload_niov, NULL, payload_kiov, payload_nob));
 }
 
@@ -775,16 +1010,15 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         {
                 /* send from ktx's pre-allocated/mapped contiguous buffer? */
                 lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
-                ktx->ktx_iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
-                ktx->ktx_iov[0].Len = nob;
-                ktx->ktx_niov = 1;
-
+                ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
+                ktx->ktx_frags.iov[0].Len = nob;
+                ktx->ktx_nfrag = 1;
                 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
         }
         else
         {
                 /* zero copy */
-                ktx->ktx_niov = 0;        /* no frags mapped yet */
+                ktx->ktx_nfrag = 0;       /* no frags mapped yet */
                 rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
                 if (rc != 0)
                         goto failed;
@@ -792,11 +1026,11 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
                 ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
         }
 
-        ktx->ktx_port       = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
-                              EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
-        ktx->ktx_nid        = nid;
-        ktx->ktx_forwarding = 1;
-        ktx->ktx_args[0]    = fwd;
+        ktx->ktx_port    = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
+                        EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
+        ktx->ktx_nid     = nid;
+        ktx->ktx_state   = KTX_FORWARDING; /* kpr_put_packet() on completion */
+        ktx->ktx_args[0] = fwd;
 
         rc = kqswnal_launch (ktx);
         if (rc == 0)
@@ -830,6 +1064,97 @@ kqswnal_fwd_callback (void *arg, int error)
 }
 
 void
+kqswnal_reply_complete (EP_RXD *rxd) 
+{
+        int           status = ep_rxd_status(rxd);
+        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+        lib_msg_t    *msg = (lib_msg_t *)ktx->ktx_args[1];
+        
+        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+        LASSERT (krx->krx_rxd == rxd);
+
+        krx->krx_rpc_completed = 1;
+        kqswnal_requeue_rx (krx);
+
+        lib_finalize (&kqswnal_lib, NULL, msg);
+        kqswnal_put_idle_tx (ktx);
+}
+
+void
+kqswnal_rpc_complete (EP_RXD *rxd)
+{
+        int           status = ep_rxd_status(rxd);
+        kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
+        
+        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+               "rxd %p, krx %p, status %d\n", rxd, krx, status);
+
+        LASSERT (krx->krx_rxd == rxd);
+        
+        krx->krx_rpc_completed = 1;
+        kqswnal_requeue_rx (krx);
+}
+
+void
+kqswnal_requeue_rx (kqswnal_rx_t *krx)
+{
+        EP_STATUSBLK  blk;
+        int           rc;
+
+        LASSERT (atomic_read (&krx->krx_refcount) > 0);
+        if (!atomic_dec_and_test (&krx->krx_refcount))
+                return;
+
+        if (!ep_rxd_isrpc(krx->krx_rxd) ||
+            krx->krx_rpc_completed) {
+
+                /* don't actually requeue on shutdown */
+                if (kqswnal_data.kqn_shuttingdown)
+                        return;
+                
+                ep_requeue_receive (krx->krx_rxd, kqswnal_rxhandler, krx,
+                                    krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE);
+                return;
+        }
+
+        /* Sender wanted an RPC, but we didn't complete it (we must have
+         * dropped the sender's message).  We complete it now with
+         * failure... */
+        memset (&blk, 0, sizeof (blk));
+        blk.Status = -ECONNREFUSED;
+
+        atomic_set (&krx->krx_refcount, 1);
+
+        rc = ep_complete_rpc (krx->krx_rxd, 
+                              kqswnal_rpc_complete, krx,
+                              &blk, NULL, 0);
+        if (rc == ESUCCESS) {
+                /* callback will call me again to requeue, having set
+                 * krx_rpc_completed... */
+                return;
+        }
+
+        CERROR("can't complete RPC: %d\n", rc);
+
+        /* we don't actually requeue on shutdown */
+        if (kqswnal_data.kqn_shuttingdown)
+                return;
+
+        /* NB ep_complete_rpc() frees rxd on failure, so we have to requeue
+         * from scratch here... */
+        rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
+                              krx->krx_elanaddr, 
+                              krx->krx_npages * PAGE_SIZE, 0);
+
+        LASSERT (rc == ESUCCESS);
+        /* This needs to be fixed by ep_complete_rpc NOT freeing
+         * krx->krx_rxd on failure so we can just ep_requeue_receive() */
+}
+
+void
 kqswnal_rx (kqswnal_rx_t *krx)
 {
         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
@@ -851,6 +1176,7 @@ kqswnal_rx (kqswnal_rx_t *krx)
         {
                 CERROR("dropping packet from "LPX64" for "LPX64
                        ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
+
                 kqswnal_requeue_rx (krx);
                 return;
         }
@@ -886,21 +1212,26 @@ kqswnal_rxhandler(EP_RXD *rxd)
 
         krx->krx_rxd = rxd;
         krx->krx_nob = nob;
-
+        LASSERT (atomic_read (&krx->krx_refcount) == 0);
+        atomic_set (&krx->krx_refcount, 1);
+        krx->krx_rpc_completed = 0;
+        
         /* must receive a whole header to be able to parse */
         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
         {
                 /* receives complete with failure when receiver is removed */
-                if (kqswnal_data.kqn_shuttingdown)
-                        return;
+                if (!kqswnal_data.kqn_shuttingdown)
+                        CERROR("receive status failed with status %d nob %d\n",
+                               ep_rxd_status(rxd), nob);
 
-                CERROR("receive status failed with status %d nob %d\n",
-                       ep_rxd_status(rxd), nob);
                 kqswnal_requeue_rx (krx);
                 return;
         }
 
-        atomic_inc (&kqswnal_packets_received);
+        if (!in_interrupt()) {
+                kqswnal_rx (krx);
+                return;
+        }
 
         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
 
@@ -960,7 +1291,7 @@ kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
 static int
 kqswnal_recvmsg (nal_cb_t     *nal,
                  void         *private,
-                 lib_msg_t    *cookie,
+                 lib_msg_t    *libmsg,
                  unsigned int  niov,
                  struct iovec *iov,
                  ptl_kiov_t   *kiov,
@@ -1084,7 +1415,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
                        "csum_nob %d\n",
                         hdr_csum, payload_csum, csum_frags, csum_nob);
 #endif
-        lib_finalize(nal, private, cookie);
+        lib_finalize(nal, private, libmsg);
 
         kqswnal_requeue_rx (krx);
 
@@ -1094,25 +1425,25 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 static int
 kqswnal_recv(nal_cb_t     *nal,
              void         *private,
-             lib_msg_t    *cookie,
+             lib_msg_t    *libmsg,
              unsigned int  niov,
              struct iovec *iov,
              size_t        mlen,
              size_t        rlen)
 {
-        return (kqswnal_recvmsg (nal, private, cookie, niov, iov, NULL, mlen, rlen));
+        return (kqswnal_recvmsg (nal, private, libmsg, niov, iov, NULL, mlen, rlen));
 }
 
 static int
 kqswnal_recv_pages (nal_cb_t     *nal,
                     void         *private,
-                    lib_msg_t    *cookie,
+                    lib_msg_t    *libmsg,
                     unsigned int  niov,
                     ptl_kiov_t   *kiov,
                     size_t        mlen,
                     size_t        rlen)
 {
-        return (kqswnal_recvmsg (nal, private, cookie, niov, NULL, kiov, mlen, rlen));
+        return (kqswnal_recvmsg (nal, private, libmsg, niov, NULL, kiov, mlen, rlen));
 }
 
 int