Whamcloud - gitweb
LU-1346 gnilnd: remove libcfs abstractions
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
index a6b4b93..eec1a6b 100644 (file)
@@ -1,13 +1,11 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
+/*
+ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  *
- * Copyright (C) 2002 Cluster File Systems, Inc.
- *   Author: Eric Barton <eric@bartonsoftware.com>
+ * Copyright (c) 2012, Intel Corporation.
  *
- * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
- * W. Marcus Miller - Based on ksocknal
+ * Author: Eric Barton <eric@bartonsoftware.com>
  *
- * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
+ * This file is part of Portals, http://www.lustre.org
  *
  * Portals is free software; you can redistribute it and/or
  * modify it under the terms of version 2 of the GNU General Public
  * You should have received a copy of the GNU General Public License
  * along with Portals; if not, write to the Free Software
  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- *
  */
 
-#include "qswnal.h"
-
-atomic_t kqswnal_packets_launched;
-atomic_t kqswnal_packets_transmitted;
-atomic_t kqswnal_packets_received;
-
-
-/*
- *  LIB functions follow
- *
- */
-static int
-kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
-             size_t len)
-{
-        CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
-                nal->ni.nid, len, src_addr, dst_addr );
-        memcpy( dst_addr, src_addr, len );
-
-        return (0);
-}
-
-static int
-kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
-              size_t len)
-{
-        CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
-                nal->ni.nid, len, src_addr, dst_addr );
-        memcpy( dst_addr, src_addr, len );
-
-        return (0);
-}
-
-static void *
-kqswnal_malloc(nal_cb_t *nal, size_t len)
-{
-        void *buf;
-
-        PORTAL_ALLOC(buf, len);
-        return (buf);
-}
-
-static void
-kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
-{
-        PORTAL_FREE(buf, len);
-}
-
-static void
-kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
-{
-        va_list ap;
-        char msg[256];
-
-        va_start (ap, fmt);
-        vsnprintf (msg, sizeof (msg), fmt, ap);        /* sprint safely */
-        va_end (ap);
-
-        msg[sizeof (msg) - 1] = 0;                /* ensure terminated */
+#include "qswlnd.h"
 
-        CDEBUG (D_NET, "%s", msg);
-}
-
-
-static void
-kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
-{
-        kqswnal_data_t *data= nal->nal_data;
-
-        spin_lock_irqsave(&data->kqn_statelock, *flags);
-}
-
-
-static void
-kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
+void
+kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
 {
-        kqswnal_data_t *data= nal->nal_data;
+        time_t             then;
 
-        spin_unlock_irqrestore(&data->kqn_statelock, *flags);
-}
+        then = cfs_time_current_sec() -
+                cfs_duration_sec(cfs_time_current() -
+                                 ktx->ktx_launchtime);
 
-
-static int
-kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
-{
-        if (nid == nal->ni.nid)
-                *dist = 0;                      /* it's me */
-        else if (kqswnal_nid2elanid (nid) >= 0)
-                *dist = 1;                      /* it's my peer */
-        else
-                *dist = 2;                      /* via router */
-        return (0);
+        lnet_notify(kqswnal_data.kqn_ni, ktx->ktx_nid, 0, then);
 }
 
 void
 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
 {
+        int      i;
+
+        ktx->ktx_rail = -1;                     /* unset rail */
+
         if (ktx->ktx_nmappedpages == 0)
                 return;
+        
+        CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
+               ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
 
-        CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
-                ktx, ktx->ktx_niov, ktx->ktx_basepage, ktx->ktx_nmappedpages);
-
-        LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
-        LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
-                 kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
+        for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
+                ep_dvma_unload(kqswnal_data.kqn_ep,
+                               kqswnal_data.kqn_ep_tx_nmh,
+                               &ktx->ktx_frags[i]);
 
-        elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
-                          kqswnal_data.kqn_eptxdmahandle,
-                          ktx->ktx_basepage, ktx->ktx_nmappedpages);
         ktx->ktx_nmappedpages = 0;
 }
 
 int
-kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
+kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, 
+                     unsigned int niov, lnet_kiov_t *kiov)
 {
-        int       nfrags    = ktx->ktx_niov;
-        const int maxfrags  = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]);
+        int       nfrags    = ktx->ktx_nfrag;
         int       nmapped   = ktx->ktx_nmappedpages;
         int       maxmapped = ktx->ktx_npages;
-        uint32_t  basepage  = ktx->ktx_basepage + nmapped;
+        __u32     basepage  = ktx->ktx_basepage + nmapped;
         char     *ptr;
-        
+
+        EP_RAILMASK railmask;
+        int         rail;
+
+        if (ktx->ktx_rail < 0)
+                ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+                                                 EP_RAILMASK_ALL,
+                                                 kqswnal_nid2elanid(ktx->ktx_nid));
+        rail = ktx->ktx_rail;
+        if (rail < 0) {
+                CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
+                return (-ENETDOWN);
+        }
+        railmask = 1 << rail;
+
         LASSERT (nmapped <= maxmapped);
-        LASSERT (nfrags <= maxfrags);
+        LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
+        LASSERT (nfrags <= EP_MAXFRAG);
         LASSERT (niov > 0);
         LASSERT (nob > 0);
-        
+
+        /* skip complete frags before 'offset' */
+        while (offset >= kiov->kiov_len) {
+                offset -= kiov->kiov_len;
+                kiov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+
         do {
-                int  fraglen = kiov->kiov_len;
+                int  fraglen = kiov->kiov_len - offset;
 
-                /* nob exactly spans the iovs */
-                LASSERT (fraglen <= nob);
-                /* each frag fits in a page */
+                /* each page frag is contained in one page */
                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
 
+                if (fraglen > nob)
+                        fraglen = nob;
+
                 nmapped++;
                 if (nmapped > maxmapped) {
                         CERROR("Can't map message in %d pages (max %d)\n",
@@ -167,79 +110,152 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
                         return (-EMSGSIZE);
                 }
 
-                if (nfrags == maxfrags) {
+                if (nfrags == EP_MAXFRAG) {
                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
-                               maxfrags);
+                               EP_MAXFRAG);
                         return (-EMSGSIZE);
                 }
 
                 /* XXX this is really crap, but we'll have to kmap until
                  * EKC has a page (rather than vaddr) mapping interface */
 
-                ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
+                ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
 
                 CDEBUG(D_NET,
                        "%p[%d] loading %p for %d, page %d, %d total\n",
                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
 
-                elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
-                                       kqswnal_data.kqn_eptxdmahandle,
-                                       ptr, fraglen,
-                                       basepage, &ktx->ktx_iov[nfrags].Base);
+                ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+                             ptr, fraglen,
+                             kqswnal_data.kqn_ep_tx_nmh, basepage,
+                             &railmask, &ktx->ktx_frags[nfrags]);
+
+                if (nfrags == ktx->ktx_firsttmpfrag ||
+                    !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags])) {
+                        /* new frag if this is the first or can't merge */
+                        nfrags++;
+                }
 
                 kunmap (kiov->kiov_page);
                 
                 /* keep in loop for failure case */
                 ktx->ktx_nmappedpages = nmapped;
 
-                if (nfrags > 0 &&                /* previous frag mapped */
-                    ktx->ktx_iov[nfrags].Base == /* contiguous with this one */
-                    (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len))
-                        /* just extend previous */
-                        ktx->ktx_iov[nfrags - 1].Len += fraglen;
-                else {
-                        ktx->ktx_iov[nfrags].Len = fraglen;
-                        nfrags++;                /* new frag */
-                }
-
                 basepage++;
                 kiov++;
                 niov--;
                 nob -= fraglen;
+                offset = 0;
 
                 /* iov must not run out before end of data */
                 LASSERT (nob == 0 || niov > 0);
 
         } while (nob > 0);
 
-        ktx->ktx_niov = nfrags;
+        ktx->ktx_nfrag = nfrags;
         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
-                ktx, ktx->ktx_niov, ktx->ktx_nmappedpages);
+                ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
 
         return (0);
 }
 
+#if KQSW_CKSUM
+__u32
+kqswnal_csum_kiov (__u32 csum, int offset, int nob, 
+                   unsigned int niov, lnet_kiov_t *kiov)
+{
+        char     *ptr;
+
+        if (nob == 0)
+                return csum;
+
+        LASSERT (niov > 0);
+        LASSERT (nob > 0);
+
+        /* skip complete frags before 'offset' */
+        while (offset >= kiov->kiov_len) {
+                offset -= kiov->kiov_len;
+                kiov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+
+        do {
+                int  fraglen = kiov->kiov_len - offset;
+
+                /* each page frag is contained in one page */
+                LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
+
+                if (fraglen > nob)
+                        fraglen = nob;
+
+                ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
+
+                csum = kqswnal_csum(csum, ptr, fraglen);
+
+                kunmap (kiov->kiov_page);
+                
+                kiov++;
+                niov--;
+                nob -= fraglen;
+                offset = 0;
+
+                /* iov must not run out before end of data */
+                LASSERT (nob == 0 || niov > 0);
+
+        } while (nob > 0);
+
+        return csum;
+}
+#endif
+
 int
-kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
+kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
+                    unsigned int niov, struct iovec *iov)
 {
-        int       nfrags    = ktx->ktx_niov;
-        const int maxfrags  = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]);
+        int       nfrags    = ktx->ktx_nfrag;
         int       nmapped   = ktx->ktx_nmappedpages;
         int       maxmapped = ktx->ktx_npages;
-        uint32_t  basepage  = ktx->ktx_basepage + nmapped;
+        __u32     basepage  = ktx->ktx_basepage + nmapped;
+
+        EP_RAILMASK railmask;
+        int         rail;
+        
+        if (ktx->ktx_rail < 0)
+                ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
+                                                 EP_RAILMASK_ALL,
+                                                 kqswnal_nid2elanid(ktx->ktx_nid));
+        rail = ktx->ktx_rail;
+        if (rail < 0) {
+                CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
+                return (-ENETDOWN);
+        }
+        railmask = 1 << rail;
 
         LASSERT (nmapped <= maxmapped);
-        LASSERT (nfrags <= maxfrags);
+        LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
+        LASSERT (nfrags <= EP_MAXFRAG);
         LASSERT (niov > 0);
         LASSERT (nob > 0);
 
+        /* skip complete frags before offset */
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                iov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+        
         do {
-                int  fraglen = iov->iov_len;
-                long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
-
-                /* nob exactly spans the iovs */
-                LASSERT (fraglen <= nob);
+                int  fraglen = iov->iov_len - offset;
+                long npages;
                 
+                if (fraglen > nob)
+                        fraglen = nob;
+                npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
+
                 nmapped += npages;
                 if (nmapped > maxmapped) {
                         CERROR("Can't map message in %d pages (max %d)\n",
@@ -247,170 +263,231 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
                         return (-EMSGSIZE);
                 }
 
-                if (nfrags == maxfrags) {
+                if (nfrags == EP_MAXFRAG) {
                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
-                               maxfrags);
+                               EP_MAXFRAG);
                         return (-EMSGSIZE);
                 }
 
                 CDEBUG(D_NET,
                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
-                        ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
-                        nmapped);
+                       ktx, nfrags, iov->iov_base + offset, fraglen, 
+                       basepage, npages, nmapped);
+
+                ep_dvma_load(kqswnal_data.kqn_ep, NULL,
+                             iov->iov_base + offset, fraglen,
+                             kqswnal_data.kqn_ep_tx_nmh, basepage,
+                             &railmask, &ktx->ktx_frags[nfrags]);
+
+                if (nfrags == ktx->ktx_firsttmpfrag ||
+                    !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags - 1],
+                                  &ktx->ktx_frags[nfrags])) {
+                        /* new frag if this is the first or can't merge */
+                        nfrags++;
+                }
 
-                elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
-                                       kqswnal_data.kqn_eptxdmahandle,
-                                       iov->iov_base, fraglen,
-                                       basepage, &ktx->ktx_iov[nfrags].Base);
                 /* keep in loop for failure case */
                 ktx->ktx_nmappedpages = nmapped;
 
-                if (nfrags > 0 &&                /* previous frag mapped */
-                    ktx->ktx_iov[nfrags].Base == /* contiguous with this one */
-                    (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len))
-                        /* just extend previous */
-                        ktx->ktx_iov[nfrags - 1].Len += fraglen;
-                else {
-                        ktx->ktx_iov[nfrags].Len = fraglen;
-                        nfrags++;                /* new frag */
-                }
-
                 basepage += npages;
                 iov++;
                 niov--;
                 nob -= fraglen;
+                offset = 0;
 
                 /* iov must not run out before end of data */
                 LASSERT (nob == 0 || niov > 0);
 
         } while (nob > 0);
 
-        ktx->ktx_niov = nfrags;
+        ktx->ktx_nfrag = nfrags;
         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
-                ktx, ktx->ktx_niov, ktx->ktx_nmappedpages);
+                ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
 
         return (0);
 }
 
-void
-kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
+#if KQSW_CKSUM
+__u32
+kqswnal_csum_iov (__u32 csum, int offset, int nob, 
+                  unsigned int niov, struct iovec *iov)
 {
-        kpr_fwd_desc_t   *fwd = NULL;
-        struct list_head *idle = ktx->ktx_idle;
-        unsigned long     flags;
-
-        kqswnal_unmap_tx (ktx);                /* release temporary mappings */
-        ktx->ktx_state = KTX_IDLE;
+        if (nob == 0)
+                return csum;
+        
+        LASSERT (niov > 0);
+        LASSERT (nob > 0);
 
-        spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
+        /* skip complete frags before offset */
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                iov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+        
+        do {
+                int  fraglen = iov->iov_len - offset;
+                
+                if (fraglen > nob)
+                        fraglen = nob;
 
-        list_add (&ktx->ktx_list, idle);
+                csum = kqswnal_csum(csum, iov->iov_base + offset, fraglen);
 
-        /* reserved for non-blocking tx */
-        if (idle == &kqswnal_data.kqn_nblk_idletxds) {
-                spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
-                return;
-        }
+                iov++;
+                niov--;
+                nob -= fraglen;
+                offset = 0;
 
-        /* anything blocking for a tx descriptor? */
-        if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
-        {
-                CDEBUG(D_NET,"wakeup fwd\n");
+                /* iov must not run out before end of data */
+                LASSERT (nob == 0 || niov > 0);
 
-                fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
-                                  kpr_fwd_desc_t, kprfd_list);
-                list_del (&fwd->kprfd_list);
-        }
+        } while (nob > 0);
 
-        if (waitqueue_active (&kqswnal_data.kqn_idletxd_waitq))  /* process? */
-        {
-                /* local sender waiting for tx desc */
-                CDEBUG(D_NET,"wakeup process\n");
-                wake_up (&kqswnal_data.kqn_idletxd_waitq);
-        }
+        return csum;
+}
+#endif
 
-        spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
+void
+kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
+{
+       unsigned long     flags;
 
-        if (fwd == NULL)
-                return;
+       kqswnal_unmap_tx(ktx);                  /* release temporary mappings */
+       ktx->ktx_state = KTX_IDLE;
 
-        /* schedule packet for forwarding again */
-        spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
+       spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
 
-        list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
-        if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
-                wake_up (&kqswnal_data.kqn_sched_waitq);
+       cfs_list_del(&ktx->ktx_list);           /* take off active list */
+       cfs_list_add(&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
 
-        spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+       spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
 }
 
 kqswnal_tx_t *
-kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
+kqswnal_get_idle_tx (void)
 {
-        unsigned long  flags;
-        kqswnal_tx_t  *ktx = NULL;
-
-        for (;;) {
-                spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
-
-                /* "normal" descriptor is free */
-                if (!list_empty (&kqswnal_data.kqn_idletxds)) {
-                        ktx = list_entry (kqswnal_data.kqn_idletxds.next,
-                                          kqswnal_tx_t, ktx_list);
-                        list_del (&ktx->ktx_list);
-                        break;
-                }
-
-                /* "normal" descriptor pool is empty */
-
-                if (fwd != NULL) { /* forwarded packet => queue for idle txd */
-                        CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
-                        list_add_tail (&fwd->kprfd_list,
-                                       &kqswnal_data.kqn_idletxd_fwdq);
-                        break;
-                }
+       unsigned long  flags;
+       kqswnal_tx_t  *ktx;
 
-                /* doing a local transmit */
-                if (!may_block) {
-                        if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
-                                CERROR ("intr tx desc pool exhausted\n");
-                                break;
-                        }
+       spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
 
-                        ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
-                                          kqswnal_tx_t, ktx_list);
-                        list_del (&ktx->ktx_list);
-                        break;
-                }
+       if (kqswnal_data.kqn_shuttingdown ||
+           cfs_list_empty(&kqswnal_data.kqn_idletxds)) {
+               spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
 
-                /* block for idle tx */
+               return NULL;
+       }
 
-                spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
+        ktx = cfs_list_entry (kqswnal_data.kqn_idletxds.next, kqswnal_tx_t,
+                              ktx_list);
+        cfs_list_del (&ktx->ktx_list);
 
-                CDEBUG (D_NET, "blocking for tx desc\n");
-                wait_event (kqswnal_data.kqn_idletxd_waitq,
-                            !list_empty (&kqswnal_data.kqn_idletxds));
-        }
+        cfs_list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
+        ktx->ktx_launcher = current->pid;
+        cfs_atomic_inc(&kqswnal_data.kqn_pending_txs);
 
-        spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
+       spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
 
         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
-        LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
+        LASSERT (ktx->ktx_nmappedpages == 0);
         return (ktx);
 }
 
 void
-kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
+kqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx)
 {
+        lnet_msg_t    *lnetmsg0 = NULL;
+        lnet_msg_t    *lnetmsg1 = NULL;
+        int            status0  = 0;
+        int            status1  = 0;
+        kqswnal_rx_t  *krx;
+
+        LASSERT (!cfs_in_interrupt());
+
+        if (ktx->ktx_status == -EHOSTDOWN)
+                kqswnal_notify_peer_down(ktx);
+
         switch (ktx->ktx_state) {
-        case KTX_FORWARDING:       /* router asked me to forward this packet */
-                kpr_fwd_done (&kqswnal_data.kqn_router,
-                              (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
+        case KTX_RDMA_FETCH:                    /* optimized PUT/REPLY handled */
+                krx      = (kqswnal_rx_t *)ktx->ktx_args[0];
+                lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
+                status0  = ktx->ktx_status;
+#if KQSW_CKSUM
+                if (status0 == 0) {             /* RDMA succeeded */
+                        kqswnal_msg_t *msg;
+                        __u32          csum;
+
+                        msg = (kqswnal_msg_t *)
+                              page_address(krx->krx_kiov[0].kiov_page);
+
+                        csum = (lnetmsg0->msg_kiov != NULL) ?
+                               kqswnal_csum_kiov(krx->krx_cksum,
+                                                 lnetmsg0->msg_offset,
+                                                 lnetmsg0->msg_wanted,
+                                                 lnetmsg0->msg_niov,
+                                                 lnetmsg0->msg_kiov) :
+                               kqswnal_csum_iov(krx->krx_cksum,
+                                                lnetmsg0->msg_offset,
+                                                lnetmsg0->msg_wanted,
+                                                lnetmsg0->msg_niov,
+                                                lnetmsg0->msg_iov);
+
+                        /* Can only check csum if I got it all */
+                        if (lnetmsg0->msg_wanted == lnetmsg0->msg_len &&
+                            csum != msg->kqm_cksum) {
+                                ktx->ktx_status = -EIO;
+                                krx->krx_rpc_reply.msg.status = -EIO;
+                                CERROR("RDMA checksum failed %u(%u) from %s\n",
+                                       csum, msg->kqm_cksum,
+                                       libcfs_nid2str(kqswnal_rx_nid(krx)));
+                        }
+                }
+#endif       
+                LASSERT (krx->krx_state == KRX_COMPLETING);
+                kqswnal_rx_decref (krx);
+                break;
+
+        case KTX_RDMA_STORE:       /* optimized GET handled */
+        case KTX_PUTTING:          /* optimized PUT sent */
+        case KTX_SENDING:          /* normal send */
+                lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
+                status0  = ktx->ktx_status;
                 break;
 
-        case KTX_SENDING:          /* packet sourced locally */
-                lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
-                              (lib_msg_t *)ktx->ktx_args[1]);
+        case KTX_GETTING:          /* optimized GET sent & payload received */
+                /* Complete the GET with success since we can't avoid
+                 * delivering a REPLY event; we committed to it when we
+                 * launched the GET */
+                lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
+                status0  = 0;
+                lnetmsg1 = (lnet_msg_t *)ktx->ktx_args[2];
+                status1  = ktx->ktx_status;
+#if KQSW_CKSUM
+                if (status1 == 0) {             /* RDMA succeeded */
+                        lnet_msg_t   *lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
+                        lnet_libmd_t *md = lnetmsg0->msg_md;
+                        __u32         csum;
+                
+                        csum = ((md->md_options & LNET_MD_KIOV) != 0) ? 
+                               kqswnal_csum_kiov(~0, 0,
+                                                 md->md_length,
+                                                 md->md_niov, 
+                                                 md->md_iov.kiov) :
+                               kqswnal_csum_iov(~0, 0,
+                                                md->md_length,
+                                                md->md_niov,
+                                                md->md_iov.iov);
+
+                        if (csum != ktx->ktx_cksum) {
+                                CERROR("RDMA checksum failed %u(%u) from %s\n",
+                                       csum, ktx->ktx_cksum,
+                                       libcfs_nid2str(ktx->ktx_nid));
+                                status1 = -EIO;
+                        }
+                }
+#endif                
                 break;
 
         default:
@@ -418,76 +495,206 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
         }
 
         kqswnal_put_idle_tx (ktx);
+
+        lnet_finalize (kqswnal_data.kqn_ni, lnetmsg0, status0);
+        if (lnetmsg1 != NULL)
+                lnet_finalize (kqswnal_data.kqn_ni, lnetmsg1, status1);
+}
+
+void
+kqswnal_tx_done (kqswnal_tx_t *ktx, int status)
+{
+        unsigned long      flags;
+
+        ktx->ktx_status = status;
+
+        if (!cfs_in_interrupt()) {
+                kqswnal_tx_done_in_thread_context(ktx);
+                return;
+        }
+
+        /* Complete the send in thread context */
+       spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
+
+       cfs_list_add_tail(&ktx->ktx_schedlist,
+                          &kqswnal_data.kqn_donetxds);
+       wake_up(&kqswnal_data.kqn_sched_waitq);
+
+       spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);
 }
 
 static void
 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
 {
-        kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
+        kqswnal_tx_t         *ktx = (kqswnal_tx_t *)arg;
+        kqswnal_rpc_reply_t  *reply;
 
         LASSERT (txd != NULL);
         LASSERT (ktx != NULL);
 
         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
 
-        if (status == EP_SUCCESS)
-                atomic_inc (&kqswnal_packets_transmitted);
+        if (status != EP_SUCCESS) {
 
-        if (status != EP_SUCCESS)
-        {
-                CERROR ("kqswnal: Transmit failed with %d\n", status);
-                status = -EIO;
+                CNETERR("Tx completion to %s failed: %d\n",
+                        libcfs_nid2str(ktx->ktx_nid), status);
+
+                status = -EHOSTDOWN;
+
+        } else switch (ktx->ktx_state) {
+
+        case KTX_GETTING:
+        case KTX_PUTTING:
+                /* RPC complete! */
+                reply = (kqswnal_rpc_reply_t *)ep_txd_statusblk(txd);
+                if (reply->msg.magic == 0) {    /* "old" peer */
+                        status = reply->msg.status;
+                        break;
+                }
+                
+                if (reply->msg.magic != LNET_PROTO_QSW_MAGIC) {
+                        if (reply->msg.magic != swab32(LNET_PROTO_QSW_MAGIC)) {
+                                CERROR("%s unexpected rpc reply magic %08x\n",
+                                       libcfs_nid2str(ktx->ktx_nid),
+                                       reply->msg.magic);
+                                status = -EPROTO;
+                                break;
+                        }
+
+                        __swab32s(&reply->msg.status);
+                        __swab32s(&reply->msg.version);
+                        
+                        if (ktx->ktx_state == KTX_GETTING) {
+                                __swab32s(&reply->msg.u.get.len);
+                                __swab32s(&reply->msg.u.get.cksum);
+                        }
+                }
+                        
+                status = reply->msg.status;
+                if (status != 0) {
+                        CERROR("%s RPC status %08x\n",
+                               libcfs_nid2str(ktx->ktx_nid), status);
+                        break;
+                }
+
+                if (ktx->ktx_state == KTX_GETTING) {
+                        lnet_set_reply_msg_len(kqswnal_data.kqn_ni,
+                                               (lnet_msg_t *)ktx->ktx_args[2],
+                                               reply->msg.u.get.len);
+#if KQSW_CKSUM
+                        ktx->ktx_cksum = reply->msg.u.get.cksum;
+#endif
+                }
+                break;
+                
+        case KTX_SENDING:
+                status = 0;
+                break;
+                
+        default:
+                LBUG();
+                break;
         }
 
-        kqswnal_tx_done (ktx, status);
+        kqswnal_tx_done(ktx, status);
 }
 
 int
 kqswnal_launch (kqswnal_tx_t *ktx)
 {
         /* Don't block for transmit descriptor if we're in interrupt context */
-        int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
+        int   attr = cfs_in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
-        long  flags;
+        unsigned long flags;
         int   rc;
-        
+
+        ktx->ktx_launchtime = cfs_time_current();
+
+        if (kqswnal_data.kqn_shuttingdown)
+                return (-ESHUTDOWN);
+
         LASSERT (dest >= 0);                    /* must be a peer */
-        rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
-                               ktx->ktx_port, attr, kqswnal_txhandler,
-                               ktx, ktx->ktx_iov, ktx->ktx_niov);
-        if (rc == 0)
-                atomic_inc (&kqswnal_packets_launched);
 
-        if (rc != ENOMEM)
-                return (rc);
+        if (ktx->ktx_nmappedpages != 0)
+                attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
+
+        switch (ktx->ktx_state) {
+        case KTX_GETTING:
+        case KTX_PUTTING:
+                if (the_lnet.ln_testprotocompat != 0) {
+                        kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
+
+                        /* single-shot proto test:
+                         * Future version queries will use an RPC, so I'll
+                         * co-opt one of the existing ones */
+                        LNET_LOCK();
+                        if ((the_lnet.ln_testprotocompat & 1) != 0) {
+                                msg->kqm_version++;
+                                the_lnet.ln_testprotocompat &= ~1;
+                        }
+                        if ((the_lnet.ln_testprotocompat & 2) != 0) {
+                                msg->kqm_magic = LNET_PROTO_MAGIC;
+                                the_lnet.ln_testprotocompat &= ~2;
+                        }
+                        LNET_UNLOCK();
+                }
+
+                /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
+                 * The other frags are the payload, awaiting RDMA */
+                rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
+                                     ktx->ktx_port, attr,
+                                     kqswnal_txhandler, ktx,
+                                     NULL, ktx->ktx_frags, 1);
+                break;
 
-        /* can't allocate ep txd => queue for later */
+        case KTX_SENDING:
+                rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
+                                         ktx->ktx_port, attr,
+                                         kqswnal_txhandler, ktx,
+                                         NULL, ktx->ktx_frags, ktx->ktx_nfrag);
+                break;
 
-        LASSERT (in_interrupt());      /* not called by thread (not looping) */
+        default:
+                LBUG();
+                rc = -EINVAL;                   /* no compiler warning please */
+                break;
+        }
 
-        spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
+        switch (rc) {
+        case EP_SUCCESS: /* success */
+                return (0);
 
-        list_add_tail (&ktx->ktx_list, &kqswnal_data.kqn_delayedtxds);
-        if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
-                wake_up (&kqswnal_data.kqn_sched_waitq);
+        case EP_ENOMEM: /* can't allocate ep txd => queue for later */
+               spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
 
-        spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+               cfs_list_add_tail(&ktx->ktx_schedlist,
+                                 &kqswnal_data.kqn_delayedtxds);
+               wake_up(&kqswnal_data.kqn_sched_waitq);
 
-        return (0);
-}
+               spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
+                                            flags);
+                return (0);
 
+        default: /* fatal error */
+                CNETERR ("Tx to %s failed: %d\n",
+                        libcfs_nid2str(ktx->ktx_nid), rc);
+                kqswnal_notify_peer_down(ktx);
+                return (-EHOSTUNREACH);
+        }
+}
 
+#if 0
 static char *
-hdr_type_string (ptl_hdr_t *hdr)
+hdr_type_string (lnet_hdr_t *hdr)
 {
         switch (hdr->type) {
-        case PTL_MSG_ACK:
+        case LNET_MSG_ACK:
                 return ("ACK");
-        case PTL_MSG_PUT:
+        case LNET_MSG_PUT:
                 return ("PUT");
-        case PTL_MSG_GET:
+        case LNET_MSG_GET:
                 return ("GET");
-        case PTL_MSG_REPLY:
+        case LNET_MSG_REPLY:
                 return ("REPLY");
         default:
                 return ("<UNKNOWN>");
@@ -495,627 +702,974 @@ hdr_type_string (ptl_hdr_t *hdr)
 }
 
 static void
-kqswnal_cerror_hdr(ptl_hdr_t * hdr)
+kqswnal_cerror_hdr(lnet_hdr_t * hdr)
 {
         char *type_str = hdr_type_string (hdr);
 
-        CERROR("P3 Header at %p of type %s\n", hdr, type_str);
-        CERROR("    From nid/pid "LPU64"/%u", NTOH__u64(hdr->src_nid),
-               NTOH__u32(hdr->src_pid));
-        CERROR("    To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
-               NTOH__u32(hdr->dest_pid));
+        CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
+               le32_to_cpu(hdr->payload_length));
+        CERROR("    From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),
+               le32_to_cpu(hdr->src_pid));
+        CERROR("    To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),
+               le32_to_cpu(hdr->dest_pid));
 
-        switch (NTOH__u32(hdr->type)) {
-        case PTL_MSG_PUT:
+        switch (le32_to_cpu(hdr->type)) {
+        case LNET_MSG_PUT:
                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
                        "match bits "LPX64"\n",
-                       NTOH__u32 (hdr->msg.put.ptl_index),
+                       le32_to_cpu(hdr->msg.put.ptl_index),
                        hdr->msg.put.ack_wmd.wh_interface_cookie,
                        hdr->msg.put.ack_wmd.wh_object_cookie,
-                       NTOH__u64 (hdr->msg.put.match_bits));
-                CERROR("    Length %d, offset %d, hdr data "LPX64"\n",
-                       NTOH__u32(PTL_HDR_LENGTH(hdr)),
-                       NTOH__u32(hdr->msg.put.offset),
+                       le64_to_cpu(hdr->msg.put.match_bits));
+                CERROR("    offset %d, hdr data "LPX64"\n",
+                       le32_to_cpu(hdr->msg.put.offset),
                        hdr->msg.put.hdr_data);
                 break;
 
-        case PTL_MSG_GET:
+        case LNET_MSG_GET:
                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
                        "match bits "LPX64"\n",
-                       NTOH__u32 (hdr->msg.get.ptl_index),
+                       le32_to_cpu(hdr->msg.get.ptl_index),
                        hdr->msg.get.return_wmd.wh_interface_cookie,
                        hdr->msg.get.return_wmd.wh_object_cookie,
                        hdr->msg.get.match_bits);
                 CERROR("    Length %d, src offset %d\n",
-                       NTOH__u32 (hdr->msg.get.sink_length),
-                       NTOH__u32 (hdr->msg.get.src_offset));
+                       le32_to_cpu(hdr->msg.get.sink_length),
+                       le32_to_cpu(hdr->msg.get.src_offset));
                 break;
 
-        case PTL_MSG_ACK:
+        case LNET_MSG_ACK:
                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
                        hdr->msg.ack.dst_wmd.wh_object_cookie,
-                       NTOH__u32 (hdr->msg.ack.mlength));
+                       le32_to_cpu(hdr->msg.ack.mlength));
                 break;
 
-        case PTL_MSG_REPLY:
-                CERROR("    dst md "LPX64"."LPX64", length %d\n",
+        case LNET_MSG_REPLY:
+                CERROR("    dst md "LPX64"."LPX64"\n",
                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
-                       hdr->msg.reply.dst_wmd.wh_object_cookie,
-                       NTOH__u32 (PTL_HDR_LENGTH(hdr)));
+                       hdr->msg.reply.dst_wmd.wh_object_cookie);
         }
 
 }                               /* end of print_hdr() */
+#endif
 
-static int
-kqswnal_sendmsg (nal_cb_t     *nal,
-                 void         *private,
-                 lib_msg_t    *cookie,
-                 ptl_hdr_t    *hdr,
-                 int           type,
-                 ptl_nid_t     nid,
-                 ptl_pid_t     pid,
-                 unsigned int  payload_niov,
-                 struct iovec *payload_iov,
-                 ptl_kiov_t   *payload_kiov,
-                 size_t        payload_nob)
+int
+kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
+                    int nrfrag, EP_NMD *rfrag)
 {
-        kqswnal_tx_t      *ktx;
-        int                rc;
-        ptl_nid_t          gatewaynid;
-#if KQSW_CHECKSUM
-        int                i;
-        kqsw_csum_t        csum;
-        int                sumnob;
-#endif
+        int  i;
+
+        if (nlfrag != nrfrag) {
+                CERROR("Can't cope with unequal # frags: %d local %d remote\n",
+                       nlfrag, nrfrag);
+                return (-EINVAL);
+        }
+        
+        for (i = 0; i < nlfrag; i++)
+                if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
+                        CERROR("Can't cope with unequal frags %d(%d):"
+                               " %d local %d remote\n",
+                               i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
+                        return (-EINVAL);
+                }
         
-        CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
-               " pid %u\n", payload_nob, payload_niov, nid, pid);
+        return (0);
+}
 
-        LASSERT (payload_nob == 0 || payload_niov > 0);
-        LASSERT (payload_niov <= PTL_MD_MAX_IOV);
+kqswnal_remotemd_t *
+kqswnal_get_portalscompat_rmd (kqswnal_rx_t *krx)
+{
+        /* Check that the RMD sent after the "raw" LNET header in a
+         * portals-compatible QSWLND message is OK */
+        char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
+        kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + sizeof(lnet_hdr_t));
+
+        /* Note RDMA addresses are sent in native endian-ness in the "old"
+         * portals protocol so no swabbing... */
+
+        if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
+                /* msg too small to discover rmd size */
+                CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
+                        krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
+                return (NULL);
+        }
 
-        /* It must be OK to kmap() if required */
-        LASSERT (payload_kiov == NULL || !in_interrupt ());
-        /* payload is either all vaddrs or all pages */
-        LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
+        if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
+                /* rmd doesn't fit in the incoming message */
+                CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
+                        krx->krx_nob, rmd->kqrmd_nfrag,
+                        (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
+                return (NULL);
+        }
+
+        return (rmd);
+}
+
+void
+kqswnal_rdma_store_complete (EP_RXD *rxd) 
+{
+        int           status = ep_rxd_status(rxd);
+        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+        
+        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+        LASSERT (ktx->ktx_state == KTX_RDMA_STORE);
+        LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
+
+        krx->krx_rpc_reply_needed = 0;
+        kqswnal_rx_decref (krx);
+
+        /* free ktx & finalize() its lnet_msg_t */
+        kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
+}
+
+void
+kqswnal_rdma_fetch_complete (EP_RXD *rxd) 
+{
+        /* Completed fetching the PUT/REPLY data */
+        int           status = ep_rxd_status(rxd);
+        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+        
+        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+        LASSERT (ktx->ktx_state == KTX_RDMA_FETCH);
+        LASSERT (krx->krx_rxd == rxd);
+        /* RPC completes with failure by default */
+        LASSERT (krx->krx_rpc_reply_needed);
+        LASSERT (krx->krx_rpc_reply.msg.status != 0);
+
+        if (status == EP_SUCCESS) {
+                krx->krx_rpc_reply.msg.status = 0;
+                status = 0;
+        } else {
+                /* Abandon RPC since get failed */
+                krx->krx_rpc_reply_needed = 0;
+                status = -ECONNABORTED;
+        }
+
+        /* krx gets decref'd in kqswnal_tx_done_in_thread_context() */
+        LASSERT (krx->krx_state == KRX_PARSE);
+        krx->krx_state = KRX_COMPLETING;
+
+        /* free ktx & finalize() its lnet_msg_t */
+        kqswnal_tx_done(ktx, status);
+}
+
+int
+kqswnal_rdma (kqswnal_rx_t *krx, lnet_msg_t *lntmsg,
+              int type, kqswnal_remotemd_t *rmd,
+              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
+              unsigned int offset, unsigned int len)
+{
+        kqswnal_tx_t       *ktx;
+        int                 eprc;
+        int                 rc;
+
+        /* Not both mapped and paged payload */
+        LASSERT (iov == NULL || kiov == NULL);
+        /* RPC completes with failure by default */
+        LASSERT (krx->krx_rpc_reply_needed);
+        LASSERT (krx->krx_rpc_reply.msg.status != 0);
+
+        if (len == 0) {
+                /* data got truncated to nothing. */
+                lnet_finalize(kqswnal_data.kqn_ni, lntmsg, 0);
+                /* Let kqswnal_rx_done() complete the RPC with success */
+                krx->krx_rpc_reply.msg.status = 0;
+                return (0);
+        }
         
-        if (payload_nob > KQSW_MAXPAYLOAD) {
-                CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
-                        payload_nob, KQSW_MAXPAYLOAD);
-                return (PTL_FAIL);
+        /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
+           actually sending a portals message with it */
+        ktx = kqswnal_get_idle_tx();
+        if (ktx == NULL) {
+                CERROR ("Can't get txd for RDMA with %s\n",
+                        libcfs_nid2str(kqswnal_rx_nid(krx)));
+                return (-ENOMEM);
+        }
+
+        ktx->ktx_state   = type;
+        ktx->ktx_nid     = kqswnal_rx_nid(krx);
+        ktx->ktx_args[0] = krx;
+        ktx->ktx_args[1] = lntmsg;
+
+        LASSERT (cfs_atomic_read(&krx->krx_refcount) > 0);
+        /* Take an extra ref for the completion callback */
+        cfs_atomic_inc(&krx->krx_refcount);
+
+        /* Map on the rail the RPC prefers */
+        ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
+                                         ep_rxd_railmask(krx->krx_rxd));
+
+        /* Start mapping at offset 0 (we're not mapping any headers) */
+        ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
+        
+        if (kiov != NULL)
+                rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
+        else
+                rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
+
+        if (rc != 0) {
+                CERROR ("Can't map local RDMA data: %d\n", rc);
+                goto out;
+        }
+
+        rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
+                                 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+        if (rc != 0) {
+                CERROR ("Incompatible RDMA descriptors\n");
+                goto out;
         }
 
-        if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
-                rc = kpr_lookup (&kqswnal_data.kqn_router, nid, &gatewaynid);
-                if (rc != 0) {
-                        CERROR("Can't route to "LPX64": router error %d\n",
-                               nid, rc);
-                        return (PTL_FAIL);
+        switch (type) {
+        default:
+                LBUG();
+                
+        case KTX_RDMA_STORE:
+                krx->krx_rpc_reply.msg.status    = 0;
+                krx->krx_rpc_reply.msg.magic     = LNET_PROTO_QSW_MAGIC;
+                krx->krx_rpc_reply.msg.version   = QSWLND_PROTO_VERSION;
+                krx->krx_rpc_reply.msg.u.get.len = len;
+#if KQSW_CKSUM
+                krx->krx_rpc_reply.msg.u.get.cksum = (kiov != NULL) ?
+                            kqswnal_csum_kiov(~0, offset, len, niov, kiov) :
+                            kqswnal_csum_iov(~0, offset, len, niov, iov);
+                if (*kqswnal_tunables.kqn_inject_csum_error == 4) {
+                        krx->krx_rpc_reply.msg.u.get.cksum++;
+                        *kqswnal_tunables.kqn_inject_csum_error = 0;
+                }
+#endif
+                eprc = ep_complete_rpc(krx->krx_rxd, 
+                                       kqswnal_rdma_store_complete, ktx, 
+                                       &krx->krx_rpc_reply.ep_statusblk, 
+                                       ktx->ktx_frags, rmd->kqrmd_frag, 
+                                       rmd->kqrmd_nfrag);
+                if (eprc != EP_SUCCESS) {
+                        CERROR("can't complete RPC: %d\n", eprc);
+                        /* don't re-attempt RPC completion */
+                        krx->krx_rpc_reply_needed = 0;
+                        rc = -ECONNABORTED;
                 }
-                if (kqswnal_nid2elanid (gatewaynid) < 0) {
-                        CERROR("Bad gateway "LPX64" for "LPX64"\n",
-                               gatewaynid, nid);
-                        return (PTL_FAIL);
+                break;
+                
+        case KTX_RDMA_FETCH:
+                eprc = ep_rpc_get (krx->krx_rxd, 
+                                   kqswnal_rdma_fetch_complete, ktx,
+                                   rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
+                if (eprc != EP_SUCCESS) {
+                        CERROR("ep_rpc_get failed: %d\n", eprc);
+                        /* Don't attempt RPC completion: 
+                         * EKC nuked it when the get failed */
+                        krx->krx_rpc_reply_needed = 0;
+                        rc = -ECONNABORTED;
                 }
-                nid = gatewaynid;
+                break;
         }
 
-        /* I may not block for a transmit descriptor if I might block the
-         * receiver, or an interrupt handler. */
-        ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
-                                          type == PTL_MSG_REPLY ||
-                                          in_interrupt()));
-        if (ktx == NULL) {
-                kqswnal_cerror_hdr (hdr);
-                return (PTL_NOSPACE);
+ out:
+        if (rc != 0) {
+                kqswnal_rx_decref(krx);                 /* drop callback's ref */
+                kqswnal_put_idle_tx (ktx);
         }
 
-        memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
+        cfs_atomic_dec(&kqswnal_data.kqn_pending_txs);
+        return (rc);
+}
 
-#if KQSW_CHECKSUM
-        csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
-        memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
-        for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) {
-                if (payload_kiov != NULL) {
-                        ptl_kiov_t *kiov = &payload_kiov[i];
-                        char       *addr = ((char *)kmap (kiov->kiov_page)) +
-                                           kiov->kiov_offset;
-                        
-                        csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len));
-                        sumnob -= kiov->kiov_len;
-                } else {
-                        struct iovec *iov = &payload_iov[i];
+int
+kqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
+{
+        lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
+        int               type = lntmsg->msg_type;
+        lnet_process_id_t target = lntmsg->msg_target;
+        int               target_is_router = lntmsg->msg_target_is_router;
+        int               routing = lntmsg->msg_routing;
+        unsigned int      payload_niov = lntmsg->msg_niov;
+        struct iovec     *payload_iov = lntmsg->msg_iov;
+        lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
+        unsigned int      payload_offset = lntmsg->msg_offset;
+        unsigned int      payload_nob = lntmsg->msg_len;
+        int               nob;
+        kqswnal_tx_t     *ktx;
+        int               rc;
+
+        /* NB 1. hdr is in network byte order */
+        /*    2. 'private' depends on the message type */
+        
+        CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",
+               payload_nob, payload_niov, libcfs_id2str(target));
 
-                        csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len));
-                        sumnob -= iov->iov_len;
-                }
+        LASSERT (payload_nob == 0 || payload_niov > 0);
+        LASSERT (payload_niov <= LNET_MAX_IOV);
+
+        /* It must be OK to kmap() if required */
+        LASSERT (payload_kiov == NULL || !cfs_in_interrupt ());
+        /* payload is either all vaddrs or all pages */
+        LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
+
+        if (kqswnal_nid2elanid (target.nid) < 0) {
+                CERROR("%s not in my cluster\n", libcfs_nid2str(target.nid));
+                return -EIO;
         }
-        memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
-#endif
 
-        /* Set up first frag from pre-mapped buffer (it's at least the
-         * portals header) */
-        ktx->ktx_iov[0].Base = ktx->ktx_ebuffer;
-        ktx->ktx_iov[0].Len = KQSW_HDR_SIZE;
-        ktx->ktx_niov = 1;
+        /* I may not block for a transmit descriptor if I might block the
+         * router, receiver, or an interrupt handler. */
+        ktx = kqswnal_get_idle_tx();
+        if (ktx == NULL) {
+                CERROR ("Can't get txd for msg type %d for %s\n",
+                        type, libcfs_nid2str(target.nid));
+                return (-ENOMEM);
+        }
 
-        if (payload_nob > 0) { /* got some payload (something more to do) */
-                /* make a single contiguous message? */
-                if (payload_nob <= KQSW_TX_MAXCONTIG) {
-                        /* copy payload to ktx_buffer, immediately after hdr */
-                        if (payload_kiov != NULL)
-                                lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
-                                                   payload_niov, payload_kiov, payload_nob);
+        ktx->ktx_state   = KTX_SENDING;
+        ktx->ktx_nid     = target.nid;
+        ktx->ktx_args[0] = private;
+        ktx->ktx_args[1] = lntmsg;
+        ktx->ktx_args[2] = NULL;    /* set when a GET commits to REPLY */
+
+        /* The first frag will be the pre-mapped buffer. */
+        ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+
+        if ((!target_is_router &&               /* target.nid is final dest */
+             !routing &&                        /* I'm the source */
+             type == LNET_MSG_GET &&            /* optimize GET? */
+             *kqswnal_tunables.kqn_optimized_gets != 0 &&
+             lntmsg->msg_md->md_length >= 
+             *kqswnal_tunables.kqn_optimized_gets) ||
+            ((type == LNET_MSG_PUT ||            /* optimize PUT? */
+              type == LNET_MSG_REPLY) &&         /* optimize REPLY? */
+             *kqswnal_tunables.kqn_optimized_puts != 0 &&
+             payload_nob >= *kqswnal_tunables.kqn_optimized_puts)) {
+                lnet_libmd_t       *md = lntmsg->msg_md;
+                kqswnal_msg_t      *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
+                lnet_hdr_t         *mhdr;
+                kqswnal_remotemd_t *rmd;
+
+                /* Optimised path: I send over the Elan vaddrs of the local
+                 * buffers, and my peer DMAs directly to/from them.
+                 *
+                 * First I set up ktx as if it was going to send this
+                 * payload, (it needs to map it anyway).  This fills
+                 * ktx_frags[1] and onward with the network addresses
+                 * of the buffer frags. */
+
+                /* Send an RDMA message */
+                msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
+                msg->kqm_version = QSWLND_PROTO_VERSION;
+                msg->kqm_type = QSWLND_MSG_RDMA;
+
+                mhdr = &msg->kqm_u.rdma.kqrm_hdr;
+                rmd  = &msg->kqm_u.rdma.kqrm_rmd;
+
+                *mhdr = *hdr;
+                nob = (((char *)rmd) - ktx->ktx_buffer);
+
+                if (type == LNET_MSG_GET) {
+                        if ((md->md_options & LNET_MD_KIOV) != 0) 
+                                rc = kqswnal_map_tx_kiov (ktx, 0, md->md_length,
+                                                          md->md_niov, md->md_iov.kiov);
                         else
-                                lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
-                                                  payload_niov, payload_iov, payload_nob);
-                        /* first frag includes payload */
-                        ktx->ktx_iov[0].Len += payload_nob;
+                                rc = kqswnal_map_tx_iov (ktx, 0, md->md_length,
+                                                         md->md_niov, md->md_iov.iov);
+                        ktx->ktx_state = KTX_GETTING;
                 } else {
                         if (payload_kiov != NULL)
-                                rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
-                                                          payload_niov, payload_kiov);
+                                rc = kqswnal_map_tx_kiov(ktx, 0, payload_nob,
+                                                         payload_niov, payload_kiov);
                         else
-                                rc = kqswnal_map_tx_iov (ktx, payload_nob,
-                                                         payload_niov, payload_iov);
-                        if (rc != 0) {
-                                kqswnal_put_idle_tx (ktx);
-                                return (PTL_FAIL);
+                                rc = kqswnal_map_tx_iov(ktx, 0, payload_nob,
+                                                        payload_niov, payload_iov);
+                        ktx->ktx_state = KTX_PUTTING;
+                }
+
+                if (rc != 0)
+                        goto out;
+
+                rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
+                nob += offsetof(kqswnal_remotemd_t,
+                                kqrmd_frag[rmd->kqrmd_nfrag]);
+                LASSERT (nob <= KQSW_TX_BUFFER_SIZE);
+
+                memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
+                       rmd->kqrmd_nfrag * sizeof(EP_NMD));
+
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
+#if KQSW_CKSUM
+                msg->kqm_nob   = nob + payload_nob;
+                msg->kqm_cksum = 0;
+                msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
+#endif
+                if (type == LNET_MSG_GET) {
+                        /* Allocate reply message now while I'm in thread context */
+                        ktx->ktx_args[2] = lnet_create_reply_msg (
+                                kqswnal_data.kqn_ni, lntmsg);
+                        if (ktx->ktx_args[2] == NULL)
+                                goto out;
+
+                        /* NB finalizing the REPLY message is my
+                         * responsibility now, whatever happens. */
+#if KQSW_CKSUM
+                        if (*kqswnal_tunables.kqn_inject_csum_error ==  3) {
+                                msg->kqm_cksum++;
+                                *kqswnal_tunables.kqn_inject_csum_error = 0;
+                        }
+
+                } else if (payload_kiov != NULL) {
+                        /* must checksum payload after header so receiver can
+                         * compute partial header cksum before swab.  Sadly
+                         * this causes 2 rounds of kmap */
+                        msg->kqm_cksum =
+                                kqswnal_csum_kiov(msg->kqm_cksum, 0, payload_nob,
+                                                  payload_niov, payload_kiov);
+                        if (*kqswnal_tunables.kqn_inject_csum_error ==  2) {
+                                msg->kqm_cksum++;
+                                *kqswnal_tunables.kqn_inject_csum_error = 0;
                         }
-                } 
+                } else {
+                        msg->kqm_cksum =
+                                kqswnal_csum_iov(msg->kqm_cksum, 0, payload_nob,
+                                                 payload_niov, payload_iov);
+                        if (*kqswnal_tunables.kqn_inject_csum_error ==  2) {
+                                msg->kqm_cksum++;
+                                *kqswnal_tunables.kqn_inject_csum_error = 0;
+                        }
+#endif
+                }
+                
+        } else if (payload_nob <= *kqswnal_tunables.kqn_tx_maxcontig) {
+                lnet_hdr_t    *mhdr;
+                char          *payload;
+                kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
+
+                /* single frag copied into the pre-mapped buffer */
+                msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
+                msg->kqm_version = QSWLND_PROTO_VERSION;
+                msg->kqm_type = QSWLND_MSG_IMMEDIATE;
+
+                mhdr = &msg->kqm_u.immediate.kqim_hdr;
+                payload = msg->kqm_u.immediate.kqim_payload;
+
+                *mhdr = *hdr;
+                nob = (payload - ktx->ktx_buffer) + payload_nob;
+
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
+
+                if (payload_kiov != NULL)
+                        lnet_copy_kiov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
+                                            payload_niov, payload_kiov, 
+                                            payload_offset, payload_nob);
+                else
+                        lnet_copy_iov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
+                                           payload_niov, payload_iov, 
+                                           payload_offset, payload_nob);
+#if KQSW_CKSUM
+                msg->kqm_nob   = nob;
+                msg->kqm_cksum = 0;
+                msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
+                if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
+                        msg->kqm_cksum++;
+                        *kqswnal_tunables.kqn_inject_csum_error = 0;
+                }
+#endif
+        } else {
+                lnet_hdr_t    *mhdr;
+                kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
+
+                /* multiple frags: first is hdr in pre-mapped buffer */
+                msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
+                msg->kqm_version = QSWLND_PROTO_VERSION;
+                msg->kqm_type = QSWLND_MSG_IMMEDIATE;
+
+                mhdr = &msg->kqm_u.immediate.kqim_hdr;
+                nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
+
+                *mhdr = *hdr;
+
+                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
+
+                if (payload_kiov != NULL)
+                        rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
+                                                  payload_niov, payload_kiov);
+                else
+                        rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
+                                                 payload_niov, payload_iov);
+                if (rc != 0)
+                        goto out;
+
+#if KQSW_CKSUM
+                msg->kqm_nob   = nob + payload_nob;
+                msg->kqm_cksum = 0;
+                msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
+
+                msg->kqm_cksum = (payload_kiov != NULL) ?
+                                 kqswnal_csum_kiov(msg->kqm_cksum,
+                                                   payload_offset, payload_nob,
+                                                   payload_niov, payload_kiov) :
+                                 kqswnal_csum_iov(msg->kqm_cksum,
+                                                  payload_offset, payload_nob,
+                                                  payload_niov, payload_iov);
+
+                if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
+                        msg->kqm_cksum++;
+                        *kqswnal_tunables.kqn_inject_csum_error = 0;
+                }
+#endif
+                nob += payload_nob;
         }
 
-        ktx->ktx_port    = (payload_nob <= KQSW_SMALLPAYLOAD) ?
-                        EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
-        ktx->ktx_nid     = nid;
-        ktx->ktx_state   = KTX_SENDING;   /* => lib_finalize() on completion */
-        ktx->ktx_args[0] = private;
-        ktx->ktx_args[1] = cookie;
+        ktx->ktx_port = (nob <= KQSW_SMALLMSG) ?
+                        EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
 
         rc = kqswnal_launch (ktx);
-        if (rc != 0) {                    /* failed? */
-                CERROR ("Failed to send packet to "LPX64": %d\n", nid, rc);
-                return (PTL_FAIL);
-        }
 
-        CDEBUG(D_NET, "send to "LPSZ" bytes to "LPX64"\n", payload_nob, nid);
-        return (PTL_OK);
+ out:
+        CDEBUG_LIMIT(rc == 0? D_NET :D_NETERROR, "%s %d bytes to %s%s: rc %d\n",
+                     routing ? (rc == 0 ? "Routed" : "Failed to route") :
+                               (rc == 0 ? "Sent" : "Failed to send"),
+                     nob, libcfs_nid2str(target.nid),
+                     target_is_router ? "(router)" : "", rc);
+
+        if (rc != 0) {
+                lnet_msg_t *repmsg = (lnet_msg_t *)ktx->ktx_args[2];
+                int         state = ktx->ktx_state;
+
+                kqswnal_put_idle_tx (ktx);
+
+                if (state == KTX_GETTING && repmsg != NULL) {
+                        /* We committed to reply, but there was a problem
+                         * launching the GET.  We can't avoid delivering a
+                         * REPLY event since we committed above, so we
+                         * pretend the GET succeeded but the REPLY
+                         * failed. */
+                        rc = 0;
+                        lnet_finalize (kqswnal_data.kqn_ni, lntmsg, 0);
+                        lnet_finalize (kqswnal_data.kqn_ni, repmsg, -EIO);
+                }
+                
+        }
+        
+        cfs_atomic_dec(&kqswnal_data.kqn_pending_txs);
+        return (rc == 0 ? 0 : -EIO);
 }
 
-static int
-kqswnal_send (nal_cb_t     *nal,
-              void         *private,
-              lib_msg_t    *cookie,
-              ptl_hdr_t    *hdr,
-              int           type,
-              ptl_nid_t     nid,
-              ptl_pid_t     pid,
-              unsigned int  payload_niov,
-              struct iovec *payload_iov,
-              size_t        payload_nob)
+void
+kqswnal_requeue_rx (kqswnal_rx_t *krx)
 {
-        return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid,
-                                 payload_niov, payload_iov, NULL, payload_nob));
+        LASSERT (cfs_atomic_read(&krx->krx_refcount) == 0);
+        LASSERT (!krx->krx_rpc_reply_needed);
+
+        krx->krx_state = KRX_POSTED;
+
+        if (kqswnal_data.kqn_shuttingdown) {
+                /* free EKC rxd on shutdown */
+                ep_complete_receive(krx->krx_rxd);
+        } else {
+                /* repost receive */
+                ep_requeue_receive(krx->krx_rxd, 
+                                   kqswnal_rxhandler, krx,
+                                   &krx->krx_elanbuffer, 0);
+        }
 }
 
-static int
-kqswnal_send_pages (nal_cb_t     *nal,
-                    void         *private,
-                    lib_msg_t    *cookie,
-                    ptl_hdr_t    *hdr,
-                    int           type,
-                    ptl_nid_t     nid,
-                    ptl_pid_t     pid,
-                    unsigned int  payload_niov,
-                    ptl_kiov_t   *payload_kiov,
-                    size_t        payload_nob)
+void
+kqswnal_rpc_complete (EP_RXD *rxd)
 {
-        return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid,
-                                 payload_niov, NULL, payload_kiov, payload_nob));
-}
+        int           status = ep_rxd_status(rxd);
+        kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
+        
+        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+               "rxd %p, krx %p, status %d\n", rxd, krx, status);
 
-int kqswnal_fwd_copy_contig = 0;
+        LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
+        
+        krx->krx_rpc_reply_needed = 0;
+        kqswnal_requeue_rx (krx);
+}
 
 void
-kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
+kqswnal_rx_done (kqswnal_rx_t *krx) 
 {
-        int             rc;
-        kqswnal_tx_t   *ktx;
-        struct iovec   *iov = fwd->kprfd_iov;
-        int             niov = fwd->kprfd_niov;
-        int             nob = fwd->kprfd_nob;
-        ptl_nid_t       nid = fwd->kprfd_gateway_nid;
-
-#if KQSW_CHECKSUM
-        CERROR ("checksums for forwarded packets not implemented\n");
-        LBUG ();
-#endif
-        /* The router wants this NAL to forward a packet */
-        CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
-                fwd, nid, niov, nob);
+        int           rc;
 
-        LASSERT (niov > 0);
-        
-        ktx = kqswnal_get_idle_tx (fwd, FALSE);
-        if (ktx == NULL)        /* can't get txd right now */
-                return;         /* fwd will be scheduled when tx desc freed */
+        LASSERT (cfs_atomic_read(&krx->krx_refcount) == 0);
 
-        if (nid == kqswnal_lib.ni.nid)          /* gateway is me */
-                nid = fwd->kprfd_target_nid;    /* target is final dest */
+        if (krx->krx_rpc_reply_needed) {
+                /* We've not completed the peer's RPC yet... */
+                krx->krx_rpc_reply.msg.magic   = LNET_PROTO_QSW_MAGIC;
+                krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION;
 
-        if (kqswnal_nid2elanid (nid) < 0) {
-                CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
-                rc = -EHOSTUNREACH;
-                goto failed;
-        }
+                LASSERT (!cfs_in_interrupt());
 
-        if (nob > KQSW_NRXMSGBYTES_LARGE) {
-                CERROR ("Can't forward [%p] to "LPX64
-                        ": size %d bigger than max packet size %ld\n",
-                        fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
-                rc = -EMSGSIZE;
-                goto failed;
-        }
+                rc = ep_complete_rpc(krx->krx_rxd, 
+                                     kqswnal_rpc_complete, krx,
+                                     &krx->krx_rpc_reply.ep_statusblk, 
+                                     NULL, NULL, 0);
+                if (rc == EP_SUCCESS)
+                        return;
 
-        if ((kqswnal_fwd_copy_contig || niov > 1) &&
-            nob <= KQSW_TX_BUFFER_SIZE) 
-        {
-                /* send from ktx's pre-allocated/mapped contiguous buffer? */
-                lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
-                ktx->ktx_iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
-                ktx->ktx_iov[0].Len = nob;
-                ktx->ktx_niov = 1;
+                CERROR("can't complete RPC: %d\n", rc);
+                krx->krx_rpc_reply_needed = 0;
         }
-        else
-        {
-                /* zero copy */
-                ktx->ktx_niov = 0;        /* no frags mapped yet */
-                rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
-                if (rc != 0)
-                        goto failed;
+
+        kqswnal_requeue_rx(krx);
+}
+        
+void
+kqswnal_parse (kqswnal_rx_t *krx)
+{
+        lnet_ni_t      *ni = kqswnal_data.kqn_ni;
+        kqswnal_msg_t  *msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
+        lnet_nid_t      fromnid = kqswnal_rx_nid(krx);
+        int             swab;
+        int             n;
+        int             i;
+        int             nob;
+        int             rc;
+
+        LASSERT (cfs_atomic_read(&krx->krx_refcount) == 1);
+
+        if (krx->krx_nob < offsetof(kqswnal_msg_t, kqm_u)) {
+                CERROR("Short message %d received from %s\n",
+                       krx->krx_nob, libcfs_nid2str(fromnid));
+                goto done;
         }
 
-        ktx->ktx_port    = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
-                        EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
-        ktx->ktx_nid     = nid;
-        ktx->ktx_state   = KTX_FORWARDING; /* kpr_put_packet() on completion */
-        ktx->ktx_args[0] = fwd;
+        swab = msg->kqm_magic == __swab32(LNET_PROTO_QSW_MAGIC);
 
-        rc = kqswnal_launch (ktx);
-        if (rc == 0)
-                return;
+        if (swab || msg->kqm_magic == LNET_PROTO_QSW_MAGIC) {
+#if KQSW_CKSUM
+                __u32 csum0;
+                __u32 csum1;
+
+                /* csum byte array before swab */
+                csum1 = msg->kqm_cksum;
+                msg->kqm_cksum = 0;
+                csum0 = kqswnal_csum_kiov(~0, 0, krx->krx_nob,
+                                          krx->krx_npages, krx->krx_kiov);
+                msg->kqm_cksum = csum1;
+#endif
 
- failed:
-        LASSERT (rc != 0);
-        CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
+                if (swab) {
+                        __swab16s(&msg->kqm_version);
+                        __swab16s(&msg->kqm_type);
+#if KQSW_CKSUM
+                        __swab32s(&msg->kqm_cksum);
+                        __swab32s(&msg->kqm_nob);
+#endif
+                }
 
-        kqswnal_put_idle_tx (ktx);
-        /* complete now (with failure) */
-        kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
-}
+                if (msg->kqm_version != QSWLND_PROTO_VERSION) {
+                        /* Future protocol version compatibility support!
+                         * The next qswlnd-specific protocol rev will first
+                         * send an RPC to check version.
+                         * 1.4.6 and 1.4.7.early reply with a status
+                         * block containing its current version.
+                         * Later versions send a failure (-ve) status +
+                         * magic/version */
+
+                        if (!krx->krx_rpc_reply_needed) {
+                                CERROR("Unexpected version %d from %s\n",
+                                       msg->kqm_version, libcfs_nid2str(fromnid));
+                                goto done;
+                        }
 
-void
-kqswnal_fwd_callback (void *arg, int error)
-{
-        kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
+                        LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
+                        goto done;
+                }
 
-        /* The router has finished forwarding this packet */
+                switch (msg->kqm_type) {
+                default:
+                        CERROR("Bad request type %x from %s\n",
+                               msg->kqm_type, libcfs_nid2str(fromnid));
+                        goto done;
+
+                case QSWLND_MSG_IMMEDIATE:
+                        if (krx->krx_rpc_reply_needed) {
+                                /* Should have been a simple message */
+                                CERROR("IMMEDIATE sent as RPC from %s\n",
+                                       libcfs_nid2str(fromnid));
+                                goto done;
+                        }
 
-        if (error != 0)
-        {
-                ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
+                        nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
+                        if (krx->krx_nob < nob) {
+                                CERROR("Short IMMEDIATE %d(%d) from %s\n",
+                                       krx->krx_nob, nob, libcfs_nid2str(fromnid));
+                                goto done;
+                        }
 
-                CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
-                       NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
-        }
+#if KQSW_CKSUM
+                        if (csum0 != msg->kqm_cksum) {
+                                CERROR("Bad IMMEDIATE checksum %08x(%08x) from %s\n",
+                                       csum0, msg->kqm_cksum, libcfs_nid2str(fromnid));
+                                CERROR("nob %d (%d)\n", krx->krx_nob, msg->kqm_nob);
+                                goto done;
+                        }
+#endif
+                        rc = lnet_parse(ni, &msg->kqm_u.immediate.kqim_hdr,
+                                        fromnid, krx, 0);
+                        if (rc < 0)
+                                goto done;
+                        return;
 
-        kqswnal_requeue_rx (krx);
-}
+                case QSWLND_MSG_RDMA:
+                        if (!krx->krx_rpc_reply_needed) {
+                                /* Should have been a simple message */
+                                CERROR("RDMA sent as simple message from %s\n",
+                                       libcfs_nid2str(fromnid));
+                                goto done;
+                        }
 
-void
-kqswnal_rx (kqswnal_rx_t *krx)
-{
-        ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
-        ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
-        int             nob;
-        int             niov;
+                        nob = offsetof(kqswnal_msg_t,
+                                       kqm_u.rdma.kqrm_rmd.kqrmd_frag[0]);
+                        if (krx->krx_nob < nob) {
+                                CERROR("Short RDMA message %d(%d) from %s\n",
+                                       krx->krx_nob, nob, libcfs_nid2str(fromnid));
+                                goto done;
+                        }
 
-        if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
-                /* NB krx requeued when lib_parse() calls back kqswnal_recv */
-                lib_parse (&kqswnal_lib, hdr, krx);
-                return;
-        }
+                        if (swab)
+                                __swab32s(&msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag);
+
+                        n = msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag;
+                        nob = offsetof(kqswnal_msg_t,
+                                       kqm_u.rdma.kqrm_rmd.kqrmd_frag[n]);
+
+                        if (krx->krx_nob < nob) {
+                                CERROR("short RDMA message %d(%d) from %s\n",
+                                       krx->krx_nob, nob, libcfs_nid2str(fromnid));
+                                goto done;
+                        }
+
+                        if (swab) {
+                                for (i = 0; i < n; i++) {
+                                        EP_NMD *nmd = &msg->kqm_u.rdma.kqrm_rmd.kqrmd_frag[i];
+
+                                        __swab32s(&nmd->nmd_addr);
+                                        __swab32s(&nmd->nmd_len);
+                                        __swab32s(&nmd->nmd_attr);
+                                }
+                        }
 
-#if KQSW_CHECKSUM
-        CERROR ("checksums for forwarded packets not implemented\n");
-        LBUG ();
+#if KQSW_CKSUM
+                        krx->krx_cksum = csum0; /* stash checksum so far */
 #endif
-        if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
-        {
-                CERROR("dropping packet from "LPX64" for "LPX64
-                       ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
-                kqswnal_requeue_rx (krx);
-                return;
+                        rc = lnet_parse(ni, &msg->kqm_u.rdma.kqrm_hdr,
+                                        fromnid, krx, 1);
+                        if (rc < 0)
+                                goto done;
+                        return;
+                }
+                /* Not Reached */
         }
 
-        /* NB forwarding may destroy iov; rebuild every time */
-        for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
-        {
-                LASSERT (niov < krx->krx_npages);
-                krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
-                krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
-        }
+        if (msg->kqm_magic == LNET_PROTO_MAGIC ||
+            msg->kqm_magic == __swab32(LNET_PROTO_MAGIC)) {
+                /* Future protocol version compatibility support!
+                 * When LNET unifies protocols over all LNDs, the first thing a
+                 * peer will send will be a version query RPC.  
+                 * 1.4.6 and 1.4.7.early reply with a status block containing
+                 * LNET_PROTO_QSW_MAGIC..
+                 * Later versions send a failure (-ve) status +
+                 * magic/version */
+
+                if (!krx->krx_rpc_reply_needed) {
+                        CERROR("Unexpected magic %08x from %s\n",
+                               msg->kqm_magic, libcfs_nid2str(fromnid));
+                        goto done;
+                }
 
-        kpr_fwd_init (&krx->krx_fwd, dest_nid,
-                      krx->krx_nob, niov, krx->krx_iov,
-                      kqswnal_fwd_callback, krx);
+                LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
+                goto done;
+        }
 
-        kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
+        CERROR("Unrecognised magic %08x from %s\n",
+               msg->kqm_magic, libcfs_nid2str(fromnid));
+ done:
+        kqswnal_rx_decref(krx);
 }
 
 /* Receive Interrupt Handler: posts to schedulers */
 void 
 kqswnal_rxhandler(EP_RXD *rxd)
 {
-        long          flags;
+        unsigned long flags;
         int           nob    = ep_rxd_len (rxd);
         int           status = ep_rxd_status (rxd);
         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
-
         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
                rxd, krx, nob, status);
 
         LASSERT (krx != NULL);
-
+        LASSERT (krx->krx_state == KRX_POSTED);
+        
+        krx->krx_state = KRX_PARSE;
         krx->krx_rxd = rxd;
         krx->krx_nob = nob;
 
-        /* must receive a whole header to be able to parse */
-        if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
-        {
-                /* receives complete with failure when receiver is removed */
-                if (kqswnal_data.kqn_shuttingdown)
-                        return;
+        /* RPC reply iff rpc request received without error */
+        krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
+                                    (status == EP_SUCCESS ||
+                                     status == EP_MSG_TOO_BIG);
 
-                CERROR("receive status failed with status %d nob %d\n",
-                       ep_rxd_status(rxd), nob);
-                kqswnal_requeue_rx (krx);
+        /* Default to failure if an RPC reply is requested but not handled */
+        krx->krx_rpc_reply.msg.status = -EPROTO;
+        cfs_atomic_set (&krx->krx_refcount, 1);
+
+        if (status != EP_SUCCESS) {
+                /* receives complete with failure when receiver is removed */
+                if (status == EP_SHUTDOWN)
+                        LASSERT (kqswnal_data.kqn_shuttingdown);
+                else
+                        CERROR("receive status failed with status %d nob %d\n",
+                               ep_rxd_status(rxd), nob);
+                kqswnal_rx_decref(krx);
                 return;
         }
 
-        atomic_inc (&kqswnal_packets_received);
+        if (!cfs_in_interrupt()) {
+                kqswnal_parse(krx);
+                return;
+        }
 
-        spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
+       spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
 
-        list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
-        if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
-                wake_up (&kqswnal_data.kqn_sched_waitq);
+       cfs_list_add_tail(&krx->krx_list, &kqswnal_data.kqn_readyrxds);
+       wake_up(&kqswnal_data.kqn_sched_waitq);
 
-        spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+       spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);
 }
 
-#if KQSW_CHECKSUM
-void
-kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
+int
+kqswnal_recv (lnet_ni_t     *ni,
+              void          *private,
+              lnet_msg_t    *lntmsg,
+              int            delayed,
+              unsigned int   niov,
+              struct iovec  *iov,
+              lnet_kiov_t   *kiov,
+              unsigned int   offset,
+              unsigned int   mlen,
+              unsigned int   rlen)
 {
-        ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
-
-        CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
-                ", dpid %d, spid %d, type %d\n",
-                ishdr ? "Header" : "Payload", krx,
-                NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
-                NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
-                NTOH__u32(hdr->type));
-
-        switch (NTOH__u32 (hdr->type))
-        {
-        case PTL_MSG_ACK:
-                CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
-                       " len %u\n",
-                       NTOH__u32(hdr->msg.ack.mlength),
-                       hdr->msg.ack.dst_wmd.handle_cookie,
-                       hdr->msg.ack.dst_wmd.handle_idx,
-                       NTOH__u64(hdr->msg.ack.match_bits),
-                       NTOH__u32(hdr->msg.ack.length));
-                break;
-        case PTL_MSG_PUT:
-                CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
-                       " len %u off %u data "LPX64"\n",
-                       NTOH__u32(hdr->msg.put.ptl_index),
-                       hdr->msg.put.ack_wmd.handle_cookie,
-                       hdr->msg.put.ack_wmd.handle_idx,
-                       NTOH__u64(hdr->msg.put.match_bits),
-                       NTOH__u32(hdr->msg.put.length),
-                       NTOH__u32(hdr->msg.put.offset),
-                       hdr->msg.put.hdr_data);
-                break;
-        case PTL_MSG_GET:
-                CERROR ("GET: <>\n");
-                break;
-        case PTL_MSG_REPLY:
-                CERROR ("REPLY: <>\n");
-                break;
-        default:
-                CERROR ("TYPE?: <>\n");
-        }
-}
-#endif
+        kqswnal_rx_t       *krx = (kqswnal_rx_t *)private;
+        lnet_nid_t          fromnid;
+        kqswnal_msg_t      *msg;
+        lnet_hdr_t         *hdr;
+        kqswnal_remotemd_t *rmd;
+        int                 msg_offset;
+        int                 rc;
+
+        LASSERT (!cfs_in_interrupt ());             /* OK to map */
+        /* Either all pages or all vaddrs */
+        LASSERT (!(kiov != NULL && iov != NULL));
 
-static int
-kqswnal_recvmsg (nal_cb_t     *nal,
-                 void         *private,
-                 lib_msg_t    *cookie,
-                 unsigned int  niov,
-                 struct iovec *iov,
-                 ptl_kiov_t   *kiov,
-                 size_t        mlen,
-                 size_t        rlen)
-{
-        kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
-        int           page;
-        char         *page_ptr;
-        int           page_nob;
-        char         *iov_ptr;
-        int           iov_nob;
-        int           frag;
-#if KQSW_CHECKSUM
-        kqsw_csum_t   senders_csum;
-        kqsw_csum_t   payload_csum = 0;
-        kqsw_csum_t   hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
-                                           sizeof(ptl_hdr_t));
-        size_t        csum_len = mlen;
-        int           csum_frags = 0;
-        int           csum_nob = 0;
-        static atomic_t csum_counter;
-        int           csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
-
-        atomic_inc (&csum_counter);
-
-        memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
-                                sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
-        if (senders_csum != hdr_csum)
-                kqswnal_csum_error (krx, 1);
-#endif
-        CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
+        fromnid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ep_rxd_node(krx->krx_rxd));
+        msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
 
-        /* What was actually received must be >= payload.
-         * This is an LASSERT, as lib_finalize() doesn't have a completion status. */
-        LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen);
-        LASSERT (mlen <= rlen);
+        if (krx->krx_rpc_reply_needed) {
+                /* optimized (rdma) request sent as RPC */
 
-        /* It must be OK to kmap() if required */
-        LASSERT (kiov == NULL || !in_interrupt ());
-        /* Either all pages or all vaddrs */
-        LASSERT (!(kiov != NULL && iov != NULL));
-        
-        if (mlen != 0)
-        {
-                page     = 0;
-                page_ptr = ((char *) page_address(krx->krx_pages[0])) +
-                        KQSW_HDR_SIZE;
-                page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
+                LASSERT (msg->kqm_type == QSWLND_MSG_RDMA);
+                hdr = &msg->kqm_u.rdma.kqrm_hdr;
+                rmd = &msg->kqm_u.rdma.kqrm_rmd;
 
-                LASSERT (niov > 0);
-                if (kiov != NULL) {
-                        iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
-                        iov_nob = kiov->kiov_len;
-                } else {
-                        iov_ptr = iov->iov_base;
-                        iov_nob = iov->iov_len;
-                }
+                /* NB header is still in wire byte order */
 
-                for (;;)
-                {
-                        /* We expect the iov to exactly match mlen */
-                        LASSERT (iov_nob <= mlen);
-                        
-                        frag = MIN (page_nob, iov_nob);
-                        memcpy (iov_ptr, page_ptr, frag);
-#if KQSW_CHECKSUM
-                        payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
-                        csum_nob += frag;
-                        csum_frags++;
-#endif
-                        mlen -= frag;
-                        if (mlen == 0)
+                switch (le32_to_cpu(hdr->type)) {
+                        case LNET_MSG_PUT:
+                        case LNET_MSG_REPLY:
+                                /* This is an optimized PUT/REPLY */
+                                rc = kqswnal_rdma(krx, lntmsg, 
+                                                  KTX_RDMA_FETCH, rmd,
+                                                  niov, iov, kiov, offset, mlen);
                                 break;
 
-                        page_nob -= frag;
-                        if (page_nob != 0)
-                                page_ptr += frag;
-                        else
-                        {
-                                page++;
-                                LASSERT (page < krx->krx_npages);
-                                page_ptr = page_address(krx->krx_pages[page]);
-                                page_nob = PAGE_SIZE;
-                        }
+                        case LNET_MSG_GET:
+#if KQSW_CKSUM
+                                if (krx->krx_cksum != msg->kqm_cksum) {
+                                        CERROR("Bad GET checksum %08x(%08x) from %s\n",
+                                               krx->krx_cksum, msg->kqm_cksum,
+                                               libcfs_nid2str(fromnid));
+                                        rc = -EIO;
+                                        break;
+                                }
+#endif                                
+                                if (lntmsg == NULL) {
+                                        /* No buffer match: my decref will
+                                         * complete the RPC with failure */
+                                        rc = 0;
+                                } else {
+                                        /* Matched something! */
+                                        rc = kqswnal_rdma(krx, lntmsg,
+                                                          KTX_RDMA_STORE, rmd,
+                                                          lntmsg->msg_niov,
+                                                          lntmsg->msg_iov,
+                                                          lntmsg->msg_kiov,
+                                                          lntmsg->msg_offset,
+                                                          lntmsg->msg_len);
+                                }
+                                break;
 
-                        iov_nob -= frag;
-                        if (iov_nob != 0)
-                                iov_ptr += frag;
-                        else if (kiov != NULL) {
-                                kunmap (kiov->kiov_page);
-                                kiov++;
-                                niov--;
-                                LASSERT (niov > 0);
-                                iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
-                                iov_nob = kiov->kiov_len;
-                        } else {
-                                iov++;
-                                niov--;
-                                LASSERT (niov > 0);
-                                iov_ptr = iov->iov_base;
-                                iov_nob = iov->iov_len;
-                        }
+                        default:
+                                CERROR("Bad RPC type %d\n",
+                                       le32_to_cpu(hdr->type));
+                                rc = -EPROTO;
+                                break;
                 }
 
-                if (kiov != NULL)
-                        kunmap (kiov->kiov_page);
+                kqswnal_rx_decref(krx);
+                return rc;
         }
 
-#if KQSW_CHECKSUM
-        memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
-                sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
-
-        if (csum_len != rlen)
-                CERROR("Unable to checksum data in user's buffer\n");
-        else if (senders_csum != payload_csum)
-                kqswnal_csum_error (krx, 0);
-
-        if (csum_verbose)
-                CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
-                       "csum_nob %d\n",
-                        hdr_csum, payload_csum, csum_frags, csum_nob);
-#endif
-        lib_finalize(nal, private, cookie);
-
-        kqswnal_requeue_rx (krx);
-
-        return (rlen);
-}
+        LASSERT (msg->kqm_type == QSWLND_MSG_IMMEDIATE);
+        msg_offset = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
+        
+        if (krx->krx_nob < msg_offset + rlen) {
+                CERROR("Bad message size from %s: have %d, need %d + %d\n",
+                       libcfs_nid2str(fromnid), krx->krx_nob,
+                       msg_offset, rlen);
+                kqswnal_rx_decref(krx);
+                return -EPROTO;
+        }
 
-static int
-kqswnal_recv(nal_cb_t     *nal,
-             void         *private,
-             lib_msg_t    *cookie,
-             unsigned int  niov,
-             struct iovec *iov,
-             size_t        mlen,
-             size_t        rlen)
-{
-        return (kqswnal_recvmsg (nal, private, cookie, niov, iov, NULL, mlen, rlen));
-}
+        if (kiov != NULL)
+                lnet_copy_kiov2kiov(niov, kiov, offset,
+                                    krx->krx_npages, krx->krx_kiov, 
+                                    msg_offset, mlen);
+        else
+                lnet_copy_kiov2iov(niov, iov, offset,
+                                   krx->krx_npages, krx->krx_kiov, 
+                                   msg_offset, mlen);
 
-static int
-kqswnal_recv_pages (nal_cb_t     *nal,
-                    void         *private,
-                    lib_msg_t    *cookie,
-                    unsigned int  niov,
-                    ptl_kiov_t   *kiov,
-                    size_t        mlen,
-                    size_t        rlen)
-{
-        return (kqswnal_recvmsg (nal, private, cookie, niov, NULL, kiov, mlen, rlen));
+        lnet_finalize(ni, lntmsg, 0);
+        kqswnal_rx_decref(krx);
+        return 0;
 }
 
 int
-kqswnal_thread_start (int (*fn)(void *arg), void *arg)
+kqswnal_thread_start(int (*fn)(void *arg), void *arg, char *name)
 {
-        long    pid = kernel_thread (fn, arg, 0);
+       struct task_struct *task = cfs_thread_run(fn, arg, name);
 
-        if (pid < 0)
-                return ((int)pid);
+       if (IS_ERR(task))
+               return PTR_ERR(task);
 
-        atomic_inc (&kqswnal_data.kqn_nthreads);
-        return (0);
+       cfs_atomic_inc(&kqswnal_data.kqn_nthreads);
+       return 0;
 }
 
 void
 kqswnal_thread_fini (void)
 {
-        atomic_dec (&kqswnal_data.kqn_nthreads);
+        cfs_atomic_dec (&kqswnal_data.kqn_nthreads);
 }
 
 int
@@ -1123,107 +1677,102 @@ kqswnal_scheduler (void *arg)
 {
         kqswnal_rx_t    *krx;
         kqswnal_tx_t    *ktx;
-        kpr_fwd_desc_t  *fwd;
-        long             flags;
+        unsigned long    flags;
         int              rc;
         int              counter = 0;
         int              did_something;
 
-        kportal_daemonize ("kqswnal_sched");
-        kportal_blockallsigs ();
-        
-        spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
+        cfs_block_allsigs ();
 
-        while (!kqswnal_data.kqn_shuttingdown)
+       spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
+
+        for (;;)
         {
-                did_something = FALSE;
+                did_something = 0;
 
-                if (!list_empty (&kqswnal_data.kqn_readyrxds))
+                if (!cfs_list_empty (&kqswnal_data.kqn_readyrxds))
                 {
-                        krx = list_entry(kqswnal_data.kqn_readyrxds.next,
-                                         kqswnal_rx_t, krx_list);
-                        list_del (&krx->krx_list);
-                        spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
-                                               flags);
-
-                        kqswnal_rx (krx);
-
-                        did_something = TRUE;
-                        spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
+                        krx = cfs_list_entry(kqswnal_data.kqn_readyrxds.next,
+                                             kqswnal_rx_t, krx_list);
+                        cfs_list_del (&krx->krx_list);
+                       spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
+                                                   flags);
+
+                        LASSERT (krx->krx_state == KRX_PARSE);
+                        kqswnal_parse (krx);
+
+                        did_something = 1;
+                       spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
+                                              flags);
                 }
 
-                if (!list_empty (&kqswnal_data.kqn_delayedtxds))
+                if (!cfs_list_empty (&kqswnal_data.kqn_donetxds))
                 {
-                        ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
-                                         kqswnal_tx_t, ktx_list);
-                        list_del (&ktx->ktx_list);
-                        spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
-                                               flags);
+                        ktx = cfs_list_entry(kqswnal_data.kqn_donetxds.next,
+                                             kqswnal_tx_t, ktx_schedlist);
+                        cfs_list_del_init (&ktx->ktx_schedlist);
+                       spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
+                                                   flags);
 
-                        rc = kqswnal_launch (ktx);
-                        if (rc != 0)          /* failed: ktx_nid down? */
-                        {
-                                CERROR("Failed delayed transmit to "LPX64
-                                       ": %d\n", ktx->ktx_nid, rc);
-                                kqswnal_tx_done (ktx, rc);
-                        }
+                        kqswnal_tx_done_in_thread_context(ktx);
 
-                        did_something = TRUE;
-                        spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
+                        did_something = 1;
+                       spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
+                                               flags);
                 }
 
-                if (!list_empty (&kqswnal_data.kqn_delayedfwds))
+                if (!cfs_list_empty (&kqswnal_data.kqn_delayedtxds))
                 {
-                        fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
-                        list_del (&fwd->kprfd_list);
-                        spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+                        ktx = cfs_list_entry(kqswnal_data.kqn_delayedtxds.next,
+                                             kqswnal_tx_t, ktx_schedlist);
+                        cfs_list_del_init (&ktx->ktx_schedlist);
+                       spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
+                                                   flags);
 
-                        kqswnal_fwd_packet (NULL, fwd);
+                        rc = kqswnal_launch (ktx);
+                        if (rc != 0) {
+                                CERROR("Failed delayed transmit to %s: %d\n", 
+                                       libcfs_nid2str(ktx->ktx_nid), rc);
+                                kqswnal_tx_done (ktx, rc);
+                        }
+                        cfs_atomic_dec (&kqswnal_data.kqn_pending_txs);
 
-                        did_something = TRUE;
-                        spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
+                        did_something = 1;
+                       spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
+                                               flags);
                 }
 
-                    /* nothing to do or hogging CPU */
+                /* nothing to do or hogging CPU */
                 if (!did_something || counter++ == KQSW_RESCHED) {
-                        spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
-                                               flags);
+                       spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
+                                                   flags);
 
                         counter = 0;
 
                         if (!did_something) {
-                                rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
-                                                               kqswnal_data.kqn_shuttingdown ||
-                                                               !list_empty(&kqswnal_data.kqn_readyrxds) ||
-                                                               !list_empty(&kqswnal_data.kqn_delayedtxds) ||
-                                                               !list_empty(&kqswnal_data.kqn_delayedfwds));
-                                LASSERT (rc == 0);
-                        } else if (current->need_resched)
-                                schedule ();
-
-                        spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
-                }
-        }
-
-        spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
-
-        kqswnal_thread_fini ();
-        return (0);
+                                if (kqswnal_data.kqn_shuttingdown == 2) {
+                                        /* We only exit in stage 2 of shutdown
+                                         * when there's nothing left to do */
+                                        break;
+                                }
+                               rc = wait_event_interruptible_exclusive (
+                                       kqswnal_data.kqn_sched_waitq,
+                                       kqswnal_data.kqn_shuttingdown == 2 ||
+                                       !cfs_list_empty(&kqswnal_data. \
+                                                       kqn_readyrxds) ||
+                                       !cfs_list_empty(&kqswnal_data. \
+                                                       kqn_donetxds) ||
+                                       !cfs_list_empty(&kqswnal_data. \
+                                                       kqn_delayedtxds));
+                               LASSERT (rc == 0);
+                       } else if (need_resched())
+                               schedule ();
+
+                       spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
+                                              flags);
+               }
+       }
+
+       kqswnal_thread_fini ();
+       return 0;
 }
-
-nal_cb_t kqswnal_lib =
-{
-        nal_data:       &kqswnal_data,         /* NAL private data */
-        cb_send:        kqswnal_send,
-        cb_send_pages:  kqswnal_send_pages,
-        cb_recv:        kqswnal_recv,
-        cb_recv_pages:  kqswnal_recv_pages,
-        cb_read:        kqswnal_read,
-        cb_write:       kqswnal_write,
-        cb_malloc:      kqswnal_malloc,
-        cb_free:        kqswnal_free,
-        cb_printf:      kqswnal_printf,
-        cb_cli:         kqswnal_cli,
-        cb_sti:         kqswnal_sti,
-        cb_dist:        kqswnal_dist
-};