Whamcloud - gitweb
LU-6209 lnet: Delete all obsolete LND drivers
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd_cb.c
diff --git a/lnet/klnds/ralnd/ralnd_cb.c b/lnet/klnds/ralnd/ralnd_cb.c
deleted file mode 100644 (file)
index f53be8d..0000000
+++ /dev/null
@@ -1,2078 +0,0 @@
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- *
- * Copyright (c) 2012, 2014, Intel Corporation.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lnet/klnds/ralnd/ralnd_cb.c
- *
- * Author: Eric Barton <eric@bartonsoftware.com>
- */
-
-#include <asm/page.h>
-#include "ralnd.h"
-
-void
-kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
-{
-        kra_device_t *dev;
-        int           i;
-        unsigned long flags;
-
-        CDEBUG(D_NET, "callback for device %d\n", devid);
-
-        for (i = 0; i < kranal_data.kra_ndevs; i++) {
-
-                dev = &kranal_data.kra_devices[i];
-                if (dev->rad_id != devid)
-                        continue;
-
-               spin_lock_irqsave(&dev->rad_lock, flags);
-
-               if (!dev->rad_ready) {
-                       dev->rad_ready = 1;
-                       wake_up(&dev->rad_waitq);
-               }
-
-               spin_unlock_irqrestore(&dev->rad_lock, flags);
-                return;
-        }
-
-        CWARN("callback for unknown device %d\n", devid);
-}
-
-void
-kranal_schedule_conn(kra_conn_t *conn)
-{
-        kra_device_t    *dev = conn->rac_device;
-        unsigned long    flags;
-
-       spin_lock_irqsave(&dev->rad_lock, flags);
-
-       if (!conn->rac_scheduled) {
-               kranal_conn_addref(conn);       /* +1 ref for scheduler */
-               conn->rac_scheduled = 1;
-               cfs_list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
-               wake_up(&dev->rad_waitq);
-       }
-
-       spin_unlock_irqrestore(&dev->rad_lock, flags);
-}
-
-kra_tx_t *
-kranal_get_idle_tx (void)
-{
-        unsigned long  flags;
-        kra_tx_t      *tx;
-
-       spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
-
-        if (cfs_list_empty(&kranal_data.kra_idle_txs)) {
-               spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
-                return NULL;
-        }
-
-        tx = cfs_list_entry(kranal_data.kra_idle_txs.next, kra_tx_t, tx_list);
-        cfs_list_del(&tx->tx_list);
-
-        /* Allocate a new completion cookie.  It might not be needed, but we've
-         * got a lock right now... */
-        tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
-
-       spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
-
-        LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
-        LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
-        LASSERT (tx->tx_conn == NULL);
-        LASSERT (tx->tx_lntmsg[0] == NULL);
-        LASSERT (tx->tx_lntmsg[1] == NULL);
-
-        return tx;
-}
-
-void
-kranal_init_msg(kra_msg_t *msg, int type)
-{
-        msg->ram_magic = RANAL_MSG_MAGIC;
-        msg->ram_version = RANAL_MSG_VERSION;
-        msg->ram_type = type;
-        msg->ram_srcnid = kranal_data.kra_ni->ni_nid;
-        /* ram_connstamp gets set when FMA is sent */
-}
-
-kra_tx_t *
-kranal_new_tx_msg (int type)
-{
-        kra_tx_t *tx = kranal_get_idle_tx();
-
-        if (tx != NULL)
-                kranal_init_msg(&tx->tx_msg, type);
-
-        return tx;
-}
-
-int
-kranal_setup_immediate_buffer (kra_tx_t *tx, 
-                               unsigned int niov, struct iovec *iov,
-                               int offset, int nob)
-
-{
-        /* For now this is almost identical to kranal_setup_virt_buffer, but we
-         * could "flatten" the payload into a single contiguous buffer ready
-         * for sending direct over an FMA if we ever needed to. */
-
-        LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
-        LASSERT (nob >= 0);
-
-        if (nob == 0) {
-                tx->tx_buffer = NULL;
-        } else {
-                LASSERT (niov > 0);
-
-                while (offset >= iov->iov_len) {
-                        offset -= iov->iov_len;
-                        niov--;
-                        iov++;
-                        LASSERT (niov > 0);
-                }
-
-                if (nob > iov->iov_len - offset) {
-                        CERROR("Can't handle multiple vaddr fragments\n");
-                        return -EMSGSIZE;
-                }
-
-                tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
-        }
-
-        tx->tx_buftype = RANAL_BUF_IMMEDIATE;
-        tx->tx_nob = nob;
-        return 0;
-}
-
-int
-kranal_setup_virt_buffer (kra_tx_t *tx, 
-                          unsigned int niov, struct iovec *iov,
-                          int offset, int nob)
-
-{
-        LASSERT (nob > 0);
-        LASSERT (niov > 0);
-        LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
-
-        while (offset >= iov->iov_len) {
-                offset -= iov->iov_len;
-                niov--;
-                iov++;
-                LASSERT (niov > 0);
-        }
-
-        if (nob > iov->iov_len - offset) {
-                CERROR("Can't handle multiple vaddr fragments\n");
-                return -EMSGSIZE;
-        }
-
-        tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
-        tx->tx_nob = nob;
-        tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
-        return 0;
-}
-
-int
-kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
-                          int offset, int nob)
-{
-        RAP_PHYS_REGION *phys = tx->tx_phys;
-        int              resid;
-
-        CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
-
-        LASSERT (nob > 0);
-        LASSERT (nkiov > 0);
-        LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
-
-        while (offset >= kiov->kiov_len) {
-                offset -= kiov->kiov_len;
-                nkiov--;
-                kiov++;
-                LASSERT (nkiov > 0);
-        }
-
-        tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
-        tx->tx_nob = nob;
-        tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
-
-       phys->Address = page_to_phys(kiov->kiov_page);
-        phys++;
-
-        resid = nob - (kiov->kiov_len - offset);
-        while (resid > 0) {
-                kiov++;
-                nkiov--;
-                LASSERT (nkiov > 0);
-
-                if (kiov->kiov_offset != 0 ||
-                    ((resid > PAGE_SIZE) &&
-                     kiov->kiov_len < PAGE_SIZE)) {
-                        /* Can't have gaps */
-                        CERROR("Can't make payload contiguous in I/O VM:"
-                               "page %d, offset %d, len %d \n",
-                               (int)(phys - tx->tx_phys),
-                               kiov->kiov_offset, kiov->kiov_len);
-                        return -EINVAL;
-                }
-
-                if ((phys - tx->tx_phys) == LNET_MAX_IOV) {
-                        CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
-                        return -EMSGSIZE;
-                }
-
-               phys->Address = page_to_phys(kiov->kiov_page);
-                phys++;
-
-                resid -= PAGE_SIZE;
-        }
-
-        tx->tx_phys_npages = phys - tx->tx_phys;
-        return 0;
-}
-
-static inline int
-kranal_setup_rdma_buffer (kra_tx_t *tx, unsigned int niov,
-                          struct iovec *iov, lnet_kiov_t *kiov,
-                          int offset, int nob)
-{
-        LASSERT ((iov == NULL) != (kiov == NULL));
-
-        if (kiov != NULL)
-                return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
-
-        return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
-}
-
-int
-kranal_map_buffer (kra_tx_t *tx)
-{
-        kra_conn_t     *conn = tx->tx_conn;
-        kra_device_t   *dev = conn->rac_device;
-        RAP_RETURN      rrc;
-
-        LASSERT (current == dev->rad_scheduler);
-
-        switch (tx->tx_buftype) {
-        default:
-                LBUG();
-
-        case RANAL_BUF_NONE:
-        case RANAL_BUF_IMMEDIATE:
-        case RANAL_BUF_PHYS_MAPPED:
-        case RANAL_BUF_VIRT_MAPPED:
-                return 0;
-
-        case RANAL_BUF_PHYS_UNMAPPED:
-                rrc = RapkRegisterPhys(dev->rad_handle,
-                                       tx->tx_phys, tx->tx_phys_npages,
-                                       &tx->tx_map_key);
-                if (rrc != RAP_SUCCESS) {
-                        CERROR ("Can't map %d pages: dev %d "
-                                "phys %u pp %u, virt %u nob %lu\n",
-                                tx->tx_phys_npages, dev->rad_id, 
-                                dev->rad_nphysmap, dev->rad_nppphysmap,
-                                dev->rad_nvirtmap, dev->rad_nobvirtmap);
-                        return -ENOMEM; /* assume insufficient resources */
-                }
-
-                dev->rad_nphysmap++;
-                dev->rad_nppphysmap += tx->tx_phys_npages;
-
-                tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
-                return 0;
-
-        case RANAL_BUF_VIRT_UNMAPPED:
-                rrc = RapkRegisterMemory(dev->rad_handle,
-                                         tx->tx_buffer, tx->tx_nob,
-                                         &tx->tx_map_key);
-                if (rrc != RAP_SUCCESS) {
-                        CERROR ("Can't map %d bytes: dev %d "
-                                "phys %u pp %u, virt %u nob %lu\n",
-                                tx->tx_nob, dev->rad_id, 
-                                dev->rad_nphysmap, dev->rad_nppphysmap,
-                                dev->rad_nvirtmap, dev->rad_nobvirtmap);
-                        return -ENOMEM; /* assume insufficient resources */
-                }
-
-                dev->rad_nvirtmap++;
-                dev->rad_nobvirtmap += tx->tx_nob;
-
-                tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
-                return 0;
-        }
-}
-
-void
-kranal_unmap_buffer (kra_tx_t *tx)
-{
-        kra_device_t   *dev;
-        RAP_RETURN      rrc;
-
-        switch (tx->tx_buftype) {
-        default:
-                LBUG();
-
-        case RANAL_BUF_NONE:
-        case RANAL_BUF_IMMEDIATE:
-        case RANAL_BUF_PHYS_UNMAPPED:
-        case RANAL_BUF_VIRT_UNMAPPED:
-                break;
-
-        case RANAL_BUF_PHYS_MAPPED:
-                LASSERT (tx->tx_conn != NULL);
-                dev = tx->tx_conn->rac_device;
-                LASSERT (current == dev->rad_scheduler);
-                rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
-                                           &tx->tx_map_key);
-                LASSERT (rrc == RAP_SUCCESS);
-
-                dev->rad_nphysmap--;
-                dev->rad_nppphysmap -= tx->tx_phys_npages;
-
-                tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
-                break;
-
-        case RANAL_BUF_VIRT_MAPPED:
-                LASSERT (tx->tx_conn != NULL);
-                dev = tx->tx_conn->rac_device;
-                LASSERT (current == dev->rad_scheduler);
-                rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
-                                           &tx->tx_map_key);
-                LASSERT (rrc == RAP_SUCCESS);
-
-                dev->rad_nvirtmap--;
-                dev->rad_nobvirtmap -= tx->tx_nob;
-
-                tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
-                break;
-        }
-}
-
-void
-kranal_tx_done (kra_tx_t *tx, int completion)
-{
-       lnet_msg_t      *lnetmsg[2];
-       unsigned long    flags;
-       int              i;
-
-       LASSERT (!in_interrupt());
-
-       kranal_unmap_buffer(tx);
-
-       lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
-       lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
-
-       tx->tx_buftype = RANAL_BUF_NONE;
-       tx->tx_msg.ram_type = RANAL_MSG_NONE;
-       tx->tx_conn = NULL;
-
-       spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
-
-       cfs_list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
-
-       spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
-
-       /* finalize AFTER freeing lnet msgs */
-       for (i = 0; i < 2; i++) {
-               if (lnetmsg[i] == NULL)
-                       continue;
-
-               lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion);
-       }
-}
-
-kra_conn_t *
-kranal_find_conn_locked (kra_peer_t *peer)
-{
-        cfs_list_t *tmp;
-
-        /* just return the first connection */
-        cfs_list_for_each (tmp, &peer->rap_conns) {
-                return cfs_list_entry(tmp, kra_conn_t, rac_list);
-        }
-
-        return NULL;
-}
-
-void
-kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
-{
-        unsigned long    flags;
-
-        tx->tx_conn = conn;
-
-       spin_lock_irqsave(&conn->rac_lock, flags);
-        cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
-        tx->tx_qtime = jiffies;
-       spin_unlock_irqrestore(&conn->rac_lock, flags);
-
-        kranal_schedule_conn(conn);
-}
-
-void
-kranal_launch_tx (kra_tx_t *tx, lnet_nid_t nid)
-{
-        unsigned long    flags;
-        kra_peer_t      *peer;
-        kra_conn_t      *conn;
-        int              rc;
-        int              retry;
-       rwlock_t    *g_lock = &kranal_data.kra_global_lock;
-
-        /* If I get here, I've committed to send, so I complete the tx with
-         * failure on any problems */
-
-        LASSERT (tx->tx_conn == NULL);      /* only set when assigned a conn */
-
-        for (retry = 0; ; retry = 1) {
-
-               read_lock(g_lock);
-
-                peer = kranal_find_peer_locked(nid);
-                if (peer != NULL) {
-                        conn = kranal_find_conn_locked(peer);
-                        if (conn != NULL) {
-                                kranal_post_fma(conn, tx);
-                               read_unlock(g_lock);
-                                return;
-                        }
-                }
-                
-                /* Making connections; I'll need a write lock... */
-               read_unlock(g_lock);
-               write_lock_irqsave(g_lock, flags);
-
-                peer = kranal_find_peer_locked(nid);
-                if (peer != NULL)
-                        break;
-                
-               write_unlock_irqrestore(g_lock, flags);
-                
-                if (retry) {
-                        CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
-                        kranal_tx_done(tx, -EHOSTUNREACH);
-                        return;
-                }
-
-                rc = kranal_add_persistent_peer(nid, LNET_NIDADDR(nid),
-                                                lnet_acceptor_port());
-                if (rc != 0) {
-                        CERROR("Can't add peer %s: %d\n",
-                               libcfs_nid2str(nid), rc);
-                        kranal_tx_done(tx, rc);
-                        return;
-                }
-        }
-        
-        conn = kranal_find_conn_locked(peer);
-        if (conn != NULL) {
-                /* Connection exists; queue message on it */
-                kranal_post_fma(conn, tx);
-               write_unlock_irqrestore(g_lock, flags);
-                return;
-        }
-                        
-        LASSERT (peer->rap_persistence > 0);
-
-        if (!peer->rap_connecting) {
-                LASSERT (cfs_list_empty(&peer->rap_tx_queue));
-
-                if (!(peer->rap_reconnect_interval == 0 || /* first attempt */
-                      cfs_time_aftereq(jiffies, peer->rap_reconnect_time))) {
-                       write_unlock_irqrestore(g_lock, flags);
-                        kranal_tx_done(tx, -EHOSTUNREACH);
-                        return;
-                }
-
-                peer->rap_connecting = 1;
-                kranal_peer_addref(peer); /* extra ref for connd */
-
-               spin_lock(&kranal_data.kra_connd_lock);
-
-               cfs_list_add_tail(&peer->rap_connd_list,
-                             &kranal_data.kra_connd_peers);
-               wake_up(&kranal_data.kra_connd_waitq);
-
-               spin_unlock(&kranal_data.kra_connd_lock);
-        }
-
-        /* A connection is being established; queue the message... */
-        cfs_list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
-
-       write_unlock_irqrestore(g_lock, flags);
-}
-
-void
-kranal_rdma(kra_tx_t *tx, int type,
-            kra_rdma_desc_t *sink, int nob, __u64 cookie)
-{
-        kra_conn_t   *conn = tx->tx_conn;
-        RAP_RETURN    rrc;
-        unsigned long flags;
-
-        LASSERT (kranal_tx_mapped(tx));
-        LASSERT (nob <= sink->rard_nob);
-        LASSERT (nob <= tx->tx_nob);
-
-        /* No actual race with scheduler sending CLOSE (I'm she!) */
-        LASSERT (current == conn->rac_device->rad_scheduler);
-
-        memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
-        tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
-        tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
-        tx->tx_rdma_desc.DstPtr = sink->rard_addr;
-        tx->tx_rdma_desc.DstKey = sink->rard_key;
-        tx->tx_rdma_desc.Length = nob;
-        tx->tx_rdma_desc.AppPtr = tx;
-
-        /* prep final completion message */
-        kranal_init_msg(&tx->tx_msg, type);
-        tx->tx_msg.ram_u.completion.racm_cookie = cookie;
-
-        if (nob == 0) { /* Immediate completion */
-                kranal_post_fma(conn, tx);
-                return;
-        }
-
-        LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
-
-        rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
-        LASSERT (rrc == RAP_SUCCESS);
-
-       spin_lock_irqsave(&conn->rac_lock, flags);
-        cfs_list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
-        tx->tx_qtime = jiffies;
-       spin_unlock_irqrestore(&conn->rac_lock, flags);
-}
-
-int
-kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
-{
-        __u32      nob_received = nob;
-        RAP_RETURN rrc;
-
-        LASSERT (conn->rac_rxmsg != NULL);
-        CDEBUG(D_NET, "Consuming %p\n", conn);
-
-        rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
-                             &nob_received, sizeof(kra_msg_t));
-        LASSERT (rrc == RAP_SUCCESS);
-
-        conn->rac_rxmsg = NULL;
-
-        if (nob_received < nob) {
-                CWARN("Incomplete immediate msg from %s: expected %d, got %d\n",
-                      libcfs_nid2str(conn->rac_peer->rap_nid), 
-                      nob, nob_received);
-                return -EPROTO;
-        }
-
-        return 0;
-}
-
-int
-kranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
-{
-        lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
-        int               type = lntmsg->msg_type;
-        lnet_process_id_t target = lntmsg->msg_target;
-        int               target_is_router = lntmsg->msg_target_is_router;
-        int               routing = lntmsg->msg_routing;
-        unsigned int      niov = lntmsg->msg_niov;
-        struct iovec     *iov = lntmsg->msg_iov;
-        lnet_kiov_t      *kiov = lntmsg->msg_kiov;
-        unsigned int      offset = lntmsg->msg_offset;
-        unsigned int      nob = lntmsg->msg_len;
-        kra_tx_t         *tx;
-        int               rc;
-
-        /* NB 'private' is different depending on what we're sending.... */
-
-       CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
-              nob, niov, libcfs_id2str(target));
-
-       LASSERT (nob == 0 || niov > 0);
-       LASSERT (niov <= LNET_MAX_IOV);
-
-       LASSERT (!in_interrupt());
-       /* payload is either all vaddrs or all pages */
-       LASSERT (!(kiov != NULL && iov != NULL));
-
-       if (routing) {
-               CERROR ("Can't route\n");
-               return -EIO;
-       }
-
-        switch(type) {
-        default:
-                LBUG();
-
-        case LNET_MSG_ACK:
-                LASSERT (nob == 0);
-                break;
-
-        case LNET_MSG_GET:
-                LASSERT (niov == 0);
-                LASSERT (nob == 0);
-                /* We have to consider the eventual sink buffer rather than any
-                 * payload passed here (there isn't any, and strictly, looking
-                 * inside lntmsg is a layering violation).  We send a simple
-                 * IMMEDIATE GET if the sink buffer is mapped already and small
-                 * enough for FMA */
-
-                if (routing || target_is_router)
-                        break;                  /* send IMMEDIATE */
-
-                if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0 &&
-                    lntmsg->msg_md->md_length <= RANAL_FMA_MAX_DATA &&
-                    lntmsg->msg_md->md_length <= *kranal_tunables.kra_max_immediate)
-                        break;                  /* send IMMEDIATE */
-
-                tx = kranal_new_tx_msg(RANAL_MSG_GET_REQ);
-                if (tx == NULL)
-                        return -ENOMEM;
-
-                if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
-                        rc = kranal_setup_virt_buffer(tx, lntmsg->msg_md->md_niov,
-                                                      lntmsg->msg_md->md_iov.iov,
-                                                      0, lntmsg->msg_md->md_length);
-                else
-                        rc = kranal_setup_phys_buffer(tx, lntmsg->msg_md->md_niov,
-                                                      lntmsg->msg_md->md_iov.kiov,
-                                                      0, lntmsg->msg_md->md_length);
-                if (rc != 0) {
-                        kranal_tx_done(tx, rc);
-                        return -EIO;
-                }
-
-                tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
-                if (tx->tx_lntmsg[1] == NULL) {
-                        CERROR("Can't create reply for GET to %s\n", 
-                               libcfs_nid2str(target.nid));
-                        kranal_tx_done(tx, rc);
-                        return -EIO;
-                }
-
-                tx->tx_lntmsg[0] = lntmsg;
-                tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
-                /* rest of tx_msg is setup just before it is sent */
-                kranal_launch_tx(tx, target.nid);
-                return 0;
-
-        case LNET_MSG_REPLY:
-        case LNET_MSG_PUT:
-                if (kiov == NULL &&             /* not paged */
-                    nob <= RANAL_FMA_MAX_DATA && /* small enough */
-                    nob <= *kranal_tunables.kra_max_immediate)
-                        break;                  /* send IMMEDIATE */
-
-                tx = kranal_new_tx_msg(RANAL_MSG_PUT_REQ);
-                if (tx == NULL)
-                        return -ENOMEM;
-
-                rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
-                if (rc != 0) {
-                        kranal_tx_done(tx, rc);
-                        return -EIO;
-                }
-
-                tx->tx_lntmsg[0] = lntmsg;
-                tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
-                /* rest of tx_msg is setup just before it is sent */
-                kranal_launch_tx(tx, target.nid);
-                return 0;
-        }
-
-        /* send IMMEDIATE */
-
-        LASSERT (kiov == NULL);
-        LASSERT (nob <= RANAL_FMA_MAX_DATA);
-
-        tx = kranal_new_tx_msg(RANAL_MSG_IMMEDIATE);
-        if (tx == NULL)
-                return -ENOMEM;
-
-        rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
-        if (rc != 0) {
-                kranal_tx_done(tx, rc);
-                return -EIO;
-        }
-
-        tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
-        tx->tx_lntmsg[0] = lntmsg;
-        kranal_launch_tx(tx, target.nid);
-        return 0;
-}
-
-void
-kranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg)
-{
-        kra_msg_t     *rxmsg = conn->rac_rxmsg;
-        unsigned int   niov = lntmsg->msg_niov;
-        struct iovec  *iov = lntmsg->msg_iov;
-        lnet_kiov_t   *kiov = lntmsg->msg_kiov;
-        unsigned int   offset = lntmsg->msg_offset;
-        unsigned int   nob = lntmsg->msg_len;
-        kra_tx_t      *tx;
-        int            rc;
-
-        tx = kranal_get_idle_tx();
-        if (tx == NULL)
-                goto failed_0;
-
-        rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
-        if (rc != 0)
-                goto failed_1;
-
-        tx->tx_conn = conn;
-
-        rc = kranal_map_buffer(tx);
-        if (rc != 0)
-                goto failed_1;
-
-        tx->tx_lntmsg[0] = lntmsg;
-
-        kranal_rdma(tx, RANAL_MSG_GET_DONE,
-                    &rxmsg->ram_u.get.ragm_desc, nob,
-                    rxmsg->ram_u.get.ragm_cookie);
-        return;
-
- failed_1:
-        kranal_tx_done(tx, -EIO);
- failed_0:
-        lnet_finalize(ni, lntmsg, -EIO);
-}
-
-int
-kranal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
-                   void **new_private)
-{
-        kra_conn_t *conn = (kra_conn_t *)private;
-
-        LCONSOLE_ERROR_MSG(0x12b, "Dropping message from %s: no buffers free.\n",
-                           libcfs_nid2str(conn->rac_peer->rap_nid));
-
-        return -EDEADLK;
-}
-
-int
-kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
-             int delayed, unsigned int niov, 
-             struct iovec *iov, lnet_kiov_t *kiov,
-             unsigned int offset, unsigned int mlen, unsigned int rlen)
-{
-       kra_conn_t  *conn = private;
-       kra_msg_t   *rxmsg = conn->rac_rxmsg;
-       kra_tx_t    *tx;
-       void        *buffer;
-       int          rc;
-
-       LASSERT (mlen <= rlen);
-       LASSERT (!in_interrupt());
-       /* Either all pages or all vaddrs */
-       LASSERT (!(kiov != NULL && iov != NULL));
-
-       CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg);
-
-        switch(rxmsg->ram_type) {
-        default:
-                LBUG();
-
-        case RANAL_MSG_IMMEDIATE:
-                if (mlen == 0) {
-                        buffer = NULL;
-                } else if (kiov != NULL) {
-                        CERROR("Can't recv immediate into paged buffer\n");
-                        return -EIO;
-                } else {
-                        LASSERT (niov > 0);
-                        while (offset >= iov->iov_len) {
-                                offset -= iov->iov_len;
-                                iov++;
-                                niov--;
-                                LASSERT (niov > 0);
-                        }
-                        if (mlen > iov->iov_len - offset) {
-                                CERROR("Can't handle immediate frags\n");
-                                return -EIO;
-                        }
-                        buffer = ((char *)iov->iov_base) + offset;
-                }
-                rc = kranal_consume_rxmsg(conn, buffer, mlen);
-                lnet_finalize(ni, lntmsg, (rc == 0) ? 0 : -EIO);
-                return 0;
-
-        case RANAL_MSG_PUT_REQ:
-                tx = kranal_new_tx_msg(RANAL_MSG_PUT_ACK);
-                if (tx == NULL) {
-                        kranal_consume_rxmsg(conn, NULL, 0);
-                        return -ENOMEM;
-                }
-                
-                rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
-                if (rc != 0) {
-                        kranal_tx_done(tx, rc);
-                        kranal_consume_rxmsg(conn, NULL, 0);
-                        return -EIO;
-                }
-
-                tx->tx_conn = conn;
-                rc = kranal_map_buffer(tx);
-                if (rc != 0) {
-                        kranal_tx_done(tx, rc);
-                        kranal_consume_rxmsg(conn, NULL, 0);
-                        return -EIO;
-                }
-
-                tx->tx_msg.ram_u.putack.rapam_src_cookie =
-                        conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
-                tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
-                tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
-                tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
-                        (__u64)((unsigned long)tx->tx_buffer);
-                tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
-
-                tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
-
-                kranal_post_fma(conn, tx);
-                kranal_consume_rxmsg(conn, NULL, 0);
-                return 0;
-
-        case RANAL_MSG_GET_REQ:
-                if (lntmsg != NULL) {
-                        /* Matched! */
-                        kranal_reply(ni, conn, lntmsg);
-                } else {
-                        /* No match */
-                        tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);
-                        if (tx != NULL) {
-                                tx->tx_msg.ram_u.completion.racm_cookie =
-                                        rxmsg->ram_u.get.ragm_cookie;
-                                kranal_post_fma(conn, tx);
-                        }
-                }
-                kranal_consume_rxmsg(conn, NULL, 0);
-                return 0;
-        }
-}
-
-int
-kranal_thread_start(int(*fn)(void *arg), void *arg, char *name)
-{
-       struct task_struct *task = cfs_thread_run(fn, arg, name);
-
-       if (!IS_ERR(task))
-               atomic_inc(&kranal_data.kra_nthreads);
-       return PTR_ERR(task);
-}
-
-void
-kranal_thread_fini (void)
-{
-       atomic_dec(&kranal_data.kra_nthreads);
-}
-
-int
-kranal_check_conn_timeouts (kra_conn_t *conn)
-{
-        kra_tx_t          *tx;
-        cfs_list_t        *ttmp;
-        unsigned long      flags;
-        long               timeout;
-        unsigned long      now = jiffies;
-
-        LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
-                 conn->rac_state == RANAL_CONN_CLOSING);
-
-       if (!conn->rac_close_sent &&
-           cfs_time_aftereq(now, conn->rac_last_tx +
-                            msecs_to_jiffies(conn->rac_keepalive *
-                                             MSEC_PER_SEC))) {
-               /* not sent in a while; schedule conn so scheduler sends a keepalive */
-               CDEBUG(D_NET, "Scheduling keepalive %p->%s\n",
-                      conn, libcfs_nid2str(conn->rac_peer->rap_nid));
-               kranal_schedule_conn(conn);
-       }
-
-       timeout = msecs_to_jiffies(conn->rac_timeout * MSEC_PER_SEC);
-
-       if (!conn->rac_close_recvd &&
-           cfs_time_aftereq(now, conn->rac_last_rx + timeout)) {
-               CERROR("%s received from %s within %lu seconds\n",
-                      (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
-                      "Nothing" : "CLOSE not",
-                      libcfs_nid2str(conn->rac_peer->rap_nid),
-                      jiffies_to_msecs(now - conn->rac_last_rx)/MSEC_PER_SEC);
-               return -ETIMEDOUT;
-       }
-
-        if (conn->rac_state != RANAL_CONN_ESTABLISHED)
-                return 0;
-
-        /* Check the conn's queues are moving.  These are "belt+braces" checks,
-         * in case of hardware/software errors that make this conn seem
-         * responsive even though it isn't progressing its message queues. */
-
-       spin_lock_irqsave(&conn->rac_lock, flags);
-
-       cfs_list_for_each (ttmp, &conn->rac_fmaq) {
-               tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
-
-               if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
-                       spin_unlock_irqrestore(&conn->rac_lock, flags);
-                       CERROR("tx on fmaq for %s blocked %lu seconds\n",
-                              libcfs_nid2str(conn->rac_peer->rap_nid),
-                              jiffies_to_msecs(now-tx->tx_qtime)/MSEC_PER_SEC);
-                       return -ETIMEDOUT;
-               }
-       }
-
-       cfs_list_for_each (ttmp, &conn->rac_rdmaq) {
-               tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
-
-               if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
-                       spin_unlock_irqrestore(&conn->rac_lock, flags);
-                       CERROR("tx on rdmaq for %s blocked %lu seconds\n",
-                              libcfs_nid2str(conn->rac_peer->rap_nid),
-                              jiffies_to_msecs(now-tx->tx_qtime)/MSEC_PER_SEC);
-                       return -ETIMEDOUT;
-               }
-       }
-
-       cfs_list_for_each (ttmp, &conn->rac_replyq) {
-               tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
-
-               if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
-                       spin_unlock_irqrestore(&conn->rac_lock, flags);
-                       CERROR("tx on replyq for %s blocked %lu seconds\n",
-                              libcfs_nid2str(conn->rac_peer->rap_nid),
-                              jiffies_to_msecs(now-tx->tx_qtime)/MSEC_PER_SEC);
-                       return -ETIMEDOUT;
-               }
-       }
-
-       spin_unlock_irqrestore(&conn->rac_lock, flags);
-        return 0;
-}
-
-void
-kranal_reaper_check (int idx, unsigned long *min_timeoutp)
-{
-        cfs_list_t        *conns = &kranal_data.kra_conns[idx];
-        cfs_list_t        *ctmp;
-        kra_conn_t        *conn;
-        unsigned long      flags;
-        int                rc;
-
- again:
-        /* NB. We expect to check all the conns and not find any problems, so
-         * we just use a shared lock while we take a look... */
-       read_lock(&kranal_data.kra_global_lock);
-
-        cfs_list_for_each (ctmp, conns) {
-                conn = cfs_list_entry(ctmp, kra_conn_t, rac_hashlist);
-
-                if (conn->rac_timeout < *min_timeoutp )
-                        *min_timeoutp = conn->rac_timeout;
-                if (conn->rac_keepalive < *min_timeoutp )
-                        *min_timeoutp = conn->rac_keepalive;
-
-                rc = kranal_check_conn_timeouts(conn);
-                if (rc == 0)
-                        continue;
-
-                kranal_conn_addref(conn);
-               read_unlock(&kranal_data.kra_global_lock);
-
-                CERROR("Conn to %s, cqid %d timed out\n",
-                       libcfs_nid2str(conn->rac_peer->rap_nid), 
-                       conn->rac_cqid);
-
-               write_lock_irqsave(&kranal_data.kra_global_lock, flags);
-
-                switch (conn->rac_state) {
-                default:
-                        LBUG();
-
-                case RANAL_CONN_ESTABLISHED:
-                        kranal_close_conn_locked(conn, -ETIMEDOUT);
-                        break;
-
-                case RANAL_CONN_CLOSING:
-                        kranal_terminate_conn_locked(conn);
-                        break;
-                }
-
-               write_unlock_irqrestore(&kranal_data.kra_global_lock,
-                                            flags);
-
-                kranal_conn_decref(conn);
-
-                /* start again now I've dropped the lock */
-                goto again;
-        }
-
-       read_unlock(&kranal_data.kra_global_lock);
-}
-
-int
-kranal_connd (void *arg)
-{
-       long               id = (long)arg;
-       wait_queue_t     wait;
-       unsigned long      flags;
-       kra_peer_t        *peer;
-       kra_acceptsock_t  *ras;
-       int                did_something;
-
-       cfs_block_allsigs();
-
-       init_waitqueue_entry_current(&wait);
-
-       spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
-
-       while (!kranal_data.kra_shutdown) {
-               did_something = 0;
-
-               if (!cfs_list_empty(&kranal_data.kra_connd_acceptq)) {
-                       ras = cfs_list_entry(kranal_data.kra_connd_acceptq.next,
-                                            kra_acceptsock_t, ras_list);
-                       cfs_list_del(&ras->ras_list);
-
-                       spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
-                                                  flags);
-
-                       CDEBUG(D_NET,"About to handshake someone\n");
-
-                       kranal_conn_handshake(ras->ras_sock, NULL);
-                       kranal_free_acceptsock(ras);
-
-                       CDEBUG(D_NET,"Finished handshaking someone\n");
-
-                       spin_lock_irqsave(&kranal_data.kra_connd_lock,
-                                             flags);
-                       did_something = 1;
-               }
-
-               if (!cfs_list_empty(&kranal_data.kra_connd_peers)) {
-                       peer = cfs_list_entry(kranal_data.kra_connd_peers.next,
-                                             kra_peer_t, rap_connd_list);
-
-                       cfs_list_del_init(&peer->rap_connd_list);
-                       spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
-                                                  flags);
-
-                       kranal_connect(peer);
-                       kranal_peer_decref(peer);
-
-                       spin_lock_irqsave(&kranal_data.kra_connd_lock,
-                                             flags);
-                       did_something = 1;
-               }
-
-               if (did_something)
-                       continue;
-
-               set_current_state(TASK_INTERRUPTIBLE);
-               add_wait_queue_exclusive(&kranal_data.kra_connd_waitq, &wait);
-
-               spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
-
-               waitq_wait(&wait, TASK_INTERRUPTIBLE);
-
-               set_current_state(TASK_RUNNING);
-               remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
-
-               spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
-       }
-
-       spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
-
-       kranal_thread_fini();
-       return 0;
-}
-
-void
-kranal_update_reaper_timeout(long timeout)
-{
-        unsigned long   flags;
-
-        LASSERT (timeout > 0);
-
-       spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
-
-        if (timeout < kranal_data.kra_new_min_timeout)
-                kranal_data.kra_new_min_timeout = timeout;
-
-       spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
-}
-
-int
-kranal_reaper (void *arg)
-{
-       wait_queue_t     wait;
-       unsigned long      flags;
-       long               timeout;
-       int                i;
-       int                conn_entries = kranal_data.kra_conn_hash_size;
-       int                conn_index = 0;
-       int                base_index = conn_entries - 1;
-       unsigned long      next_check_time = jiffies;
-       long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
-       long               current_min_timeout = 1;
-
-       cfs_block_allsigs();
-
-       init_waitqueue_entry_current(&wait);
-
-       spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
-
-       while (!kranal_data.kra_shutdown) {
-               /* I wake up every 'p' seconds to check for timeouts on some
-                * more peers.  I try to check every connection 'n' times
-                * within the global minimum of all keepalive and timeout
-                * intervals, to ensure I attend to every connection within
-                * (n+1)/n times its timeout intervals. */
-               const int     p = 1;
-               const int     n = 3;
-               unsigned long min_timeout;
-               int           chunk;
-
-               /* careful with the jiffy wrap... */
-               timeout = (long)(next_check_time - jiffies);
-               if (timeout > 0) {
-                       set_current_state(TASK_INTERRUPTIBLE);
-                       add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
-
-                       spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
-                                                  flags);
-
-                       waitq_timedwait(&wait, TASK_INTERRUPTIBLE,
-                                           timeout);
-
-                       spin_lock_irqsave(&kranal_data.kra_reaper_lock,
-                                             flags);
-
-                       set_current_state(TASK_RUNNING);
-                       remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
-                       continue;
-               }
-
-               if (kranal_data.kra_new_min_timeout !=
-                   MAX_SCHEDULE_TIMEOUT) {
-                       /* new min timeout set: restart min timeout scan */
-                       next_min_timeout = MAX_SCHEDULE_TIMEOUT;
-                       base_index = conn_index - 1;
-                       if (base_index < 0)
-                               base_index = conn_entries - 1;
-
-                       if (kranal_data.kra_new_min_timeout <
-                           current_min_timeout) {
-                               current_min_timeout =
-                                       kranal_data.kra_new_min_timeout;
-                               CDEBUG(D_NET, "Set new min timeout %ld\n",
-                                      current_min_timeout);
-                       }
-
-                       kranal_data.kra_new_min_timeout =
-                               MAX_SCHEDULE_TIMEOUT;
-               }
-               min_timeout = current_min_timeout;
-
-               spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
-
-               LASSERT (min_timeout > 0);
-
-               /* Compute how many table entries to check now so I get round
-                * the whole table fast enough given that I do this at fixed
-                * intervals of 'p' seconds) */
-               chunk = conn_entries;
-               if (min_timeout > n * p)
-                       chunk = (chunk * n * p) / min_timeout;
-               if (chunk == 0)
-                       chunk = 1;
-
-               for (i = 0; i < chunk; i++) {
-                       kranal_reaper_check(conn_index,
-                                           &next_min_timeout);
-                       conn_index = (conn_index + 1) % conn_entries;
-               }
-
-               next_check_time += msecs_to_jiffies(p * MSEC_PER_SEC);
-
-               spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
-
-               if (((conn_index - chunk <= base_index &&
-                     base_index < conn_index) ||
-                    (conn_index - conn_entries - chunk <= base_index &&
-                     base_index < conn_index - conn_entries))) {
-
-                       /* Scanned all conns: set current_min_timeout... */
-                       if (current_min_timeout != next_min_timeout) {
-                               current_min_timeout = next_min_timeout;
-                               CDEBUG(D_NET, "Set new min timeout %ld\n",
-                                      current_min_timeout);
-                       }
-
-                       /* ...and restart min timeout scan */
-                       next_min_timeout = MAX_SCHEDULE_TIMEOUT;
-                       base_index = conn_index - 1;
-                       if (base_index < 0)
-                               base_index = conn_entries - 1;
-               }
-       }
-
-       kranal_thread_fini();
-       return 0;
-}
-
-void
-kranal_check_rdma_cq (kra_device_t *dev)
-{
-        kra_conn_t          *conn;
-        kra_tx_t            *tx;
-        RAP_RETURN           rrc;
-        unsigned long        flags;
-        RAP_RDMA_DESCRIPTOR *desc;
-        __u32                cqid;
-        __u32                event_type;
-
-        for (;;) {
-                rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
-                if (rrc == RAP_NOT_DONE) {
-                        CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
-                        return;
-                }
-
-                LASSERT (rrc == RAP_SUCCESS);
-                LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
-
-               read_lock(&kranal_data.kra_global_lock);
-
-                conn = kranal_cqid2conn_locked(cqid);
-                if (conn == NULL) {
-                        /* Conn was destroyed? */
-                        CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
-                       read_unlock(&kranal_data.kra_global_lock);
-                        continue;
-                }
-
-                rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
-                LASSERT (rrc == RAP_SUCCESS);
-
-                CDEBUG(D_NET, "Completed %p\n",
-                       cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
-
-               spin_lock_irqsave(&conn->rac_lock, flags);
-
-                LASSERT (!cfs_list_empty(&conn->rac_rdmaq));
-                tx = cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
-                cfs_list_del(&tx->tx_list);
-
-                LASSERT(desc->AppPtr == (void *)tx);
-                LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
-                        tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
-
-                cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
-                tx->tx_qtime = jiffies;
-
-               spin_unlock_irqrestore(&conn->rac_lock, flags);
-
-                /* Get conn's fmaq processed, now I've just put something
-                 * there */
-                kranal_schedule_conn(conn);
-
-               read_unlock(&kranal_data.kra_global_lock);
-        }
-}
-
-void
-kranal_check_fma_cq (kra_device_t *dev)
-{
-        kra_conn_t         *conn;
-        RAP_RETURN          rrc;
-        __u32               cqid;
-        __u32               event_type;
-        cfs_list_t         *conns;
-        cfs_list_t         *tmp;
-        int                 i;
-
-        for (;;) {
-                rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
-                if (rrc == RAP_NOT_DONE) {
-                        CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
-                        return;
-                }
-
-                LASSERT (rrc == RAP_SUCCESS);
-
-                if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
-
-                       read_lock(&kranal_data.kra_global_lock);
-
-                        conn = kranal_cqid2conn_locked(cqid);
-                        if (conn == NULL) {
-                                CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
-                                       cqid);
-                        } else {
-                                CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
-                                       conn, cqid);
-                                kranal_schedule_conn(conn);
-                        }
-
-                       read_unlock(&kranal_data.kra_global_lock);
-                        continue;
-                }
-
-                /* FMA CQ has overflowed: check ALL conns */
-                CWARN("FMA CQ overflow: scheduling ALL conns on device %d\n", 
-                      dev->rad_id);
-
-                for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
-
-                       read_lock(&kranal_data.kra_global_lock);
-
-                        conns = &kranal_data.kra_conns[i];
-
-                        cfs_list_for_each (tmp, conns) {
-                                conn = cfs_list_entry(tmp, kra_conn_t,
-                                                      rac_hashlist);
-
-                                if (conn->rac_device == dev)
-                                        kranal_schedule_conn(conn);
-                        }
-
-                        /* don't block write lockers for too long... */
-                       read_unlock(&kranal_data.kra_global_lock);
-                }
-        }
-}
-
-int
-kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
-               void *immediate, int immediatenob)
-{
-        int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
-        RAP_RETURN rrc;
-
-        CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
-               conn, msg, msg->ram_type, sync ? "(sync)" : "",
-               immediate, immediatenob);
-
-        LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
-        LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
-                 immediatenob <= RANAL_FMA_MAX_DATA :
-                 immediatenob == 0);
-
-        msg->ram_connstamp = conn->rac_my_connstamp;
-        msg->ram_seq = conn->rac_tx_seq;
-
-        if (sync)
-                rrc = RapkFmaSyncSend(conn->rac_rihandle,
-                                      immediate, immediatenob,
-                                      msg, sizeof(*msg));
-        else
-                rrc = RapkFmaSend(conn->rac_rihandle,
-                                  immediate, immediatenob,
-                                  msg, sizeof(*msg));
-
-        switch (rrc) {
-        default:
-                LBUG();
-
-        case RAP_SUCCESS:
-                conn->rac_last_tx = jiffies;
-                conn->rac_tx_seq++;
-                return 0;
-
-        case RAP_NOT_DONE:
-               if (cfs_time_aftereq(jiffies,
-                                    conn->rac_last_tx +
-                                    msecs_to_jiffies(conn->rac_keepalive *
-                                                     MSEC_PER_SEC)))
-                       CWARN("EAGAIN sending %02x (idle %lu secs)\n",
-                             msg->ram_type,
-                             jiffies_to_msecs(jiffies - conn->rac_last_tx) /
-                             MSEC_PER_SEC);
-               return -EAGAIN;
-        }
-}
-
-void
-kranal_process_fmaq (kra_conn_t *conn)
-{
-        unsigned long flags;
-        int           more_to_do;
-        kra_tx_t     *tx;
-        int           rc;
-        int           expect_reply;
-
-        /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
-         *       However I will be rescheduled by an FMA completion event
-         *       when I eventually get some.
-         * NB 2. Sampling rac_state here races with setting it elsewhere.
-         *       But it doesn't matter if I try to send a "real" message just
-         *       as I start closing because I'll get scheduled to send the
-         *       close anyway. */
-
-        /* Not racing with incoming message processing! */
-        LASSERT (current == conn->rac_device->rad_scheduler);
-
-        if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
-                if (!cfs_list_empty(&conn->rac_rdmaq)) {
-                        /* RDMAs in progress */
-                        LASSERT (!conn->rac_close_sent);
-
-                       if (cfs_time_aftereq(jiffies,
-                                            conn->rac_last_tx +
-                                            msecs_to_jiffies(conn->rac_keepalive *
-                                                             MSEC_PER_SEC))) {
-                               CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
-                               kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
-                               kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
-                       }
-                        return;
-                }
-
-                if (conn->rac_close_sent)
-                        return;
-
-                CWARN("sending CLOSE to %s\n", 
-                      libcfs_nid2str(conn->rac_peer->rap_nid));
-                kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
-                rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
-                if (rc != 0)
-                        return;
-
-                conn->rac_close_sent = 1;
-                if (!conn->rac_close_recvd)
-                        return;
-
-               write_lock_irqsave(&kranal_data.kra_global_lock, flags);
-
-                if (conn->rac_state == RANAL_CONN_CLOSING)
-                        kranal_terminate_conn_locked(conn);
-
-               write_unlock_irqrestore(&kranal_data.kra_global_lock,
-                                            flags);
-                return;
-        }
-
-       spin_lock_irqsave(&conn->rac_lock, flags);
-
-        if (cfs_list_empty(&conn->rac_fmaq)) {
-
-               spin_unlock_irqrestore(&conn->rac_lock, flags);
-
-               if (cfs_time_aftereq(jiffies,
-                                    conn->rac_last_tx +
-                                    msecs_to_jiffies(conn->rac_keepalive *
-                                                     MSEC_PER_SEC))) {
-                       CDEBUG(D_NET, "sending NOOP -> %s (%p idle %lu(%ld))\n",
-                              libcfs_nid2str(conn->rac_peer->rap_nid), conn,
-                              jiffies_to_msecs(jiffies - conn->rac_last_tx) /
-                              MSEC_PER_SEC,
-                              conn->rac_keepalive);
-                       kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
-                       kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
-               }
-                return;
-        }
-
-        tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
-        cfs_list_del(&tx->tx_list);
-        more_to_do = !cfs_list_empty(&conn->rac_fmaq);
-
-       spin_unlock_irqrestore(&conn->rac_lock, flags);
-
-        expect_reply = 0;
-        CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
-               tx, tx->tx_msg.ram_type, tx->tx_cookie);
-        switch (tx->tx_msg.ram_type) {
-        default:
-                LBUG();
-
-        case RANAL_MSG_IMMEDIATE:
-                rc = kranal_sendmsg(conn, &tx->tx_msg,
-                                    tx->tx_buffer, tx->tx_nob);
-                break;
-
-        case RANAL_MSG_PUT_NAK:
-        case RANAL_MSG_PUT_DONE:
-        case RANAL_MSG_GET_NAK:
-        case RANAL_MSG_GET_DONE:
-                rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
-                break;
-
-        case RANAL_MSG_PUT_REQ:
-                rc = kranal_map_buffer(tx);
-                LASSERT (rc != -EAGAIN);
-                if (rc != 0)
-                        break;
-
-                tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
-                rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
-                expect_reply = 1;
-                break;
-
-        case RANAL_MSG_PUT_ACK:
-                rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
-                expect_reply = 1;
-                break;
-
-        case RANAL_MSG_GET_REQ:
-                rc = kranal_map_buffer(tx);
-                LASSERT (rc != -EAGAIN);
-                if (rc != 0)
-                        break;
-
-                tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
-                tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
-                tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
-                        (__u64)((unsigned long)tx->tx_buffer);
-                tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
-                rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
-                expect_reply = 1;
-                break;
-        }
-
-        if (rc == -EAGAIN) {
-                /* I need credits to send this.  Replace tx at the head of the
-                 * fmaq and I'll get rescheduled when credits appear */
-                CDEBUG(D_NET, "EAGAIN on %p\n", conn);
-               spin_lock_irqsave(&conn->rac_lock, flags);
-                cfs_list_add(&tx->tx_list, &conn->rac_fmaq);
-               spin_unlock_irqrestore(&conn->rac_lock, flags);
-                return;
-        }
-
-        if (!expect_reply || rc != 0) {
-                kranal_tx_done(tx, rc);
-        } else {
-                /* LASSERT(current) above ensures this doesn't race with reply
-                 * processing */
-               spin_lock_irqsave(&conn->rac_lock, flags);
-                cfs_list_add_tail(&tx->tx_list, &conn->rac_replyq);
-                tx->tx_qtime = jiffies;
-               spin_unlock_irqrestore(&conn->rac_lock, flags);
-        }
-
-        if (more_to_do) {
-                CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
-                kranal_schedule_conn(conn);
-        }
-}
-
-static inline void
-kranal_swab_rdma_desc (kra_rdma_desc_t *d)
-{
-        __swab64s(&d->rard_key.Key);
-        __swab16s(&d->rard_key.Cookie);
-        __swab16s(&d->rard_key.MdHandle);
-        __swab32s(&d->rard_key.Flags);
-        __swab64s(&d->rard_addr.AddressBits);
-        __swab32s(&d->rard_nob);
-}
-
-kra_tx_t *
-kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
-{
-        cfs_list_t       *ttmp;
-        kra_tx_t         *tx;
-        unsigned long     flags;
-
-       spin_lock_irqsave(&conn->rac_lock, flags);
-
-        cfs_list_for_each(ttmp, &conn->rac_replyq) {
-                tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
-
-                CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
-                       tx, tx->tx_msg.ram_type, tx->tx_cookie);
-
-                if (tx->tx_cookie != cookie)
-                        continue;
-
-                if (tx->tx_msg.ram_type != type) {
-                       spin_unlock_irqrestore(&conn->rac_lock, flags);
-                        CWARN("Unexpected type %x (%x expected) "
-                              "matched reply from %s\n",
-                              tx->tx_msg.ram_type, type,
-                              libcfs_nid2str(conn->rac_peer->rap_nid));
-                        return NULL;
-                }
-
-                cfs_list_del(&tx->tx_list);
-               spin_unlock_irqrestore(&conn->rac_lock, flags);
-                return tx;
-        }
-
-       spin_unlock_irqrestore(&conn->rac_lock, flags);
-        CWARN("Unmatched reply %02x/"LPX64" from %s\n",
-              type, cookie, libcfs_nid2str(conn->rac_peer->rap_nid));
-        return NULL;
-}
-
-void
-kranal_check_fma_rx (kra_conn_t *conn)
-{
-        unsigned long flags;
-        __u32         seq;
-        kra_tx_t     *tx;
-        kra_msg_t    *msg;
-        void         *prefix;
-        RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
-        kra_peer_t   *peer = conn->rac_peer;
-        int           rc = 0;
-        int           repost = 1;
-
-        if (rrc == RAP_NOT_DONE)
-                return;
-
-        CDEBUG(D_NET, "RX on %p\n", conn);
-
-        LASSERT (rrc == RAP_SUCCESS);
-        conn->rac_last_rx = jiffies;
-        seq = conn->rac_rx_seq++;
-        msg = (kra_msg_t *)prefix;
-
-        /* stash message for portals callbacks they'll NULL
-         * rac_rxmsg if they consume it */
-        LASSERT (conn->rac_rxmsg == NULL);
-        conn->rac_rxmsg = msg;
-
-        if (msg->ram_magic != RANAL_MSG_MAGIC) {
-                if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
-                        CERROR("Unexpected magic %08x from %s\n",
-                               msg->ram_magic, libcfs_nid2str(peer->rap_nid));
-                        rc = -EPROTO;
-                        goto out;
-                }
-
-                __swab32s(&msg->ram_magic);
-                __swab16s(&msg->ram_version);
-                __swab16s(&msg->ram_type);
-                __swab64s(&msg->ram_srcnid);
-                __swab64s(&msg->ram_connstamp);
-                __swab32s(&msg->ram_seq);
-
-                /* NB message type checked below; NOT here... */
-                switch (msg->ram_type) {
-                case RANAL_MSG_PUT_ACK:
-                        kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
-                        break;
-
-                case RANAL_MSG_GET_REQ:
-                        kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
-                        break;
-
-                default:
-                        break;
-                }
-        }
-
-        if (msg->ram_version != RANAL_MSG_VERSION) {
-                CERROR("Unexpected protocol version %d from %s\n",
-                       msg->ram_version, libcfs_nid2str(peer->rap_nid));
-                rc = -EPROTO;
-                goto out;
-        }
-
-        if (msg->ram_srcnid != peer->rap_nid) {
-                CERROR("Unexpected peer %s from %s\n",
-                       libcfs_nid2str(msg->ram_srcnid), 
-                       libcfs_nid2str(peer->rap_nid));
-                rc = -EPROTO;
-                goto out;
-        }
-
-        if (msg->ram_connstamp != conn->rac_peer_connstamp) {
-                CERROR("Unexpected connstamp "LPX64"("LPX64
-                       " expected) from %s\n",
-                       msg->ram_connstamp, conn->rac_peer_connstamp,
-                       libcfs_nid2str(peer->rap_nid));
-                rc = -EPROTO;
-                goto out;
-        }
-
-        if (msg->ram_seq != seq) {
-                CERROR("Unexpected sequence number %d(%d expected) from %s\n",
-                       msg->ram_seq, seq, libcfs_nid2str(peer->rap_nid));
-                rc = -EPROTO;
-                goto out;
-        }
-
-        if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
-                /* This message signals RDMA completion... */
-                rrc = RapkFmaSyncWait(conn->rac_rihandle);
-                if (rrc != RAP_SUCCESS) {
-                        CERROR("RapkFmaSyncWait failed: %d\n", rrc);
-                        rc = -ENETDOWN;
-                        goto out;
-                }
-        }
-
-        if (conn->rac_close_recvd) {
-                CERROR("Unexpected message %d after CLOSE from %s\n",
-                       msg->ram_type, libcfs_nid2str(conn->rac_peer->rap_nid));
-                rc = -EPROTO;
-                goto out;
-        }
-
-        if (msg->ram_type == RANAL_MSG_CLOSE) {
-                CWARN("RX CLOSE from %s\n", libcfs_nid2str(conn->rac_peer->rap_nid));
-                conn->rac_close_recvd = 1;
-               write_lock_irqsave(&kranal_data.kra_global_lock, flags);
-
-                if (conn->rac_state == RANAL_CONN_ESTABLISHED)
-                        kranal_close_conn_locked(conn, 0);
-                else if (conn->rac_state == RANAL_CONN_CLOSING &&
-                         conn->rac_close_sent)
-                        kranal_terminate_conn_locked(conn);
-
-               write_unlock_irqrestore(&kranal_data.kra_global_lock,
-                                            flags);
-                goto out;
-        }
-
-        if (conn->rac_state != RANAL_CONN_ESTABLISHED)
-                goto out;
-
-        switch (msg->ram_type) {
-        case RANAL_MSG_NOOP:
-                /* Nothing to do; just a keepalive */
-                CDEBUG(D_NET, "RX NOOP on %p\n", conn);
-                break;
-
-        case RANAL_MSG_IMMEDIATE:
-                CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
-                rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr, 
-                                msg->ram_srcnid, conn, 0);
-                repost = rc < 0;
-                break;
-
-        case RANAL_MSG_PUT_REQ:
-                CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
-                rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr, 
-                                msg->ram_srcnid, conn, 1);
-                repost = rc < 0;
-                break;
-
-        case RANAL_MSG_PUT_NAK:
-                CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
-                tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
-                                        msg->ram_u.completion.racm_cookie);
-                if (tx == NULL)
-                        break;
-
-                LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
-                         tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
-                kranal_tx_done(tx, -ENOENT);    /* no match */
-                break;
-
-        case RANAL_MSG_PUT_ACK:
-                CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
-                tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
-                                        msg->ram_u.putack.rapam_src_cookie);
-                if (tx == NULL)
-                        break;
-
-                kranal_rdma(tx, RANAL_MSG_PUT_DONE,
-                            &msg->ram_u.putack.rapam_desc,
-                            msg->ram_u.putack.rapam_desc.rard_nob,
-                            msg->ram_u.putack.rapam_dst_cookie);
-                break;
-
-        case RANAL_MSG_PUT_DONE:
-                CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
-                tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
-                                        msg->ram_u.completion.racm_cookie);
-                if (tx == NULL)
-                        break;
-
-                LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
-                         tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
-                kranal_tx_done(tx, 0);
-                break;
-
-        case RANAL_MSG_GET_REQ:
-                CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
-                rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr, 
-                                msg->ram_srcnid, conn, 1);
-                repost = rc < 0;
-                break;
-
-        case RANAL_MSG_GET_NAK:
-                CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
-                tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
-                                        msg->ram_u.completion.racm_cookie);
-                if (tx == NULL)
-                        break;
-
-                LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
-                         tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
-                kranal_tx_done(tx, -ENOENT);    /* no match */
-                break;
-
-        case RANAL_MSG_GET_DONE:
-                CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
-                tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
-                                        msg->ram_u.completion.racm_cookie);
-                if (tx == NULL)
-                        break;
-
-                LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
-                         tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
-#if 0
-                /* completion message should send rdma length if we ever allow
-                 * GET truncation */
-                lnet_set_reply_msg_len(kranal_data.kra_ni, tx->tx_lntmsg[1], ???);
-#endif
-                kranal_tx_done(tx, 0);
-                break;
-        }
-
- out:
-        if (rc < 0)                             /* protocol/comms error */
-                kranal_close_conn (conn, rc);
-
-        if (repost && conn->rac_rxmsg != NULL)
-                kranal_consume_rxmsg(conn, NULL, 0);
-
-        /* check again later */
-        kranal_schedule_conn(conn);
-}
-
-void
-kranal_complete_closed_conn (kra_conn_t *conn)
-{
-        kra_tx_t   *tx;
-        int         nfma;
-        int         nreplies;
-
-        LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
-        LASSERT (cfs_list_empty(&conn->rac_list));
-        LASSERT (cfs_list_empty(&conn->rac_hashlist));
-
-        for (nfma = 0; !cfs_list_empty(&conn->rac_fmaq); nfma++) {
-                tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
-
-                cfs_list_del(&tx->tx_list);
-                kranal_tx_done(tx, -ECONNABORTED);
-        }
-
-        LASSERT (cfs_list_empty(&conn->rac_rdmaq));
-
-        for (nreplies = 0; !cfs_list_empty(&conn->rac_replyq); nreplies++) {
-                tx = cfs_list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
-
-                cfs_list_del(&tx->tx_list);
-                kranal_tx_done(tx, -ECONNABORTED);
-        }
-
-        CWARN("Closed conn %p -> %s: nmsg %d nreplies %d\n",
-               conn, libcfs_nid2str(conn->rac_peer->rap_nid), nfma, nreplies);
-}
-
-int kranal_process_new_conn (kra_conn_t *conn)
-{
-       RAP_RETURN   rrc;
-
-       rrc = RapkCompleteSync(conn->rac_rihandle, 1);
-       if (rrc == RAP_SUCCESS)
-               return 0;
-
-       LASSERT (rrc == RAP_NOT_DONE);
-       if (!cfs_time_aftereq(jiffies, conn->rac_last_tx +
-                             msecs_to_jiffies(conn->rac_timeout*MSEC_PER_SEC)))
-               return -EAGAIN;
-
-       /* Too late */
-       rrc = RapkCompleteSync(conn->rac_rihandle, 0);
-       LASSERT (rrc == RAP_SUCCESS);
-       return -ETIMEDOUT;
-}
-
-int
-kranal_scheduler (void *arg)
-{
-       kra_device_t     *dev = (kra_device_t *)arg;
-       wait_queue_t    wait;
-       kra_conn_t       *conn;
-        unsigned long     flags;
-        unsigned long     deadline;
-        unsigned long     soonest;
-        int               nsoonest;
-        long              timeout;
-        cfs_list_t       *tmp;
-        cfs_list_t       *nxt;
-        int               rc;
-        int               dropped_lock;
-        int               busy_loops = 0;
-
-        cfs_block_allsigs();
-
-       dev->rad_scheduler = current;
-       init_waitqueue_entry_current(&wait);
-
-       spin_lock_irqsave(&dev->rad_lock, flags);
-
-        while (!kranal_data.kra_shutdown) {
-                /* Safe: kra_shutdown only set when quiescent */
-
-                if (busy_loops++ >= RANAL_RESCHED) {
-                       spin_unlock_irqrestore(&dev->rad_lock, flags);
-
-                       cond_resched();
-                       busy_loops = 0;
-
-                       spin_lock_irqsave(&dev->rad_lock, flags);
-                }
-
-                dropped_lock = 0;
-
-                if (dev->rad_ready) {
-                        /* Device callback fired since I last checked it */
-                        dev->rad_ready = 0;
-                       spin_unlock_irqrestore(&dev->rad_lock, flags);
-                        dropped_lock = 1;
-
-                        kranal_check_rdma_cq(dev);
-                        kranal_check_fma_cq(dev);
-
-                       spin_lock_irqsave(&dev->rad_lock, flags);
-                }
-
-                cfs_list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
-                        conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
-
-                        cfs_list_del_init(&conn->rac_schedlist);
-                        LASSERT (conn->rac_scheduled);
-                        conn->rac_scheduled = 0;
-                       spin_unlock_irqrestore(&dev->rad_lock, flags);
-                        dropped_lock = 1;
-
-                        kranal_check_fma_rx(conn);
-                        kranal_process_fmaq(conn);
-
-                        if (conn->rac_state == RANAL_CONN_CLOSED)
-                                kranal_complete_closed_conn(conn);
-
-                        kranal_conn_decref(conn);
-                       spin_lock_irqsave(&dev->rad_lock, flags);
-                }
-
-                nsoonest = 0;
-                soonest = jiffies;
-
-                cfs_list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
-                        conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
-
-                        deadline = conn->rac_last_tx + conn->rac_keepalive;
-                        if (cfs_time_aftereq(jiffies, deadline)) {
-                                /* Time to process this new conn */
-                               spin_unlock_irqrestore(&dev->rad_lock,
-                                                           flags);
-                                dropped_lock = 1;
-
-                                rc = kranal_process_new_conn(conn);
-                                if (rc != -EAGAIN) {
-                                        /* All done with this conn */
-                                       spin_lock_irqsave(&dev->rad_lock,
-                                                              flags);
-                                        cfs_list_del_init(&conn->rac_schedlist);
-                                       spin_unlock_irqrestore(&dev-> \
-                                                                   rad_lock,
-                                                                   flags);
-
-                                        kranal_conn_decref(conn);
-                                       spin_lock_irqsave(&dev->rad_lock,
-                                                              flags);
-                                        continue;
-                                }
-
-                               /* retry with exponential backoff until HZ */
-                               if (conn->rac_keepalive == 0)
-                                       conn->rac_keepalive = 1;
-                               else if (conn->rac_keepalive <=
-                                        msecs_to_jiffies(MSEC_PER_SEC))
-                                       conn->rac_keepalive *= 2;
-                               else
-                                       conn->rac_keepalive +=
-                                               msecs_to_jiffies(MSEC_PER_SEC);
-
-                               deadline = conn->rac_last_tx + conn->rac_keepalive;
-                               spin_lock_irqsave(&dev->rad_lock, flags);
-                        }
-
-                        /* Does this conn need attention soonest? */
-                        if (nsoonest++ == 0 ||
-                            !cfs_time_aftereq(deadline, soonest))
-                                soonest = deadline;
-                }
-
-                if (dropped_lock)               /* may sleep iff I didn't drop the lock */
-                        continue;
-
-               set_current_state(TASK_INTERRUPTIBLE);
-               add_wait_queue_exclusive(&dev->rad_waitq, &wait);
-               spin_unlock_irqrestore(&dev->rad_lock, flags);
-
-               if (nsoonest == 0) {
-                       busy_loops = 0;
-                       waitq_wait(&wait, TASK_INTERRUPTIBLE);
-               } else {
-                       timeout = (long)(soonest - jiffies);
-                       if (timeout > 0) {
-                               busy_loops = 0;
-                               waitq_timedwait(&wait,
-                                                   TASK_INTERRUPTIBLE,
-                                                   timeout);
-                       }
-               }
-
-               remove_wait_queue(&dev->rad_waitq, &wait);
-               set_current_state(TASK_RUNNING);
-               spin_lock_irqsave(&dev->rad_lock, flags);
-       }
-
-       spin_unlock_irqrestore(&dev->rad_lock, flags);
-
-        dev->rad_scheduler = NULL;
-        kranal_thread_fini();
-        return 0;
-}