Whamcloud - gitweb
The first pass of some overdue copyright cleanup:
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
index 61c88f6..fe17e6a 100644 (file)
@@ -4,10 +4,7 @@
  * Copyright (C) 2002 Cluster File Systems, Inc.
  *   Author: Eric Barton <eric@bartonsoftware.com>
  *
- * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
- * W. Marcus Miller - Based on ksocknal
- *
- * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
+ * This file is part of Portals, http://www.lustre.org
  *
  * Portals is free software; you can redistribute it and/or
  * modify it under the terms of version 2 of the GNU General Public
 
 #include "qswnal.h"
 
-EP_STATUSBLK  kqswnal_rpc_success;
-EP_STATUSBLK  kqswnal_rpc_failed;
-
 /*
  *  LIB functions follow
  *
  */
-static ptl_err_t
-kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
-             size_t len)
-{
-        CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
-                nal->ni.nid, len, src_addr, dst_addr );
-        memcpy( dst_addr, src_addr, len );
-
-        return (PTL_OK);
-}
-
-static ptl_err_t
-kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
-              size_t len)
-{
-        CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
-                nal->ni.nid, len, src_addr, dst_addr );
-        memcpy( dst_addr, src_addr, len );
-
-        return (PTL_OK);
-}
-
-static void *
-kqswnal_malloc(nal_cb_t *nal, size_t len)
-{
-        void *buf;
-
-        PORTAL_ALLOC(buf, len);
-        return (buf);
-}
-
-static void
-kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
-{
-        PORTAL_FREE(buf, len);
-}
-
-static void
-kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
-{
-        va_list ap;
-        char msg[256];
-
-        va_start (ap, fmt);
-        vsnprintf (msg, sizeof (msg), fmt, ap);        /* sprint safely */
-        va_end (ap);
-
-        msg[sizeof (msg) - 1] = 0;                /* ensure terminated */
-
-        CDEBUG (D_NET, "%s", msg);
-}
-
-#if (defined(CONFIG_SPARC32) || defined(CONFIG_SPARC64))
-# error "Can't save/restore irq contexts in different procedures"
-#endif
-
-static void
-kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
-{
-        kqswnal_data_t *data= nal->nal_data;
-
-        spin_lock_irqsave(&data->kqn_statelock, *flags);
-}
-
-
-static void
-kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
-{
-        kqswnal_data_t *data= nal->nal_data;
-
-        spin_unlock_irqrestore(&data->kqn_statelock, *flags);
-}
-
-static void
-kqswnal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
-{
-        /* holding kqn_statelock */
-
-        if (eq->event_callback != NULL)
-                eq->event_callback(ev);
-
-        if (waitqueue_active(&kqswnal_data.kqn_yield_waitq))
-                wake_up_all(&kqswnal_data.kqn_yield_waitq);
-}
-
 static int
-kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
+kqswnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
 {
-        if (nid == nal->ni.nid)
+        if (nid == nal->libnal_ni.ni_pid.nid)
                 *dist = 0;                      /* it's me */
         else if (kqswnal_nid2elanid (nid) >= 0)
                 *dist = 1;                      /* it's my peer */
@@ -147,6 +56,8 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx)
 {
 #if MULTIRAIL_EKC
         int      i;
+
+        ktx->ktx_rail = -1;                     /* unset rail */
 #endif
 
         if (ktx->ktx_nmappedpages == 0)
@@ -185,10 +96,13 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_
         char     *ptr;
 #if MULTIRAIL_EKC
         EP_RAILMASK railmask;
-        int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
-                                            EP_RAILMASK_ALL,
-                                            kqswnal_nid2elanid(ktx->ktx_nid));
-        
+        int         rail;
+
+        if (ktx->ktx_rail < 0)
+                ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+                                                 EP_RAILMASK_ALL,
+                                                 kqswnal_nid2elanid(ktx->ktx_nid));
+        rail = ktx->ktx_rail;
         if (rail < 0) {
                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
                 return (-ENETDOWN);
@@ -212,11 +126,12 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_
         do {
                 int  fraglen = kiov->kiov_len - offset;
 
-                /* nob exactly spans the iovs */
-                LASSERT (fraglen <= nob);
-                /* each frag fits in a page */
+                /* each page frag is contained in one page */
                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
 
+                if (fraglen > nob)
+                        fraglen = nob;
+
                 nmapped++;
                 if (nmapped > maxmapped) {
                         CERROR("Can't map message in %d pages (max %d)\n",
@@ -302,10 +217,13 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob,
         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
 #if MULTIRAIL_EKC
         EP_RAILMASK railmask;
-        int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
-                                            EP_RAILMASK_ALL,
-                                            kqswnal_nid2elanid(ktx->ktx_nid));
+        int         rail;
         
+        if (ktx->ktx_rail < 0)
+                ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+                                                 EP_RAILMASK_ALL,
+                                                 kqswnal_nid2elanid(ktx->ktx_nid));
+        rail = ktx->ktx_rail;
         if (rail < 0) {
                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
                 return (-ENETDOWN);
@@ -328,11 +246,12 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob,
         
         do {
                 int  fraglen = iov->iov_len - offset;
-                long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
-
-                /* nob exactly spans the iovs */
-                LASSERT (fraglen <= nob);
+                long npages;
                 
+                if (fraglen > nob)
+                        fraglen = nob;
+                npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
+
                 nmapped += npages;
                 if (nmapped > maxmapped) {
                         CERROR("Can't map message in %d pages (max %d)\n",
@@ -426,7 +345,8 @@ kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
 
         /* anything blocking for a tx descriptor? */
-        if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
+        if (!kqswnal_data.kqn_shuttingdown &&
+            !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
         {
                 CDEBUG(D_NET,"wakeup fwd\n");
 
@@ -460,6 +380,9 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
         for (;;) {
                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
 
+                if (kqswnal_data.kqn_shuttingdown)
+                        break;
+
                 /* "normal" descriptor is free */
                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
@@ -467,14 +390,8 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
                         break;
                 }
 
-                /* "normal" descriptor pool is empty */
-
-                if (fwd != NULL) { /* forwarded packet => queue for idle txd */
-                        CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
-                        list_add_tail (&fwd->kprfd_list,
-                                       &kqswnal_data.kqn_idletxd_fwdq);
+                if (fwd != NULL)                /* forwarded packet? */
                         break;
-                }
 
                 /* doing a local transmit */
                 if (!may_block) {
@@ -494,13 +411,20 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
 
                 CDEBUG (D_NET, "blocking for tx desc\n");
                 wait_event (kqswnal_data.kqn_idletxd_waitq,
-                            !list_empty (&kqswnal_data.kqn_idletxds));
+                            !list_empty (&kqswnal_data.kqn_idletxds) ||
+                            kqswnal_data.kqn_shuttingdown);
         }
 
         if (ktx != NULL) {
                 list_del (&ktx->ktx_list);
                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
                 ktx->ktx_launcher = current->pid;
+                atomic_inc(&kqswnal_data.kqn_pending_txs);
+        } else if (fwd != NULL) {
+                /* queue forwarded packet until idle txd available */
+                CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
+                list_add_tail (&fwd->kprfd_list,
+                               &kqswnal_data.kqn_idletxd_fwdq);
         }
 
         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
@@ -514,40 +438,29 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
 void
 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
 {
-        lib_msg_t     *msg;
-        lib_msg_t     *repmsg = NULL;
-
         switch (ktx->ktx_state) {
         case KTX_FORWARDING:       /* router asked me to forward this packet */
                 kpr_fwd_done (&kqswnal_data.kqn_router,
                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
                 break;
 
-        case KTX_SENDING:          /* packet sourced locally */
-                lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
+        case KTX_RDMAING:          /* optimized GET/PUT handled */
+        case KTX_PUTTING:          /* optimized PUT sent */
+        case KTX_SENDING:          /* normal send */
+                lib_finalize (&kqswnal_lib, NULL,
                               (lib_msg_t *)ktx->ktx_args[1],
-                              (error == 0) ? PTL_OK : 
-                              (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
+                              (error == 0) ? PTL_OK : PTL_FAIL);
                 break;
 
-        case KTX_GETTING:          /* Peer has DMA-ed direct? */
-                msg = (lib_msg_t *)ktx->ktx_args[1];
-
-                if (error == 0) {
-                        repmsg = lib_create_reply_msg (&kqswnal_lib, 
-                                                       ktx->ktx_nid, msg);
-                        if (repmsg == NULL)
-                                error = -ENOMEM;
-                }
-                
-                if (error == 0) {
-                        lib_finalize (&kqswnal_lib, ktx->ktx_args[0], 
-                                      msg, PTL_OK);
-                        lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
-                } else {
-                        lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
-                                      (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
-                }
+        case KTX_GETTING:          /* optimized GET sent & REPLY received */
+                /* Complete the GET with success since we can't avoid
+                 * delivering a REPLY event; we committed to it when we
+                 * launched the GET */
+                lib_finalize (&kqswnal_lib, NULL, 
+                              (lib_msg_t *)ktx->ktx_args[1], PTL_OK);
+                lib_finalize (&kqswnal_lib, NULL,
+                              (lib_msg_t *)ktx->ktx_args[2],
+                              (error == 0) ? PTL_OK : PTL_FAIL);
                 break;
 
         default:
@@ -575,16 +488,27 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
                 kqswnal_notify_peer_down(ktx);
                 status = -EHOSTDOWN;
 
-        } else if (ktx->ktx_state == KTX_GETTING) {
-                /* RPC completed OK; what did our peer put in the status
+        } else switch (ktx->ktx_state) {
+
+        case KTX_GETTING:
+        case KTX_PUTTING:
+                /* RPC completed OK; but what did our peer put in the status
                  * block? */
 #if MULTIRAIL_EKC
                 status = ep_txd_statusblk(txd)->Data[0];
 #else
                 status = ep_txd_statusblk(txd)->Status;
 #endif
-        } else {
+                break;
+                
+        case KTX_FORWARDING:
+        case KTX_SENDING:
                 status = 0;
+                break;
+                
+        default:
+                LBUG();
+                break;
         }
 
         kqswnal_tx_done (ktx, status);
@@ -596,27 +520,34 @@ kqswnal_launch (kqswnal_tx_t *ktx)
         /* Don't block for transmit descriptor if we're in interrupt context */
         int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
-        long  flags;
+        unsigned long flags;
         int   rc;
 
         ktx->ktx_launchtime = jiffies;
 
+        if (kqswnal_data.kqn_shuttingdown)
+                return (-ESHUTDOWN);
+
         LASSERT (dest >= 0);                    /* must be a peer */
-        if (ktx->ktx_state == KTX_GETTING) {
-                /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
-                 * other frags are the GET sink which we obviously don't
-                 * send here :) */
+
 #if MULTIRAIL_EKC
+        if (ktx->ktx_nmappedpages != 0)
+                attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
+#endif
+
+        switch (ktx->ktx_state) {
+        case KTX_GETTING:
+        case KTX_PUTTING:
+                /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
+                 * The other frags are the payload, awaiting RDMA */
                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
                                      ktx->ktx_port, attr,
                                      kqswnal_txhandler, ktx,
                                      NULL, ktx->ktx_frags, 1);
-#else
-                rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
-                                     ktx->ktx_port, attr, kqswnal_txhandler,
-                                     ktx, NULL, ktx->ktx_frags, 1);
-#endif
-        } else {
+                break;
+
+        case KTX_FORWARDING:
+        case KTX_SENDING:
 #if MULTIRAIL_EKC
                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
                                          ktx->ktx_port, attr,
@@ -628,6 +559,12 @@ kqswnal_launch (kqswnal_tx_t *ktx)
                                        kqswnal_txhandler, ktx, 
                                        ktx->ktx_frags, ktx->ktx_nfrag);
 #endif
+                break;
+                
+        default:
+                LBUG();
+                rc = -EINVAL;                   /* no compiler warning please */
+                break;
         }
 
         switch (rc) {
@@ -635,8 +572,6 @@ kqswnal_launch (kqswnal_tx_t *ktx)
                 return (0);
 
         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
-                LASSERT (in_interrupt());
-
                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
 
                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
@@ -652,6 +587,7 @@ kqswnal_launch (kqswnal_tx_t *ktx)
         }
 }
 
+#if 0
 static char *
 hdr_type_string (ptl_hdr_t *hdr)
 {
@@ -675,42 +611,42 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr)
         char *type_str = hdr_type_string (hdr);
 
         CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
-               NTOH__u32(hdr->payload_length));
-        CERROR("    From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
-               NTOH__u32(hdr->src_pid));
-        CERROR("    To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
-               NTOH__u32(hdr->dest_pid));
+               le32_to_cpu(hdr->payload_length));
+        CERROR("    From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),
+               le32_to_cpu(hdr->src_pid));
+        CERROR("    To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),
+               le32_to_cpu(hdr->dest_pid));
 
-        switch (NTOH__u32(hdr->type)) {
+        switch (le32_to_cpu(hdr->type)) {
         case PTL_MSG_PUT:
                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
                        "match bits "LPX64"\n",
-                       NTOH__u32 (hdr->msg.put.ptl_index),
+                       le32_to_cpu(hdr->msg.put.ptl_index),
                        hdr->msg.put.ack_wmd.wh_interface_cookie,
                        hdr->msg.put.ack_wmd.wh_object_cookie,
-                       NTOH__u64 (hdr->msg.put.match_bits));
+                       le64_to_cpu(hdr->msg.put.match_bits));
                 CERROR("    offset %d, hdr data "LPX64"\n",
-                       NTOH__u32(hdr->msg.put.offset),
+                       le32_to_cpu(hdr->msg.put.offset),
                        hdr->msg.put.hdr_data);
                 break;
 
         case PTL_MSG_GET:
                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
                        "match bits "LPX64"\n",
-                       NTOH__u32 (hdr->msg.get.ptl_index),
+                       le32_to_cpu(hdr->msg.get.ptl_index),
                        hdr->msg.get.return_wmd.wh_interface_cookie,
                        hdr->msg.get.return_wmd.wh_object_cookie,
                        hdr->msg.get.match_bits);
                 CERROR("    Length %d, src offset %d\n",
-                       NTOH__u32 (hdr->msg.get.sink_length),
-                       NTOH__u32 (hdr->msg.get.src_offset));
+                       le32_to_cpu(hdr->msg.get.sink_length),
+                       le32_to_cpu(hdr->msg.get.src_offset));
                 break;
 
         case PTL_MSG_ACK:
                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
                        hdr->msg.ack.dst_wmd.wh_object_cookie,
-                       NTOH__u32 (hdr->msg.ack.mlength));
+                       le32_to_cpu(hdr->msg.ack.mlength));
                 break;
 
         case PTL_MSG_REPLY:
@@ -720,6 +656,7 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr)
         }
 
 }                               /* end of print_hdr() */
+#endif
 
 #if !MULTIRAIL_EKC
 void
@@ -781,114 +718,306 @@ kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
         CERROR ("DATAVEC too small\n");
         return (-E2BIG);
 }
+#else
+int
+kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
+                    int nrfrag, EP_NMD *rfrag)
+{
+        int  i;
+
+        if (nlfrag != nrfrag) {
+                CERROR("Can't cope with unequal # frags: %d local %d remote\n",
+                       nlfrag, nrfrag);
+                return (-EINVAL);
+        }
+        
+        for (i = 0; i < nlfrag; i++)
+                if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
+                        CERROR("Can't cope with unequal frags %d(%d):"
+                               " %d local %d remote\n",
+                               i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
+                        return (-EINVAL);
+                }
+        
+        return (0);
+}
 #endif
 
-int
-kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
-                   struct iovec *iov, ptl_kiov_t *kiov, 
-                   int offset, int nob)
+kqswnal_remotemd_t *
+kqswnal_parse_rmd (kqswnal_rx_t *krx, int type, ptl_nid_t expected_nid)
 {
-        kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
         char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
+        ptl_hdr_t          *hdr = (ptl_hdr_t *)buffer;
         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
-        int                 rc;
-#if MULTIRAIL_EKC
-        int                 i;
-#else
-        EP_DATAVEC          datav[EP_MAXFRAG];
-        int                 ndatav;
-#endif
-        LASSERT (krx->krx_rpc_reply_needed);
-        LASSERT ((iov == NULL) != (kiov == NULL));
+        ptl_nid_t           nid = kqswnal_rx_nid(krx);
+
+        /* Note (1) lib_parse has already flipped hdr.
+         *      (2) RDMA addresses are sent in native endian-ness.  When
+         *      EKC copes with different endian nodes, I'll fix this (and
+         *      eat my hat :) */
+
+        LASSERT (krx->krx_nob >= sizeof(*hdr));
+
+        if (hdr->type != type) {
+                CERROR ("Unexpected optimized get/put type %d (%d expected)"
+                        "from "LPX64"\n", hdr->type, type, nid);
+                return (NULL);
+        }
+        
+        if (hdr->src_nid != nid) {
+                CERROR ("Unexpected optimized get/put source NID "
+                        LPX64" from "LPX64"\n", hdr->src_nid, nid);
+                return (NULL);
+        }
+
+        LASSERT (nid == expected_nid);
 
-        /* see kqswnal_sendmsg comment regarding endian-ness */
         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
                 /* msg too small to discover rmd size */
                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
-                return (-EINVAL);
+                return (NULL);
         }
-        
+
         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
                 /* rmd doesn't fit in the incoming message */
                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
                         krx->krx_nob, rmd->kqrmd_nfrag,
                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
-                return (-EINVAL);
+                return (NULL);
+        }
+
+        return (rmd);
+}
+
+void
+kqswnal_rdma_store_complete (EP_RXD *rxd) 
+{
+        int           status = ep_rxd_status(rxd);
+        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+        
+        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+        LASSERT (ktx->ktx_state == KTX_RDMAING);
+        LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
+
+        krx->krx_rpc_reply_needed = 0;
+        kqswnal_rx_decref (krx);
+
+        /* free ktx & finalize() its lib_msg_t */
+        kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
+}
+
+void
+kqswnal_rdma_fetch_complete (EP_RXD *rxd) 
+{
+        /* Completed fetching the PUT data */
+        int           status = ep_rxd_status(rxd);
+        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+        unsigned long flags;
+        
+        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+        LASSERT (ktx->ktx_state == KTX_RDMAING);
+        LASSERT (krx->krx_rxd == rxd);
+        /* RPC completes with failure by default */
+        LASSERT (krx->krx_rpc_reply_needed);
+        LASSERT (krx->krx_rpc_reply_status != 0);
+
+        if (status == EP_SUCCESS) {
+                status = krx->krx_rpc_reply_status = 0;
+        } else {
+                /* Abandon RPC since get failed */
+                krx->krx_rpc_reply_needed = 0;
+                status = -ECONNABORTED;
         }
 
-        /* Map the source data... */
+        /* free ktx & finalize() its lib_msg_t */
+        kqswnal_tx_done(ktx, status);
+
+        if (!in_interrupt()) {
+                /* OK to complete the RPC now (iff I had the last ref) */
+                kqswnal_rx_decref (krx);
+                return;
+        }
+
+        LASSERT (krx->krx_state == KRX_PARSE);
+        krx->krx_state = KRX_COMPLETING;
+
+        /* Complete the RPC in thread context */
+        spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
+
+        list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
+        wake_up (&kqswnal_data.kqn_sched_waitq);
+
+        spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+}
+
+int
+kqswnal_rdma (kqswnal_rx_t *krx, lib_msg_t *libmsg, int type,
+              int niov, struct iovec *iov, ptl_kiov_t *kiov,
+              size_t offset, size_t len)
+{
+        kqswnal_remotemd_t *rmd;
+        kqswnal_tx_t       *ktx;
+        int                 eprc;
+        int                 rc;
+#if !MULTIRAIL_EKC
+        EP_DATAVEC          datav[EP_MAXFRAG];
+        int                 ndatav;
+#endif
+
+        LASSERT (type == PTL_MSG_GET || type == PTL_MSG_PUT);
+        /* Not both mapped and paged payload */
+        LASSERT (iov == NULL || kiov == NULL);
+        /* RPC completes with failure by default */
+        LASSERT (krx->krx_rpc_reply_needed);
+        LASSERT (krx->krx_rpc_reply_status != 0);
+
+        rmd = kqswnal_parse_rmd(krx, type, libmsg->ev.initiator.nid);
+        if (rmd == NULL)
+                return (-EPROTO);
+
+        if (len == 0) {
+                /* data got truncated to nothing. */
+                lib_finalize(&kqswnal_lib, krx, libmsg, PTL_OK);
+                /* Let kqswnal_rx_done() complete the RPC with success */
+                krx->krx_rpc_reply_status = 0;
+                return (0);
+        }
+        
+        /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
+           actually sending a portals message with it */
+        ktx = kqswnal_get_idle_tx(NULL, 0);
+        if (ktx == NULL) {
+                CERROR ("Can't get txd for RDMA with "LPX64"\n",
+                        libmsg->ev.initiator.nid);
+                return (-ENOMEM);
+        }
+
+        ktx->ktx_state   = KTX_RDMAING;
+        ktx->ktx_nid     = libmsg->ev.initiator.nid;
+        ktx->ktx_args[0] = krx;
+        ktx->ktx_args[1] = libmsg;
+
+#if MULTIRAIL_EKC
+        /* Map on the rail the RPC prefers */
+        ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
+                                         ep_rxd_railmask(krx->krx_rxd));
+#endif
+
+        /* Start mapping at offset 0 (we're not mapping any headers) */
         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
+        
         if (kiov != NULL)
-                rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
+                rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
         else
-                rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
+                rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
 
         if (rc != 0) {
-                CERROR ("Can't map source data: %d\n", rc);
-                return (rc);
+                CERROR ("Can't map local RDMA data: %d\n", rc);
+                goto out;
         }
 
 #if MULTIRAIL_EKC
-        if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
-                CERROR("Can't cope with unequal # frags: %d local %d remote\n",
-                       ktx->ktx_nfrag, rmd->kqrmd_nfrag);
-                return (-EINVAL);
+        rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
+                                 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+        if (rc != 0) {
+                CERROR ("Incompatible RDMA descriptors\n");
+                goto out;
         }
-        
-        for (i = 0; i < rmd->kqrmd_nfrag; i++)
-                if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
-                        CERROR("Can't cope with unequal frags %d(%d):"
-                               " %d local %d remote\n",
-                               i, rmd->kqrmd_nfrag, 
-                               ktx->ktx_frags[i].nmd_len, 
-                               rmd->kqrmd_frag[i].nmd_len);
-                        return (-EINVAL);
-                }
 #else
-        ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
-                                      ktx->ktx_nfrag, ktx->ktx_frags,
-                                      rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+        switch (type) {
+        default:
+                LBUG();
+
+        case PTL_MSG_GET:
+                ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
+                                             ktx->ktx_nfrag, ktx->ktx_frags,
+                                             rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+                break;
+
+        case PTL_MSG_PUT:
+                ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
+                                             rmd->kqrmd_nfrag, rmd->kqrmd_frag,
+                                             ktx->ktx_nfrag, ktx->ktx_frags);
+                break;
+        }
+                
         if (ndatav < 0) {
                 CERROR ("Can't create datavec: %d\n", ndatav);
-                return (ndatav);
+                rc = ndatav;
+                goto out;
         }
 #endif
 
-        /* Our caller will start to race with kqswnal_dma_reply_complete... */
-        LASSERT (atomic_read (&krx->krx_refcount) == 1);
-        atomic_set (&krx->krx_refcount, 2);
+        LASSERT (atomic_read(&krx->krx_refcount) > 0);
+        /* Take an extra ref for the completion callback */
+        atomic_inc(&krx->krx_refcount);
 
-#if MULTIRAIL_EKC
-        rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx, 
-                             &kqswnal_rpc_success,
-                             ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
-        if (rc == EP_SUCCESS)
-                return (0);
+        switch (type) {
+        default:
+                LBUG();
 
-        /* Well we tried... */
-        krx->krx_rpc_reply_needed = 0;
+        case PTL_MSG_GET:
+#if MULTIRAIL_EKC
+                eprc = ep_complete_rpc(krx->krx_rxd, 
+                                       kqswnal_rdma_store_complete, ktx, 
+                                       &kqswnal_data.kqn_rpc_success,
+                                       ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
 #else
-        rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
-                              &kqswnal_rpc_success, datav, ndatav);
-        if (rc == EP_SUCCESS)
-                return (0);
-
-        /* "old" EKC destroys rxd on failed completion */
-        krx->krx_rxd = NULL;
+                eprc = ep_complete_rpc (krx->krx_rxd, 
+                                        kqswnal_rdma_store_complete, ktx,
+                                        &kqswnal_data.kqn_rpc_success, 
+                                        datav, ndatav);
+                if (eprc != EP_SUCCESS) /* "old" EKC destroys rxd on failed completion */
+                        krx->krx_rxd = NULL;
 #endif
+                if (eprc != EP_SUCCESS) {
+                        CERROR("can't complete RPC: %d\n", eprc);
+                        /* don't re-attempt RPC completion */
+                        krx->krx_rpc_reply_needed = 0;
+                        rc = -ECONNABORTED;
+                }
+                break;
+                
+        case PTL_MSG_PUT:
+#if MULTIRAIL_EKC
+                eprc = ep_rpc_get (krx->krx_rxd, 
+                                   kqswnal_rdma_fetch_complete, ktx,
+                                   rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
+#else
+                eprc = ep_rpc_get (krx->krx_rxd,
+                                   kqswnal_rdma_fetch_complete, ktx,
+                                   datav, ndatav);
+#endif
+                if (eprc != EP_SUCCESS) {
+                        CERROR("ep_rpc_get failed: %d\n", eprc);
+                        /* Don't attempt RPC completion: 
+                         * EKC nuked it when the get failed */
+                        krx->krx_rpc_reply_needed = 0;
+                        rc = -ECONNABORTED;
+                }
+                break;
+        }
 
-        CERROR("can't complete RPC: %d\n", rc);
-
-        /* reset refcount back to 1: we're not going to be racing with
-         * kqswnal_dma_reply_complete. */
-        atomic_set (&krx->krx_refcount, 1);
+ out:
+        if (rc != 0) {
+                kqswnal_rx_decref(krx);                 /* drop callback's ref */
+                kqswnal_put_idle_tx (ktx);
+        }
 
-        return (-ECONNABORTED);
+        atomic_dec(&kqswnal_data.kqn_pending_txs);
+        return (rc);
 }
 
 static ptl_err_t
-kqswnal_sendmsg (nal_cb_t     *nal,
+kqswnal_sendmsg (lib_nal_t    *nal,
                  void         *private,
                  lib_msg_t    *libmsg,
                  ptl_hdr_t    *hdr,
@@ -910,6 +1039,8 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         int                sumoff;
         int                sumnob;
 #endif
+        /* NB 1. hdr is in network byte order */
+        /*    2. 'private' depends on the message type */
         
         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
                " pid %u\n", payload_nob, payload_niov, nid, pid);
@@ -921,13 +1052,22 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         LASSERT (payload_kiov == NULL || !in_interrupt ());
         /* payload is either all vaddrs or all pages */
         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
-        
+
         if (payload_nob > KQSW_MAXPAYLOAD) {
                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
                         payload_nob, KQSW_MAXPAYLOAD);
                 return (PTL_FAIL);
         }
 
+        if (type == PTL_MSG_REPLY &&            /* can I look in 'private' */
+            ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) { /* is it an RPC */
+                /* Must be a REPLY for an optimized GET */
+                rc = kqswnal_rdma ((kqswnal_rx_t *)private, libmsg, PTL_MSG_GET,
+                                   payload_niov, payload_iov, payload_kiov, 
+                                   payload_offset, payload_nob);
+                return ((rc == 0) ? PTL_OK : PTL_FAIL);
+        }
+
         targetnid = nid;
         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
@@ -950,40 +1090,18 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                           type == PTL_MSG_REPLY ||
                                           in_interrupt()));
         if (ktx == NULL) {
-                kqswnal_cerror_hdr (hdr);
+                CERROR ("Can't get txd for msg type %d for "LPX64"\n",
+                        type, libmsg->ev.initiator.nid);
                 return (PTL_NO_SPACE);
         }
 
+        ktx->ktx_state   = KTX_SENDING;
         ktx->ktx_nid     = targetnid;
         ktx->ktx_args[0] = private;
         ktx->ktx_args[1] = libmsg;
-
-        if (type == PTL_MSG_REPLY &&
-            ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
-                if (nid != targetnid ||
-                    kqswnal_nid2elanid(nid) != 
-                    ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
-                        CERROR("Optimized reply nid conflict: "
-                               "nid "LPX64" via "LPX64" elanID %d\n",
-                               nid, targetnid,
-                               ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
-                        return (PTL_FAIL);
-                }
-
-                /* peer expects RPC completion with GET data */
-                rc = kqswnal_dma_reply (ktx, payload_niov, 
-                                        payload_iov, payload_kiov, 
-                                        payload_offset, payload_nob);
-                if (rc == 0)
-                        return (PTL_OK);
-                
-                CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
-                kqswnal_put_idle_tx (ktx);
-                return (PTL_FAIL);
-        }
+        ktx->ktx_args[2] = NULL;    /* set when a GET commits to REPLY */
 
         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
-        ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
 
 #if KQSW_CHECKSUM
         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
@@ -1023,28 +1141,31 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
 #endif
 
-        if (kqswnal_data.kqn_optimized_gets &&
-            type == PTL_MSG_GET &&              /* doing a GET */
-            nid == targetnid) {                 /* not forwarding */
+        /* The first frag will be the pre-mapped buffer for (at least) the
+         * portals header. */
+        ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+
+        if (nid == targetnid &&                 /* not forwarding */
+            ((type == PTL_MSG_GET &&            /* optimize GET? */
+              kqswnal_tunables.kqn_optimized_gets != 0 &&
+              le32_to_cpu(hdr->msg.get.sink_length) >= kqswnal_tunables.kqn_optimized_gets) ||
+             (type == PTL_MSG_PUT &&            /* optimize PUT? */
+              kqswnal_tunables.kqn_optimized_puts != 0 &&
+              payload_nob >= kqswnal_tunables.kqn_optimized_puts))) {
                 lib_md_t           *md = libmsg->md;
                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
                 
-                /* Optimised path: I send over the Elan vaddrs of the get
-                 * sink buffers, and my peer DMAs directly into them.
+                /* Optimised path: I send over the Elan vaddrs of the local
+                 * buffers, and my peer DMAs directly to/from them.
                  *
                  * First I set up ktx as if it was going to send this
                  * payload, (it needs to map it anyway).  This fills
                  * ktx_frags[1] and onward with the network addresses
                  * of the GET sink frags.  I copy these into ktx_buffer,
-                 * immediately after the header, and send that as my GET
-                 * message.
-                 *
-                 * Note that the addresses are sent in native endian-ness.
-                 * When EKC copes with different endian nodes, I'll fix
-                 * this (and eat my hat :) */
+                 * immediately after the header, and send that as my
+                 * message. */
 
-                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
-                ktx->ktx_state = KTX_GETTING;
+                ktx->ktx_state = (type == PTL_MSG_PUT) ? KTX_PUTTING : KTX_GETTING;
 
                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
                         rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
@@ -1052,11 +1173,8 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 else
                         rc = kqswnal_map_tx_iov (ktx, 0, md->length,
                                                  md->md_niov, md->md_iov.iov);
-
-                if (rc < 0) {
-                        kqswnal_put_idle_tx (ktx);
-                        return (PTL_FAIL);
-                }
+                if (rc != 0)
+                        goto out;
 
                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
 
@@ -1077,12 +1195,21 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
 #endif
+                if (type == PTL_MSG_GET) {
+                        /* Allocate reply message now while I'm in thread context */
+                        ktx->ktx_args[2] = lib_create_reply_msg (&kqswnal_lib,
+                                                                 nid, libmsg);
+                        if (ktx->ktx_args[2] == NULL)
+                                goto out;
+
+                        /* NB finalizing the REPLY message is my
+                         * responsibility now, whatever happens. */
+                }
+                
         } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
 
                 /* small message: single frag copied into the pre-mapped buffer */
 
-                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
-                ktx->ktx_state = KTX_SENDING;
 #if MULTIRAIL_EKC
                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
                               0, KQSW_HDR_SIZE + payload_nob);
@@ -1104,8 +1231,6 @@ kqswnal_sendmsg (nal_cb_t     *nal,
 
                 /* large message: multiple frags: first is hdr in pre-mapped buffer */
 
-                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
-                ktx->ktx_state = KTX_SENDING;
 #if MULTIRAIL_EKC
                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
                               0, KQSW_HDR_SIZE);
@@ -1119,29 +1244,44 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 else
                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
                                                  payload_niov, payload_iov);
-                if (rc != 0) {
-                        kqswnal_put_idle_tx (ktx);
-                        return (PTL_FAIL);
-                }
+                if (rc != 0)
+                        goto out;
         }
         
         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
 
         rc = kqswnal_launch (ktx);
-        if (rc != 0) {                    /* failed? */
-                CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
+
+ out:
+        CDEBUG(rc == 0 ? D_NET : D_ERROR, 
+               "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n", 
+               rc == 0 ? "Sent" : "Failed to send",
+               payload_nob, nid, targetnid, rc);
+
+        if (rc != 0) {
+                if (ktx->ktx_state == KTX_GETTING &&
+                    ktx->ktx_args[2] != NULL) {
+                        /* We committed to reply, but there was a problem
+                         * launching the GET.  We can't avoid delivering a
+                         * REPLY event since we committed above, so we
+                         * pretend the GET succeeded but the REPLY
+                         * failed. */
+                        rc = 0;
+                        lib_finalize (&kqswnal_lib, private, libmsg, PTL_OK);
+                        lib_finalize (&kqswnal_lib, private,
+                                      (lib_msg_t *)ktx->ktx_args[2], PTL_FAIL);
+                }
+                
                 kqswnal_put_idle_tx (ktx);
-                return (PTL_FAIL);
         }
-
-        CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n", 
-               payload_nob, nid, targetnid);
-        return (PTL_OK);
+        
+        atomic_dec(&kqswnal_data.kqn_pending_txs);
+        return (rc == 0 ? PTL_OK : PTL_FAIL);
 }
 
 static ptl_err_t
-kqswnal_send (nal_cb_t     *nal,
+kqswnal_send (lib_nal_t    *nal,
               void         *private,
               lib_msg_t    *libmsg,
               ptl_hdr_t    *hdr,
@@ -1159,7 +1299,7 @@ kqswnal_send (nal_cb_t     *nal,
 }
 
 static ptl_err_t
-kqswnal_send_pages (nal_cb_t     *nal,
+kqswnal_send_pages (lib_nal_t    *nal,
                     void         *private,
                     lib_msg_t    *libmsg,
                     ptl_hdr_t    *hdr,
@@ -1198,18 +1338,17 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         if (ktx == NULL)        /* can't get txd right now */
                 return;         /* fwd will be scheduled when tx desc freed */
 
-        if (nid == kqswnal_lib.ni.nid)          /* gateway is me */
+        if (nid == kqswnal_lib.libnal_ni.ni_pid.nid) /* gateway is me */
                 nid = fwd->kprfd_target_nid;    /* target is final dest */
 
         if (kqswnal_nid2elanid (nid) < 0) {
                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
                 rc = -EHOSTUNREACH;
-                goto failed;
+                goto out;
         }
 
         /* copy hdr into pre-mapped buffer */
         memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
-        ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
 
         ktx->ktx_port    = (nob <= KQSW_SMALLPAYLOAD) ?
                            EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
@@ -1244,20 +1383,19 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 #endif
                 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
                 if (rc != 0)
-                        goto failed;
+                        goto out;
         }
 
         rc = kqswnal_launch (ktx);
-        if (rc == 0)
-                return;
+ out:
+        if (rc != 0) {
+                CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
 
- failed:
-        LASSERT (rc != 0);
-        CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
+                /* complete now (with failure) */
+                kqswnal_tx_done (ktx, rc);
+        }
 
-        kqswnal_put_idle_tx (ktx);
-        /* complete now (with failure) */
-        kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+        atomic_dec(&kqswnal_data.kqn_pending_txs);
 }
 
 void
@@ -1272,32 +1410,51 @@ kqswnal_fwd_callback (void *arg, int error)
                 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
 
                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
-                       NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
+                       le64_to_cpu(hdr->src_nid), le64_to_cpu(hdr->dest_nid),error);
         }
 
-        kqswnal_requeue_rx (krx);
+        LASSERT (atomic_read(&krx->krx_refcount) == 1);
+        kqswnal_rx_decref (krx);
 }
 
 void
-kqswnal_dma_reply_complete (EP_RXD *rxd) 
+kqswnal_requeue_rx (kqswnal_rx_t *krx)
 {
-        int           status = ep_rxd_status(rxd);
-        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
-        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
-        lib_msg_t    *msg = (lib_msg_t *)ktx->ktx_args[1];
-        
-        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
-               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+        LASSERT (atomic_read(&krx->krx_refcount) == 0);
+        LASSERT (!krx->krx_rpc_reply_needed);
 
-        LASSERT (krx->krx_rxd == rxd);
-        LASSERT (krx->krx_rpc_reply_needed);
+        krx->krx_state = KRX_POSTED;
 
-        krx->krx_rpc_reply_needed = 0;
-        kqswnal_rx_done (krx);
+#if MULTIRAIL_EKC
+        if (kqswnal_data.kqn_shuttingdown) {
+                /* free EKC rxd on shutdown */
+                ep_complete_receive(krx->krx_rxd);
+        } else {
+                /* repost receive */
+                ep_requeue_receive(krx->krx_rxd, 
+                                   kqswnal_rxhandler, krx,
+                                   &krx->krx_elanbuffer, 0);
+        }
+#else                
+        if (kqswnal_data.kqn_shuttingdown)
+                return;
 
-        lib_finalize (&kqswnal_lib, NULL, msg,
-                      (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
-        kqswnal_put_idle_tx (ktx);
+        if (krx->krx_rxd == NULL) {
+                /* We had a failed ep_complete_rpc() which nukes the
+                 * descriptor in "old" EKC */
+                int eprc = ep_queue_receive(krx->krx_eprx, 
+                                            kqswnal_rxhandler, krx,
+                                            krx->krx_elanbuffer, 
+                                            krx->krx_npages * PAGE_SIZE, 0);
+                LASSERT (eprc == EP_SUCCESS);
+                /* We don't handle failure here; it's incredibly rare
+                 * (never reported?) and only happens with "old" EKC */
+        } else {
+                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+                                   krx->krx_elanbuffer, 
+                                   krx->krx_npages * PAGE_SIZE);
+        }
+#endif
 }
 
 void
@@ -1317,97 +1474,74 @@ kqswnal_rpc_complete (EP_RXD *rxd)
 }
 
 void
-kqswnal_requeue_rx (kqswnal_rx_t *krx) 
+kqswnal_rx_done (kqswnal_rx_t *krx) 
 {
-        int   rc;
+        int           rc;
+        EP_STATUSBLK *sblk;
 
         LASSERT (atomic_read(&krx->krx_refcount) == 0);
 
         if (krx->krx_rpc_reply_needed) {
+                /* We've not completed the peer's RPC yet... */
+                sblk = (krx->krx_rpc_reply_status == 0) ? 
+                       &kqswnal_data.kqn_rpc_success : 
+                       &kqswnal_data.kqn_rpc_failed;
 
-                /* We failed to complete the peer's optimized GET (e.g. we
-                 * couldn't map the source buffers).  We complete the
-                 * peer's EKC rpc now with failure. */
+                LASSERT (!in_interrupt());
 #if MULTIRAIL_EKC
-                rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
-                                     &kqswnal_rpc_failed, NULL, NULL, 0);
+                rc = ep_complete_rpc(krx->krx_rxd, 
+                                     kqswnal_rpc_complete, krx,
+                                     sblk, NULL, NULL, 0);
                 if (rc == EP_SUCCESS)
                         return;
-                
-                CERROR("can't complete RPC: %d\n", rc);
 #else
-                if (krx->krx_rxd != NULL) {
-                        /* We didn't try (and fail) to complete earlier... */
-                        rc = ep_complete_rpc(krx->krx_rxd, 
-                                             kqswnal_rpc_complete, krx,
-                                             &kqswnal_rpc_failed, NULL, 0);
-                        if (rc == EP_SUCCESS)
-                                return;
-
-                        CERROR("can't complete RPC: %d\n", rc);
-                }
-                
-                /* NB the old ep_complete_rpc() frees rxd on failure, so we
-                 * have to requeue from scratch here, unless we're shutting
-                 * down */
-                if (kqswnal_data.kqn_shuttingdown)
+                rc = ep_complete_rpc(krx->krx_rxd, 
+                                     kqswnal_rpc_complete, krx,
+                                     sblk, NULL, 0);
+                if (rc == EP_SUCCESS)
                         return;
 
-                rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
-                                      krx->krx_elanbuffer, 
-                                      krx->krx_npages * PAGE_SIZE, 0);
-                LASSERT (rc == EP_SUCCESS);
-                /* We don't handle failure here; it's incredibly rare
-                 * (never reported?) and only happens with "old" EKC */
-                return;
+                /* "old" EKC destroys rxd on failed completion */
+                krx->krx_rxd = NULL;
 #endif
+                CERROR("can't complete RPC: %d\n", rc);
+                krx->krx_rpc_reply_needed = 0;
         }
 
-#if MULTIRAIL_EKC
-        if (kqswnal_data.kqn_shuttingdown) {
-                /* free EKC rxd on shutdown */
-                ep_complete_receive(krx->krx_rxd);
-        } else {
-                /* repost receive */
-                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
-                                   &krx->krx_elanbuffer, 0);
-        }
-#else                
-        /* don't actually requeue on shutdown */
-        if (!kqswnal_data.kqn_shuttingdown) 
-                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
-                                   krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
-#endif
+        kqswnal_requeue_rx(krx);
 }
         
 void
-kqswnal_rx (kqswnal_rx_t *krx)
+kqswnal_parse (kqswnal_rx_t *krx)
 {
         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
-        ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
+        ptl_nid_t       dest_nid = le64_to_cpu(hdr->dest_nid);
         int             payload_nob;
         int             nob;
         int             niov;
 
-        LASSERT (atomic_read(&krx->krx_refcount) == 0);
+        LASSERT (atomic_read(&krx->krx_refcount) == 1);
 
-        if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
-                atomic_set(&krx->krx_refcount, 1);
-                lib_parse (&kqswnal_lib, hdr, krx);
-                kqswnal_rx_done(krx);
+        if (dest_nid == kqswnal_lib.libnal_ni.ni_pid.nid) { /* It's for me :) */
+                /* I ignore parse errors since I'm not consuming a byte
+                 * stream */
+                (void)lib_parse (&kqswnal_lib, hdr, krx);
+
+                /* Drop my ref; any RDMA activity takes an additional ref */
+                kqswnal_rx_decref(krx);
                 return;
         }
 
 #if KQSW_CHECKSUM
-        CERROR ("checksums for forwarded packets not implemented\n");
-        LBUG ();
+        LASSERTF (0, "checksums for forwarded packets not implemented\n");
 #endif
+
         if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
         {
                 CERROR("dropping packet from "LPX64" for "LPX64
-                       ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
+                       ": target is peer\n", le64_to_cpu(hdr->src_nid), dest_nid);
 
-                kqswnal_requeue_rx (krx);
+                kqswnal_rx_decref (krx);
                 return;
         }
 
@@ -1440,7 +1574,7 @@ kqswnal_rx (kqswnal_rx_t *krx)
 void 
 kqswnal_rxhandler(EP_RXD *rxd)
 {
-        long          flags;
+        unsigned long flags;
         int           nob    = ep_rxd_len (rxd);
         int           status = ep_rxd_status (rxd);
         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
@@ -1449,15 +1583,21 @@ kqswnal_rxhandler(EP_RXD *rxd)
                rxd, krx, nob, status);
 
         LASSERT (krx != NULL);
-
+        LASSERT (krx->krx_state = KRX_POSTED);
+        
+        krx->krx_state = KRX_PARSE;
         krx->krx_rxd = rxd;
         krx->krx_nob = nob;
-#if MULTIRAIL_EKC
-        krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
-#else
-        krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
-#endif
-        
+
+        /* RPC reply iff rpc request received without error */
+        krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
+                                    (status == EP_SUCCESS ||
+                                     status == EP_MSG_TOO_BIG);
+
+        /* Default to failure if an RPC reply is requested but not handled */
+        krx->krx_rpc_reply_status = -EPROTO;
+        atomic_set (&krx->krx_refcount, 1);
+
         /* must receive a whole header to be able to parse */
         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
         {
@@ -1473,12 +1613,12 @@ kqswnal_rxhandler(EP_RXD *rxd)
                         CERROR("receive status failed with status %d nob %d\n",
                                ep_rxd_status(rxd), nob);
 #endif
-                kqswnal_requeue_rx (krx);
+                kqswnal_rx_decref(krx);
                 return;
         }
 
         if (!in_interrupt()) {
-                kqswnal_rx (krx);
+                kqswnal_parse(krx);
                 return;
         }
 
@@ -1499,30 +1639,30 @@ kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
                 ", dpid %d, spid %d, type %d\n",
                 ishdr ? "Header" : "Payload", krx,
-                NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
-                NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
-                NTOH__u32(hdr->type));
+                le64_to_cpu(hdr->dest_nid), le64_to_cpu(hdr->src_nid)
+                le32_to_cpu(hdr->dest_pid), le32_to_cpu(hdr->src_pid),
+                le32_to_cpu(hdr->type));
 
-        switch (NTOH__u32 (hdr->type))
+        switch (le32_to_cpu(hdr->type))
         {
         case PTL_MSG_ACK:
                 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
                        " len %u\n",
-                       NTOH__u32(hdr->msg.ack.mlength),
+                       le32_to_cpu(hdr->msg.ack.mlength),
                        hdr->msg.ack.dst_wmd.handle_cookie,
                        hdr->msg.ack.dst_wmd.handle_idx,
-                       NTOH__u64(hdr->msg.ack.match_bits),
-                       NTOH__u32(hdr->msg.ack.length));
+                       le64_to_cpu(hdr->msg.ack.match_bits),
+                       le32_to_cpu(hdr->msg.ack.length));
                 break;
         case PTL_MSG_PUT:
                 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
                        " len %u off %u data "LPX64"\n",
-                       NTOH__u32(hdr->msg.put.ptl_index),
+                       le32_to_cpu(hdr->msg.put.ptl_index),
                        hdr->msg.put.ack_wmd.handle_cookie,
                        hdr->msg.put.ack_wmd.handle_idx,
-                       NTOH__u64(hdr->msg.put.match_bits),
-                       NTOH__u32(hdr->msg.put.length),
-                       NTOH__u32(hdr->msg.put.offset),
+                       le64_to_cpu(hdr->msg.put.match_bits),
+                       le32_to_cpu(hdr->msg.put.length),
+                       le32_to_cpu(hdr->msg.put.offset),
                        hdr->msg.put.hdr_data);
                 break;
         case PTL_MSG_GET:
@@ -1538,7 +1678,7 @@ kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
 #endif
 
 static ptl_err_t
-kqswnal_recvmsg (nal_cb_t     *nal,
+kqswnal_recvmsg (lib_nal_t    *nal,
                  void         *private,
                  lib_msg_t    *libmsg,
                  unsigned int  niov,
@@ -1550,16 +1690,18 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 {
         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
         char         *buffer = page_address(krx->krx_kiov[0].kiov_page);
+        ptl_hdr_t    *hdr = (ptl_hdr_t *)buffer;
         int           page;
         char         *page_ptr;
         int           page_nob;
         char         *iov_ptr;
         int           iov_nob;
         int           frag;
+        int           rc;
 #if KQSW_CHECKSUM
         kqsw_csum_t   senders_csum;
         kqsw_csum_t   payload_csum = 0;
-        kqsw_csum_t   hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
+        kqsw_csum_t   hdr_csum = kqsw_csum(0, hdr, sizeof(*hdr));
         size_t        csum_len = mlen;
         int           csum_frags = 0;
         int           csum_nob = 0;
@@ -1572,8 +1714,18 @@ kqswnal_recvmsg (nal_cb_t     *nal,
         if (senders_csum != hdr_csum)
                 kqswnal_csum_error (krx, 1);
 #endif
+        /* NB lib_parse() has already flipped *hdr */
+
         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
 
+        if (krx->krx_rpc_reply_needed &&
+            hdr->type == PTL_MSG_PUT) {
+                /* This must be an optimized PUT */
+                rc = kqswnal_rdma (krx, libmsg, PTL_MSG_PUT,
+                                   niov, iov, kiov, offset, mlen);
+                return (rc == 0 ? PTL_OK : PTL_FAIL);
+        }
+
         /* What was actually received must be >= payload. */
         LASSERT (mlen <= rlen);
         if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
@@ -1689,7 +1841,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 }
 
 static ptl_err_t
-kqswnal_recv(nal_cb_t     *nal,
+kqswnal_recv(lib_nal_t    *nal,
              void         *private,
              lib_msg_t    *libmsg,
              unsigned int  niov,
@@ -1704,7 +1856,7 @@ kqswnal_recv(nal_cb_t     *nal,
 }
 
 static ptl_err_t
-kqswnal_recv_pages (nal_cb_t     *nal,
+kqswnal_recv_pages (lib_nal_t    *nal,
                     void         *private,
                     lib_msg_t    *libmsg,
                     unsigned int  niov,
@@ -1727,7 +1879,6 @@ kqswnal_thread_start (int (*fn)(void *arg), void *arg)
                 return ((int)pid);
 
         atomic_inc (&kqswnal_data.kqn_nthreads);
-        atomic_inc (&kqswnal_data.kqn_nthreads_running);
         return (0);
 }
 
@@ -1743,10 +1894,9 @@ kqswnal_scheduler (void *arg)
         kqswnal_rx_t    *krx;
         kqswnal_tx_t    *ktx;
         kpr_fwd_desc_t  *fwd;
-        long             flags;
+        unsigned long    flags;
         int              rc;
         int              counter = 0;
-        int              shuttingdown = 0;
         int              did_something;
 
         kportal_daemonize ("kqswnal_sched");
@@ -1756,18 +1906,6 @@ kqswnal_scheduler (void *arg)
 
         for (;;)
         {
-                if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
-
-                        if (kqswnal_data.kqn_shuttingdown == 2)
-                                break;
-                
-                        /* During stage 1 of shutdown we are still responsive
-                         * to receives */
-
-                        atomic_dec (&kqswnal_data.kqn_nthreads_running);
-                        shuttingdown = kqswnal_data.kqn_shuttingdown;
-                }
-
                 did_something = 0;
 
                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
@@ -1778,14 +1916,22 @@ kqswnal_scheduler (void *arg)
                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
                                                flags);
 
-                        kqswnal_rx (krx);
+                        switch (krx->krx_state) {
+                        case KRX_PARSE:
+                                kqswnal_parse (krx);
+                                break;
+                        case KRX_COMPLETING:
+                                kqswnal_rx_decref (krx);
+                                break;
+                        default:
+                                LBUG();
+                        }
 
                         did_something = 1;
                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!shuttingdown &&
-                    !list_empty (&kqswnal_data.kqn_delayedtxds))
+                if (!list_empty (&kqswnal_data.kqn_delayedtxds))
                 {
                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
                                          kqswnal_tx_t, ktx_list);
@@ -1794,31 +1940,31 @@ kqswnal_scheduler (void *arg)
                                                flags);
 
                         rc = kqswnal_launch (ktx);
-                        if (rc != 0)          /* failed: ktx_nid down? */
-                        {
+                        if (rc != 0) {
                                 CERROR("Failed delayed transmit to "LPX64
                                        ": %d\n", ktx->ktx_nid, rc);
                                 kqswnal_tx_done (ktx, rc);
                         }
+                        atomic_dec (&kqswnal_data.kqn_pending_txs);
 
                         did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                if (!shuttingdown &
-                    !list_empty (&kqswnal_data.kqn_delayedfwds))
+                if (!list_empty (&kqswnal_data.kqn_delayedfwds))
                 {
                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
                         list_del (&fwd->kprfd_list);
                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
 
+                        /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */
                         kqswnal_fwd_packet (NULL, fwd);
 
                         did_something = 1;
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
 
-                    /* nothing to do or hogging CPU */
+                /* nothing to do or hogging CPU */
                 if (!did_something || counter++ == KQSW_RESCHED) {
                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
                                                flags);
@@ -1826,39 +1972,34 @@ kqswnal_scheduler (void *arg)
                         counter = 0;
 
                         if (!did_something) {
+                                if (kqswnal_data.kqn_shuttingdown == 2) {
+                                        /* We only exit in stage 2 of shutdown when 
+                                         * there's nothing left to do */
+                                        break;
+                                }
                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
-                                                               kqswnal_data.kqn_shuttingdown != shuttingdown ||
+                                                               kqswnal_data.kqn_shuttingdown == 2 ||
                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
                                 LASSERT (rc == 0);
-                        } else if (current->need_resched)
+                        } else if (need_resched())
                                 schedule ();
 
                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
                 }
         }
 
-        spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
-
         kqswnal_thread_fini ();
         return (0);
 }
 
-nal_cb_t kqswnal_lib =
+lib_nal_t kqswnal_lib =
 {
-        nal_data:       &kqswnal_data,         /* NAL private data */
-        cb_send:        kqswnal_send,
-        cb_send_pages:  kqswnal_send_pages,
-        cb_recv:        kqswnal_recv,
-        cb_recv_pages:  kqswnal_recv_pages,
-        cb_read:        kqswnal_read,
-        cb_write:       kqswnal_write,
-        cb_malloc:      kqswnal_malloc,
-        cb_free:        kqswnal_free,
-        cb_printf:      kqswnal_printf,
-        cb_cli:         kqswnal_cli,
-        cb_sti:         kqswnal_sti,
-        cb_callback:    kqswnal_callback,
-        cb_dist:        kqswnal_dist
+        libnal_data:       &kqswnal_data,         /* NAL private data */
+        libnal_send:        kqswnal_send,
+        libnal_send_pages:  kqswnal_send_pages,
+        libnal_recv:        kqswnal_recv,
+        libnal_recv_pages:  kqswnal_recv_pages,
+        libnal_dist:        kqswnal_dist
 };