Whamcloud - gitweb
Landing b_elan4.
[fs/lustre-release.git] / lustre / portals / knals / qswnal / qswnal_cb.c
index 43926c9..96749cd 100644 (file)
@@ -26,6 +26,9 @@
 
 #include "qswnal.h"
 
+EP_STATUSBLK  kqswnal_rpc_success;
+EP_STATUSBLK  kqswnal_rpc_failed;
+
 /*
  *  LIB functions follow
  *
@@ -128,9 +131,22 @@ kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
 void
 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
 {
+#if MULTIRAIL_EKC
+        int      i;
+#endif
+
         if (ktx->ktx_nmappedpages == 0)
                 return;
-
+        
+#if MULTIRAIL_EKC
+        CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
+               ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
+
+        for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
+                ep_dvma_unload(kqswnal_data.kqn_ep,
+                               kqswnal_data.kqn_ep_tx_nmh,
+                               &ktx->ktx_frags[i]);
+#else
         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
                 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
 
@@ -138,9 +154,11 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx)
         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
 
-        elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
+        elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
                           kqswnal_data.kqn_eptxdmahandle,
                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
+
+#endif
         ktx->ktx_nmappedpages = 0;
 }
 
@@ -152,12 +170,24 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
         int       maxmapped = ktx->ktx_npages;
         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
         char     *ptr;
+#if MULTIRAIL_EKC
+        EP_RAILMASK railmask;
+        int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+                                            EP_RAILMASK_ALL,
+                                            kqswnal_nid2elanid(ktx->ktx_nid));
         
+        if (rail < 0) {
+                CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
+                return (-ENETDOWN);
+        }
+        railmask = 1 << rail;
+#endif
         LASSERT (nmapped <= maxmapped);
+        LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
         LASSERT (nfrags <= EP_MAXFRAG);
         LASSERT (niov > 0);
         LASSERT (nob > 0);
-        
+
         do {
                 int  fraglen = kiov->kiov_len;
 
@@ -188,25 +218,40 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
                        "%p[%d] loading %p for %d, page %d, %d total\n",
                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
 
-                elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
+#if MULTIRAIL_EKC
+                ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+                             ptr, fraglen,
+                             kqswnal_data.kqn_ep_tx_nmh, basepage,
+                             &railmask, &ktx->ktx_frags[nfrags]);
+
+                if (nfrags == ktx->ktx_firsttmpfrag ||
+                    !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags])) {
+                        /* new frag if this is the first or can't merge */
+                        nfrags++;
+                }
+#else
+                elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
                                        kqswnal_data.kqn_eptxdmahandle,
                                        ptr, fraglen,
-                                       basepage, &ktx->ktx_frags.iov[nfrags].Base);
-
-                kunmap (kiov->kiov_page);
-                
-                /* keep in loop for failure case */
-                ktx->ktx_nmappedpages = nmapped;
+                                       basepage, &ktx->ktx_frags[nfrags].Base);
 
                 if (nfrags > 0 &&                /* previous frag mapped */
-                    ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
-                    (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
+                    ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
+                    (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
                         /* just extend previous */
-                        ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
+                        ktx->ktx_frags[nfrags - 1].Len += fraglen;
                 else {
-                        ktx->ktx_frags.iov[nfrags].Len = fraglen;
+                        ktx->ktx_frags[nfrags].Len = fraglen;
                         nfrags++;                /* new frag */
                 }
+#endif
+
+                kunmap (kiov->kiov_page);
+                
+                /* keep in loop for failure case */
+                ktx->ktx_nmappedpages = nmapped;
 
                 basepage++;
                 kiov++;
@@ -232,8 +277,20 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
         int       nmapped   = ktx->ktx_nmappedpages;
         int       maxmapped = ktx->ktx_npages;
         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
-
+#if MULTIRAIL_EKC
+        EP_RAILMASK railmask;
+        int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+                                            EP_RAILMASK_ALL,
+                                            kqswnal_nid2elanid(ktx->ktx_nid));
+        
+        if (rail < 0) {
+                CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
+                return (-ENETDOWN);
+        }
+        railmask = 1 << rail;
+#endif
         LASSERT (nmapped <= maxmapped);
+        LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
         LASSERT (nfrags <= EP_MAXFRAG);
         LASSERT (niov > 0);
         LASSERT (nob > 0);
@@ -263,22 +320,38 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
                         ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
                         nmapped);
 
-                elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
+#if MULTIRAIL_EKC
+                ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+                             iov->iov_base, fraglen,
+                             kqswnal_data.kqn_ep_tx_nmh, basepage,
+                             &railmask, &ktx->ktx_frags[nfrags]);
+
+                if (nfrags == ktx->ktx_firsttmpfrag ||
+                    !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags])) {
+                        /* new frag if this is the first or can't merge */
+                        nfrags++;
+                }
+#else
+                elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
                                        kqswnal_data.kqn_eptxdmahandle,
                                        iov->iov_base, fraglen,
-                                       basepage, &ktx->ktx_frags.iov[nfrags].Base);
-                /* keep in loop for failure case */
-                ktx->ktx_nmappedpages = nmapped;
+                                       basepage, &ktx->ktx_frags[nfrags].Base);
 
                 if (nfrags > 0 &&                /* previous frag mapped */
-                    ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
-                    (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
+                    ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
+                    (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
                         /* just extend previous */
-                        ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
+                        ktx->ktx_frags[nfrags - 1].Len += fraglen;
                 else {
-                        ktx->ktx_frags.iov[nfrags].Len = fraglen;
+                        ktx->ktx_frags[nfrags].Len = fraglen;
                         nfrags++;                /* new frag */
                 }
+#endif
+
+                /* keep in loop for failure case */
+                ktx->ktx_nmappedpages = nmapped;
 
                 basepage += npages;
                 iov++;
@@ -424,7 +497,6 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
                 break;
 
         case KTX_GETTING:          /* Peer has DMA-ed direct? */
-                LASSERT (KQSW_OPTIMIZE_GETS);
                 msg = (lib_msg_t *)ktx->ktx_args[1];
                 repmsg = NULL;
 
@@ -455,8 +527,8 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
 
         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
 
-        if (status != EP_SUCCESS)
-        {
+        if (status != EP_SUCCESS) {
+
                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
                         ktx->ktx_nid, status);
 
@@ -466,8 +538,11 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
         } else if (ktx->ktx_state == KTX_GETTING) {
                 /* RPC completed OK; what did our peer put in the status
                  * block? */
-                LASSERT (KQSW_OPTIMIZE_GETS);
+#if MULTIRAIL_EKC
+                status = ep_txd_statusblk(txd)->Data[0];
+#else
                 status = ep_txd_statusblk(txd)->Status;
+#endif
         } else {
                 status = 0;
         }
@@ -488,21 +563,38 @@ kqswnal_launch (kqswnal_tx_t *ktx)
 
         LASSERT (dest >= 0);                    /* must be a peer */
         if (ktx->ktx_state == KTX_GETTING) {
-                LASSERT (KQSW_OPTIMIZE_GETS);
+                /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
+                 * other frags are the GET sink which we obviously don't
+                 * send here :) */
+#if MULTIRAIL_EKC
+                rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
+                                     ktx->ktx_port, attr,
+                                     kqswnal_txhandler, ktx,
+                                     NULL, ktx->ktx_frags, 1);
+#else
                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
                                      ktx->ktx_port, attr, kqswnal_txhandler,
-                                     ktx, NULL, ktx->ktx_frags.iov, ktx->ktx_nfrag);
+                                     ktx, NULL, ktx->ktx_frags, 1);
+#endif
         } else {
+#if MULTIRAIL_EKC
+                rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
+                                         ktx->ktx_port, attr,
+                                         kqswnal_txhandler, ktx,
+                                         NULL, ktx->ktx_frags, ktx->ktx_nfrag);
+#else
                 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
-                                       ktx->ktx_port, attr, kqswnal_txhandler,
-                                       ktx, ktx->ktx_frags.iov, ktx->ktx_nfrag);
+                                       ktx->ktx_port, attr, 
+                                       kqswnal_txhandler, ktx, 
+                                       ktx->ktx_frags, ktx->ktx_nfrag);
+#endif
         }
 
         switch (rc) {
-        case ESUCCESS: /* success */
+        case EP_SUCCESS: /* success */
                 return (0);
 
-        case ENOMEM: /* can't allocate ep txd => queue for later */
+        case EP_ENOMEM: /* can't allocate ep txd => queue for later */
                 LASSERT (in_interrupt());
 
                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
@@ -516,7 +608,7 @@ kqswnal_launch (kqswnal_tx_t *ktx)
         default: /* fatal error */
                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
                 kqswnal_notify_peer_down(ktx);
-                return (rc);
+                return (-EHOSTUNREACH);
         }
 }
 
@@ -589,6 +681,7 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr)
 
 }                               /* end of print_hdr() */
 
+#if !MULTIRAIL_EKC
 void
 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) 
 {
@@ -648,6 +741,7 @@ kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
         CERROR ("DATAVEC too small\n");
         return (-E2BIG);
 }
+#endif
 
 int
 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
@@ -656,14 +750,17 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
         kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
         char               *buffer = (char *)page_address(krx->krx_pages[0]);
         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
-        EP_IOVEC            eiov[EP_MAXFRAG];
-        EP_STATUSBLK        blk;
         int                 rc;
-
-        LASSERT (ep_rxd_isrpc(krx->krx_rxd) && !krx->krx_rpc_completed);
+#if MULTIRAIL_EKC
+        int                 i;
+#else
+        EP_DATAVEC          datav[EP_MAXFRAG];
+        int                 ndatav;
+#endif
+        LASSERT (krx->krx_rpc_reply_needed);
         LASSERT ((iov == NULL) != (kiov == NULL));
 
-        /* see .*_pack_k?iov comment regarding endian-ness */
+        /* see kqswnal_sendmsg comment regarding endian-ness */
         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
                 /* msg too small to discover rmd size */
                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
@@ -671,16 +768,16 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
                 return (-EINVAL);
         }
         
-        if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) {
+        if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
                 /* rmd doesn't fit in the incoming message */
                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
-                        krx->krx_nob, rmd->kqrmd_neiov,
-                        (int)(((char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) - buffer));
+                        krx->krx_nob, rmd->kqrmd_nfrag,
+                        (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
                 return (-EINVAL);
         }
 
-        /* Ghastly hack part 1, uses the existing procedures to map the source data... */
-        ktx->ktx_nfrag = 0;
+        /* Map the source data... */
+        ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
         if (kiov != NULL)
                 rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov);
         else
@@ -691,32 +788,61 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
                 return (rc);
         }
 
-        /* Ghastly hack part 2, copy out eiov so we can create the datav; Ugghh... */
-        memcpy (eiov, ktx->ktx_frags.iov, ktx->ktx_nfrag * sizeof (eiov[0]));
-
-        rc = kqswnal_eiovs2datav (EP_MAXFRAG, ktx->ktx_frags.datav,
-                                  ktx->ktx_nfrag, eiov,
-                                  rmd->kqrmd_neiov, rmd->kqrmd_eiov);
-        if (rc < 0) {
-                CERROR ("Can't create datavec: %d\n", rc);
-                return (rc);
+#if MULTIRAIL_EKC
+        if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
+                CERROR("Can't cope with unequal # frags: %d local %d remote\n",
+                       ktx->ktx_nfrag, rmd->kqrmd_nfrag);
+                return (-EINVAL);
         }
-        ktx->ktx_nfrag = rc;
-
-        memset (&blk, 0, sizeof (blk));         /* zero blk.Status */
+        
+        for (i = 0; i < rmd->kqrmd_nfrag; i++)
+                if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
+                        CERROR("Can't cope with unequal frags %d(%d):"
+                               " %d local %d remote\n",
+                               i, rmd->kqrmd_nfrag, 
+                               ktx->ktx_frags[i].nmd_len, 
+                               rmd->kqrmd_frag[i].nmd_len);
+                        return (-EINVAL);
+                }
+#else
+        ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
+                                      ktx->ktx_nfrag, ktx->ktx_frags,
+                                      rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+        if (ndatav < 0) {
+                CERROR ("Can't create datavec: %d\n", ndatav);
+                return (ndatav);
+        }
+#endif
 
-        /* Our caller will start to race with kqswnal_rpc_complete... */
+        /* Our caller will start to race with kqswnal_dma_reply_complete... */
         LASSERT (atomic_read (&krx->krx_refcount) == 1);
         atomic_set (&krx->krx_refcount, 2);
 
-        rc = ep_complete_rpc (krx->krx_rxd, kqswnal_reply_complete, ktx,
-                              &blk, ktx->ktx_frags.datav, ktx->ktx_nfrag);
-        if (rc == ESUCCESS)
+#if MULTIRAIL_EKC
+        rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx, 
+                             &kqswnal_rpc_success,
+                             ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
+        if (rc == EP_SUCCESS)
+                return (0);
+
+        /* Well we tried... */
+        krx->krx_rpc_reply_needed = 0;
+#else
+        rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
+                              &kqswnal_rpc_success, datav, ndatav);
+        if (rc == EP_SUCCESS)
                 return (0);
 
+        /* "old" EKC destroys rxd on failed completion */
+        krx->krx_rxd = NULL;
+#endif
+
+        CERROR("can't complete RPC: %d\n", rc);
+
         /* reset refcount back to 1: we're not going to be racing with
-         * kqswnal_rely_complete. */
+         * kqswnal_dma_reply_complete. */
         atomic_set (&krx->krx_refcount, 1);
+
         return (-ECONNABORTED);
 }
 
@@ -785,12 +911,12 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 return (PTL_NOSPACE);
         }
 
+        ktx->ktx_nid     = targetnid;
         ktx->ktx_args[0] = private;
         ktx->ktx_args[1] = libmsg;
 
-#if KQSW_OPTIMIZE_GETS
         if (type == PTL_MSG_REPLY &&
-            ep_rxd_isrpc(((kqswnal_rx_t *)private)->krx_rxd)) {
+            ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
                 if (nid != targetnid ||
                     kqswnal_nid2elanid(nid) != 
                     ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
@@ -798,7 +924,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                "nid "LPX64" via "LPX64" elanID %d\n",
                                nid, targetnid,
                                ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
-                        return(PTL_FAIL);
+                        return (PTL_FAIL);
                 }
 
                 /* peer expects RPC completion with GET data */
@@ -806,13 +932,12 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                         payload_niov, payload_iov, 
                                         payload_kiov, payload_nob);
                 if (rc == 0)
-                        return (0);
+                        return (PTL_OK);
                 
                 CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
                 kqswnal_put_idle_tx (ktx);
                 return (PTL_FAIL);
         }
-#endif
 
         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
@@ -838,15 +963,8 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
 #endif
         
-        /* Set up first frag from pre-mapped buffer (it's at least the
-         * portals header) */
-        ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer;
-        ktx->ktx_frags.iov[0].Len = KQSW_HDR_SIZE;
-        ktx->ktx_nfrag = 1;
-        ktx->ktx_state = KTX_SENDING;   /* => lib_finalize() on completion */
-
-#if KQSW_OPTIMIZE_GETS
-        if (type == PTL_MSG_GET &&              /* doing a GET */
+        if (kqswnal_data.kqn_optimized_gets &&
+            type == PTL_MSG_GET &&              /* doing a GET */
             nid == targetnid) {                 /* not forwarding */
                 lib_md_t           *md = libmsg->md;
                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
@@ -856,8 +974,8 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                  *
                  * First I set up ktx as if it was going to send this
                  * payload, (it needs to map it anyway).  This fills
-                 * ktx_frags.iov[1] and onward with the network addresses
-                 * of the get sink frags.  I copy these into ktx_buffer,
+                 * ktx_frags[1] and onward with the network addresses
+                 * of the GET sink frags.  I copy these into ktx_buffer,
                  * immediately after the header, and send that as my GET
                  * message.
                  *
@@ -865,6 +983,9 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                  * When EKC copes with different endian nodes, I'll fix
                  * this (and eat my hat :) */
 
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+                ktx->ktx_state = KTX_GETTING;
+
                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
                         rc = kqswnal_map_tx_kiov (ktx, md->length,
                                                   md->md_niov, md->md_iov.kiov);
@@ -877,46 +998,73 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                         return (PTL_FAIL);
                 }
 
-                rmd->kqrmd_neiov = ktx->ktx_nfrag - 1;
-                memcpy (&rmd->kqrmd_eiov[0], &ktx->ktx_frags.iov[1],
-                        rmd->kqrmd_neiov * sizeof (EP_IOVEC));
+                rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
 
-                ktx->ktx_nfrag = 1;
-                ktx->ktx_frags.iov[0].Len += offsetof (kqswnal_remotemd_t,
-                                                       kqrmd_eiov[rmd->kqrmd_neiov]);
-                payload_nob = ktx->ktx_frags.iov[0].Len;
-                ktx->ktx_state = KTX_GETTING;
-        } else 
+                payload_nob = offsetof(kqswnal_remotemd_t,
+                                       kqrmd_frag[rmd->kqrmd_nfrag]);
+                LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
+
+#if MULTIRAIL_EKC
+                memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
+                       rmd->kqrmd_nfrag * sizeof(EP_NMD));
+
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE + payload_nob);
+#else
+                memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
+                       rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
+                
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
+#endif
+        } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
+
+                /* small message: single frag copied into the pre-mapped buffer */
+
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+                ktx->ktx_state = KTX_SENDING;
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE + payload_nob);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
 #endif
-        if (payload_nob > 0) { /* got some payload (something more to do) */
-                /* make a single contiguous message? */
-                if (payload_nob <= KQSW_TX_MAXCONTIG) {
-                        /* copy payload to ktx_buffer, immediately after hdr */
+                if (payload_nob > 0) {
                         if (payload_kiov != NULL)
                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
                                                    payload_niov, payload_kiov, payload_nob);
                         else
                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
                                                   payload_niov, payload_iov, payload_nob);
-                        /* first frag includes payload */
-                        ktx->ktx_frags.iov[0].Len += payload_nob;
-                } else {
-                        if (payload_kiov != NULL)
-                                rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
-                                                          payload_niov, payload_kiov);
-                        else
-                                rc = kqswnal_map_tx_iov (ktx, payload_nob,
-                                                         payload_niov, payload_iov);
-                        if (rc != 0) {
-                                kqswnal_put_idle_tx (ktx);
-                                return (PTL_FAIL);
-                        }
-                } 
-        }
+                }
+        } else {
 
-        ktx->ktx_nid  = targetnid;
+                /* large message: multiple frags: first is hdr in pre-mapped buffer */
+
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+                ktx->ktx_state = KTX_SENDING;
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, KQSW_HDR_SIZE);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
+#endif
+                if (payload_kiov != NULL)
+                        rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
+                                                  payload_niov, payload_kiov);
+                else
+                        rc = kqswnal_map_tx_iov (ktx, payload_nob,
+                                                 payload_niov, payload_iov);
+                if (rc != 0) {
+                        kqswnal_put_idle_tx (ktx);
+                        return (PTL_FAIL);
+                }
+        }
+        
         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
-                        EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
+                        EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
 
         rc = kqswnal_launch (ktx);
         if (rc != 0) {                    /* failed? */
@@ -962,8 +1110,6 @@ kqswnal_send_pages (nal_cb_t     *nal,
                                  payload_niov, NULL, payload_kiov, payload_nob));
 }
 
-int kqswnal_fwd_copy_contig = 0;
-
 void
 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 {
@@ -984,7 +1130,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 
         LASSERT (niov > 0);
         
-        ktx = kqswnal_get_idle_tx (fwd, FALSE);
+        ktx = kqswnal_get_idle_tx (fwd, 0);
         if (ktx == NULL)        /* can't get txd right now */
                 return;         /* fwd will be scheduled when tx desc freed */
 
@@ -1005,20 +1151,31 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
                 goto failed;
         }
 
-        if ((kqswnal_fwd_copy_contig || niov > 1) &&
+        ktx->ktx_port    = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ?
+                           EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
+        ktx->ktx_nid     = nid;
+        ktx->ktx_state   = KTX_FORWARDING;
+        ktx->ktx_args[0] = fwd;
+
+        if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) &&
             nob <= KQSW_TX_BUFFER_SIZE) 
         {
-                /* send from ktx's pre-allocated/mapped contiguous buffer? */
+                /* send from ktx's pre-mapped contiguous buffer? */
                 lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
-                ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
-                ktx->ktx_frags.iov[0].Len = nob;
-                ktx->ktx_nfrag = 1;
+#if MULTIRAIL_EKC
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
+                              0, nob);
+#else
+                ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
+                ktx->ktx_frags[0].Len = nob;
+#endif
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
                 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
         }
         else
         {
                 /* zero copy */
-                ktx->ktx_nfrag = 0;       /* no frags mapped yet */
+                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
                 rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
                 if (rc != 0)
                         goto failed;
@@ -1026,12 +1183,6 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
                 ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
         }
 
-        ktx->ktx_port    = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
-                        EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
-        ktx->ktx_nid     = nid;
-        ktx->ktx_state   = KTX_FORWARDING; /* kpr_put_packet() on completion */
-        ktx->ktx_args[0] = fwd;
-
         rc = kqswnal_launch (ktx);
         if (rc == 0)
                 return;
@@ -1064,7 +1215,7 @@ kqswnal_fwd_callback (void *arg, int error)
 }
 
 void
-kqswnal_reply_complete (EP_RXD *rxd) 
+kqswnal_dma_reply_complete (EP_RXD *rxd) 
 {
         int           status = ep_rxd_status(rxd);
         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
@@ -1075,9 +1226,10 @@ kqswnal_reply_complete (EP_RXD *rxd)
                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
 
         LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
 
-        krx->krx_rpc_completed = 1;
-        kqswnal_requeue_rx (krx);
+        krx->krx_rpc_reply_needed = 0;
+        kqswnal_rx_done (krx);
 
         lib_finalize (&kqswnal_lib, NULL, msg);
         kqswnal_put_idle_tx (ktx);
@@ -1093,67 +1245,76 @@ kqswnal_rpc_complete (EP_RXD *rxd)
                "rxd %p, krx %p, status %d\n", rxd, krx, status);
 
         LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
         
-        krx->krx_rpc_completed = 1;
+        krx->krx_rpc_reply_needed = 0;
         kqswnal_requeue_rx (krx);
 }
 
 void
-kqswnal_requeue_rx (kqswnal_rx_t *krx)
+kqswnal_requeue_rx (kqswnal_rx_t *krx) 
 {
-        EP_STATUSBLK  blk;
-        int           rc;
+        int   rc;
 
-        LASSERT (atomic_read (&krx->krx_refcount) > 0);
-        if (!atomic_dec_and_test (&krx->krx_refcount))
-                return;
+        LASSERT (atomic_read(&krx->krx_refcount) == 0);
 
-        if (!ep_rxd_isrpc(krx->krx_rxd) ||
-            krx->krx_rpc_completed) {
+        if (krx->krx_rpc_reply_needed) {
 
-                /* don't actually requeue on shutdown */
-                if (kqswnal_data.kqn_shuttingdown)
+                /* We failed to complete the peer's optimized GET (e.g. we
+                 * couldn't map the source buffers).  We complete the
+                 * peer's EKC rpc now with failure. */
+#if MULTIRAIL_EKC
+                rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
+                                     &kqswnal_rpc_failed, NULL, NULL, 0);
+                if (rc == EP_SUCCESS)
                         return;
                 
-                ep_requeue_receive (krx->krx_rxd, kqswnal_rxhandler, krx,
-                                    krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE);
-                return;
-        }
-
-        /* Sender wanted an RPC, but we didn't complete it (we must have
-         * dropped the sender's message).  We complete it now with
-         * failure... */
-        memset (&blk, 0, sizeof (blk));
-        blk.Status = -ECONNREFUSED;
-
-        atomic_set (&krx->krx_refcount, 1);
+                CERROR("can't complete RPC: %d\n", rc);
+#else
+                if (krx->krx_rxd != NULL) {
+                        /* We didn't try (and fail) to complete earlier... */
+                        rc = ep_complete_rpc(krx->krx_rxd, 
+                                             kqswnal_rpc_complete, krx,
+                                             &kqswnal_rpc_failed, NULL, 0);
+                        if (rc == EP_SUCCESS)
+                                return;
+
+                        CERROR("can't complete RPC: %d\n", rc);
+                }
+                
+                /* NB the old ep_complete_rpc() frees rxd on failure, so we
+                 * have to requeue from scratch here, unless we're shutting
+                 * down */
+                if (kqswnal_data.kqn_shuttingdown)
+                        return;
 
-        rc = ep_complete_rpc (krx->krx_rxd, 
-                              kqswnal_rpc_complete, krx,
-                              &blk, NULL, 0);
-        if (rc == ESUCCESS) {
-                /* callback will call me again to requeue, having set
-                 * krx_rpc_completed... */
+                rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
+                                      krx->krx_elanbuffer, 
+                                      krx->krx_npages * PAGE_SIZE, 0);
+                LASSERT (rc == EP_SUCCESS);
+                /* We don't handle failure here; it's incredibly rare
+                 * (never reported?) and only happens with "old" EKC */
                 return;
+#endif
         }
 
-        CERROR("can't complete RPC: %d\n", rc);
-
-        /* we don't actually requeue on shutdown */
-        if (kqswnal_data.kqn_shuttingdown)
-                return;
-
-        /* NB ep_complete_rpc() frees rxd on failure, so we have to requeue
-         * from scratch here... */
-        rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
-                              krx->krx_elanaddr, 
-                              krx->krx_npages * PAGE_SIZE, 0);
-
-        LASSERT (rc == ESUCCESS);
-        /* This needs to be fixed by ep_complete_rpc NOT freeing
-         * krx->krx_rxd on failure so we can just ep_requeue_receive() */
+#if MULTIRAIL_EKC
+        if (kqswnal_data.kqn_shuttingdown) {
+                /* free EKC rxd on shutdown */
+                ep_complete_receive(krx->krx_rxd);
+        } else {
+                /* repost receive */
+                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+                                   &krx->krx_elanbuffer, 0);
+        }
+#else                
+        /* don't actually requeue on shutdown */
+        if (!kqswnal_data.kqn_shuttingdown) 
+                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+                                   krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
+#endif
 }
-
+        
 void
 kqswnal_rx (kqswnal_rx_t *krx)
 {
@@ -1162,9 +1323,12 @@ kqswnal_rx (kqswnal_rx_t *krx)
         int             nob;
         int             niov;
 
+        LASSERT (atomic_read(&krx->krx_refcount) == 0);
+
         if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
-                /* NB krx requeued when lib_parse() calls back kqswnal_recv */
+                atomic_set(&krx->krx_refcount, 1);
                 lib_parse (&kqswnal_lib, hdr, krx);
+                kqswnal_rx_done(krx);
                 return;
         }
 
@@ -1212,18 +1376,27 @@ kqswnal_rxhandler(EP_RXD *rxd)
 
         krx->krx_rxd = rxd;
         krx->krx_nob = nob;
-        LASSERT (atomic_read (&krx->krx_refcount) == 0);
-        atomic_set (&krx->krx_refcount, 1);
-        krx->krx_rpc_completed = 0;
+#if MULTIRAIL_EKC
+        krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
+#else
+        krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
+#endif
         
         /* must receive a whole header to be able to parse */
         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
         {
                 /* receives complete with failure when receiver is removed */
+#if MULTIRAIL_EKC
+                if (status == EP_SHUTDOWN)
+                        LASSERT (kqswnal_data.kqn_shuttingdown);
+                else
+                        CERROR("receive status failed with status %d nob %d\n",
+                               ep_rxd_status(rxd), nob);
+#else
                 if (!kqswnal_data.kqn_shuttingdown)
                         CERROR("receive status failed with status %d nob %d\n",
                                ep_rxd_status(rxd), nob);
-
+#endif
                 kqswnal_requeue_rx (krx);
                 return;
         }
@@ -1417,8 +1590,6 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 #endif
         lib_finalize(nal, private, libmsg);
 
-        kqswnal_requeue_rx (krx);
-
         return (rlen);
 }
 
@@ -1455,6 +1626,7 @@ kqswnal_thread_start (int (*fn)(void *arg), void *arg)
                 return ((int)pid);
 
         atomic_inc (&kqswnal_data.kqn_nthreads);
+        atomic_inc (&kqswnal_data.kqn_nthreads_running);
         return (0);
 }
 
@@ -1473,6 +1645,7 @@ kqswnal_scheduler (void *arg)
         long             flags;
         int              rc;
         int              counter = 0;
+        int              shuttingdown = 0;
         int              did_something;
 
         kportal_daemonize ("kqswnal_sched");
@@ -1480,9 +1653,21 @@ kqswnal_scheduler (void *arg)
         
         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
 
-        while (!kqswnal_data.kqn_shuttingdown)
+        for (;;)
         {
-                did_something = FALSE;
+                if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
+
+                        if (kqswnal_data.kqn_shuttingdown == 2)
+                                break;
+                
+                        /* During stage 1 of shutdown we are still responsive
+                         * to receives */
+
+                        atomic_dec (&kqswnal_data.kqn_nthreads_running);
+                        shuttingdown = kqswnal_data.kqn_shuttingdown;
+                }
+
+                did_something = 0;
 
                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
                 {
@@ -1494,11 +1679,12 @@ kqswnal_scheduler (void *arg)
 
                         kqswnal_rx (krx);
 
-                        did_something = TRUE;
+                        did_something = 1;
                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!list_empty (&kqswnal_data.kqn_delayedtxds))
+                if (!shuttingdown &&
+                    !list_empty (&kqswnal_data.kqn_delayedtxds))
                 {
                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
                                          kqswnal_tx_t, ktx_list);
@@ -1514,11 +1700,12 @@ kqswnal_scheduler (void *arg)
                                 kqswnal_tx_done (ktx, rc);
                         }
 
-                        did_something = TRUE;
+                        did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!list_empty (&kqswnal_data.kqn_delayedfwds))
+                if (!shuttingdown &
+                    !list_empty (&kqswnal_data.kqn_delayedfwds))
                 {
                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
                         list_del (&fwd->kprfd_list);
@@ -1526,7 +1713,7 @@ kqswnal_scheduler (void *arg)
 
                         kqswnal_fwd_packet (NULL, fwd);
 
-                        did_something = TRUE;
+                        did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
@@ -1539,7 +1726,7 @@ kqswnal_scheduler (void *arg)
 
                         if (!did_something) {
                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
-                                                               kqswnal_data.kqn_shuttingdown ||
+                                                               kqswnal_data.kqn_shuttingdown != shuttingdown ||
                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));