Whamcloud - gitweb
* Applied the last patch in Bug 2306, which changes the portals router/NAL
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
index 8d5f70b..157dc70 100644 (file)
 
 #include "qswnal.h"
 
-atomic_t kqswnal_packets_launched;
-atomic_t kqswnal_packets_transmitted;
-atomic_t kqswnal_packets_received;
-
+EP_STATUSBLK  kqswnal_rpc_success;
+EP_STATUSBLK  kqswnal_rpc_failed;
 
 /*
  *  LIB functions follow
  *
  */
-static int
+static ptl_err_t
 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
              size_t len)
 {
@@ -43,10 +41,10 @@ kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
                 nal->ni.nid, len, src_addr, dst_addr );
         memcpy( dst_addr, src_addr, len );
 
-        return (0);
+        return (PTL_OK);
 }
 
-static int
+static ptl_err_t
 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
               size_t len)
 {
@@ -54,7 +52,7 @@ kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
                 nal->ni.nid, len, src_addr, dst_addr );
         memcpy( dst_addr, src_addr, len );
 
-        return (0);
+        return (PTL_OK);
 }
 
 static void *
@@ -119,41 +117,86 @@ kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
 }
 
 void
+kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
+{
+        struct timeval     now;
+        time_t             then;
+
+        do_gettimeofday (&now);
+        then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
+
+        kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
+}
+
+void
 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
 {
+#if MULTIRAIL_EKC
+        int      i;
+#endif
+
         if (ktx->ktx_nmappedpages == 0)
                 return;
-
+        
+#if MULTIRAIL_EKC
+        CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
+               ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
+
+        for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
+                ep_dvma_unload(kqswnal_data.kqn_ep,
+                               kqswnal_data.kqn_ep_tx_nmh,
+                               &ktx->ktx_frags[i]);
+#else
         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
-                ktx, ktx->ktx_niov, ktx->ktx_basepage, ktx->ktx_nmappedpages);
+                ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
 
         LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
 
-        elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
+        elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
                           kqswnal_data.kqn_eptxdmahandle,
                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
+#endif
         ktx->ktx_nmappedpages = 0;
 }
 
 int
-kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
+kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
 {
-        int       nfrags    = ktx->ktx_niov;
-        const int maxfrags  = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]);
+        int       nfrags    = ktx->ktx_nfrag;
         int       nmapped   = ktx->ktx_nmappedpages;
         int       maxmapped = ktx->ktx_npages;
         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
         char     *ptr;
+#if MULTIRAIL_EKC
+        EP_RAILMASK railmask;
+        int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+                                            EP_RAILMASK_ALL,
+                                            kqswnal_nid2elanid(ktx->ktx_nid));
         
+        if (rail < 0) {
+                CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
+                return (-ENETDOWN);
+        }
+        railmask = 1 << rail;
+#endif
         LASSERT (nmapped <= maxmapped);
-        LASSERT (nfrags <= maxfrags);
+        LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
+        LASSERT (nfrags <= EP_MAXFRAG);
         LASSERT (niov > 0);
         LASSERT (nob > 0);
-        
+
+        /* skip complete frags before 'offset' */
+        while (offset >= kiov->kiov_len) {
+                offset -= kiov->kiov_len;
+                kiov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+
         do {
-                int  fraglen = kiov->kiov_len;
+                int  fraglen = kiov->kiov_len - offset;
 
                 /* nob exactly spans the iovs */
                 LASSERT (fraglen <= nob);
@@ -167,74 +210,110 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
                         return (-EMSGSIZE);
                 }
 
-                if (nfrags == maxfrags) {
+                if (nfrags == EP_MAXFRAG) {
                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
-                               maxfrags);
+                               EP_MAXFRAG);
                         return (-EMSGSIZE);
                 }
 
                 /* XXX this is really crap, but we'll have to kmap until
                  * EKC has a page (rather than vaddr) mapping interface */
 
-                ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
+                ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
 
                 CDEBUG(D_NET,
                        "%p[%d] loading %p for %d, page %d, %d total\n",
                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
 
-                elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
+#if MULTIRAIL_EKC
+                ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+                             ptr, fraglen,
+                             kqswnal_data.kqn_ep_tx_nmh, basepage,
+                             &railmask, &ktx->ktx_frags[nfrags]);
+
+                if (nfrags == ktx->ktx_firsttmpfrag ||
+                    !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags])) {
+                        /* new frag if this is the first or can't merge */
+                        nfrags++;
+                }
+#else
+                elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
                                        kqswnal_data.kqn_eptxdmahandle,
                                        ptr, fraglen,
-                                       basepage, &ktx->ktx_iov[nfrags].Base);
-
-                kunmap (kiov->kiov_page);
-                
-                /* keep in loop for failure case */
-                ktx->ktx_nmappedpages = nmapped;
+                                       basepage, &ktx->ktx_frags[nfrags].Base);
 
                 if (nfrags > 0 &&                /* previous frag mapped */
-                    ktx->ktx_iov[nfrags].Base == /* contiguous with this one */
-                    (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len))
+                    ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
+                    (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
                         /* just extend previous */
-                        ktx->ktx_iov[nfrags - 1].Len += fraglen;
+                        ktx->ktx_frags[nfrags - 1].Len += fraglen;
                 else {
-                        ktx->ktx_iov[nfrags].Len = fraglen;
+                        ktx->ktx_frags[nfrags].Len = fraglen;
                         nfrags++;                /* new frag */
                 }
+#endif
+
+                kunmap (kiov->kiov_page);
+                
+                /* keep in loop for failure case */
+                ktx->ktx_nmappedpages = nmapped;
 
                 basepage++;
                 kiov++;
                 niov--;
                 nob -= fraglen;
+                offset = 0;
 
                 /* iov must not run out before end of data */
                 LASSERT (nob == 0 || niov > 0);
 
         } while (nob > 0);
 
-        ktx->ktx_niov = nfrags;
+        ktx->ktx_nfrag = nfrags;
         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
-                ktx, ktx->ktx_niov, ktx->ktx_nmappedpages);
+                ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
 
         return (0);
 }
 
 int
-kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
+kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
+                    int niov, struct iovec *iov)
 {
-        int       nfrags    = ktx->ktx_niov;
-        const int maxfrags  = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]);
+        int       nfrags    = ktx->ktx_nfrag;
         int       nmapped   = ktx->ktx_nmappedpages;
         int       maxmapped = ktx->ktx_npages;
         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
-
+#if MULTIRAIL_EKC
+        EP_RAILMASK railmask;
+        int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+                                            EP_RAILMASK_ALL,
+                                            kqswnal_nid2elanid(ktx->ktx_nid));
+        
+        if (rail < 0) {
+                CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
+                return (-ENETDOWN);
+        }
+        railmask = 1 << rail;
+#endif
         LASSERT (nmapped <= maxmapped);
-        LASSERT (nfrags <= maxfrags);
+        LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
+        LASSERT (nfrags <= EP_MAXFRAG);
         LASSERT (niov > 0);
         LASSERT (nob > 0);
 
+        /* skip complete frags before offset */
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                iov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+        
         do {
-                int  fraglen = iov->iov_len;
+                int  fraglen = iov->iov_len - offset;
                 long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
 
                 /* nob exactly spans the iovs */
@@ -247,51 +326,69 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
                         return (-EMSGSIZE);
                 }
 
-                if (nfrags == maxfrags) {
+                if (nfrags == EP_MAXFRAG) {
                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
-                               maxfrags);
+                               EP_MAXFRAG);
                         return (-EMSGSIZE);
                 }
 
                 CDEBUG(D_NET,
                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
-                        ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
-                        nmapped);
-
-                elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
+                       ktx, nfrags, iov->iov_base + offset, fraglen, 
+                       basepage, npages, nmapped);
+
+#if MULTIRAIL_EKC
+                ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+                             iov->iov_base + offset, fraglen,
+                             kqswnal_data.kqn_ep_tx_nmh, basepage,
+                             &railmask, &ktx->ktx_frags[nfrags]);
+
+                if (nfrags == ktx->ktx_firsttmpfrag ||
+                    !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags])) {
+                        /* new frag if this is the first or can't merge */
+                        nfrags++;
+                }
+#else
+                elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
                                        kqswnal_data.kqn_eptxdmahandle,
-                                       iov->iov_base, fraglen,
-                                       basepage, &ktx->ktx_iov[nfrags].Base);
-                /* keep in loop for failure case */
-                ktx->ktx_nmappedpages = nmapped;
+                                       iov->iov_base + offset, fraglen,
+                                       basepage, &ktx->ktx_frags[nfrags].Base);
 
                 if (nfrags > 0 &&                /* previous frag mapped */
-                    ktx->ktx_iov[nfrags].Base == /* contiguous with this one */
-                    (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len))
+                    ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
+                    (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
                         /* just extend previous */
-                        ktx->ktx_iov[nfrags - 1].Len += fraglen;
+                        ktx->ktx_frags[nfrags - 1].Len += fraglen;
                 else {
-                        ktx->ktx_iov[nfrags].Len = fraglen;
+                        ktx->ktx_frags[nfrags].Len = fraglen;
                         nfrags++;                /* new frag */
                 }
+#endif
+
+                /* keep in loop for failure case */
+                ktx->ktx_nmappedpages = nmapped;
 
                 basepage += npages;
                 iov++;
                 niov--;
                 nob -= fraglen;
+                offset = 0;
 
                 /* iov must not run out before end of data */
                 LASSERT (nob == 0 || niov > 0);
 
         } while (nob > 0);
 
-        ktx->ktx_niov = nfrags;
+        ktx->ktx_nfrag = nfrags;
         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
-                ktx, ktx->ktx_niov, ktx->ktx_nmappedpages);
+                ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
 
         return (0);
 }
 
+
 void
 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
 {
@@ -299,6 +396,7 @@ kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
         unsigned long     flags;
 
         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
+        ktx->ktx_state = KTX_IDLE;
 
         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
 
@@ -323,12 +421,7 @@ kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
                 list_del (&fwd->kprfd_list);
         }
 
-        if (waitqueue_active (&kqswnal_data.kqn_idletxd_waitq))  /* process? */
-        {
-                /* local sender waiting for tx desc */
-                CDEBUG(D_NET,"wakeup process\n");
-                wake_up (&kqswnal_data.kqn_idletxd_waitq);
-        }
+        wake_up (&kqswnal_data.kqn_idletxd_waitq);
 
         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
 
@@ -339,8 +432,7 @@ kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
 
         list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
-        if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
-                wake_up (&kqswnal_data.kqn_sched_waitq);
+        wake_up (&kqswnal_data.kqn_sched_waitq);
 
         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
 }
@@ -401,18 +493,52 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
 
         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
         LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
+
         return (ktx);
 }
 
 void
 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
 {
-        if (ktx->ktx_forwarding)                /* router asked me to forward this packet */
+        lib_msg_t     *msg;
+        lib_msg_t     *repmsg = NULL;
+
+        switch (ktx->ktx_state) {
+        case KTX_FORWARDING:       /* router asked me to forward this packet */
                 kpr_fwd_done (&kqswnal_data.kqn_router,
                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
-        else                                    /* packet sourced locally */
+                break;
+
+        case KTX_SENDING:          /* packet sourced locally */
                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
-                              (lib_msg_t *)ktx->ktx_args[1]);
+                              (lib_msg_t *)ktx->ktx_args[1],
+                              (error == 0) ? PTL_OK : 
+                              (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
+                break;
+
+        case KTX_GETTING:          /* Peer has DMA-ed direct? */
+                msg = (lib_msg_t *)ktx->ktx_args[1];
+
+                if (error == 0) {
+                        repmsg = lib_fake_reply_msg (&kqswnal_lib, 
+                                                     ktx->ktx_nid, msg->md);
+                        if (repmsg == NULL)
+                                error = -ENOMEM;
+                }
+                
+                if (error == 0) {
+                        lib_finalize (&kqswnal_lib, ktx->ktx_args[0], 
+                                      msg, PTL_OK);
+                        lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
+                } else {
+                        lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
+                                      (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
+                }
+                break;
+
+        default:
+                LASSERT (0);
+        }
 
         kqswnal_put_idle_tx (ktx);
 }
@@ -427,13 +553,24 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
 
         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
 
-        if (status == EP_SUCCESS)
-                atomic_inc (&kqswnal_packets_transmitted);
+        if (status != EP_SUCCESS) {
 
-        if (status != EP_SUCCESS)
-        {
-                CERROR ("kqswnal: Transmit failed with %d\n", status);
-                status = -EIO;
+                CERROR ("Tx completion to "LPX64" failed: %d\n", 
+                        ktx->ktx_nid, status);
+
+                kqswnal_notify_peer_down(ktx);
+                status = -EHOSTDOWN;
+
+        } else if (ktx->ktx_state == KTX_GETTING) {
+                /* RPC completed OK; what did our peer put in the status
+                 * block? */
+#if MULTIRAIL_EKC
+                status = ep_txd_statusblk(txd)->Data[0];
+#else
+                status = ep_txd_statusblk(txd)->Status;
+#endif
+        } else {
+                status = 0;
         }
 
         kqswnal_tx_done (ktx, status);
@@ -447,33 +584,60 @@ kqswnal_launch (kqswnal_tx_t *ktx)
         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
         long  flags;
         int   rc;
-        
-        LASSERT (dest >= 0);                    /* must be a peer */
-        rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
-                               ktx->ktx_port, attr, kqswnal_txhandler,
-                               ktx, ktx->ktx_iov, ktx->ktx_niov);
-        if (rc == 0)
-                atomic_inc (&kqswnal_packets_launched);
 
-        if (rc != ENOMEM)
-                return (rc);
+        ktx->ktx_launchtime = jiffies;
 
-        /* can't allocate ep txd => queue for later */
+        LASSERT (dest >= 0);                    /* must be a peer */
+        if (ktx->ktx_state == KTX_GETTING) {
+                /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
+                 * other frags are the GET sink which we obviously don't
+                 * send here :) */
+#if MULTIRAIL_EKC
+                rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
+                                     ktx->ktx_port, attr,
+                                     kqswnal_txhandler, ktx,
+                                     NULL, ktx->ktx_frags, 1);
+#else
+                rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
+                                     ktx->ktx_port, attr, kqswnal_txhandler,
+                                     ktx, NULL, ktx->ktx_frags, 1);
+#endif
+        } else {
+#if MULTIRAIL_EKC
+                rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
+                                         ktx->ktx_port, attr,
+                                         kqswnal_txhandler, ktx,
+                                         NULL, ktx->ktx_frags, ktx->ktx_nfrag);
+#else
+                rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
+                                       ktx->ktx_port, attr, 
+                                       kqswnal_txhandler, ktx, 
+                                       ktx->ktx_frags, ktx->ktx_nfrag);
+#endif
+        }
 
-        LASSERT (in_interrupt());      /* not called by thread (not looping) */
+        switch (rc) {
+        case EP_SUCCESS: /* success */
+                return (0);
 
-        spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
+        case EP_ENOMEM: /* can't allocate ep txd => queue for later */
+                LASSERT (in_interrupt());
+
+                spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
 
-        list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
-        if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
+                list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
                 wake_up (&kqswnal_data.kqn_sched_waitq);
 
-        spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+                spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+                return (0);
 
-        return (0);
+        default: /* fatal error */
+                CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
+                kqswnal_notify_peer_down(ktx);
+                return (-EHOSTUNREACH);
+        }
 }
 
-
 static char *
 hdr_type_string (ptl_hdr_t *hdr)
 {
@@ -496,8 +660,9 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr)
 {
         char *type_str = hdr_type_string (hdr);
 
-        CERROR("P3 Header at %p of type %s\n", hdr, type_str);
-        CERROR("    From nid/pid "LPU64"/%u", NTOH__u64(hdr->src_nid),
+        CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
+               NTOH__u32(hdr->payload_length));
+        CERROR("    From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
                NTOH__u32(hdr->src_pid));
         CERROR("    To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
                NTOH__u32(hdr->dest_pid));
@@ -510,8 +675,7 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr)
                        hdr->msg.put.ack_wmd.wh_interface_cookie,
                        hdr->msg.put.ack_wmd.wh_object_cookie,
                        NTOH__u64 (hdr->msg.put.match_bits));
-                CERROR("    Length %d, offset %d, hdr data "LPX64"\n",
-                       NTOH__u32(PTL_HDR_LENGTH(hdr)),
+                CERROR("    offset %d, hdr data "LPX64"\n",
                        NTOH__u32(hdr->msg.put.offset),
                        hdr->msg.put.hdr_data);
                 break;
@@ -536,18 +700,183 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr)
                 break;
 
         case PTL_MSG_REPLY:
-                CERROR("    dst md "LPX64"."LPX64", length %d\n",
+                CERROR("    dst md "LPX64"."LPX64"\n",
                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
-                       hdr->msg.reply.dst_wmd.wh_object_cookie,
-                       NTOH__u32 (PTL_HDR_LENGTH(hdr)));
+                       hdr->msg.reply.dst_wmd.wh_object_cookie);
         }
 
 }                               /* end of print_hdr() */
 
-static int
+#if !MULTIRAIL_EKC
+void
+kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) 
+{
+        int          i;
+
+        CDEBUG (how, "%s: %d\n", str, n);
+        for (i = 0; i < n; i++) {
+                CDEBUG (how, "   %08x for %d\n", iov[i].Base, iov[i].Len);
+        }
+}
+
+int
+kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
+                     int nsrc, EP_IOVEC *src,
+                     int ndst, EP_IOVEC *dst) 
+{
+        int        count;
+        int        nob;
+
+        LASSERT (ndv > 0);
+        LASSERT (nsrc > 0);
+        LASSERT (ndst > 0);
+
+        for (count = 0; count < ndv; count++, dv++) {
+
+                if (nsrc == 0 || ndst == 0) {
+                        if (nsrc != ndst) {
+                                /* For now I'll barf on any left over entries */
+                                CERROR ("mismatched src and dst iovs\n");
+                                return (-EINVAL);
+                        }
+                        return (count);
+                }
+
+                nob = (src->Len < dst->Len) ? src->Len : dst->Len;
+                dv->Len    = nob;
+                dv->Source = src->Base;
+                dv->Dest   = dst->Base;
+
+                if (nob >= src->Len) {
+                        src++;
+                        nsrc--;
+                } else {
+                        src->Len -= nob;
+                        src->Base += nob;
+                }
+                
+                if (nob >= dst->Len) {
+                        dst++;
+                        ndst--;
+                } else {
+                        src->Len -= nob;
+                        src->Base += nob;
+                }
+        }
+
+        CERROR ("DATAVEC too small\n");
+        return (-E2BIG);
+}
+#endif
+
+int
+kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
+                   struct iovec *iov, ptl_kiov_t *kiov, 
+                   int offset, int nob)
+{
+        kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+        char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
+        kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
+        int                 rc;
+#if MULTIRAIL_EKC
+        int                 i;
+#else
+        EP_DATAVEC          datav[EP_MAXFRAG];
+        int                 ndatav;
+#endif
+        LASSERT (krx->krx_rpc_reply_needed);
+        LASSERT ((iov == NULL) != (kiov == NULL));
+
+        /* see kqswnal_sendmsg comment regarding endian-ness */
+        if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
+                /* msg too small to discover rmd size */
+                CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
+                        krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
+                return (-EINVAL);
+        }
+        
+        if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
+                /* rmd doesn't fit in the incoming message */
+                CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
+                        krx->krx_nob, rmd->kqrmd_nfrag,
+                        (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
+                return (-EINVAL);
+        }
+
+        /* Map the source data... */
+        ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
+        if (kiov != NULL)
+                rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
+        else
+                rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
+
+        if (rc != 0) {
+                CERROR ("Can't map source data: %d\n", rc);
+                return (rc);
+        }
+
+#if MULTIRAIL_EKC
+        if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
+                CERROR("Can't cope with unequal # frags: %d local %d remote\n",
+                       ktx->ktx_nfrag, rmd->kqrmd_nfrag);
+                return (-EINVAL);
+        }
+        
+        for (i = 0; i < rmd->kqrmd_nfrag; i++)
+                if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
+                        CERROR("Can't cope with unequal frags %d(%d):"
+                               " %d local %d remote\n",
+                               i, rmd->kqrmd_nfrag, 
+                               ktx->ktx_frags[i].nmd_len, 
+                               rmd->kqrmd_frag[i].nmd_len);
+                        return (-EINVAL);
+                }
+#else
+        ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
+                                      ktx->ktx_nfrag, ktx->ktx_frags,
+                                      rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+        if (ndatav < 0) {
+                CERROR ("Can't create datavec: %d\n", ndatav);
+                return (ndatav);
+        }
+#endif
+
+        /* Our caller will start to race with kqswnal_dma_reply_complete... */
+        LASSERT (atomic_read (&krx->krx_refcount) == 1);
+        atomic_set (&krx->krx_refcount, 2);
+
+#if MULTIRAIL_EKC
+        rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx, 
+                             &kqswnal_rpc_success,
+                             ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
+        if (rc == EP_SUCCESS)
+                return (0);
+
+        /* Well we tried... */
+        krx->krx_rpc_reply_needed = 0;
+#else
+        rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
+                              &kqswnal_rpc_success, datav, ndatav);
+        if (rc == EP_SUCCESS)
+                return (0);
+
+        /* "old" EKC destroys rxd on failed completion */
+        krx->krx_rxd = NULL;
+#endif
+
+        CERROR("can't complete RPC: %d\n", rc);
+
+        /* reset refcount back to 1: we're not going to be racing with
+         * kqswnal_dma_reply_complete. */
+        atomic_set (&krx->krx_refcount, 1);
+
+        return (-ECONNABORTED);
+}
+
+static ptl_err_t
 kqswnal_sendmsg (nal_cb_t     *nal,
                  void         *private,
-                 lib_msg_t    *cookie,
+                 lib_msg_t    *libmsg,
                  ptl_hdr_t    *hdr,
                  int           type,
                  ptl_nid_t     nid,
@@ -555,14 +884,16 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                  unsigned int  payload_niov,
                  struct iovec *payload_iov,
                  ptl_kiov_t   *payload_kiov,
+                 size_t        payload_offset,
                  size_t        payload_nob)
 {
         kqswnal_tx_t      *ktx;
         int                rc;
-        ptl_nid_t          gatewaynid;
+        ptl_nid_t          targetnid;
 #if KQSW_CHECKSUM
         int                i;
         kqsw_csum_t        csum;
+        int                sumoff;
         int                sumnob;
 #endif
         
@@ -583,19 +914,20 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 return (PTL_FAIL);
         }
 
+        targetnid = nid;
         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
-                rc = kpr_lookup (&kqswnal_data.kqn_router, nid, &gatewaynid);
+                rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
+                                 sizeof (ptl_hdr_t) + payload_nob, &targetnid);
                 if (rc != 0) {
                         CERROR("Can't route to "LPX64": router error %d\n",
                                nid, rc);
                         return (PTL_FAIL);
                 }
-                if (kqswnal_nid2elanid (gatewaynid) < 0) {
+                if (kqswnal_nid2elanid (targetnid) < 0) {
                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
-                               gatewaynid, nid);
+                               targetnid, nid);
                         return (PTL_FAIL);
                 }
-                nid = gatewaynid;
         }
 
         /* I may not block for a transmit descriptor if I might block the
@@ -608,120 +940,234 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 return (PTL_NOSPACE);
         }
 
+        ktx->ktx_nid     = targetnid;
+        ktx->ktx_args[0] = private;
+        ktx->ktx_args[1] = libmsg;
+
+        if (type == PTL_MSG_REPLY &&
+            ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
+                if (nid != targetnid ||
+                    kqswnal_nid2elanid(nid) != 
+                    ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
+                        CERROR("Optimized reply nid conflict: "
+                               "nid "LPX64" via "LPX64" elanID %d\n",
+                               nid, targetnid,
+                               ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
+                        return (PTL_FAIL);
+                }
+
+                /* peer expects RPC completion with GET data */
+                rc = kqswnal_dma_reply (ktx, payload_niov, 
+                                        payload_iov, payload_kiov, 
+                                        payload_offset, payload_nob);
+                if (rc == 0)
+                        return (PTL_OK);
+                
+                CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
+                kqswnal_put_idle_tx (ktx);
+                return (PTL_FAIL);
+        }
+
         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
 
 #if KQSW_CHECKSUM
         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
-        for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) {
+        for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
+                LASSERT(i < niov);
                 if (payload_kiov != NULL) {
                         ptl_kiov_t *kiov = &payload_kiov[i];
-                        char       *addr = ((char *)kmap (kiov->kiov_page)) +
-                                           kiov->kiov_offset;
-                        
-                        csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len));
-                        sumnob -= kiov->kiov_len;
+
+                        if (sumoff >= kiov->kiov_len) {
+                                sumoff -= kiov->kiov_len;
+                        } else {
+                                char *addr = ((char *)kmap (kiov->kiov_page)) +
+                                             kiov->kiov_offset + sumoff;
+                                int   fragnob = kiov->kiov_len - sumoff;
+
+                                csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
+                                sumnob -= fragnob;
+                                sumoff = 0;
+                                kunmap(kiov->kiov_page);
+                        }
                 } else {
                         struct iovec *iov = &payload_iov[i];
 
-                        csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len));
-                        sumnob -= iov->iov_len;
+                        if (sumoff > iov->iov_len) {
+                                sumoff -= iov->iov_len;
+                        } else {
+                                char *addr = iov->iov_base + sumoff;
+                                int   fragnob = iov->iov_len - sumoff;
+                                
+                                csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
+                                sumnob -= fragnob;
+                                sumoff = 0;
+                        }
                 }
         }
-        memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
+        memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
 #endif
 
-        /* Set up first frag from pre-mapped buffer (it's at least the
-         * portals header) */
-        ktx->ktx_iov[0].Base = ktx->ktx_ebuffer;
-        ktx->ktx_iov[0].Len = KQSW_HDR_SIZE;
-        ktx->ktx_niov = 1;
+        if (kqswnal_data.kqn_optimized_gets &&
+            type == PTL_MSG_GET &&              /* doing a GET */
+            nid == targetnid) {                 /* not forwarding */
+                lib_md_t           *md = libmsg->md;
+                kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
+                
+                /* Optimised path: I send over the Elan vaddrs of the get
+                 * sink buffers, and my peer DMAs directly into them.
+                 *
+                 * First I set up ktx as if it was going to send this
+                 * payload, (it needs to map it anyway).  This fills
+                 * ktx_frags[1] and onward with the network addresses
+                 * of the GET sink frags.  I copy these into ktx_buffer,
+                 * immediately after the header, and send that as my GET
+                 * message.
+                 *
+                 * Note that the addresses are sent in native endian-ness.
+                 * When EKC copes with different endian nodes, I'll fix
+                 * this (and eat my hat :) */
+
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+                ktx->ktx_state = KTX_GETTING;
+
+                if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
+                        rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
+                                                  md->md_niov, md->md_iov.kiov);
+                else
+                        rc = kqswnal_map_tx_iov (ktx, 0, md->length,
+                                                 md->md_niov, md->md_iov.iov);
+
+                if (rc < 0) {
+                        kqswnal_put_idle_tx (ktx);
+                        return (PTL_FAIL);
+                }
+
+                rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
 
-        if (payload_nob > 0) { /* got some payload (something more to do) */
-                /* make a single contiguous message? */
-                if (payload_nob <= KQSW_TX_MAXCONTIG) {
-                        /* copy payload to ktx_buffer, immediately after hdr */
+                payload_nob = offsetof(kqswnal_remotemd_t,
+                                       kqrmd_frag[rmd->kqrmd_nfrag]);
+                LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
+
+#if MULTIRAIL_EKC
+                memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
+                       rmd->kqrmd_nfrag * sizeof(EP_NMD));
+
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE + payload_nob);
+#else
+                memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
+                       rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
+                
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
+#endif
+        } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
+
+                /* small message: single frag copied into the pre-mapped buffer */
+
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+                ktx->ktx_state = KTX_SENDING;
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE + payload_nob);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
+#endif
+                if (payload_nob > 0) {
                         if (payload_kiov != NULL)
                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
-                                                   payload_niov, payload_kiov, payload_nob);
+                                                   payload_niov, payload_kiov, 
+                                                   payload_offset, payload_nob);
                         else
                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
-                                                  payload_niov, payload_iov, payload_nob);
-                        /* first frag includes payload */
-                        ktx->ktx_iov[0].Len += payload_nob;
-                } else {
-                        if (payload_kiov != NULL)
-                                rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
-                                                          payload_niov, payload_kiov);
-                        else
-                                rc = kqswnal_map_tx_iov (ktx, payload_nob,
-                                                         payload_niov, payload_iov);
-                        if (rc != 0) {
-                                kqswnal_put_idle_tx (ktx);
-                                return (PTL_FAIL);
-                        }
-                } 
+                                                  payload_niov, payload_iov, 
+                                                  payload_offset, payload_nob);
+                }
+        } else {
+
+                /* large message: multiple frags: first is hdr in pre-mapped buffer */
+
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+                ktx->ktx_state = KTX_SENDING;
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
+#endif
+                if (payload_kiov != NULL)
+                        rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
+                                                  payload_niov, payload_kiov);
+                else
+                        rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
+                                                 payload_niov, payload_iov);
+                if (rc != 0) {
+                        kqswnal_put_idle_tx (ktx);
+                        return (PTL_FAIL);
+                }
         }
-
-        ktx->ktx_port       = (payload_nob <= KQSW_SMALLPAYLOAD) ?
-                              EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
-        ktx->ktx_nid        = nid;
-        ktx->ktx_forwarding = 0;   /* => lib_finalize() on completion */
-        ktx->ktx_args[0]    = private;
-        ktx->ktx_args[1]    = cookie;
+        
+        ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
+                        EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
 
         rc = kqswnal_launch (ktx);
         if (rc != 0) {                    /* failed? */
-                CERROR ("Failed to send packet to "LPX64": %d\n", nid, rc);
+                CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
                 kqswnal_put_idle_tx (ktx);
                 return (PTL_FAIL);
         }
 
-        CDEBUG(D_NET, "send to "LPSZ" bytes to "LPX64"\n", payload_nob, nid);
+        CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n", 
+               payload_nob, nid, targetnid);
         return (PTL_OK);
 }
 
-static int
+static ptl_err_t
 kqswnal_send (nal_cb_t     *nal,
               void         *private,
-              lib_msg_t    *cookie,
+              lib_msg_t    *libmsg,
               ptl_hdr_t    *hdr,
               int           type,
               ptl_nid_t     nid,
               ptl_pid_t     pid,
               unsigned int  payload_niov,
               struct iovec *payload_iov,
+              size_t        payload_offset,
               size_t        payload_nob)
 {
-        return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid,
-                                 payload_niov, payload_iov, NULL, payload_nob));
+        return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
+                                 payload_niov, payload_iov, NULL, 
+                                 payload_offset, payload_nob));
 }
 
-static int
+static ptl_err_t
 kqswnal_send_pages (nal_cb_t     *nal,
                     void         *private,
-                    lib_msg_t    *cookie,
+                    lib_msg_t    *libmsg,
                     ptl_hdr_t    *hdr,
                     int           type,
                     ptl_nid_t     nid,
                     ptl_pid_t     pid,
                     unsigned int  payload_niov,
                     ptl_kiov_t   *payload_kiov,
+                    size_t        payload_offset,
                     size_t        payload_nob)
 {
-        return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid,
-                                 payload_niov, NULL, payload_kiov, payload_nob));
+        return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
+                                 payload_niov, NULL, payload_kiov, 
+                                 payload_offset, payload_nob));
 }
 
-int kqswnal_fwd_copy_contig = 0;
-
 void
 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 {
         int             rc;
         kqswnal_tx_t   *ktx;
-        struct iovec   *iov = fwd->kprfd_iov;
+        ptl_kiov_t     *kiov = fwd->kprfd_kiov;
         int             niov = fwd->kprfd_niov;
         int             nob = fwd->kprfd_nob;
         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
@@ -731,12 +1177,10 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         LBUG ();
 #endif
         /* The router wants this NAL to forward a packet */
-        CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
+        CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
                 fwd, nid, niov, nob);
 
-        LASSERT (niov > 0);
-        
-        ktx = kqswnal_get_idle_tx (fwd, FALSE);
+        ktx = kqswnal_get_idle_tx (fwd, 0);
         if (ktx == NULL)        /* can't get txd right now */
                 return;         /* fwd will be scheduled when tx desc freed */
 
@@ -749,42 +1193,46 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
                 goto failed;
         }
 
-        if (nob > KQSW_NRXMSGBYTES_LARGE) {
-                CERROR ("Can't forward [%p] to "LPX64
-                        ": size %d bigger than max packet size %ld\n",
-                        fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
-                rc = -EMSGSIZE;
-                goto failed;
-        }
+        /* copy hdr into pre-mapped buffer */
+        memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
+        ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
 
-        if ((kqswnal_fwd_copy_contig || niov > 1) &&
-            nob <= KQSW_TX_BUFFER_SIZE) 
-        {
-                /* send from ktx's pre-allocated/mapped contiguous buffer? */
-                lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
-                ktx->ktx_iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
-                ktx->ktx_iov[0].Len = nob;
-                ktx->ktx_niov = 1;
+        ktx->ktx_port    = (nob <= KQSW_SMALLPAYLOAD) ?
+                           EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
+        ktx->ktx_nid     = nid;
+        ktx->ktx_state   = KTX_FORWARDING;
+        ktx->ktx_args[0] = fwd;
+        ktx->ktx_nfrag   = ktx->ktx_firsttmpfrag = 1;
 
-                ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
+        if (nob <= KQSW_TX_MAXCONTIG) 
+        {
+                /* send payload from ktx's pre-mapped contiguous buffer */
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE + nob);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
+#endif
+                if (nob > 0)
+                        lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
+                                          niov, kiov, 0, nob);
         }
         else
         {
-                /* zero copy */
-                ktx->ktx_niov = 0;        /* no frags mapped yet */
-                rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
+                /* zero copy payload */
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
+#endif
+                rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
                 if (rc != 0)
                         goto failed;
-
-                ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
         }
 
-        ktx->ktx_port       = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
-                              EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
-        ktx->ktx_nid        = nid;
-        ktx->ktx_forwarding = 1;
-        ktx->ktx_args[0]    = fwd;
-
         rc = kqswnal_launch (ktx);
         if (rc == 0)
                 return;
@@ -807,7 +1255,7 @@ kqswnal_fwd_callback (void *arg, int error)
 
         if (error != 0)
         {
-                ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+                ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
 
                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
@@ -817,16 +1265,122 @@ kqswnal_fwd_callback (void *arg, int error)
 }
 
 void
+kqswnal_dma_reply_complete (EP_RXD *rxd) 
+{
+        int           status = ep_rxd_status(rxd);
+        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+        lib_msg_t    *msg = (lib_msg_t *)ktx->ktx_args[1];
+        
+        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+        LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
+
+        krx->krx_rpc_reply_needed = 0;
+        kqswnal_rx_done (krx);
+
+        lib_finalize (&kqswnal_lib, NULL, msg,
+                      (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
+        kqswnal_put_idle_tx (ktx);
+}
+
+void
+kqswnal_rpc_complete (EP_RXD *rxd)
+{
+        int           status = ep_rxd_status(rxd);
+        kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
+        
+        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+               "rxd %p, krx %p, status %d\n", rxd, krx, status);
+
+        LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
+        
+        krx->krx_rpc_reply_needed = 0;
+        kqswnal_requeue_rx (krx);
+}
+
+void
+kqswnal_requeue_rx (kqswnal_rx_t *krx) 
+{
+        int   rc;
+
+        LASSERT (atomic_read(&krx->krx_refcount) == 0);
+
+        if (krx->krx_rpc_reply_needed) {
+
+                /* We failed to complete the peer's optimized GET (e.g. we
+                 * couldn't map the source buffers).  We complete the
+                 * peer's EKC rpc now with failure. */
+#if MULTIRAIL_EKC
+                rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
+                                     &kqswnal_rpc_failed, NULL, NULL, 0);
+                if (rc == EP_SUCCESS)
+                        return;
+                
+                CERROR("can't complete RPC: %d\n", rc);
+#else
+                if (krx->krx_rxd != NULL) {
+                        /* We didn't try (and fail) to complete earlier... */
+                        rc = ep_complete_rpc(krx->krx_rxd, 
+                                             kqswnal_rpc_complete, krx,
+                                             &kqswnal_rpc_failed, NULL, 0);
+                        if (rc == EP_SUCCESS)
+                                return;
+
+                        CERROR("can't complete RPC: %d\n", rc);
+                }
+                
+                /* NB the old ep_complete_rpc() frees rxd on failure, so we
+                 * have to requeue from scratch here, unless we're shutting
+                 * down */
+                if (kqswnal_data.kqn_shuttingdown)
+                        return;
+
+                rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
+                                      krx->krx_elanbuffer, 
+                                      krx->krx_npages * PAGE_SIZE, 0);
+                LASSERT (rc == EP_SUCCESS);
+                /* We don't handle failure here; it's incredibly rare
+                 * (never reported?) and only happens with "old" EKC */
+                return;
+#endif
+        }
+
+#if MULTIRAIL_EKC
+        if (kqswnal_data.kqn_shuttingdown) {
+                /* free EKC rxd on shutdown */
+                ep_complete_receive(krx->krx_rxd);
+        } else {
+                /* repost receive */
+                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+                                   &krx->krx_elanbuffer, 0);
+        }
+#else                
+        /* don't actually requeue on shutdown */
+        if (!kqswnal_data.kqn_shuttingdown) 
+                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+                                   krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
+#endif
+}
+        
+void
 kqswnal_rx (kqswnal_rx_t *krx)
 {
-        ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
+        ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
+        int             payload_nob;
         int             nob;
         int             niov;
 
+        LASSERT (atomic_read(&krx->krx_refcount) == 0);
+
         if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
-                /* NB krx requeued when lib_parse() calls back kqswnal_recv */
+                atomic_set(&krx->krx_refcount, 1);
                 lib_parse (&kqswnal_lib, hdr, krx);
+                kqswnal_rx_done(krx);
                 return;
         }
 
@@ -838,20 +1392,31 @@ kqswnal_rx (kqswnal_rx_t *krx)
         {
                 CERROR("dropping packet from "LPX64" for "LPX64
                        ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
+
                 kqswnal_requeue_rx (krx);
                 return;
         }
 
-        /* NB forwarding may destroy iov; rebuild every time */
-        for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
-        {
-                LASSERT (niov < krx->krx_npages);
-                krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
-                krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
+        nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
+        niov = 0;
+        if (nob > 0) {
+                krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
+                krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
+                niov = 1;
+                nob -= PAGE_SIZE - KQSW_HDR_SIZE;
+                
+                while (nob > 0) {
+                        LASSERT (niov < krx->krx_npages);
+                        
+                        krx->krx_kiov[niov].kiov_offset = 0;
+                        krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
+                        niov++;
+                        nob -= PAGE_SIZE;
+                }
         }
 
-        kpr_fwd_init (&krx->krx_fwd, dest_nid,
-                      krx->krx_nob, niov, krx->krx_iov,
+        kpr_fwd_init (&krx->krx_fwd, dest_nid, 
+                      hdr, payload_nob, niov, krx->krx_kiov,
                       kqswnal_fwd_callback, krx);
 
         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
@@ -873,27 +1438,40 @@ kqswnal_rxhandler(EP_RXD *rxd)
 
         krx->krx_rxd = rxd;
         krx->krx_nob = nob;
-
+#if MULTIRAIL_EKC
+        krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
+#else
+        krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
+#endif
+        
         /* must receive a whole header to be able to parse */
         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
         {
                 /* receives complete with failure when receiver is removed */
-                if (kqswnal_data.kqn_shuttingdown)
-                        return;
-
-                CERROR("receive status failed with status %d nob %d\n",
-                       ep_rxd_status(rxd), nob);
+#if MULTIRAIL_EKC
+                if (status == EP_SHUTDOWN)
+                        LASSERT (kqswnal_data.kqn_shuttingdown);
+                else
+                        CERROR("receive status failed with status %d nob %d\n",
+                               ep_rxd_status(rxd), nob);
+#else
+                if (!kqswnal_data.kqn_shuttingdown)
+                        CERROR("receive status failed with status %d nob %d\n",
+                               ep_rxd_status(rxd), nob);
+#endif
                 kqswnal_requeue_rx (krx);
                 return;
         }
 
-        atomic_inc (&kqswnal_packets_received);
+        if (!in_interrupt()) {
+                kqswnal_rx (krx);
+                return;
+        }
 
         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
 
         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
-        if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
-                wake_up (&kqswnal_data.kqn_sched_waitq);
+        wake_up (&kqswnal_data.kqn_sched_waitq);
 
         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
 }
@@ -902,7 +1480,7 @@ kqswnal_rxhandler(EP_RXD *rxd)
 void
 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
 {
-        ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+        ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
 
         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
                 ", dpid %d, spid %d, type %d\n",
@@ -945,17 +1523,19 @@ kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
 }
 #endif
 
-static int
+static ptl_err_t
 kqswnal_recvmsg (nal_cb_t     *nal,
                  void         *private,
-                 lib_msg_t    *cookie,
+                 lib_msg_t    *libmsg,
                  unsigned int  niov,
                  struct iovec *iov,
                  ptl_kiov_t   *kiov,
+                 size_t        offset,
                  size_t        mlen,
                  size_t        rlen)
 {
         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
+        char         *buffer = page_address(krx->krx_kiov[0].kiov_page);
         int           page;
         char         *page_ptr;
         int           page_nob;
@@ -965,8 +1545,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 #if KQSW_CHECKSUM
         kqsw_csum_t   senders_csum;
         kqsw_csum_t   payload_csum = 0;
-        kqsw_csum_t   hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
-                                           sizeof(ptl_hdr_t));
+        kqsw_csum_t   hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
         size_t        csum_len = mlen;
         int           csum_frags = 0;
         int           csum_nob = 0;
@@ -975,45 +1554,63 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 
         atomic_inc (&csum_counter);
 
-        memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
-                                sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
+        memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
         if (senders_csum != hdr_csum)
                 kqswnal_csum_error (krx, 1);
 #endif
         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
 
-        /* What was actually received must be >= payload.
-         * This is an LASSERT, as lib_finalize() doesn't have a completion status. */
-        LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen);
+        /* What was actually received must be >= payload. */
         LASSERT (mlen <= rlen);
+        if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
+                CERROR("Bad message size: have %d, need %d + %d\n",
+                       krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
+                return (PTL_FAIL);
+        }
 
         /* It must be OK to kmap() if required */
         LASSERT (kiov == NULL || !in_interrupt ());
         /* Either all pages or all vaddrs */
         LASSERT (!(kiov != NULL && iov != NULL));
-        
-        if (mlen != 0)
-        {
+
+        if (mlen != 0) {
                 page     = 0;
-                page_ptr = ((char *) page_address(krx->krx_pages[0])) +
-                        KQSW_HDR_SIZE;
+                page_ptr = buffer + KQSW_HDR_SIZE;
                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
 
                 LASSERT (niov > 0);
+
                 if (kiov != NULL) {
-                        iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
-                        iov_nob = kiov->kiov_len;
+                        /* skip complete frags */
+                        while (offset >= kiov->kiov_len) {
+                                offset -= kiov->kiov_len;
+                                kiov++;
+                                niov--;
+                                LASSERT (niov > 0);
+                        }
+                        iov_ptr = ((char *)kmap (kiov->kiov_page)) +
+                                kiov->kiov_offset + offset;
+                        iov_nob = kiov->kiov_len - offset;
                 } else {
-                        iov_ptr = iov->iov_base;
-                        iov_nob = iov->iov_len;
+                        /* skip complete frags */
+                        while (offset >= iov->iov_len) {
+                                offset -= iov->iov_len;
+                                iov++;
+                                niov--;
+                                LASSERT (niov > 0);
+                        }
+                        iov_ptr = iov->iov_base + offset;
+                        iov_nob = iov->iov_len - offset;
                 }
-
+                
                 for (;;)
                 {
-                        /* We expect the iov to exactly match mlen */
-                        LASSERT (iov_nob <= mlen);
-                        
-                        frag = MIN (page_nob, iov_nob);
+                        frag = mlen;
+                        if (frag > page_nob)
+                                frag = page_nob;
+                        if (frag > iov_nob)
+                                frag = iov_nob;
+
                         memcpy (iov_ptr, page_ptr, frag);
 #if KQSW_CHECKSUM
                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
@@ -1031,7 +1628,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
                         {
                                 page++;
                                 LASSERT (page < krx->krx_npages);
-                                page_ptr = page_address(krx->krx_pages[page]);
+                                page_ptr = page_address(krx->krx_kiov[page].kiov_page);
                                 page_nob = PAGE_SIZE;
                         }
 
@@ -1059,8 +1656,8 @@ kqswnal_recvmsg (nal_cb_t     *nal,
         }
 
 #if KQSW_CHECKSUM
-        memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
-                sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
+        memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), 
+                sizeof(kqsw_csum_t));
 
         if (csum_len != rlen)
                 CERROR("Unable to checksum data in user's buffer\n");
@@ -1072,35 +1669,39 @@ kqswnal_recvmsg (nal_cb_t     *nal,
                        "csum_nob %d\n",
                         hdr_csum, payload_csum, csum_frags, csum_nob);
 #endif
-        lib_finalize(nal, private, cookie);
-
-        kqswnal_requeue_rx (krx);
+        lib_finalize(nal, private, libmsg, PTL_OK);
 
-        return (rlen);
+        return (PTL_OK);
 }
 
-static int
+static ptl_err_t
 kqswnal_recv(nal_cb_t     *nal,
              void         *private,
-             lib_msg_t    *cookie,
+             lib_msg_t    *libmsg,
              unsigned int  niov,
              struct iovec *iov,
+             size_t        offset,
              size_t        mlen,
              size_t        rlen)
 {
-        return (kqswnal_recvmsg (nal, private, cookie, niov, iov, NULL, mlen, rlen));
+        return (kqswnal_recvmsg(nal, private, libmsg, 
+                                niov, iov, NULL, 
+                                offset, mlen, rlen));
 }
 
-static int
+static ptl_err_t
 kqswnal_recv_pages (nal_cb_t     *nal,
                     void         *private,
-                    lib_msg_t    *cookie,
+                    lib_msg_t    *libmsg,
                     unsigned int  niov,
                     ptl_kiov_t   *kiov,
+                    size_t        offset,
                     size_t        mlen,
                     size_t        rlen)
 {
-        return (kqswnal_recvmsg (nal, private, cookie, niov, NULL, kiov, mlen, rlen));
+        return (kqswnal_recvmsg(nal, private, libmsg, 
+                                niov, NULL, kiov, 
+                                offset, mlen, rlen));
 }
 
 int
@@ -1112,6 +1713,7 @@ kqswnal_thread_start (int (*fn)(void *arg), void *arg)
                 return ((int)pid);
 
         atomic_inc (&kqswnal_data.kqn_nthreads);
+        atomic_inc (&kqswnal_data.kqn_nthreads_running);
         return (0);
 }
 
@@ -1130,6 +1732,7 @@ kqswnal_scheduler (void *arg)
         long             flags;
         int              rc;
         int              counter = 0;
+        int              shuttingdown = 0;
         int              did_something;
 
         kportal_daemonize ("kqswnal_sched");
@@ -1137,9 +1740,21 @@ kqswnal_scheduler (void *arg)
         
         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
 
-        while (!kqswnal_data.kqn_shuttingdown)
+        for (;;)
         {
-                did_something = FALSE;
+                if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
+
+                        if (kqswnal_data.kqn_shuttingdown == 2)
+                                break;
+                
+                        /* During stage 1 of shutdown we are still responsive
+                         * to receives */
+
+                        atomic_dec (&kqswnal_data.kqn_nthreads_running);
+                        shuttingdown = kqswnal_data.kqn_shuttingdown;
+                }
+
+                did_something = 0;
 
                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
                 {
@@ -1151,11 +1766,12 @@ kqswnal_scheduler (void *arg)
 
                         kqswnal_rx (krx);
 
-                        did_something = TRUE;
+                        did_something = 1;
                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!list_empty (&kqswnal_data.kqn_delayedtxds))
+                if (!shuttingdown &&
+                    !list_empty (&kqswnal_data.kqn_delayedtxds))
                 {
                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
                                          kqswnal_tx_t, ktx_list);
@@ -1171,11 +1787,12 @@ kqswnal_scheduler (void *arg)
                                 kqswnal_tx_done (ktx, rc);
                         }
 
-                        did_something = TRUE;
+                        did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!list_empty (&kqswnal_data.kqn_delayedfwds))
+                if (!shuttingdown &
+                    !list_empty (&kqswnal_data.kqn_delayedfwds))
                 {
                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
                         list_del (&fwd->kprfd_list);
@@ -1183,7 +1800,7 @@ kqswnal_scheduler (void *arg)
 
                         kqswnal_fwd_packet (NULL, fwd);
 
-                        did_something = TRUE;
+                        did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
@@ -1196,7 +1813,7 @@ kqswnal_scheduler (void *arg)
 
                         if (!did_something) {
                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
-                                                               kqswnal_data.kqn_shuttingdown ||
+                                                               kqswnal_data.kqn_shuttingdown != shuttingdown ||
                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));