Whamcloud - gitweb
smash the HEAD with the contents of b_cmd. HEAD_PRE_CMD_SMASH and
[fs/lustre-release.git] / lustre / portals / knals / qswnal / qswnal_cb.c
index 43926c9..61c88f6 100644 (file)
 
 #include "qswnal.h"
 
+EP_STATUSBLK  kqswnal_rpc_success;
+EP_STATUSBLK  kqswnal_rpc_failed;
+
 /*
  *  LIB functions follow
  *
  */
-static int
+static ptl_err_t
 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
              size_t len)
 {
@@ -38,10 +41,10 @@ kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
                 nal->ni.nid, len, src_addr, dst_addr );
         memcpy( dst_addr, src_addr, len );
 
-        return (0);
+        return (PTL_OK);
 }
 
-static int
+static ptl_err_t
 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
               size_t len)
 {
@@ -49,7 +52,7 @@ kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
                 nal->ni.nid, len, src_addr, dst_addr );
         memcpy( dst_addr, src_addr, len );
 
-        return (0);
+        return (PTL_OK);
 }
 
 static void *
@@ -82,6 +85,9 @@ kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
         CDEBUG (D_NET, "%s", msg);
 }
 
+#if (defined(CONFIG_SPARC32) || defined(CONFIG_SPARC64))
+# error "Can't save/restore irq contexts in different procedures"
+#endif
 
 static void
 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
@@ -100,6 +106,17 @@ kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
         spin_unlock_irqrestore(&data->kqn_statelock, *flags);
 }
 
+static void
+kqswnal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
+{
+        /* holding kqn_statelock */
+
+        if (eq->event_callback != NULL)
+                eq->event_callback(ev);
+
+        if (waitqueue_active(&kqswnal_data.kqn_yield_waitq))
+                wake_up_all(&kqswnal_data.kqn_yield_waitq);
+}
 
 static int
 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
@@ -128,9 +145,22 @@ kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
 void
 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
 {
+#if MULTIRAIL_EKC
+        int      i;
+#endif
+
         if (ktx->ktx_nmappedpages == 0)
                 return;
-
+        
+#if MULTIRAIL_EKC
+        CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
+               ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
+
+        for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
+                ep_dvma_unload(kqswnal_data.kqn_ep,
+                               kqswnal_data.kqn_ep_tx_nmh,
+                               &ktx->ktx_frags[i]);
+#else
         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
                 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
 
@@ -138,28 +168,49 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx)
         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
 
-        elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
+        elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
                           kqswnal_data.kqn_eptxdmahandle,
                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
+#endif
         ktx->ktx_nmappedpages = 0;
 }
 
 int
-kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
+kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
 {
         int       nfrags    = ktx->ktx_nfrag;
         int       nmapped   = ktx->ktx_nmappedpages;
         int       maxmapped = ktx->ktx_npages;
         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
         char     *ptr;
+#if MULTIRAIL_EKC
+        EP_RAILMASK railmask;
+        int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+                                            EP_RAILMASK_ALL,
+                                            kqswnal_nid2elanid(ktx->ktx_nid));
         
+        if (rail < 0) {
+                CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
+                return (-ENETDOWN);
+        }
+        railmask = 1 << rail;
+#endif
         LASSERT (nmapped <= maxmapped);
+        LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
         LASSERT (nfrags <= EP_MAXFRAG);
         LASSERT (niov > 0);
         LASSERT (nob > 0);
-        
+
+        /* skip complete frags before 'offset' */
+        while (offset >= kiov->kiov_len) {
+                offset -= kiov->kiov_len;
+                kiov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+
         do {
-                int  fraglen = kiov->kiov_len;
+                int  fraglen = kiov->kiov_len - offset;
 
                 /* nob exactly spans the iovs */
                 LASSERT (fraglen <= nob);
@@ -182,36 +233,52 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
                 /* XXX this is really crap, but we'll have to kmap until
                  * EKC has a page (rather than vaddr) mapping interface */
 
-                ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
+                ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
 
                 CDEBUG(D_NET,
                        "%p[%d] loading %p for %d, page %d, %d total\n",
                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
 
-                elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
+#if MULTIRAIL_EKC
+                ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+                             ptr, fraglen,
+                             kqswnal_data.kqn_ep_tx_nmh, basepage,
+                             &railmask, &ktx->ktx_frags[nfrags]);
+
+                if (nfrags == ktx->ktx_firsttmpfrag ||
+                    !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags])) {
+                        /* new frag if this is the first or can't merge */
+                        nfrags++;
+                }
+#else
+                elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
                                        kqswnal_data.kqn_eptxdmahandle,
                                        ptr, fraglen,
-                                       basepage, &ktx->ktx_frags.iov[nfrags].Base);
-
-                kunmap (kiov->kiov_page);
-                
-                /* keep in loop for failure case */
-                ktx->ktx_nmappedpages = nmapped;
+                                       basepage, &ktx->ktx_frags[nfrags].Base);
 
                 if (nfrags > 0 &&                /* previous frag mapped */
-                    ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
-                    (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
+                    ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
+                    (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
                         /* just extend previous */
-                        ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
+                        ktx->ktx_frags[nfrags - 1].Len += fraglen;
                 else {
-                        ktx->ktx_frags.iov[nfrags].Len = fraglen;
+                        ktx->ktx_frags[nfrags].Len = fraglen;
                         nfrags++;                /* new frag */
                 }
+#endif
+
+                kunmap (kiov->kiov_page);
+                
+                /* keep in loop for failure case */
+                ktx->ktx_nmappedpages = nmapped;
 
                 basepage++;
                 kiov++;
                 niov--;
                 nob -= fraglen;
+                offset = 0;
 
                 /* iov must not run out before end of data */
                 LASSERT (nob == 0 || niov > 0);
@@ -226,20 +293,41 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
 }
 
 int
-kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
+kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
+                    int niov, struct iovec *iov)
 {
         int       nfrags    = ktx->ktx_nfrag;
         int       nmapped   = ktx->ktx_nmappedpages;
         int       maxmapped = ktx->ktx_npages;
         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
-
+#if MULTIRAIL_EKC
+        EP_RAILMASK railmask;
+        int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+                                            EP_RAILMASK_ALL,
+                                            kqswnal_nid2elanid(ktx->ktx_nid));
+        
+        if (rail < 0) {
+                CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
+                return (-ENETDOWN);
+        }
+        railmask = 1 << rail;
+#endif
         LASSERT (nmapped <= maxmapped);
+        LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
         LASSERT (nfrags <= EP_MAXFRAG);
         LASSERT (niov > 0);
         LASSERT (nob > 0);
 
+        /* skip complete frags before offset */
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                iov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+        
         do {
-                int  fraglen = iov->iov_len;
+                int  fraglen = iov->iov_len - offset;
                 long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
 
                 /* nob exactly spans the iovs */
@@ -260,30 +348,47 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
 
                 CDEBUG(D_NET,
                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
-                        ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
-                        nmapped);
-
-                elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
+                       ktx, nfrags, iov->iov_base + offset, fraglen, 
+                       basepage, npages, nmapped);
+
+#if MULTIRAIL_EKC
+                ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+                             iov->iov_base + offset, fraglen,
+                             kqswnal_data.kqn_ep_tx_nmh, basepage,
+                             &railmask, &ktx->ktx_frags[nfrags]);
+
+                if (nfrags == ktx->ktx_firsttmpfrag ||
+                    !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags])) {
+                        /* new frag if this is the first or can't merge */
+                        nfrags++;
+                }
+#else
+                elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
                                        kqswnal_data.kqn_eptxdmahandle,
-                                       iov->iov_base, fraglen,
-                                       basepage, &ktx->ktx_frags.iov[nfrags].Base);
-                /* keep in loop for failure case */
-                ktx->ktx_nmappedpages = nmapped;
+                                       iov->iov_base + offset, fraglen,
+                                       basepage, &ktx->ktx_frags[nfrags].Base);
 
                 if (nfrags > 0 &&                /* previous frag mapped */
-                    ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
-                    (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
+                    ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
+                    (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
                         /* just extend previous */
-                        ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
+                        ktx->ktx_frags[nfrags - 1].Len += fraglen;
                 else {
-                        ktx->ktx_frags.iov[nfrags].Len = fraglen;
+                        ktx->ktx_frags[nfrags].Len = fraglen;
                         nfrags++;                /* new frag */
                 }
+#endif
+
+                /* keep in loop for failure case */
+                ktx->ktx_nmappedpages = nmapped;
 
                 basepage += npages;
                 iov++;
                 niov--;
                 nob -= fraglen;
+                offset = 0;
 
                 /* iov must not run out before end of data */
                 LASSERT (nob == 0 || niov > 0);
@@ -410,7 +515,7 @@ void
 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
 {
         lib_msg_t     *msg;
-        lib_msg_t     *repmsg;
+        lib_msg_t     *repmsg = NULL;
 
         switch (ktx->ktx_state) {
         case KTX_FORWARDING:       /* router asked me to forward this packet */
@@ -420,22 +525,29 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
 
         case KTX_SENDING:          /* packet sourced locally */
                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
-                              (lib_msg_t *)ktx->ktx_args[1]);
+                              (lib_msg_t *)ktx->ktx_args[1],
+                              (error == 0) ? PTL_OK : 
+                              (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
                 break;
 
         case KTX_GETTING:          /* Peer has DMA-ed direct? */
-                LASSERT (KQSW_OPTIMIZE_GETS);
                 msg = (lib_msg_t *)ktx->ktx_args[1];
-                repmsg = NULL;
 
-                if (error == 0) 
-                        repmsg = lib_fake_reply_msg (&kqswnal_lib, 
-                                                     ktx->ktx_nid, msg->md);
+                if (error == 0) {
+                        repmsg = lib_create_reply_msg (&kqswnal_lib, 
+                                                       ktx->ktx_nid, msg);
+                        if (repmsg == NULL)
+                                error = -ENOMEM;
+                }
                 
-                lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg);
-
-                if (repmsg != NULL) 
-                        lib_finalize (&kqswnal_lib, NULL, repmsg);
+                if (error == 0) {
+                        lib_finalize (&kqswnal_lib, ktx->ktx_args[0], 
+                                      msg, PTL_OK);
+                        lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
+                } else {
+                        lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
+                                      (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
+                }
                 break;
 
         default:
@@ -455,19 +567,22 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
 
         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
 
-        if (status != EP_SUCCESS)
-        {
+        if (status != EP_SUCCESS) {
+
                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
                         ktx->ktx_nid, status);
 
                 kqswnal_notify_peer_down(ktx);
-                status = -EIO;
+                status = -EHOSTDOWN;
 
         } else if (ktx->ktx_state == KTX_GETTING) {
                 /* RPC completed OK; what did our peer put in the status
                  * block? */
-                LASSERT (KQSW_OPTIMIZE_GETS);
+#if MULTIRAIL_EKC
+                status = ep_txd_statusblk(txd)->Data[0];
+#else
                 status = ep_txd_statusblk(txd)->Status;
+#endif
         } else {
                 status = 0;
         }
@@ -488,21 +603,38 @@ kqswnal_launch (kqswnal_tx_t *ktx)
 
         LASSERT (dest >= 0);                    /* must be a peer */
         if (ktx->ktx_state == KTX_GETTING) {
-                LASSERT (KQSW_OPTIMIZE_GETS);
+                /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
+                 * other frags are the GET sink which we obviously don't
+                 * send here :) */
+#if MULTIRAIL_EKC
+                rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
+                                     ktx->ktx_port, attr,
+                                     kqswnal_txhandler, ktx,
+                                     NULL, ktx->ktx_frags, 1);
+#else
                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
                                      ktx->ktx_port, attr, kqswnal_txhandler,
-                                     ktx, NULL, ktx->ktx_frags.iov, ktx->ktx_nfrag);
+                                     ktx, NULL, ktx->ktx_frags, 1);
+#endif
         } else {
+#if MULTIRAIL_EKC
+                rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
+                                         ktx->ktx_port, attr,
+                                         kqswnal_txhandler, ktx,
+                                         NULL, ktx->ktx_frags, ktx->ktx_nfrag);
+#else
                 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
-                                       ktx->ktx_port, attr, kqswnal_txhandler,
-                                       ktx, ktx->ktx_frags.iov, ktx->ktx_nfrag);
+                                       ktx->ktx_port, attr, 
+                                       kqswnal_txhandler, ktx, 
+                                       ktx->ktx_frags, ktx->ktx_nfrag);
+#endif
         }
 
         switch (rc) {
-        case ESUCCESS: /* success */
+        case EP_SUCCESS: /* success */
                 return (0);
 
-        case ENOMEM: /* can't allocate ep txd => queue for later */
+        case EP_ENOMEM: /* can't allocate ep txd => queue for later */
                 LASSERT (in_interrupt());
 
                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
@@ -516,7 +648,7 @@ kqswnal_launch (kqswnal_tx_t *ktx)
         default: /* fatal error */
                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
                 kqswnal_notify_peer_down(ktx);
-                return (rc);
+                return (-EHOSTUNREACH);
         }
 }
 
@@ -589,6 +721,7 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr)
 
 }                               /* end of print_hdr() */
 
+#if !MULTIRAIL_EKC
 void
 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) 
 {
@@ -648,22 +781,27 @@ kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
         CERROR ("DATAVEC too small\n");
         return (-E2BIG);
 }
+#endif
 
 int
 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
-                   struct iovec *iov, ptl_kiov_t *kiov, int nob)
+                   struct iovec *iov, ptl_kiov_t *kiov, 
+                   int offset, int nob)
 {
         kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
-        char               *buffer = (char *)page_address(krx->krx_pages[0]);
+        char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
-        EP_IOVEC            eiov[EP_MAXFRAG];
-        EP_STATUSBLK        blk;
         int                 rc;
-
-        LASSERT (ep_rxd_isrpc(krx->krx_rxd) && !krx->krx_rpc_completed);
+#if MULTIRAIL_EKC
+        int                 i;
+#else
+        EP_DATAVEC          datav[EP_MAXFRAG];
+        int                 ndatav;
+#endif
+        LASSERT (krx->krx_rpc_reply_needed);
         LASSERT ((iov == NULL) != (kiov == NULL));
 
-        /* see .*_pack_k?iov comment regarding endian-ness */
+        /* see kqswnal_sendmsg comment regarding endian-ness */
         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
                 /* msg too small to discover rmd size */
                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
@@ -671,56 +809,85 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
                 return (-EINVAL);
         }
         
-        if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) {
+        if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
                 /* rmd doesn't fit in the incoming message */
                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
-                        krx->krx_nob, rmd->kqrmd_neiov,
-                        (int)(((char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) - buffer));
+                        krx->krx_nob, rmd->kqrmd_nfrag,
+                        (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
                 return (-EINVAL);
         }
 
-        /* Ghastly hack part 1, uses the existing procedures to map the source data... */
-        ktx->ktx_nfrag = 0;
+        /* Map the source data... */
+        ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
         if (kiov != NULL)
-                rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov);
+                rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
         else
-                rc = kqswnal_map_tx_iov (ktx, nob, nfrag, iov);
+                rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
 
         if (rc != 0) {
                 CERROR ("Can't map source data: %d\n", rc);
                 return (rc);
         }
 
-        /* Ghastly hack part 2, copy out eiov so we can create the datav; Ugghh... */
-        memcpy (eiov, ktx->ktx_frags.iov, ktx->ktx_nfrag * sizeof (eiov[0]));
-
-        rc = kqswnal_eiovs2datav (EP_MAXFRAG, ktx->ktx_frags.datav,
-                                  ktx->ktx_nfrag, eiov,
-                                  rmd->kqrmd_neiov, rmd->kqrmd_eiov);
-        if (rc < 0) {
-                CERROR ("Can't create datavec: %d\n", rc);
-                return (rc);
+#if MULTIRAIL_EKC
+        if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
+                CERROR("Can't cope with unequal # frags: %d local %d remote\n",
+                       ktx->ktx_nfrag, rmd->kqrmd_nfrag);
+                return (-EINVAL);
         }
-        ktx->ktx_nfrag = rc;
-
-        memset (&blk, 0, sizeof (blk));         /* zero blk.Status */
+        
+        for (i = 0; i < rmd->kqrmd_nfrag; i++)
+                if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
+                        CERROR("Can't cope with unequal frags %d(%d):"
+                               " %d local %d remote\n",
+                               i, rmd->kqrmd_nfrag, 
+                               ktx->ktx_frags[i].nmd_len, 
+                               rmd->kqrmd_frag[i].nmd_len);
+                        return (-EINVAL);
+                }
+#else
+        ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
+                                      ktx->ktx_nfrag, ktx->ktx_frags,
+                                      rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+        if (ndatav < 0) {
+                CERROR ("Can't create datavec: %d\n", ndatav);
+                return (ndatav);
+        }
+#endif
 
-        /* Our caller will start to race with kqswnal_rpc_complete... */
+        /* Our caller will start to race with kqswnal_dma_reply_complete... */
         LASSERT (atomic_read (&krx->krx_refcount) == 1);
         atomic_set (&krx->krx_refcount, 2);
 
-        rc = ep_complete_rpc (krx->krx_rxd, kqswnal_reply_complete, ktx,
-                              &blk, ktx->ktx_frags.datav, ktx->ktx_nfrag);
-        if (rc == ESUCCESS)
+#if MULTIRAIL_EKC
+        rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx, 
+                             &kqswnal_rpc_success,
+                             ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
+        if (rc == EP_SUCCESS)
+                return (0);
+
+        /* Well we tried... */
+        krx->krx_rpc_reply_needed = 0;
+#else
+        rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
+                              &kqswnal_rpc_success, datav, ndatav);
+        if (rc == EP_SUCCESS)
                 return (0);
 
+        /* "old" EKC destroys rxd on failed completion */
+        krx->krx_rxd = NULL;
+#endif
+
+        CERROR("can't complete RPC: %d\n", rc);
+
         /* reset refcount back to 1: we're not going to be racing with
-         * kqswnal_rely_complete. */
+         * kqswnal_dma_reply_complete. */
         atomic_set (&krx->krx_refcount, 1);
+
         return (-ECONNABORTED);
 }
 
-static int
+static ptl_err_t
 kqswnal_sendmsg (nal_cb_t     *nal,
                  void         *private,
                  lib_msg_t    *libmsg,
@@ -731,6 +898,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                  unsigned int  payload_niov,
                  struct iovec *payload_iov,
                  ptl_kiov_t   *payload_kiov,
+                 size_t        payload_offset,
                  size_t        payload_nob)
 {
         kqswnal_tx_t      *ktx;
@@ -739,6 +907,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
 #if KQSW_CHECKSUM
         int                i;
         kqsw_csum_t        csum;
+        int                sumoff;
         int                sumnob;
 #endif
         
@@ -782,15 +951,15 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                           in_interrupt()));
         if (ktx == NULL) {
                 kqswnal_cerror_hdr (hdr);
-                return (PTL_NOSPACE);
+                return (PTL_NO_SPACE);
         }
 
+        ktx->ktx_nid     = targetnid;
         ktx->ktx_args[0] = private;
         ktx->ktx_args[1] = libmsg;
 
-#if KQSW_OPTIMIZE_GETS
         if (type == PTL_MSG_REPLY &&
-            ep_rxd_isrpc(((kqswnal_rx_t *)private)->krx_rxd)) {
+            ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
                 if (nid != targetnid ||
                     kqswnal_nid2elanid(nid) != 
                     ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
@@ -798,21 +967,20 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                "nid "LPX64" via "LPX64" elanID %d\n",
                                nid, targetnid,
                                ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
-                        return(PTL_FAIL);
+                        return (PTL_FAIL);
                 }
 
                 /* peer expects RPC completion with GET data */
-                rc = kqswnal_dma_reply (ktx,
-                                        payload_niov, payload_iov, 
-                                        payload_kiov, payload_nob);
+                rc = kqswnal_dma_reply (ktx, payload_niov, 
+                                        payload_iov, payload_kiov, 
+                                        payload_offset, payload_nob);
                 if (rc == 0)
-                        return (0);
+                        return (PTL_OK);
                 
                 CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
                 kqswnal_put_idle_tx (ktx);
                 return (PTL_FAIL);
         }
-#endif
 
         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
@@ -820,33 +988,43 @@ kqswnal_sendmsg (nal_cb_t     *nal,
 #if KQSW_CHECKSUM
         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
-        for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) {
+        for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
+                LASSERT(i < niov);
                 if (payload_kiov != NULL) {
                         ptl_kiov_t *kiov = &payload_kiov[i];
-                        char       *addr = ((char *)kmap (kiov->kiov_page)) +
-                                           kiov->kiov_offset;
-                        
-                        csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len));
-                        sumnob -= kiov->kiov_len;
+
+                        if (sumoff >= kiov->kiov_len) {
+                                sumoff -= kiov->kiov_len;
+                        } else {
+                                char *addr = ((char *)kmap (kiov->kiov_page)) +
+                                             kiov->kiov_offset + sumoff;
+                                int   fragnob = kiov->kiov_len - sumoff;
+
+                                csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
+                                sumnob -= fragnob;
+                                sumoff = 0;
+                                kunmap(kiov->kiov_page);
+                        }
                 } else {
                         struct iovec *iov = &payload_iov[i];
 
-                        csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len));
-                        sumnob -= iov->iov_len;
+                        if (sumoff > iov->iov_len) {
+                                sumoff -= iov->iov_len;
+                        } else {
+                                char *addr = iov->iov_base + sumoff;
+                                int   fragnob = iov->iov_len - sumoff;
+                                
+                                csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
+                                sumnob -= fragnob;
+                                sumoff = 0;
+                        }
                 }
         }
-        memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
+        memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
 #endif
-        
-        /* Set up first frag from pre-mapped buffer (it's at least the
-         * portals header) */
-        ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer;
-        ktx->ktx_frags.iov[0].Len = KQSW_HDR_SIZE;
-        ktx->ktx_nfrag = 1;
-        ktx->ktx_state = KTX_SENDING;   /* => lib_finalize() on completion */
-
-#if KQSW_OPTIMIZE_GETS
-        if (type == PTL_MSG_GET &&              /* doing a GET */
+
+        if (kqswnal_data.kqn_optimized_gets &&
+            type == PTL_MSG_GET &&              /* doing a GET */
             nid == targetnid) {                 /* not forwarding */
                 lib_md_t           *md = libmsg->md;
                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
@@ -856,8 +1034,8 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                  *
                  * First I set up ktx as if it was going to send this
                  * payload, (it needs to map it anyway).  This fills
-                 * ktx_frags.iov[1] and onward with the network addresses
-                 * of the get sink frags.  I copy these into ktx_buffer,
+                 * ktx_frags[1] and onward with the network addresses
+                 * of the GET sink frags.  I copy these into ktx_buffer,
                  * immediately after the header, and send that as my GET
                  * message.
                  *
@@ -865,11 +1043,14 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                  * When EKC copes with different endian nodes, I'll fix
                  * this (and eat my hat :) */
 
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+                ktx->ktx_state = KTX_GETTING;
+
                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
-                        rc = kqswnal_map_tx_kiov (ktx, md->length,
+                        rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
                                                   md->md_niov, md->md_iov.kiov);
                 else
-                        rc = kqswnal_map_tx_iov (ktx, md->length,
+                        rc = kqswnal_map_tx_iov (ktx, 0, md->length,
                                                  md->md_niov, md->md_iov.iov);
 
                 if (rc < 0) {
@@ -877,46 +1058,75 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                         return (PTL_FAIL);
                 }
 
-                rmd->kqrmd_neiov = ktx->ktx_nfrag - 1;
-                memcpy (&rmd->kqrmd_eiov[0], &ktx->ktx_frags.iov[1],
-                        rmd->kqrmd_neiov * sizeof (EP_IOVEC));
+                rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
 
-                ktx->ktx_nfrag = 1;
-                ktx->ktx_frags.iov[0].Len += offsetof (kqswnal_remotemd_t,
-                                                       kqrmd_eiov[rmd->kqrmd_neiov]);
-                payload_nob = ktx->ktx_frags.iov[0].Len;
-                ktx->ktx_state = KTX_GETTING;
-        } else 
+                payload_nob = offsetof(kqswnal_remotemd_t,
+                                       kqrmd_frag[rmd->kqrmd_nfrag]);
+                LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
+
+#if MULTIRAIL_EKC
+                memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
+                       rmd->kqrmd_nfrag * sizeof(EP_NMD));
+
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE + payload_nob);
+#else
+                memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
+                       rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
+                
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
+#endif
+        } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
+
+                /* small message: single frag copied into the pre-mapped buffer */
+
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+                ktx->ktx_state = KTX_SENDING;
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE + payload_nob);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
 #endif
-        if (payload_nob > 0) { /* got some payload (something more to do) */
-                /* make a single contiguous message? */
-                if (payload_nob <= KQSW_TX_MAXCONTIG) {
-                        /* copy payload to ktx_buffer, immediately after hdr */
+                if (payload_nob > 0) {
                         if (payload_kiov != NULL)
                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
-                                                   payload_niov, payload_kiov, payload_nob);
+                                                   payload_niov, payload_kiov, 
+                                                   payload_offset, payload_nob);
                         else
                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
-                                                  payload_niov, payload_iov, payload_nob);
-                        /* first frag includes payload */
-                        ktx->ktx_frags.iov[0].Len += payload_nob;
-                } else {
-                        if (payload_kiov != NULL)
-                                rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
-                                                          payload_niov, payload_kiov);
-                        else
-                                rc = kqswnal_map_tx_iov (ktx, payload_nob,
-                                                         payload_niov, payload_iov);
-                        if (rc != 0) {
-                                kqswnal_put_idle_tx (ktx);
-                                return (PTL_FAIL);
-                        }
-                } 
-        }
+                                                  payload_niov, payload_iov, 
+                                                  payload_offset, payload_nob);
+                }
+        } else {
+
+                /* large message: multiple frags: first is hdr in pre-mapped buffer */
 
-        ktx->ktx_nid  = targetnid;
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+                ktx->ktx_state = KTX_SENDING;
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
+#endif
+                if (payload_kiov != NULL)
+                        rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
+                                                  payload_niov, payload_kiov);
+                else
+                        rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
+                                                 payload_niov, payload_iov);
+                if (rc != 0) {
+                        kqswnal_put_idle_tx (ktx);
+                        return (PTL_FAIL);
+                }
+        }
+        
         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
-                        EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
+                        EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
 
         rc = kqswnal_launch (ktx);
         if (rc != 0) {                    /* failed? */
@@ -930,7 +1140,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         return (PTL_OK);
 }
 
-static int
+static ptl_err_t
 kqswnal_send (nal_cb_t     *nal,
               void         *private,
               lib_msg_t    *libmsg,
@@ -940,13 +1150,15 @@ kqswnal_send (nal_cb_t     *nal,
               ptl_pid_t     pid,
               unsigned int  payload_niov,
               struct iovec *payload_iov,
+              size_t        payload_offset,
               size_t        payload_nob)
 {
         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
-                                 payload_niov, payload_iov, NULL, payload_nob));
+                                 payload_niov, payload_iov, NULL, 
+                                 payload_offset, payload_nob));
 }
 
-static int
+static ptl_err_t
 kqswnal_send_pages (nal_cb_t     *nal,
                     void         *private,
                     lib_msg_t    *libmsg,
@@ -956,20 +1168,20 @@ kqswnal_send_pages (nal_cb_t     *nal,
                     ptl_pid_t     pid,
                     unsigned int  payload_niov,
                     ptl_kiov_t   *payload_kiov,
+                    size_t        payload_offset,
                     size_t        payload_nob)
 {
         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
-                                 payload_niov, NULL, payload_kiov, payload_nob));
+                                 payload_niov, NULL, payload_kiov, 
+                                 payload_offset, payload_nob));
 }
 
-int kqswnal_fwd_copy_contig = 0;
-
 void
 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 {
         int             rc;
         kqswnal_tx_t   *ktx;
-        struct iovec   *iov = fwd->kprfd_iov;
+        ptl_kiov_t     *kiov = fwd->kprfd_kiov;
         int             niov = fwd->kprfd_niov;
         int             nob = fwd->kprfd_nob;
         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
@@ -979,12 +1191,10 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         LBUG ();
 #endif
         /* The router wants this NAL to forward a packet */
-        CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
+        CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
                 fwd, nid, niov, nob);
 
-        LASSERT (niov > 0);
-        
-        ktx = kqswnal_get_idle_tx (fwd, FALSE);
+        ktx = kqswnal_get_idle_tx (fwd, 0);
         if (ktx == NULL)        /* can't get txd right now */
                 return;         /* fwd will be scheduled when tx desc freed */
 
@@ -997,41 +1207,46 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
                 goto failed;
         }
 
-        if (nob > KQSW_NRXMSGBYTES_LARGE) {
-                CERROR ("Can't forward [%p] to "LPX64
-                        ": size %d bigger than max packet size %ld\n",
-                        fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
-                rc = -EMSGSIZE;
-                goto failed;
-        }
+        /* copy hdr into pre-mapped buffer */
+        memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
+        ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
 
-        if ((kqswnal_fwd_copy_contig || niov > 1) &&
-            nob <= KQSW_TX_BUFFER_SIZE) 
+        ktx->ktx_port    = (nob <= KQSW_SMALLPAYLOAD) ?
+                           EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
+        ktx->ktx_nid     = nid;
+        ktx->ktx_state   = KTX_FORWARDING;
+        ktx->ktx_args[0] = fwd;
+        ktx->ktx_nfrag   = ktx->ktx_firsttmpfrag = 1;
+
+        if (nob <= KQSW_TX_MAXCONTIG) 
         {
-                /* send from ktx's pre-allocated/mapped contiguous buffer? */
-                lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
-                ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
-                ktx->ktx_frags.iov[0].Len = nob;
-                ktx->ktx_nfrag = 1;
-                ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
+                /* send payload from ktx's pre-mapped contiguous buffer */
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE + nob);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
+#endif
+                if (nob > 0)
+                        lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
+                                          niov, kiov, 0, nob);
         }
         else
         {
-                /* zero copy */
-                ktx->ktx_nfrag = 0;       /* no frags mapped yet */
-                rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
+                /* zero copy payload */
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
+#endif
+                rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
                 if (rc != 0)
                         goto failed;
-
-                ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
         }
 
-        ktx->ktx_port    = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
-                        EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
-        ktx->ktx_nid     = nid;
-        ktx->ktx_state   = KTX_FORWARDING; /* kpr_put_packet() on completion */
-        ktx->ktx_args[0] = fwd;
-
         rc = kqswnal_launch (ktx);
         if (rc == 0)
                 return;
@@ -1054,7 +1269,7 @@ kqswnal_fwd_callback (void *arg, int error)
 
         if (error != 0)
         {
-                ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+                ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
 
                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
@@ -1064,7 +1279,7 @@ kqswnal_fwd_callback (void *arg, int error)
 }
 
 void
-kqswnal_reply_complete (EP_RXD *rxd) 
+kqswnal_dma_reply_complete (EP_RXD *rxd) 
 {
         int           status = ep_rxd_status(rxd);
         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
@@ -1075,11 +1290,13 @@ kqswnal_reply_complete (EP_RXD *rxd)
                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
 
         LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
 
-        krx->krx_rpc_completed = 1;
-        kqswnal_requeue_rx (krx);
+        krx->krx_rpc_reply_needed = 0;
+        kqswnal_rx_done (krx);
 
-        lib_finalize (&kqswnal_lib, NULL, msg);
+        lib_finalize (&kqswnal_lib, NULL, msg,
+                      (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
         kqswnal_put_idle_tx (ktx);
 }
 
@@ -1093,78 +1310,91 @@ kqswnal_rpc_complete (EP_RXD *rxd)
                "rxd %p, krx %p, status %d\n", rxd, krx, status);
 
         LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
         
-        krx->krx_rpc_completed = 1;
+        krx->krx_rpc_reply_needed = 0;
         kqswnal_requeue_rx (krx);
 }
 
 void
-kqswnal_requeue_rx (kqswnal_rx_t *krx)
+kqswnal_requeue_rx (kqswnal_rx_t *krx) 
 {
-        EP_STATUSBLK  blk;
-        int           rc;
+        int   rc;
 
-        LASSERT (atomic_read (&krx->krx_refcount) > 0);
-        if (!atomic_dec_and_test (&krx->krx_refcount))
-                return;
+        LASSERT (atomic_read(&krx->krx_refcount) == 0);
 
-        if (!ep_rxd_isrpc(krx->krx_rxd) ||
-            krx->krx_rpc_completed) {
+        if (krx->krx_rpc_reply_needed) {
 
-                /* don't actually requeue on shutdown */
-                if (kqswnal_data.kqn_shuttingdown)
+                /* We failed to complete the peer's optimized GET (e.g. we
+                 * couldn't map the source buffers).  We complete the
+                 * peer's EKC rpc now with failure. */
+#if MULTIRAIL_EKC
+                rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
+                                     &kqswnal_rpc_failed, NULL, NULL, 0);
+                if (rc == EP_SUCCESS)
                         return;
                 
-                ep_requeue_receive (krx->krx_rxd, kqswnal_rxhandler, krx,
-                                    krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE);
-                return;
-        }
-
-        /* Sender wanted an RPC, but we didn't complete it (we must have
-         * dropped the sender's message).  We complete it now with
-         * failure... */
-        memset (&blk, 0, sizeof (blk));
-        blk.Status = -ECONNREFUSED;
-
-        atomic_set (&krx->krx_refcount, 1);
+                CERROR("can't complete RPC: %d\n", rc);
+#else
+                if (krx->krx_rxd != NULL) {
+                        /* We didn't try (and fail) to complete earlier... */
+                        rc = ep_complete_rpc(krx->krx_rxd, 
+                                             kqswnal_rpc_complete, krx,
+                                             &kqswnal_rpc_failed, NULL, 0);
+                        if (rc == EP_SUCCESS)
+                                return;
+
+                        CERROR("can't complete RPC: %d\n", rc);
+                }
+                
+                /* NB the old ep_complete_rpc() frees rxd on failure, so we
+                 * have to requeue from scratch here, unless we're shutting
+                 * down */
+                if (kqswnal_data.kqn_shuttingdown)
+                        return;
 
-        rc = ep_complete_rpc (krx->krx_rxd, 
-                              kqswnal_rpc_complete, krx,
-                              &blk, NULL, 0);
-        if (rc == ESUCCESS) {
-                /* callback will call me again to requeue, having set
-                 * krx_rpc_completed... */
+                rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
+                                      krx->krx_elanbuffer, 
+                                      krx->krx_npages * PAGE_SIZE, 0);
+                LASSERT (rc == EP_SUCCESS);
+                /* We don't handle failure here; it's incredibly rare
+                 * (never reported?) and only happens with "old" EKC */
                 return;
+#endif
         }
 
-        CERROR("can't complete RPC: %d\n", rc);
-
-        /* we don't actually requeue on shutdown */
-        if (kqswnal_data.kqn_shuttingdown)
-                return;
-
-        /* NB ep_complete_rpc() frees rxd on failure, so we have to requeue
-         * from scratch here... */
-        rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
-                              krx->krx_elanaddr, 
-                              krx->krx_npages * PAGE_SIZE, 0);
-
-        LASSERT (rc == ESUCCESS);
-        /* This needs to be fixed by ep_complete_rpc NOT freeing
-         * krx->krx_rxd on failure so we can just ep_requeue_receive() */
+#if MULTIRAIL_EKC
+        if (kqswnal_data.kqn_shuttingdown) {
+                /* free EKC rxd on shutdown */
+                ep_complete_receive(krx->krx_rxd);
+        } else {
+                /* repost receive */
+                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+                                   &krx->krx_elanbuffer, 0);
+        }
+#else                
+        /* don't actually requeue on shutdown */
+        if (!kqswnal_data.kqn_shuttingdown) 
+                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+                                   krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
+#endif
 }
-
+        
 void
 kqswnal_rx (kqswnal_rx_t *krx)
 {
-        ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
+        ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
+        int             payload_nob;
         int             nob;
         int             niov;
 
+        LASSERT (atomic_read(&krx->krx_refcount) == 0);
+
         if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
-                /* NB krx requeued when lib_parse() calls back kqswnal_recv */
+                atomic_set(&krx->krx_refcount, 1);
                 lib_parse (&kqswnal_lib, hdr, krx);
+                kqswnal_rx_done(krx);
                 return;
         }
 
@@ -1181,16 +1411,26 @@ kqswnal_rx (kqswnal_rx_t *krx)
                 return;
         }
 
-        /* NB forwarding may destroy iov; rebuild every time */
-        for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
-        {
-                LASSERT (niov < krx->krx_npages);
-                krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
-                krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
+        nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
+        niov = 0;
+        if (nob > 0) {
+                krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
+                krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
+                niov = 1;
+                nob -= PAGE_SIZE - KQSW_HDR_SIZE;
+                
+                while (nob > 0) {
+                        LASSERT (niov < krx->krx_npages);
+                        
+                        krx->krx_kiov[niov].kiov_offset = 0;
+                        krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
+                        niov++;
+                        nob -= PAGE_SIZE;
+                }
         }
 
-        kpr_fwd_init (&krx->krx_fwd, dest_nid,
-                      krx->krx_nob, niov, krx->krx_iov,
+        kpr_fwd_init (&krx->krx_fwd, dest_nid, 
+                      hdr, payload_nob, niov, krx->krx_kiov,
                       kqswnal_fwd_callback, krx);
 
         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
@@ -1212,18 +1452,27 @@ kqswnal_rxhandler(EP_RXD *rxd)
 
         krx->krx_rxd = rxd;
         krx->krx_nob = nob;
-        LASSERT (atomic_read (&krx->krx_refcount) == 0);
-        atomic_set (&krx->krx_refcount, 1);
-        krx->krx_rpc_completed = 0;
+#if MULTIRAIL_EKC
+        krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
+#else
+        krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
+#endif
         
         /* must receive a whole header to be able to parse */
         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
         {
                 /* receives complete with failure when receiver is removed */
+#if MULTIRAIL_EKC
+                if (status == EP_SHUTDOWN)
+                        LASSERT (kqswnal_data.kqn_shuttingdown);
+                else
+                        CERROR("receive status failed with status %d nob %d\n",
+                               ep_rxd_status(rxd), nob);
+#else
                 if (!kqswnal_data.kqn_shuttingdown)
                         CERROR("receive status failed with status %d nob %d\n",
                                ep_rxd_status(rxd), nob);
-
+#endif
                 kqswnal_requeue_rx (krx);
                 return;
         }
@@ -1245,7 +1494,7 @@ kqswnal_rxhandler(EP_RXD *rxd)
 void
 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
 {
-        ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+        ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
 
         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
                 ", dpid %d, spid %d, type %d\n",
@@ -1288,17 +1537,19 @@ kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
 }
 #endif
 
-static int
+static ptl_err_t
 kqswnal_recvmsg (nal_cb_t     *nal,
                  void         *private,
                  lib_msg_t    *libmsg,
                  unsigned int  niov,
                  struct iovec *iov,
                  ptl_kiov_t   *kiov,
+                 size_t        offset,
                  size_t        mlen,
                  size_t        rlen)
 {
         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
+        char         *buffer = page_address(krx->krx_kiov[0].kiov_page);
         int           page;
         char         *page_ptr;
         int           page_nob;
@@ -1308,8 +1559,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 #if KQSW_CHECKSUM
         kqsw_csum_t   senders_csum;
         kqsw_csum_t   payload_csum = 0;
-        kqsw_csum_t   hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
-                                           sizeof(ptl_hdr_t));
+        kqsw_csum_t   hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
         size_t        csum_len = mlen;
         int           csum_frags = 0;
         int           csum_nob = 0;
@@ -1318,45 +1568,63 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 
         atomic_inc (&csum_counter);
 
-        memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
-                                sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
+        memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
         if (senders_csum != hdr_csum)
                 kqswnal_csum_error (krx, 1);
 #endif
         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
 
-        /* What was actually received must be >= payload.
-         * This is an LASSERT, as lib_finalize() doesn't have a completion status. */
-        LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen);
+        /* What was actually received must be >= payload. */
         LASSERT (mlen <= rlen);
+        if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
+                CERROR("Bad message size: have %d, need %d + %d\n",
+                       krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
+                return (PTL_FAIL);
+        }
 
         /* It must be OK to kmap() if required */
         LASSERT (kiov == NULL || !in_interrupt ());
         /* Either all pages or all vaddrs */
         LASSERT (!(kiov != NULL && iov != NULL));
-        
-        if (mlen != 0)
-        {
+
+        if (mlen != 0) {
                 page     = 0;
-                page_ptr = ((char *) page_address(krx->krx_pages[0])) +
-                        KQSW_HDR_SIZE;
+                page_ptr = buffer + KQSW_HDR_SIZE;
                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
 
                 LASSERT (niov > 0);
+
                 if (kiov != NULL) {
-                        iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
-                        iov_nob = kiov->kiov_len;
+                        /* skip complete frags */
+                        while (offset >= kiov->kiov_len) {
+                                offset -= kiov->kiov_len;
+                                kiov++;
+                                niov--;
+                                LASSERT (niov > 0);
+                        }
+                        iov_ptr = ((char *)kmap (kiov->kiov_page)) +
+                                kiov->kiov_offset + offset;
+                        iov_nob = kiov->kiov_len - offset;
                 } else {
-                        iov_ptr = iov->iov_base;
-                        iov_nob = iov->iov_len;
+                        /* skip complete frags */
+                        while (offset >= iov->iov_len) {
+                                offset -= iov->iov_len;
+                                iov++;
+                                niov--;
+                                LASSERT (niov > 0);
+                        }
+                        iov_ptr = iov->iov_base + offset;
+                        iov_nob = iov->iov_len - offset;
                 }
-
+                
                 for (;;)
                 {
-                        /* We expect the iov to exactly match mlen */
-                        LASSERT (iov_nob <= mlen);
-                        
-                        frag = MIN (page_nob, iov_nob);
+                        frag = mlen;
+                        if (frag > page_nob)
+                                frag = page_nob;
+                        if (frag > iov_nob)
+                                frag = iov_nob;
+
                         memcpy (iov_ptr, page_ptr, frag);
 #if KQSW_CHECKSUM
                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
@@ -1374,7 +1642,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
                         {
                                 page++;
                                 LASSERT (page < krx->krx_npages);
-                                page_ptr = page_address(krx->krx_pages[page]);
+                                page_ptr = page_address(krx->krx_kiov[page].kiov_page);
                                 page_nob = PAGE_SIZE;
                         }
 
@@ -1402,8 +1670,8 @@ kqswnal_recvmsg (nal_cb_t     *nal,
         }
 
 #if KQSW_CHECKSUM
-        memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
-                sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
+        memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), 
+                sizeof(kqsw_csum_t));
 
         if (csum_len != rlen)
                 CERROR("Unable to checksum data in user's buffer\n");
@@ -1415,35 +1683,39 @@ kqswnal_recvmsg (nal_cb_t     *nal,
                        "csum_nob %d\n",
                         hdr_csum, payload_csum, csum_frags, csum_nob);
 #endif
-        lib_finalize(nal, private, libmsg);
-
-        kqswnal_requeue_rx (krx);
+        lib_finalize(nal, private, libmsg, PTL_OK);
 
-        return (rlen);
+        return (PTL_OK);
 }
 
-static int
+static ptl_err_t
 kqswnal_recv(nal_cb_t     *nal,
              void         *private,
              lib_msg_t    *libmsg,
              unsigned int  niov,
              struct iovec *iov,
+             size_t        offset,
              size_t        mlen,
              size_t        rlen)
 {
-        return (kqswnal_recvmsg (nal, private, libmsg, niov, iov, NULL, mlen, rlen));
+        return (kqswnal_recvmsg(nal, private, libmsg, 
+                                niov, iov, NULL, 
+                                offset, mlen, rlen));
 }
 
-static int
+static ptl_err_t
 kqswnal_recv_pages (nal_cb_t     *nal,
                     void         *private,
                     lib_msg_t    *libmsg,
                     unsigned int  niov,
                     ptl_kiov_t   *kiov,
+                    size_t        offset,
                     size_t        mlen,
                     size_t        rlen)
 {
-        return (kqswnal_recvmsg (nal, private, libmsg, niov, NULL, kiov, mlen, rlen));
+        return (kqswnal_recvmsg(nal, private, libmsg, 
+                                niov, NULL, kiov, 
+                                offset, mlen, rlen));
 }
 
 int
@@ -1455,6 +1727,7 @@ kqswnal_thread_start (int (*fn)(void *arg), void *arg)
                 return ((int)pid);
 
         atomic_inc (&kqswnal_data.kqn_nthreads);
+        atomic_inc (&kqswnal_data.kqn_nthreads_running);
         return (0);
 }
 
@@ -1473,6 +1746,7 @@ kqswnal_scheduler (void *arg)
         long             flags;
         int              rc;
         int              counter = 0;
+        int              shuttingdown = 0;
         int              did_something;
 
         kportal_daemonize ("kqswnal_sched");
@@ -1480,9 +1754,21 @@ kqswnal_scheduler (void *arg)
         
         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
 
-        while (!kqswnal_data.kqn_shuttingdown)
+        for (;;)
         {
-                did_something = FALSE;
+                if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
+
+                        if (kqswnal_data.kqn_shuttingdown == 2)
+                                break;
+                
+                        /* During stage 1 of shutdown we are still responsive
+                         * to receives */
+
+                        atomic_dec (&kqswnal_data.kqn_nthreads_running);
+                        shuttingdown = kqswnal_data.kqn_shuttingdown;
+                }
+
+                did_something = 0;
 
                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
                 {
@@ -1494,11 +1780,12 @@ kqswnal_scheduler (void *arg)
 
                         kqswnal_rx (krx);
 
-                        did_something = TRUE;
+                        did_something = 1;
                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!list_empty (&kqswnal_data.kqn_delayedtxds))
+                if (!shuttingdown &&
+                    !list_empty (&kqswnal_data.kqn_delayedtxds))
                 {
                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
                                          kqswnal_tx_t, ktx_list);
@@ -1514,11 +1801,12 @@ kqswnal_scheduler (void *arg)
                                 kqswnal_tx_done (ktx, rc);
                         }
 
-                        did_something = TRUE;
+                        did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!list_empty (&kqswnal_data.kqn_delayedfwds))
+                if (!shuttingdown &
+                    !list_empty (&kqswnal_data.kqn_delayedfwds))
                 {
                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
                         list_del (&fwd->kprfd_list);
@@ -1526,7 +1814,7 @@ kqswnal_scheduler (void *arg)
 
                         kqswnal_fwd_packet (NULL, fwd);
 
-                        did_something = TRUE;
+                        did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
@@ -1539,7 +1827,7 @@ kqswnal_scheduler (void *arg)
 
                         if (!did_something) {
                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
-                                                               kqswnal_data.kqn_shuttingdown ||
+                                                               kqswnal_data.kqn_shuttingdown != shuttingdown ||
                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
@@ -1571,5 +1859,6 @@ nal_cb_t kqswnal_lib =
         cb_printf:      kqswnal_printf,
         cb_cli:         kqswnal_cli,
         cb_sti:         kqswnal_sti,
+        cb_callback:    kqswnal_callback,
         cb_dist:        kqswnal_dist
 };