Whamcloud - gitweb
LU-6209 lnet: Delete all obsolete LND drivers
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
diff --git a/lnet/klnds/qswlnd/qswlnd_cb.c b/lnet/klnds/qswlnd/qswlnd_cb.c
deleted file mode 100644 (file)
index 99eb1cc..0000000
+++ /dev/null
@@ -1,1778 +0,0 @@
-/*
- * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
- *
- * Copyright (c) 2012, Intel Corporation.
- *
- * Author: Eric Barton <eric@bartonsoftware.com>
- *
- * This file is part of Portals, http://www.lustre.org
- *
- * Portals is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
- *
- * Portals is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Portals; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#include "qswlnd.h"
-
-void
-kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
-{
-        time_t             then;
-
-        then = cfs_time_current_sec() -
-                cfs_duration_sec(cfs_time_current() -
-                                 ktx->ktx_launchtime);
-
-        lnet_notify(kqswnal_data.kqn_ni, ktx->ktx_nid, 0, then);
-}
-
-void
-kqswnal_unmap_tx (kqswnal_tx_t *ktx)
-{
-        int      i;
-
-        ktx->ktx_rail = -1;                     /* unset rail */
-
-        if (ktx->ktx_nmappedpages == 0)
-                return;
-        
-        CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
-               ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
-
-        for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
-                ep_dvma_unload(kqswnal_data.kqn_ep,
-                               kqswnal_data.kqn_ep_tx_nmh,
-                               &ktx->ktx_frags[i]);
-
-        ktx->ktx_nmappedpages = 0;
-}
-
-int
-kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, 
-                     unsigned int niov, lnet_kiov_t *kiov)
-{
-        int       nfrags    = ktx->ktx_nfrag;
-        int       nmapped   = ktx->ktx_nmappedpages;
-        int       maxmapped = ktx->ktx_npages;
-        __u32     basepage  = ktx->ktx_basepage + nmapped;
-        char     *ptr;
-
-        EP_RAILMASK railmask;
-        int         rail;
-
-        if (ktx->ktx_rail < 0)
-                ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
-                                                 EP_RAILMASK_ALL,
-                                                 kqswnal_nid2elanid(ktx->ktx_nid));
-        rail = ktx->ktx_rail;
-        if (rail < 0) {
-                CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
-                return (-ENETDOWN);
-        }
-        railmask = 1 << rail;
-
-        LASSERT (nmapped <= maxmapped);
-        LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
-        LASSERT (nfrags <= EP_MAXFRAG);
-        LASSERT (niov > 0);
-        LASSERT (nob > 0);
-
-        /* skip complete frags before 'offset' */
-        while (offset >= kiov->kiov_len) {
-                offset -= kiov->kiov_len;
-                kiov++;
-                niov--;
-                LASSERT (niov > 0);
-        }
-
-        do {
-                int  fraglen = kiov->kiov_len - offset;
-
-                /* each page frag is contained in one page */
-                LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
-
-                if (fraglen > nob)
-                        fraglen = nob;
-
-                nmapped++;
-                if (nmapped > maxmapped) {
-                        CERROR("Can't map message in %d pages (max %d)\n",
-                               nmapped, maxmapped);
-                        return (-EMSGSIZE);
-                }
-
-                if (nfrags == EP_MAXFRAG) {
-                        CERROR("Message too fragmented in Elan VM (max %d frags)\n",
-                               EP_MAXFRAG);
-                        return (-EMSGSIZE);
-                }
-
-                /* XXX this is really crap, but we'll have to kmap until
-                 * EKC has a page (rather than vaddr) mapping interface */
-
-                ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
-
-                CDEBUG(D_NET,
-                       "%p[%d] loading %p for %d, page %d, %d total\n",
-                        ktx, nfrags, ptr, fraglen, basepage, nmapped);
-
-                ep_dvma_load(kqswnal_data.kqn_ep, NULL,
-                             ptr, fraglen,
-                             kqswnal_data.kqn_ep_tx_nmh, basepage,
-                             &railmask, &ktx->ktx_frags[nfrags]);
-
-                if (nfrags == ktx->ktx_firsttmpfrag ||
-                    !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
-                                  &ktx->ktx_frags[nfrags - 1],
-                                  &ktx->ktx_frags[nfrags])) {
-                        /* new frag if this is the first or can't merge */
-                        nfrags++;
-                }
-
-                kunmap (kiov->kiov_page);
-                
-                /* keep in loop for failure case */
-                ktx->ktx_nmappedpages = nmapped;
-
-                basepage++;
-                kiov++;
-                niov--;
-                nob -= fraglen;
-                offset = 0;
-
-                /* iov must not run out before end of data */
-                LASSERT (nob == 0 || niov > 0);
-
-        } while (nob > 0);
-
-        ktx->ktx_nfrag = nfrags;
-        CDEBUG (D_NET, "%p got %d frags over %d pages\n",
-                ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
-
-        return (0);
-}
-
-#if KQSW_CKSUM
-__u32
-kqswnal_csum_kiov (__u32 csum, int offset, int nob, 
-                   unsigned int niov, lnet_kiov_t *kiov)
-{
-        char     *ptr;
-
-        if (nob == 0)
-                return csum;
-
-        LASSERT (niov > 0);
-        LASSERT (nob > 0);
-
-        /* skip complete frags before 'offset' */
-        while (offset >= kiov->kiov_len) {
-                offset -= kiov->kiov_len;
-                kiov++;
-                niov--;
-                LASSERT (niov > 0);
-        }
-
-        do {
-                int  fraglen = kiov->kiov_len - offset;
-
-                /* each page frag is contained in one page */
-                LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
-
-                if (fraglen > nob)
-                        fraglen = nob;
-
-                ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
-
-                csum = kqswnal_csum(csum, ptr, fraglen);
-
-                kunmap (kiov->kiov_page);
-                
-                kiov++;
-                niov--;
-                nob -= fraglen;
-                offset = 0;
-
-                /* iov must not run out before end of data */
-                LASSERT (nob == 0 || niov > 0);
-
-        } while (nob > 0);
-
-        return csum;
-}
-#endif
-
-int
-kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
-                    unsigned int niov, struct iovec *iov)
-{
-        int       nfrags    = ktx->ktx_nfrag;
-        int       nmapped   = ktx->ktx_nmappedpages;
-        int       maxmapped = ktx->ktx_npages;
-        __u32     basepage  = ktx->ktx_basepage + nmapped;
-
-        EP_RAILMASK railmask;
-        int         rail;
-        
-        if (ktx->ktx_rail < 0)
-                ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
-                                                 EP_RAILMASK_ALL,
-                                                 kqswnal_nid2elanid(ktx->ktx_nid));
-        rail = ktx->ktx_rail;
-        if (rail < 0) {
-                CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
-                return (-ENETDOWN);
-        }
-        railmask = 1 << rail;
-
-        LASSERT (nmapped <= maxmapped);
-        LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
-        LASSERT (nfrags <= EP_MAXFRAG);
-        LASSERT (niov > 0);
-        LASSERT (nob > 0);
-
-        /* skip complete frags before offset */
-        while (offset >= iov->iov_len) {
-                offset -= iov->iov_len;
-                iov++;
-                niov--;
-                LASSERT (niov > 0);
-        }
-        
-        do {
-                int  fraglen = iov->iov_len - offset;
-                long npages;
-                
-                if (fraglen > nob)
-                        fraglen = nob;
-                npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
-
-                nmapped += npages;
-                if (nmapped > maxmapped) {
-                        CERROR("Can't map message in %d pages (max %d)\n",
-                               nmapped, maxmapped);
-                        return (-EMSGSIZE);
-                }
-
-                if (nfrags == EP_MAXFRAG) {
-                        CERROR("Message too fragmented in Elan VM (max %d frags)\n",
-                               EP_MAXFRAG);
-                        return (-EMSGSIZE);
-                }
-
-                CDEBUG(D_NET,
-                       "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
-                       ktx, nfrags, iov->iov_base + offset, fraglen, 
-                       basepage, npages, nmapped);
-
-                ep_dvma_load(kqswnal_data.kqn_ep, NULL,
-                             iov->iov_base + offset, fraglen,
-                             kqswnal_data.kqn_ep_tx_nmh, basepage,
-                             &railmask, &ktx->ktx_frags[nfrags]);
-
-                if (nfrags == ktx->ktx_firsttmpfrag ||
-                    !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
-                                  &ktx->ktx_frags[nfrags - 1],
-                                  &ktx->ktx_frags[nfrags])) {
-                        /* new frag if this is the first or can't merge */
-                        nfrags++;
-                }
-
-                /* keep in loop for failure case */
-                ktx->ktx_nmappedpages = nmapped;
-
-                basepage += npages;
-                iov++;
-                niov--;
-                nob -= fraglen;
-                offset = 0;
-
-                /* iov must not run out before end of data */
-                LASSERT (nob == 0 || niov > 0);
-
-        } while (nob > 0);
-
-        ktx->ktx_nfrag = nfrags;
-        CDEBUG (D_NET, "%p got %d frags over %d pages\n",
-                ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
-
-        return (0);
-}
-
-#if KQSW_CKSUM
-__u32
-kqswnal_csum_iov (__u32 csum, int offset, int nob, 
-                  unsigned int niov, struct iovec *iov)
-{
-        if (nob == 0)
-                return csum;
-        
-        LASSERT (niov > 0);
-        LASSERT (nob > 0);
-
-        /* skip complete frags before offset */
-        while (offset >= iov->iov_len) {
-                offset -= iov->iov_len;
-                iov++;
-                niov--;
-                LASSERT (niov > 0);
-        }
-        
-        do {
-                int  fraglen = iov->iov_len - offset;
-                
-                if (fraglen > nob)
-                        fraglen = nob;
-
-                csum = kqswnal_csum(csum, iov->iov_base + offset, fraglen);
-
-                iov++;
-                niov--;
-                nob -= fraglen;
-                offset = 0;
-
-                /* iov must not run out before end of data */
-                LASSERT (nob == 0 || niov > 0);
-
-        } while (nob > 0);
-
-        return csum;
-}
-#endif
-
-void
-kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
-{
-       unsigned long     flags;
-
-       kqswnal_unmap_tx(ktx);                  /* release temporary mappings */
-       ktx->ktx_state = KTX_IDLE;
-
-       spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
-
-       cfs_list_del(&ktx->ktx_list);           /* take off active list */
-       cfs_list_add(&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
-
-       spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
-}
-
-kqswnal_tx_t *
-kqswnal_get_idle_tx (void)
-{
-       unsigned long  flags;
-       kqswnal_tx_t  *ktx;
-
-       spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
-
-       if (kqswnal_data.kqn_shuttingdown ||
-           cfs_list_empty(&kqswnal_data.kqn_idletxds)) {
-               spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
-
-               return NULL;
-       }
-
-        ktx = cfs_list_entry (kqswnal_data.kqn_idletxds.next, kqswnal_tx_t,
-                              ktx_list);
-        cfs_list_del (&ktx->ktx_list);
-
-        cfs_list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
-        ktx->ktx_launcher = current->pid;
-       atomic_inc(&kqswnal_data.kqn_pending_txs);
-
-       spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
-
-        /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
-        LASSERT (ktx->ktx_nmappedpages == 0);
-        return (ktx);
-}
-
-void
-kqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx)
-{
-       lnet_msg_t    *lnetmsg0 = NULL;
-       lnet_msg_t    *lnetmsg1 = NULL;
-       int            status0  = 0;
-       int            status1  = 0;
-       kqswnal_rx_t  *krx;
-
-       LASSERT (!in_interrupt());
-
-       if (ktx->ktx_status == -EHOSTDOWN)
-               kqswnal_notify_peer_down(ktx);
-
-        switch (ktx->ktx_state) {
-        case KTX_RDMA_FETCH:                    /* optimized PUT/REPLY handled */
-                krx      = (kqswnal_rx_t *)ktx->ktx_args[0];
-                lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
-                status0  = ktx->ktx_status;
-#if KQSW_CKSUM
-                if (status0 == 0) {             /* RDMA succeeded */
-                        kqswnal_msg_t *msg;
-                        __u32          csum;
-
-                        msg = (kqswnal_msg_t *)
-                              page_address(krx->krx_kiov[0].kiov_page);
-
-                        csum = (lnetmsg0->msg_kiov != NULL) ?
-                               kqswnal_csum_kiov(krx->krx_cksum,
-                                                 lnetmsg0->msg_offset,
-                                                 lnetmsg0->msg_wanted,
-                                                 lnetmsg0->msg_niov,
-                                                 lnetmsg0->msg_kiov) :
-                               kqswnal_csum_iov(krx->krx_cksum,
-                                                lnetmsg0->msg_offset,
-                                                lnetmsg0->msg_wanted,
-                                                lnetmsg0->msg_niov,
-                                                lnetmsg0->msg_iov);
-
-                        /* Can only check csum if I got it all */
-                        if (lnetmsg0->msg_wanted == lnetmsg0->msg_len &&
-                            csum != msg->kqm_cksum) {
-                                ktx->ktx_status = -EIO;
-                                krx->krx_rpc_reply.msg.status = -EIO;
-                                CERROR("RDMA checksum failed %u(%u) from %s\n",
-                                       csum, msg->kqm_cksum,
-                                       libcfs_nid2str(kqswnal_rx_nid(krx)));
-                        }
-                }
-#endif       
-                LASSERT (krx->krx_state == KRX_COMPLETING);
-                kqswnal_rx_decref (krx);
-                break;
-
-        case KTX_RDMA_STORE:       /* optimized GET handled */
-        case KTX_PUTTING:          /* optimized PUT sent */
-        case KTX_SENDING:          /* normal send */
-                lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
-                status0  = ktx->ktx_status;
-                break;
-
-        case KTX_GETTING:          /* optimized GET sent & payload received */
-                /* Complete the GET with success since we can't avoid
-                 * delivering a REPLY event; we committed to it when we
-                 * launched the GET */
-                lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
-                status0  = 0;
-                lnetmsg1 = (lnet_msg_t *)ktx->ktx_args[2];
-                status1  = ktx->ktx_status;
-#if KQSW_CKSUM
-                if (status1 == 0) {             /* RDMA succeeded */
-                        lnet_msg_t   *lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
-                        lnet_libmd_t *md = lnetmsg0->msg_md;
-                        __u32         csum;
-                
-                        csum = ((md->md_options & LNET_MD_KIOV) != 0) ? 
-                               kqswnal_csum_kiov(~0, 0,
-                                                 md->md_length,
-                                                 md->md_niov, 
-                                                 md->md_iov.kiov) :
-                               kqswnal_csum_iov(~0, 0,
-                                                md->md_length,
-                                                md->md_niov,
-                                                md->md_iov.iov);
-
-                        if (csum != ktx->ktx_cksum) {
-                                CERROR("RDMA checksum failed %u(%u) from %s\n",
-                                       csum, ktx->ktx_cksum,
-                                       libcfs_nid2str(ktx->ktx_nid));
-                                status1 = -EIO;
-                        }
-                }
-#endif                
-                break;
-
-        default:
-                LASSERT (0);
-        }
-
-        kqswnal_put_idle_tx (ktx);
-
-        lnet_finalize (kqswnal_data.kqn_ni, lnetmsg0, status0);
-        if (lnetmsg1 != NULL)
-                lnet_finalize (kqswnal_data.kqn_ni, lnetmsg1, status1);
-}
-
-void
-kqswnal_tx_done (kqswnal_tx_t *ktx, int status)
-{
-       unsigned long      flags;
-
-       ktx->ktx_status = status;
-
-       if (!in_interrupt()) {
-               kqswnal_tx_done_in_thread_context(ktx);
-               return;
-       }
-
-       /* Complete the send in thread context */
-       spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
-
-       cfs_list_add_tail(&ktx->ktx_schedlist,
-                          &kqswnal_data.kqn_donetxds);
-       wake_up(&kqswnal_data.kqn_sched_waitq);
-
-       spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);
-}
-
-static void
-kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
-{
-        kqswnal_tx_t         *ktx = (kqswnal_tx_t *)arg;
-        kqswnal_rpc_reply_t  *reply;
-
-        LASSERT (txd != NULL);
-        LASSERT (ktx != NULL);
-
-        CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
-
-        if (status != EP_SUCCESS) {
-
-                CNETERR("Tx completion to %s failed: %d\n",
-                        libcfs_nid2str(ktx->ktx_nid), status);
-
-                status = -EHOSTDOWN;
-
-        } else switch (ktx->ktx_state) {
-
-        case KTX_GETTING:
-        case KTX_PUTTING:
-                /* RPC complete! */
-                reply = (kqswnal_rpc_reply_t *)ep_txd_statusblk(txd);
-                if (reply->msg.magic == 0) {    /* "old" peer */
-                        status = reply->msg.status;
-                        break;
-                }
-                
-                if (reply->msg.magic != LNET_PROTO_QSW_MAGIC) {
-                        if (reply->msg.magic != swab32(LNET_PROTO_QSW_MAGIC)) {
-                                CERROR("%s unexpected rpc reply magic %08x\n",
-                                       libcfs_nid2str(ktx->ktx_nid),
-                                       reply->msg.magic);
-                                status = -EPROTO;
-                                break;
-                        }
-
-                        __swab32s(&reply->msg.status);
-                        __swab32s(&reply->msg.version);
-                        
-                        if (ktx->ktx_state == KTX_GETTING) {
-                                __swab32s(&reply->msg.u.get.len);
-                                __swab32s(&reply->msg.u.get.cksum);
-                        }
-                }
-                        
-                status = reply->msg.status;
-                if (status != 0) {
-                        CERROR("%s RPC status %08x\n",
-                               libcfs_nid2str(ktx->ktx_nid), status);
-                        break;
-                }
-
-                if (ktx->ktx_state == KTX_GETTING) {
-                        lnet_set_reply_msg_len(kqswnal_data.kqn_ni,
-                                               (lnet_msg_t *)ktx->ktx_args[2],
-                                               reply->msg.u.get.len);
-#if KQSW_CKSUM
-                        ktx->ktx_cksum = reply->msg.u.get.cksum;
-#endif
-                }
-                break;
-                
-        case KTX_SENDING:
-                status = 0;
-                break;
-                
-        default:
-                LBUG();
-                break;
-        }
-
-        kqswnal_tx_done(ktx, status);
-}
-
-int
-kqswnal_launch (kqswnal_tx_t *ktx)
-{
-       /* Don't block for transmit descriptor if we're in interrupt context */
-       int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
-       int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
-       unsigned long flags;
-       int   rc;
-
-       ktx->ktx_launchtime = cfs_time_current();
-
-        if (kqswnal_data.kqn_shuttingdown)
-                return (-ESHUTDOWN);
-
-        LASSERT (dest >= 0);                    /* must be a peer */
-
-        if (ktx->ktx_nmappedpages != 0)
-                attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
-
-        switch (ktx->ktx_state) {
-        case KTX_GETTING:
-        case KTX_PUTTING:
-                if (the_lnet.ln_testprotocompat != 0) {
-                        kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
-
-                        /* single-shot proto test:
-                         * Future version queries will use an RPC, so I'll
-                         * co-opt one of the existing ones */
-                        LNET_LOCK();
-                        if ((the_lnet.ln_testprotocompat & 1) != 0) {
-                                msg->kqm_version++;
-                                the_lnet.ln_testprotocompat &= ~1;
-                        }
-                        if ((the_lnet.ln_testprotocompat & 2) != 0) {
-                                msg->kqm_magic = LNET_PROTO_MAGIC;
-                                the_lnet.ln_testprotocompat &= ~2;
-                        }
-                        LNET_UNLOCK();
-                }
-
-                /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
-                 * The other frags are the payload, awaiting RDMA */
-                rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
-                                     ktx->ktx_port, attr,
-                                     kqswnal_txhandler, ktx,
-                                     NULL, ktx->ktx_frags, 1);
-                break;
-
-        case KTX_SENDING:
-                rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
-                                         ktx->ktx_port, attr,
-                                         kqswnal_txhandler, ktx,
-                                         NULL, ktx->ktx_frags, ktx->ktx_nfrag);
-                break;
-
-        default:
-                LBUG();
-                rc = -EINVAL;                   /* no compiler warning please */
-                break;
-        }
-
-        switch (rc) {
-        case EP_SUCCESS: /* success */
-                return (0);
-
-        case EP_ENOMEM: /* can't allocate ep txd => queue for later */
-               spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
-
-               cfs_list_add_tail(&ktx->ktx_schedlist,
-                                 &kqswnal_data.kqn_delayedtxds);
-               wake_up(&kqswnal_data.kqn_sched_waitq);
-
-               spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
-                                            flags);
-                return (0);
-
-        default: /* fatal error */
-                CNETERR ("Tx to %s failed: %d\n",
-                        libcfs_nid2str(ktx->ktx_nid), rc);
-                kqswnal_notify_peer_down(ktx);
-                return (-EHOSTUNREACH);
-        }
-}
-
-#if 0
-static char *
-hdr_type_string (lnet_hdr_t *hdr)
-{
-        switch (hdr->type) {
-        case LNET_MSG_ACK:
-                return ("ACK");
-        case LNET_MSG_PUT:
-                return ("PUT");
-        case LNET_MSG_GET:
-                return ("GET");
-        case LNET_MSG_REPLY:
-                return ("REPLY");
-        default:
-                return ("<UNKNOWN>");
-        }
-}
-
-static void
-kqswnal_cerror_hdr(lnet_hdr_t * hdr)
-{
-        char *type_str = hdr_type_string (hdr);
-
-        CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
-               le32_to_cpu(hdr->payload_length));
-        CERROR("    From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),
-               le32_to_cpu(hdr->src_pid));
-        CERROR("    To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),
-               le32_to_cpu(hdr->dest_pid));
-
-        switch (le32_to_cpu(hdr->type)) {
-        case LNET_MSG_PUT:
-                CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
-                       "match bits "LPX64"\n",
-                       le32_to_cpu(hdr->msg.put.ptl_index),
-                       hdr->msg.put.ack_wmd.wh_interface_cookie,
-                       hdr->msg.put.ack_wmd.wh_object_cookie,
-                       le64_to_cpu(hdr->msg.put.match_bits));
-                CERROR("    offset %d, hdr data "LPX64"\n",
-                       le32_to_cpu(hdr->msg.put.offset),
-                       hdr->msg.put.hdr_data);
-                break;
-
-        case LNET_MSG_GET:
-                CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
-                       "match bits "LPX64"\n",
-                       le32_to_cpu(hdr->msg.get.ptl_index),
-                       hdr->msg.get.return_wmd.wh_interface_cookie,
-                       hdr->msg.get.return_wmd.wh_object_cookie,
-                       hdr->msg.get.match_bits);
-                CERROR("    Length %d, src offset %d\n",
-                       le32_to_cpu(hdr->msg.get.sink_length),
-                       le32_to_cpu(hdr->msg.get.src_offset));
-                break;
-
-        case LNET_MSG_ACK:
-                CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
-                       hdr->msg.ack.dst_wmd.wh_interface_cookie,
-                       hdr->msg.ack.dst_wmd.wh_object_cookie,
-                       le32_to_cpu(hdr->msg.ack.mlength));
-                break;
-
-        case LNET_MSG_REPLY:
-                CERROR("    dst md "LPX64"."LPX64"\n",
-                       hdr->msg.reply.dst_wmd.wh_interface_cookie,
-                       hdr->msg.reply.dst_wmd.wh_object_cookie);
-        }
-
-}                               /* end of print_hdr() */
-#endif
-
-int
-kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
-                    int nrfrag, EP_NMD *rfrag)
-{
-        int  i;
-
-        if (nlfrag != nrfrag) {
-                CERROR("Can't cope with unequal # frags: %d local %d remote\n",
-                       nlfrag, nrfrag);
-                return (-EINVAL);
-        }
-        
-        for (i = 0; i < nlfrag; i++)
-                if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
-                        CERROR("Can't cope with unequal frags %d(%d):"
-                               " %d local %d remote\n",
-                               i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
-                        return (-EINVAL);
-                }
-        
-        return (0);
-}
-
-kqswnal_remotemd_t *
-kqswnal_get_portalscompat_rmd (kqswnal_rx_t *krx)
-{
-        /* Check that the RMD sent after the "raw" LNET header in a
-         * portals-compatible QSWLND message is OK */
-        char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
-        kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + sizeof(lnet_hdr_t));
-
-        /* Note RDMA addresses are sent in native endian-ness in the "old"
-         * portals protocol so no swabbing... */
-
-        if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
-                /* msg too small to discover rmd size */
-                CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
-                        krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
-                return (NULL);
-        }
-
-        if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
-                /* rmd doesn't fit in the incoming message */
-                CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
-                        krx->krx_nob, rmd->kqrmd_nfrag,
-                        (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
-                return (NULL);
-        }
-
-        return (rmd);
-}
-
-void
-kqswnal_rdma_store_complete (EP_RXD *rxd) 
-{
-        int           status = ep_rxd_status(rxd);
-        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
-        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
-        
-        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
-               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
-
-        LASSERT (ktx->ktx_state == KTX_RDMA_STORE);
-        LASSERT (krx->krx_rxd == rxd);
-        LASSERT (krx->krx_rpc_reply_needed);
-
-        krx->krx_rpc_reply_needed = 0;
-        kqswnal_rx_decref (krx);
-
-        /* free ktx & finalize() its lnet_msg_t */
-        kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
-}
-
-void
-kqswnal_rdma_fetch_complete (EP_RXD *rxd) 
-{
-        /* Completed fetching the PUT/REPLY data */
-        int           status = ep_rxd_status(rxd);
-        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
-        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
-        
-        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
-               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
-
-        LASSERT (ktx->ktx_state == KTX_RDMA_FETCH);
-        LASSERT (krx->krx_rxd == rxd);
-        /* RPC completes with failure by default */
-        LASSERT (krx->krx_rpc_reply_needed);
-        LASSERT (krx->krx_rpc_reply.msg.status != 0);
-
-        if (status == EP_SUCCESS) {
-                krx->krx_rpc_reply.msg.status = 0;
-                status = 0;
-        } else {
-                /* Abandon RPC since get failed */
-                krx->krx_rpc_reply_needed = 0;
-                status = -ECONNABORTED;
-        }
-
-        /* krx gets decref'd in kqswnal_tx_done_in_thread_context() */
-        LASSERT (krx->krx_state == KRX_PARSE);
-        krx->krx_state = KRX_COMPLETING;
-
-        /* free ktx & finalize() its lnet_msg_t */
-        kqswnal_tx_done(ktx, status);
-}
-
-int
-kqswnal_rdma (kqswnal_rx_t *krx, lnet_msg_t *lntmsg,
-              int type, kqswnal_remotemd_t *rmd,
-              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
-              unsigned int offset, unsigned int len)
-{
-        kqswnal_tx_t       *ktx;
-        int                 eprc;
-        int                 rc;
-
-        /* Not both mapped and paged payload */
-        LASSERT (iov == NULL || kiov == NULL);
-        /* RPC completes with failure by default */
-        LASSERT (krx->krx_rpc_reply_needed);
-        LASSERT (krx->krx_rpc_reply.msg.status != 0);
-
-        if (len == 0) {
-                /* data got truncated to nothing. */
-                lnet_finalize(kqswnal_data.kqn_ni, lntmsg, 0);
-                /* Let kqswnal_rx_done() complete the RPC with success */
-                krx->krx_rpc_reply.msg.status = 0;
-                return (0);
-        }
-        
-        /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
-           actually sending a portals message with it */
-        ktx = kqswnal_get_idle_tx();
-        if (ktx == NULL) {
-                CERROR ("Can't get txd for RDMA with %s\n",
-                        libcfs_nid2str(kqswnal_rx_nid(krx)));
-                return (-ENOMEM);
-        }
-
-        ktx->ktx_state   = type;
-        ktx->ktx_nid     = kqswnal_rx_nid(krx);
-        ktx->ktx_args[0] = krx;
-        ktx->ktx_args[1] = lntmsg;
-
-       LASSERT (atomic_read(&krx->krx_refcount) > 0);
-        /* Take an extra ref for the completion callback */
-       atomic_inc(&krx->krx_refcount);
-
-        /* Map on the rail the RPC prefers */
-        ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
-                                         ep_rxd_railmask(krx->krx_rxd));
-
-        /* Start mapping at offset 0 (we're not mapping any headers) */
-        ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
-        
-        if (kiov != NULL)
-                rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
-        else
-                rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
-
-        if (rc != 0) {
-                CERROR ("Can't map local RDMA data: %d\n", rc);
-                goto out;
-        }
-
-        rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
-                                 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
-        if (rc != 0) {
-                CERROR ("Incompatible RDMA descriptors\n");
-                goto out;
-        }
-
-        switch (type) {
-        default:
-                LBUG();
-                
-        case KTX_RDMA_STORE:
-                krx->krx_rpc_reply.msg.status    = 0;
-                krx->krx_rpc_reply.msg.magic     = LNET_PROTO_QSW_MAGIC;
-                krx->krx_rpc_reply.msg.version   = QSWLND_PROTO_VERSION;
-                krx->krx_rpc_reply.msg.u.get.len = len;
-#if KQSW_CKSUM
-                krx->krx_rpc_reply.msg.u.get.cksum = (kiov != NULL) ?
-                            kqswnal_csum_kiov(~0, offset, len, niov, kiov) :
-                            kqswnal_csum_iov(~0, offset, len, niov, iov);
-                if (*kqswnal_tunables.kqn_inject_csum_error == 4) {
-                        krx->krx_rpc_reply.msg.u.get.cksum++;
-                        *kqswnal_tunables.kqn_inject_csum_error = 0;
-                }
-#endif
-                eprc = ep_complete_rpc(krx->krx_rxd, 
-                                       kqswnal_rdma_store_complete, ktx, 
-                                       &krx->krx_rpc_reply.ep_statusblk, 
-                                       ktx->ktx_frags, rmd->kqrmd_frag, 
-                                       rmd->kqrmd_nfrag);
-                if (eprc != EP_SUCCESS) {
-                        CERROR("can't complete RPC: %d\n", eprc);
-                        /* don't re-attempt RPC completion */
-                        krx->krx_rpc_reply_needed = 0;
-                        rc = -ECONNABORTED;
-                }
-                break;
-                
-        case KTX_RDMA_FETCH:
-                eprc = ep_rpc_get (krx->krx_rxd, 
-                                   kqswnal_rdma_fetch_complete, ktx,
-                                   rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
-                if (eprc != EP_SUCCESS) {
-                        CERROR("ep_rpc_get failed: %d\n", eprc);
-                        /* Don't attempt RPC completion: 
-                         * EKC nuked it when the get failed */
-                        krx->krx_rpc_reply_needed = 0;
-                        rc = -ECONNABORTED;
-                }
-                break;
-        }
-
- out:
-        if (rc != 0) {
-                kqswnal_rx_decref(krx);                 /* drop callback's ref */
-                kqswnal_put_idle_tx (ktx);
-        }
-
-       atomic_dec(&kqswnal_data.kqn_pending_txs);
-        return (rc);
-}
-
-int
-kqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
-{
-        lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
-        int               type = lntmsg->msg_type;
-        lnet_process_id_t target = lntmsg->msg_target;
-        int               target_is_router = lntmsg->msg_target_is_router;
-        int               routing = lntmsg->msg_routing;
-        unsigned int      payload_niov = lntmsg->msg_niov;
-        struct iovec     *payload_iov = lntmsg->msg_iov;
-        lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
-        unsigned int      payload_offset = lntmsg->msg_offset;
-        unsigned int      payload_nob = lntmsg->msg_len;
-        int               nob;
-        kqswnal_tx_t     *ktx;
-        int               rc;
-
-        /* NB 1. hdr is in network byte order */
-        /*    2. 'private' depends on the message type */
-        
-        CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",
-               payload_nob, payload_niov, libcfs_id2str(target));
-
-       LASSERT (payload_nob == 0 || payload_niov > 0);
-       LASSERT (payload_niov <= LNET_MAX_IOV);
-
-       /* It must be OK to kmap() if required */
-       LASSERT (payload_kiov == NULL || !in_interrupt ());
-       /* payload is either all vaddrs or all pages */
-       LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
-
-       if (kqswnal_nid2elanid (target.nid) < 0) {
-               CERROR("%s not in my cluster\n", libcfs_nid2str(target.nid));
-               return -EIO;
-       }
-
-        /* I may not block for a transmit descriptor if I might block the
-         * router, receiver, or an interrupt handler. */
-        ktx = kqswnal_get_idle_tx();
-        if (ktx == NULL) {
-                CERROR ("Can't get txd for msg type %d for %s\n",
-                        type, libcfs_nid2str(target.nid));
-                return (-ENOMEM);
-        }
-
-        ktx->ktx_state   = KTX_SENDING;
-        ktx->ktx_nid     = target.nid;
-        ktx->ktx_args[0] = private;
-        ktx->ktx_args[1] = lntmsg;
-        ktx->ktx_args[2] = NULL;    /* set when a GET commits to REPLY */
-
-        /* The first frag will be the pre-mapped buffer. */
-        ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
-
-        if ((!target_is_router &&               /* target.nid is final dest */
-             !routing &&                        /* I'm the source */
-             type == LNET_MSG_GET &&            /* optimize GET? */
-             *kqswnal_tunables.kqn_optimized_gets != 0 &&
-             lntmsg->msg_md->md_length >= 
-             *kqswnal_tunables.kqn_optimized_gets) ||
-            ((type == LNET_MSG_PUT ||            /* optimize PUT? */
-              type == LNET_MSG_REPLY) &&         /* optimize REPLY? */
-             *kqswnal_tunables.kqn_optimized_puts != 0 &&
-             payload_nob >= *kqswnal_tunables.kqn_optimized_puts)) {
-                lnet_libmd_t       *md = lntmsg->msg_md;
-                kqswnal_msg_t      *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
-                lnet_hdr_t         *mhdr;
-                kqswnal_remotemd_t *rmd;
-
-                /* Optimised path: I send over the Elan vaddrs of the local
-                 * buffers, and my peer DMAs directly to/from them.
-                 *
-                 * First I set up ktx as if it was going to send this
-                 * payload, (it needs to map it anyway).  This fills
-                 * ktx_frags[1] and onward with the network addresses
-                 * of the buffer frags. */
-
-                /* Send an RDMA message */
-                msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
-                msg->kqm_version = QSWLND_PROTO_VERSION;
-                msg->kqm_type = QSWLND_MSG_RDMA;
-
-                mhdr = &msg->kqm_u.rdma.kqrm_hdr;
-                rmd  = &msg->kqm_u.rdma.kqrm_rmd;
-
-                *mhdr = *hdr;
-                nob = (((char *)rmd) - ktx->ktx_buffer);
-
-                if (type == LNET_MSG_GET) {
-                        if ((md->md_options & LNET_MD_KIOV) != 0) 
-                                rc = kqswnal_map_tx_kiov (ktx, 0, md->md_length,
-                                                          md->md_niov, md->md_iov.kiov);
-                        else
-                                rc = kqswnal_map_tx_iov (ktx, 0, md->md_length,
-                                                         md->md_niov, md->md_iov.iov);
-                        ktx->ktx_state = KTX_GETTING;
-                } else {
-                        if (payload_kiov != NULL)
-                                rc = kqswnal_map_tx_kiov(ktx, 0, payload_nob,
-                                                         payload_niov, payload_kiov);
-                        else
-                                rc = kqswnal_map_tx_iov(ktx, 0, payload_nob,
-                                                        payload_niov, payload_iov);
-                        ktx->ktx_state = KTX_PUTTING;
-                }
-
-                if (rc != 0)
-                        goto out;
-
-                rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
-                nob += offsetof(kqswnal_remotemd_t,
-                                kqrmd_frag[rmd->kqrmd_nfrag]);
-                LASSERT (nob <= KQSW_TX_BUFFER_SIZE);
-
-                memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
-                       rmd->kqrmd_nfrag * sizeof(EP_NMD));
-
-                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
-#if KQSW_CKSUM
-                msg->kqm_nob   = nob + payload_nob;
-                msg->kqm_cksum = 0;
-                msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
-#endif
-                if (type == LNET_MSG_GET) {
-                        /* Allocate reply message now while I'm in thread context */
-                        ktx->ktx_args[2] = lnet_create_reply_msg (
-                                kqswnal_data.kqn_ni, lntmsg);
-                        if (ktx->ktx_args[2] == NULL)
-                                goto out;
-
-                        /* NB finalizing the REPLY message is my
-                         * responsibility now, whatever happens. */
-#if KQSW_CKSUM
-                        if (*kqswnal_tunables.kqn_inject_csum_error ==  3) {
-                                msg->kqm_cksum++;
-                                *kqswnal_tunables.kqn_inject_csum_error = 0;
-                        }
-
-                } else if (payload_kiov != NULL) {
-                        /* must checksum payload after header so receiver can
-                         * compute partial header cksum before swab.  Sadly
-                         * this causes 2 rounds of kmap */
-                        msg->kqm_cksum =
-                                kqswnal_csum_kiov(msg->kqm_cksum, 0, payload_nob,
-                                                  payload_niov, payload_kiov);
-                        if (*kqswnal_tunables.kqn_inject_csum_error ==  2) {
-                                msg->kqm_cksum++;
-                                *kqswnal_tunables.kqn_inject_csum_error = 0;
-                        }
-                } else {
-                        msg->kqm_cksum =
-                                kqswnal_csum_iov(msg->kqm_cksum, 0, payload_nob,
-                                                 payload_niov, payload_iov);
-                        if (*kqswnal_tunables.kqn_inject_csum_error ==  2) {
-                                msg->kqm_cksum++;
-                                *kqswnal_tunables.kqn_inject_csum_error = 0;
-                        }
-#endif
-                }
-                
-        } else if (payload_nob <= *kqswnal_tunables.kqn_tx_maxcontig) {
-                lnet_hdr_t    *mhdr;
-                char          *payload;
-                kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
-
-                /* single frag copied into the pre-mapped buffer */
-                msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
-                msg->kqm_version = QSWLND_PROTO_VERSION;
-                msg->kqm_type = QSWLND_MSG_IMMEDIATE;
-
-                mhdr = &msg->kqm_u.immediate.kqim_hdr;
-                payload = msg->kqm_u.immediate.kqim_payload;
-
-                *mhdr = *hdr;
-                nob = (payload - ktx->ktx_buffer) + payload_nob;
-
-                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
-
-                if (payload_kiov != NULL)
-                        lnet_copy_kiov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
-                                            payload_niov, payload_kiov, 
-                                            payload_offset, payload_nob);
-                else
-                        lnet_copy_iov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
-                                           payload_niov, payload_iov, 
-                                           payload_offset, payload_nob);
-#if KQSW_CKSUM
-                msg->kqm_nob   = nob;
-                msg->kqm_cksum = 0;
-                msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
-                if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
-                        msg->kqm_cksum++;
-                        *kqswnal_tunables.kqn_inject_csum_error = 0;
-                }
-#endif
-        } else {
-                lnet_hdr_t    *mhdr;
-                kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
-
-                /* multiple frags: first is hdr in pre-mapped buffer */
-                msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
-                msg->kqm_version = QSWLND_PROTO_VERSION;
-                msg->kqm_type = QSWLND_MSG_IMMEDIATE;
-
-                mhdr = &msg->kqm_u.immediate.kqim_hdr;
-                nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
-
-                *mhdr = *hdr;
-
-                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
-
-                if (payload_kiov != NULL)
-                        rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
-                                                  payload_niov, payload_kiov);
-                else
-                        rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
-                                                 payload_niov, payload_iov);
-                if (rc != 0)
-                        goto out;
-
-#if KQSW_CKSUM
-                msg->kqm_nob   = nob + payload_nob;
-                msg->kqm_cksum = 0;
-                msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
-
-                msg->kqm_cksum = (payload_kiov != NULL) ?
-                                 kqswnal_csum_kiov(msg->kqm_cksum,
-                                                   payload_offset, payload_nob,
-                                                   payload_niov, payload_kiov) :
-                                 kqswnal_csum_iov(msg->kqm_cksum,
-                                                  payload_offset, payload_nob,
-                                                  payload_niov, payload_iov);
-
-                if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
-                        msg->kqm_cksum++;
-                        *kqswnal_tunables.kqn_inject_csum_error = 0;
-                }
-#endif
-                nob += payload_nob;
-        }
-
-        ktx->ktx_port = (nob <= KQSW_SMALLMSG) ?
-                        EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
-
-        rc = kqswnal_launch (ktx);
-
- out:
-        CDEBUG_LIMIT(rc == 0? D_NET :D_NETERROR, "%s %d bytes to %s%s: rc %d\n",
-                     routing ? (rc == 0 ? "Routed" : "Failed to route") :
-                               (rc == 0 ? "Sent" : "Failed to send"),
-                     nob, libcfs_nid2str(target.nid),
-                     target_is_router ? "(router)" : "", rc);
-
-        if (rc != 0) {
-                lnet_msg_t *repmsg = (lnet_msg_t *)ktx->ktx_args[2];
-                int         state = ktx->ktx_state;
-
-                kqswnal_put_idle_tx (ktx);
-
-                if (state == KTX_GETTING && repmsg != NULL) {
-                        /* We committed to reply, but there was a problem
-                         * launching the GET.  We can't avoid delivering a
-                         * REPLY event since we committed above, so we
-                         * pretend the GET succeeded but the REPLY
-                         * failed. */
-                        rc = 0;
-                        lnet_finalize (kqswnal_data.kqn_ni, lntmsg, 0);
-                        lnet_finalize (kqswnal_data.kqn_ni, repmsg, -EIO);
-                }
-                
-        }
-        
-       atomic_dec(&kqswnal_data.kqn_pending_txs);
-        return (rc == 0 ? 0 : -EIO);
-}
-
-void
-kqswnal_requeue_rx (kqswnal_rx_t *krx)
-{
-       LASSERT (atomic_read(&krx->krx_refcount) == 0);
-        LASSERT (!krx->krx_rpc_reply_needed);
-
-        krx->krx_state = KRX_POSTED;
-
-        if (kqswnal_data.kqn_shuttingdown) {
-                /* free EKC rxd on shutdown */
-                ep_complete_receive(krx->krx_rxd);
-        } else {
-                /* repost receive */
-                ep_requeue_receive(krx->krx_rxd, 
-                                   kqswnal_rxhandler, krx,
-                                   &krx->krx_elanbuffer, 0);
-        }
-}
-
-void
-kqswnal_rpc_complete (EP_RXD *rxd)
-{
-        int           status = ep_rxd_status(rxd);
-        kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
-        
-        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
-               "rxd %p, krx %p, status %d\n", rxd, krx, status);
-
-        LASSERT (krx->krx_rxd == rxd);
-        LASSERT (krx->krx_rpc_reply_needed);
-        
-        krx->krx_rpc_reply_needed = 0;
-        kqswnal_requeue_rx (krx);
-}
-
-void
-kqswnal_rx_done (kqswnal_rx_t *krx) 
-{
-       int           rc;
-
-       LASSERT (atomic_read(&krx->krx_refcount) == 0);
-
-       if (krx->krx_rpc_reply_needed) {
-               /* We've not completed the peer's RPC yet... */
-               krx->krx_rpc_reply.msg.magic   = LNET_PROTO_QSW_MAGIC;
-               krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION;
-
-               LASSERT (!in_interrupt());
-
-               rc = ep_complete_rpc(krx->krx_rxd,
-                                    kqswnal_rpc_complete, krx,
-                                    &krx->krx_rpc_reply.ep_statusblk,
-                                    NULL, NULL, 0);
-               if (rc == EP_SUCCESS)
-                       return;
-
-               CERROR("can't complete RPC: %d\n", rc);
-               krx->krx_rpc_reply_needed = 0;
-       }
-
-       kqswnal_requeue_rx(krx);
-}
-        
-void
-kqswnal_parse (kqswnal_rx_t *krx)
-{
-        lnet_ni_t      *ni = kqswnal_data.kqn_ni;
-        kqswnal_msg_t  *msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
-        lnet_nid_t      fromnid = kqswnal_rx_nid(krx);
-        int             swab;
-        int             n;
-        int             i;
-        int             nob;
-        int             rc;
-
-       LASSERT (atomic_read(&krx->krx_refcount) == 1);
-
-        if (krx->krx_nob < offsetof(kqswnal_msg_t, kqm_u)) {
-                CERROR("Short message %d received from %s\n",
-                       krx->krx_nob, libcfs_nid2str(fromnid));
-                goto done;
-        }
-
-        swab = msg->kqm_magic == __swab32(LNET_PROTO_QSW_MAGIC);
-
-        if (swab || msg->kqm_magic == LNET_PROTO_QSW_MAGIC) {
-#if KQSW_CKSUM
-                __u32 csum0;
-                __u32 csum1;
-
-                /* csum byte array before swab */
-                csum1 = msg->kqm_cksum;
-                msg->kqm_cksum = 0;
-                csum0 = kqswnal_csum_kiov(~0, 0, krx->krx_nob,
-                                          krx->krx_npages, krx->krx_kiov);
-                msg->kqm_cksum = csum1;
-#endif
-
-                if (swab) {
-                        __swab16s(&msg->kqm_version);
-                        __swab16s(&msg->kqm_type);
-#if KQSW_CKSUM
-                        __swab32s(&msg->kqm_cksum);
-                        __swab32s(&msg->kqm_nob);
-#endif
-                }
-
-                if (msg->kqm_version != QSWLND_PROTO_VERSION) {
-                        /* Future protocol version compatibility support!
-                         * The next qswlnd-specific protocol rev will first
-                         * send an RPC to check version.
-                         * 1.4.6 and 1.4.7.early reply with a status
-                         * block containing its current version.
-                         * Later versions send a failure (-ve) status +
-                         * magic/version */
-
-                        if (!krx->krx_rpc_reply_needed) {
-                                CERROR("Unexpected version %d from %s\n",
-                                       msg->kqm_version, libcfs_nid2str(fromnid));
-                                goto done;
-                        }
-
-                        LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
-                        goto done;
-                }
-
-                switch (msg->kqm_type) {
-                default:
-                        CERROR("Bad request type %x from %s\n",
-                               msg->kqm_type, libcfs_nid2str(fromnid));
-                        goto done;
-
-                case QSWLND_MSG_IMMEDIATE:
-                        if (krx->krx_rpc_reply_needed) {
-                                /* Should have been a simple message */
-                                CERROR("IMMEDIATE sent as RPC from %s\n",
-                                       libcfs_nid2str(fromnid));
-                                goto done;
-                        }
-
-                        nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
-                        if (krx->krx_nob < nob) {
-                                CERROR("Short IMMEDIATE %d(%d) from %s\n",
-                                       krx->krx_nob, nob, libcfs_nid2str(fromnid));
-                                goto done;
-                        }
-
-#if KQSW_CKSUM
-                        if (csum0 != msg->kqm_cksum) {
-                                CERROR("Bad IMMEDIATE checksum %08x(%08x) from %s\n",
-                                       csum0, msg->kqm_cksum, libcfs_nid2str(fromnid));
-                                CERROR("nob %d (%d)\n", krx->krx_nob, msg->kqm_nob);
-                                goto done;
-                        }
-#endif
-                        rc = lnet_parse(ni, &msg->kqm_u.immediate.kqim_hdr,
-                                        fromnid, krx, 0);
-                        if (rc < 0)
-                                goto done;
-                        return;
-
-                case QSWLND_MSG_RDMA:
-                        if (!krx->krx_rpc_reply_needed) {
-                                /* Should have been a simple message */
-                                CERROR("RDMA sent as simple message from %s\n",
-                                       libcfs_nid2str(fromnid));
-                                goto done;
-                        }
-
-                        nob = offsetof(kqswnal_msg_t,
-                                       kqm_u.rdma.kqrm_rmd.kqrmd_frag[0]);
-                        if (krx->krx_nob < nob) {
-                                CERROR("Short RDMA message %d(%d) from %s\n",
-                                       krx->krx_nob, nob, libcfs_nid2str(fromnid));
-                                goto done;
-                        }
-
-                        if (swab)
-                                __swab32s(&msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag);
-
-                        n = msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag;
-                        nob = offsetof(kqswnal_msg_t,
-                                       kqm_u.rdma.kqrm_rmd.kqrmd_frag[n]);
-
-                        if (krx->krx_nob < nob) {
-                                CERROR("short RDMA message %d(%d) from %s\n",
-                                       krx->krx_nob, nob, libcfs_nid2str(fromnid));
-                                goto done;
-                        }
-
-                        if (swab) {
-                                for (i = 0; i < n; i++) {
-                                        EP_NMD *nmd = &msg->kqm_u.rdma.kqrm_rmd.kqrmd_frag[i];
-
-                                        __swab32s(&nmd->nmd_addr);
-                                        __swab32s(&nmd->nmd_len);
-                                        __swab32s(&nmd->nmd_attr);
-                                }
-                        }
-
-#if KQSW_CKSUM
-                        krx->krx_cksum = csum0; /* stash checksum so far */
-#endif
-                        rc = lnet_parse(ni, &msg->kqm_u.rdma.kqrm_hdr,
-                                        fromnid, krx, 1);
-                        if (rc < 0)
-                                goto done;
-                        return;
-                }
-                /* Not Reached */
-        }
-
-        if (msg->kqm_magic == LNET_PROTO_MAGIC ||
-            msg->kqm_magic == __swab32(LNET_PROTO_MAGIC)) {
-                /* Future protocol version compatibility support!
-                 * When LNET unifies protocols over all LNDs, the first thing a
-                 * peer will send will be a version query RPC.  
-                 * 1.4.6 and 1.4.7.early reply with a status block containing
-                 * LNET_PROTO_QSW_MAGIC..
-                 * Later versions send a failure (-ve) status +
-                 * magic/version */
-
-                if (!krx->krx_rpc_reply_needed) {
-                        CERROR("Unexpected magic %08x from %s\n",
-                               msg->kqm_magic, libcfs_nid2str(fromnid));
-                        goto done;
-                }
-
-                LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
-                goto done;
-        }
-
-        CERROR("Unrecognised magic %08x from %s\n",
-               msg->kqm_magic, libcfs_nid2str(fromnid));
- done:
-        kqswnal_rx_decref(krx);
-}
-
-/* Receive Interrupt Handler: posts to schedulers */
-void 
-kqswnal_rxhandler(EP_RXD *rxd)
-{
-        unsigned long flags;
-        int           nob    = ep_rxd_len (rxd);
-        int           status = ep_rxd_status (rxd);
-        kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
-        CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
-               rxd, krx, nob, status);
-
-        LASSERT (krx != NULL);
-        LASSERT (krx->krx_state == KRX_POSTED);
-        
-        krx->krx_state = KRX_PARSE;
-        krx->krx_rxd = rxd;
-        krx->krx_nob = nob;
-
-        /* RPC reply iff rpc request received without error */
-        krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
-                                    (status == EP_SUCCESS ||
-                                     status == EP_MSG_TOO_BIG);
-
-        /* Default to failure if an RPC reply is requested but not handled */
-        krx->krx_rpc_reply.msg.status = -EPROTO;
-       atomic_set (&krx->krx_refcount, 1);
-
-        if (status != EP_SUCCESS) {
-                /* receives complete with failure when receiver is removed */
-                if (status == EP_SHUTDOWN)
-                        LASSERT (kqswnal_data.kqn_shuttingdown);
-                else
-                        CERROR("receive status failed with status %d nob %d\n",
-                               ep_rxd_status(rxd), nob);
-               kqswnal_rx_decref(krx);
-               return;
-       }
-
-       if (!in_interrupt()) {
-               kqswnal_parse(krx);
-               return;
-       }
-
-       spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
-
-       cfs_list_add_tail(&krx->krx_list, &kqswnal_data.kqn_readyrxds);
-       wake_up(&kqswnal_data.kqn_sched_waitq);
-
-       spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);
-}
-
-int
-kqswnal_recv (lnet_ni_t     *ni,
-              void          *private,
-              lnet_msg_t    *lntmsg,
-              int            delayed,
-              unsigned int   niov,
-              struct iovec  *iov,
-              lnet_kiov_t   *kiov,
-              unsigned int   offset,
-              unsigned int   mlen,
-              unsigned int   rlen)
-{
-       kqswnal_rx_t       *krx = (kqswnal_rx_t *)private;
-       lnet_nid_t          fromnid;
-       kqswnal_msg_t      *msg;
-       lnet_hdr_t         *hdr;
-       kqswnal_remotemd_t *rmd;
-       int                 msg_offset;
-       int                 rc;
-
-       LASSERT (!in_interrupt ());             /* OK to map */
-       /* Either all pages or all vaddrs */
-       LASSERT (!(kiov != NULL && iov != NULL));
-
-        fromnid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ep_rxd_node(krx->krx_rxd));
-        msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
-
-        if (krx->krx_rpc_reply_needed) {
-                /* optimized (rdma) request sent as RPC */
-
-                LASSERT (msg->kqm_type == QSWLND_MSG_RDMA);
-                hdr = &msg->kqm_u.rdma.kqrm_hdr;
-                rmd = &msg->kqm_u.rdma.kqrm_rmd;
-
-                /* NB header is still in wire byte order */
-
-                switch (le32_to_cpu(hdr->type)) {
-                        case LNET_MSG_PUT:
-                        case LNET_MSG_REPLY:
-                                /* This is an optimized PUT/REPLY */
-                                rc = kqswnal_rdma(krx, lntmsg, 
-                                                  KTX_RDMA_FETCH, rmd,
-                                                  niov, iov, kiov, offset, mlen);
-                                break;
-
-                        case LNET_MSG_GET:
-#if KQSW_CKSUM
-                                if (krx->krx_cksum != msg->kqm_cksum) {
-                                        CERROR("Bad GET checksum %08x(%08x) from %s\n",
-                                               krx->krx_cksum, msg->kqm_cksum,
-                                               libcfs_nid2str(fromnid));
-                                        rc = -EIO;
-                                        break;
-                                }
-#endif                                
-                                if (lntmsg == NULL) {
-                                        /* No buffer match: my decref will
-                                         * complete the RPC with failure */
-                                        rc = 0;
-                                } else {
-                                        /* Matched something! */
-                                        rc = kqswnal_rdma(krx, lntmsg,
-                                                          KTX_RDMA_STORE, rmd,
-                                                          lntmsg->msg_niov,
-                                                          lntmsg->msg_iov,
-                                                          lntmsg->msg_kiov,
-                                                          lntmsg->msg_offset,
-                                                          lntmsg->msg_len);
-                                }
-                                break;
-
-                        default:
-                                CERROR("Bad RPC type %d\n",
-                                       le32_to_cpu(hdr->type));
-                                rc = -EPROTO;
-                                break;
-                }
-
-                kqswnal_rx_decref(krx);
-                return rc;
-        }
-
-        LASSERT (msg->kqm_type == QSWLND_MSG_IMMEDIATE);
-        msg_offset = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
-        
-        if (krx->krx_nob < msg_offset + rlen) {
-                CERROR("Bad message size from %s: have %d, need %d + %d\n",
-                       libcfs_nid2str(fromnid), krx->krx_nob,
-                       msg_offset, rlen);
-                kqswnal_rx_decref(krx);
-                return -EPROTO;
-        }
-
-        if (kiov != NULL)
-                lnet_copy_kiov2kiov(niov, kiov, offset,
-                                    krx->krx_npages, krx->krx_kiov, 
-                                    msg_offset, mlen);
-        else
-                lnet_copy_kiov2iov(niov, iov, offset,
-                                   krx->krx_npages, krx->krx_kiov, 
-                                   msg_offset, mlen);
-
-        lnet_finalize(ni, lntmsg, 0);
-        kqswnal_rx_decref(krx);
-        return 0;
-}
-
-int
-kqswnal_thread_start(int (*fn)(void *arg), void *arg, char *name)
-{
-       struct task_struct *task = cfs_thread_run(fn, arg, name);
-
-       if (IS_ERR(task))
-               return PTR_ERR(task);
-
-       atomic_inc(&kqswnal_data.kqn_nthreads);
-       return 0;
-}
-
-void
-kqswnal_thread_fini (void)
-{
-       atomic_dec (&kqswnal_data.kqn_nthreads);
-}
-
-int
-kqswnal_scheduler (void *arg)
-{
-        kqswnal_rx_t    *krx;
-        kqswnal_tx_t    *ktx;
-        unsigned long    flags;
-        int              rc;
-        int              counter = 0;
-        int              did_something;
-
-        cfs_block_allsigs ();
-
-       spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
-
-        for (;;)
-        {
-                did_something = 0;
-
-                if (!cfs_list_empty (&kqswnal_data.kqn_readyrxds))
-                {
-                        krx = cfs_list_entry(kqswnal_data.kqn_readyrxds.next,
-                                             kqswnal_rx_t, krx_list);
-                        cfs_list_del (&krx->krx_list);
-                       spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
-                                                   flags);
-
-                        LASSERT (krx->krx_state == KRX_PARSE);
-                        kqswnal_parse (krx);
-
-                        did_something = 1;
-                       spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
-                                              flags);
-                }
-
-                if (!cfs_list_empty (&kqswnal_data.kqn_donetxds))
-                {
-                        ktx = cfs_list_entry(kqswnal_data.kqn_donetxds.next,
-                                             kqswnal_tx_t, ktx_schedlist);
-                        cfs_list_del_init (&ktx->ktx_schedlist);
-                       spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
-                                                   flags);
-
-                        kqswnal_tx_done_in_thread_context(ktx);
-
-                        did_something = 1;
-                       spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
-                                               flags);
-                }
-
-                if (!cfs_list_empty (&kqswnal_data.kqn_delayedtxds))
-                {
-                        ktx = cfs_list_entry(kqswnal_data.kqn_delayedtxds.next,
-                                             kqswnal_tx_t, ktx_schedlist);
-                        cfs_list_del_init (&ktx->ktx_schedlist);
-                       spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
-                                                   flags);
-
-                        rc = kqswnal_launch (ktx);
-                        if (rc != 0) {
-                                CERROR("Failed delayed transmit to %s: %d\n", 
-                                       libcfs_nid2str(ktx->ktx_nid), rc);
-                                kqswnal_tx_done (ktx, rc);
-                        }
-                       atomic_dec (&kqswnal_data.kqn_pending_txs);
-
-                        did_something = 1;
-                       spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
-                                               flags);
-                }
-
-                /* nothing to do or hogging CPU */
-                if (!did_something || counter++ == KQSW_RESCHED) {
-                       spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
-                                                   flags);
-
-                        counter = 0;
-
-                        if (!did_something) {
-                                if (kqswnal_data.kqn_shuttingdown == 2) {
-                                        /* We only exit in stage 2 of shutdown
-                                         * when there's nothing left to do */
-                                        break;
-                                }
-                               rc = wait_event_interruptible_exclusive (
-                                       kqswnal_data.kqn_sched_waitq,
-                                       kqswnal_data.kqn_shuttingdown == 2 ||
-                                       !cfs_list_empty(&kqswnal_data. \
-                                                       kqn_readyrxds) ||
-                                       !cfs_list_empty(&kqswnal_data. \
-                                                       kqn_donetxds) ||
-                                       !cfs_list_empty(&kqswnal_data. \
-                                                       kqn_delayedtxds));
-                               LASSERT (rc == 0);
-                       } else if (need_resched())
-                               schedule ();
-
-                       spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
-                                              flags);
-               }
-       }
-
-       kqswnal_thread_fini ();
-       return 0;
-}