Whamcloud - gitweb
* Added ranal subdir
authoreeb <eeb>
Fri, 17 Dec 2004 14:35:56 +0000 (14:35 +0000)
committereeb <eeb>
Fri, 17 Dec 2004 14:35:56 +0000 (14:35 +0000)
12 files changed:
lnet/klnds/ralnd/.cvsignore [new file with mode: 0644]
lnet/klnds/ralnd/Makefile.in [new file with mode: 0644]
lnet/klnds/ralnd/autoMakefile.am [new file with mode: 0644]
lnet/klnds/ralnd/ralnd.c [new file with mode: 0644]
lnet/klnds/ralnd/ralnd.h [new file with mode: 0644]
lnet/klnds/ralnd/ralnd_cb.c [new file with mode: 0644]
lustre/portals/knals/ranal/.cvsignore [new file with mode: 0644]
lustre/portals/knals/ranal/Makefile.in [new file with mode: 0644]
lustre/portals/knals/ranal/autoMakefile.am [new file with mode: 0644]
lustre/portals/knals/ranal/ranal.c [new file with mode: 0644]
lustre/portals/knals/ranal/ranal.h [new file with mode: 0644]
lustre/portals/knals/ranal/ranal_cb.c [new file with mode: 0644]

diff --git a/lnet/klnds/ralnd/.cvsignore b/lnet/klnds/ralnd/.cvsignore
new file mode 100644 (file)
index 0000000..5ed596b
--- /dev/null
@@ -0,0 +1,10 @@
+.deps
+Makefile
+.*.cmd
+autoMakefile.in
+autoMakefile
+*.ko
+*.mod.c
+.*.flags
+.tmp_versions
+.depend
diff --git a/lnet/klnds/ralnd/Makefile.in b/lnet/klnds/ralnd/Makefile.in
new file mode 100644 (file)
index 0000000..1772cc2
--- /dev/null
@@ -0,0 +1,6 @@
+MODULES := kranal
+kranal-objs := ranal.o ranal_cb.o
+
+EXTRA_POST_CFLAGS := @RACPPFLAGS@
+
+@INCLUDE_RULES@
diff --git a/lnet/klnds/ralnd/autoMakefile.am b/lnet/klnds/ralnd/autoMakefile.am
new file mode 100644 (file)
index 0000000..f136aa5
--- /dev/null
@@ -0,0 +1,15 @@
+# Copyright (C) 2001  Cluster File Systems, Inc.
+#
+# This code is issued under the GNU General Public License.
+# See the file COPYING in this distribution
+
+if MODULES
+if !CRAY_PORTALS
+if BUILD_RANAL
+modulenet_DATA = kranal$(KMODEXT)
+endif
+endif
+endif
+
+MOSTLYCLEANFILES = *.o *.ko *.mod.c
+DIST_SOURCES = $(kranal-objs:%.o=%.c) ranal.h
diff --git a/lnet/klnds/ralnd/ralnd.c b/lnet/klnds/ralnd/ralnd.c
new file mode 100644 (file)
index 0000000..a59757d
--- /dev/null
@@ -0,0 +1,1978 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2004 Cluster File Systems, Inc.
+ *   Author: Eric Barton <eric@bartonsoftware.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+#include "ranal.h"
+
+
+nal_t                   kranal_api;
+ptl_handle_ni_t         kranal_ni;
+kra_data_t              kranal_data;
+kra_tunables_t          kranal_tunables;
+
+#ifdef CONFIG_SYSCTL
+#define RANAL_SYSCTL_TIMEOUT           1
+#define RANAL_SYSCTL_LISTENER_TIMEOUT  2
+#define RANAL_SYSCTL_BACKLOG           3
+#define RANAL_SYSCTL_PORT              4
+#define RANAL_SYSCTL_MAX_IMMEDIATE     5
+
+#define RANAL_SYSCTL                   202
+
+static ctl_table kranal_ctl_table[] = {
+        {RANAL_SYSCTL_TIMEOUT, "timeout", 
+         &kranal_tunables.kra_timeout, sizeof(int),
+         0644, NULL, &proc_dointvec},
+        {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout", 
+         &kranal_tunables.kra_listener_timeout, sizeof(int),
+         0644, NULL, &proc_dointvec},
+       {RANAL_SYSCTL_BACKLOG, "backlog",
+        &kranal_tunables.kra_backlog, sizeof(int),
+        0644, NULL, kranal_listener_procint},
+       {RANAL_SYSCTL_PORT, "port",
+        &kranal_tunables.kra_port, sizeof(int),
+        0644, NULL, kranal_listener_procint},
+        {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate", 
+         &kranal_tunables.kra_max_immediate, sizeof(int),
+         0644, NULL, &proc_dointvec},
+        { 0 }
+};
+
+static ctl_table kranal_top_ctl_table[] = {
+        {RANAL_SYSCTL, "ranal", NULL, 0, 0555, kranal_ctl_table},
+        { 0 }
+};
+#endif
+
+int
+kranal_sock_write (struct socket *sock, void *buffer, int nob)
+{
+        int           rc;
+        mm_segment_t  oldmm = get_fs();
+       struct iovec  iov = {
+               .iov_base = buffer,
+               .iov_len  = nob
+       };
+       struct msghdr msg = {
+               .msg_name       = NULL,
+               .msg_namelen    = 0,
+               .msg_iov        = &iov,
+               .msg_iovlen     = 1,
+               .msg_control    = NULL,
+               .msg_controllen = 0,
+               .msg_flags      = MSG_DONTWAIT
+       };
+
+       /* We've set up the socket's send buffer to be large enough for
+        * everything we send, so a single non-blocking send should
+        * complete without error. */
+
+       set_fs(KERNEL_DS);
+       rc = sock_sendmsg(sock, &msg, iov.iov_len);
+       set_fs(oldmm);
+
+       return rc;
+}
+
+int
+kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout)
+{
+        int            rc;
+        mm_segment_t   oldmm = get_fs();
+       long           ticks = timeout * HZ;
+       unsigned long  then;
+       struct timeval tv;
+
+       LASSERT (nob > 0);
+       LASSERT (ticks > 0);
+
+        for (;;) {
+                struct iovec  iov = {
+                        .iov_base = buffer,
+                        .iov_len  = nob
+                };
+                struct msghdr msg = {
+                        .msg_name       = NULL,
+                        .msg_namelen    = 0,
+                        .msg_iov        = &iov,
+                        .msg_iovlen     = 1,
+                        .msg_control    = NULL,
+                        .msg_controllen = 0,
+                        .msg_flags      = 0
+                };
+
+               /* Set receive timeout to remaining time */
+               tv = (struct timeval) {
+                       .tv_sec = ticks / HZ,
+                       .tv_usec = ((ticks % HZ) * 1000000) / HZ;
+               };
+               set_fs(KERNEL_DS);
+               rc = sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO,
+                                    (char *)&tv, sizeof(tv));
+               set_fs(oldmm);
+               if (rc != 0) {
+                       CERROR("Can't set socket recv timeout %d: %d\n",
+                              send_timeout, rc);
+                       return rc;
+               }
+
+                set_fs(KERNEL_DS);
+               then = jiffies;
+                rc = sock_recvmsg(sock, &msg, iov.iov_len, 0);
+               ticks -= jiffies - then;
+                set_fs(oldmm);
+
+                if (rc < 0)
+                        return rc;
+
+                if (rc == 0)
+                        return -ECONNABORTED;
+
+                buffer = ((char *)buffer) + rc;
+                nob -= rc;
+
+               if (nob == 0)
+                       return 0;
+
+               if (ticks <= 0)
+                       return -ETIMEDOUT;
+        }
+}
+
+int
+kranal_create_sock(struct socket **sockp)
+{
+       struct socket       *sock;
+       int                  rc;
+        struct timeval       tv;
+       int                  option;
+        mm_segment_t         oldmm = get_fs();
+
+       rc = sock_create(PF_INET, SOCK_STREAM, 0, &sock);
+       if (rc != 0) {
+               CERROR("Can't create socket: %d\n", rc);
+               return rc;
+       }
+
+       /* Ensure sending connection info doesn't block */
+       option = 2 * sizeof(kra_connreq_t);
+       set_fs(KERNEL_DS);
+       rc = sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
+                            (char *)&option, sizeof(option));
+       set_fs(oldmm);
+       if (rc != 0) {
+               CERROR("Can't set send buffer %d: %d\n", option, rc);
+               goto failed;
+       }
+
+       option = 1;
+       set_fs(KERNEL_DS);
+       rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+                            (char *)&option, sizeof(option));
+       set_fs(oldmm);
+       if (rc != 0) {
+               CERROR("Can't set SO_REUSEADDR: %d\n", rc);
+               goto failed;
+       }
+
+       *sockp = sock;
+       return 0;
+
+ failed:
+       sock_release(sock);
+       return rc;
+}
+
+void
+kranal_pause(int ticks)
+{
+       set_current_state(TASK_UNINTERRUPTIBLE);
+       schedule_timeout(ticks);
+}
+
+void
+kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn)
+{
+        memset(connreq, 0, sizeof(*connreq));
+
+        connreq->racr_magic       = RANAL_MSG_MAGIC;
+        connreq->racr_version     = RANAL_MSG_VERSION;
+        connreq->racr_devid       = conn->rac_device->rad_id;
+        connreq->racr_nid         = kranal_lib.libnal_ni.ni_pid.nid;
+        connreq->racr_timeout     = conn->rac_timeout;
+        connreq->racr_incarnation = conn->rac_my_incarnation;
+
+        rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
+        LASSERT(rrc == RAP_SUCCESS);
+}
+
+int
+kranal_recv_connreq(struct sock *sock, kra_connreq_t *connreq, int timeout)
+{
+        int         i;
+       int         rc;
+
+       rc = kranal_sock_read(newsock, connreq, sizeof(*connreq), timeout);
+       if (rc != 0) {
+               CERROR("Read failed: %d\n", rc);
+               return rc;
+       }
+
+       if (connreq->racr_magic != RANAL_MSG_MAGIC) {
+               if (__swab32(connreq->racr_magic) != RANAL_MSG_MAGIC) {
+                       CERROR("Unexpected magic %08x\n", connreq->racr_magic);
+                       return -EPROTO;
+               }
+
+               __swab32s(&connreq->racr_magic);
+               __swab16s(&connreq->racr_version);
+                __swab16s(&connreq->racr_devid);
+               __swab64s(&connreq->racr_nid);
+               __swab64s(&connreq->racr_incarnation);
+                __swab32s(&connreq->racr_timeout);
+
+               __swab32s(&connreq->racr_riparams.FmaDomainHndl);
+               __swab32s(&connreq->racr_riparams.RcvCqHndl);
+               __swab32s(&connreq->racr_riparams.PTag);
+                __swab32s(&connreq->racr_riparams.CompletionCookie);
+       }
+
+       if (connreq->racr_version != RANAL_MSG_VERSION) {
+               CERROR("Unexpected version %d\n", connreq->racr_version);
+               return -EPROTO;
+       }
+
+        if (connreq->racr_nid == PTL_NID_ANY) {
+                CERROR("Received PTL_NID_ANY\n");
+                return -EPROTO;
+        }
+
+        if (connreq->racr_timeout < RANAL_MIN_TIMEOUT) {
+                CERROR("Received timeout %d < MIN %d\n",
+                       connreq->racr_timeout, RANAL_MIN_TIMEOUT);
+                return -EPROTO;
+        }
+        
+        for (i = 0; i < kranal_data.kra_ndevs; i++)
+                if (connreq->racr_devid == 
+                    kranal_data.kra_devices[i]->rad_id)
+                        break;
+
+        if (i == kranal_data.kra_ndevs) {
+                CERROR("Can't match device %d\n", connreq->racr_devid);
+                return -ENODEV;
+        }
+
+       return 0;
+}
+
+int
+kranal_conn_isdup_locked(kranal_peer_t *peer, __u64 incarnation)
+{
+       kra_conn_t       *conn;
+       struct list_head *tmp;
+        int               loopback = 0;
+
+       list_for_each(tmp, &peer->rap_conns) {
+               conn = list_entry(tmp, kra_conn_t, rac_list);
+
+                if (conn->rac_incarnation < incarnation) {
+                        /* Conns with an older incarnation get culled later */
+                        continue;
+                }
+
+                if (!loopback &&
+                    conn->rac_incarnation == incarnation &&
+                    peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid) {
+                        /* loopback creates 2 conns */
+                        loopback = 1;
+                        continue;
+                }
+
+                return 1;
+       }
+
+       return 0;
+}
+
+void
+kranal_set_conn_uniqueness (kra_conn_t *conn)
+{
+        unsigned long  flags;
+
+        write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+        conn->rac_my_incarnation = kranal_data.kra_next_incarnation++;
+
+        do {    /* allocate a unique cqid */
+                conn->rac_cqid = kranal_data.kra_next_cqid++;
+        } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL)
+        
+
+        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+}
+
+int
+kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev)
+{
+       kra_conn_t    *conn;
+        RAP_RETURN     rrc;
+
+        LASSERT (!in_interrupt());
+       PORTAL_ALLOC(conn, sizeof(*conn));
+
+       if (conn == NULL)
+               return -ENOMEM;
+
+       memset(conn, 0, sizeof(*conn));
+        conn->rac_cqid = cqid;
+       atomic_set(&conn->rac_refcount, 1);
+       INIT_LIST_HEAD(&conn->rac_list);
+       INIT_LIST_HEAD(&conn->rac_hashlist);
+       INIT_LIST_HEAD(&conn->rac_fmaq);
+       INIT_LIST_HEAD(&conn->rac_rdmaq);
+       INIT_LIST_HEAD(&conn->rac_replyq);
+       spin_lock_init(&conn->rac_lock);
+
+        conn->rac_timeout = MAX(kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT);
+        kranal_update_reaper_timeout(conn->rac_timeout);
+
+        rrc = RapkCreateRi(dev->rad_handle, cqid, dev->rad_ptag,
+                           dev->rad_rdma_cq, dev->rad_fma_cq,
+                           &conn->rac_rihandle);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("RapkCreateRi failed: %d\n", rrc);
+                PORTAL_FREE(conn, sizeof(*conn));
+                return -ENETDOWN;
+        }
+
+        atomic_inc(&kranal_data.kra_nconns);
+       *connp = conn;
+       return 0;
+}
+
+void
+__kranal_conn_decref(kra_conn_t *conn) 
+{
+        kra_tx_t          *tx;
+        RAP_RETURN         rrc;
+
+        LASSERT (!in_interrupt());
+        LASSERT (!conn->rac_scheduled);
+        LASSERT (list_empty(&conn->rac_list));
+        LASSERT (list_empty(&conn->rac_hashlist));
+        LASSERT (atomic_read(&conn->rac_refcount) == 0);
+
+        while (!list_empty(&conn->rac_fmaq)) {
+                tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
+                
+                list_del(&tx->tx_list);
+                kranal_tx_done(tx, -ECONNABORTED);
+        }
+        
+        /* We may not destroy this connection while it has RDMAs outstanding */
+        LASSERT (list_empty(&conn->rac_rdmaq));
+
+        while (!list_empty(&conn->rac_replyq)) {
+                tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
+                
+                list_del(&tx->tx_list);
+                kranal_tx_done(tx, -ECONNABORTED);
+        }
+        
+        rrc = RapkDestroyRi(conn->rac_device->rad_handle,
+                            conn->rac_rihandle);
+        LASSERT (rrc == RAP_SUCCESS);
+
+        if (conn->rac_peer != NULL)
+                kranal_peer_decref(conn->rac_peer);
+
+       PORTAL_FREE(conn, sizeof(*conn));
+        atomic_dec(&kranal_data.kra_nconns);
+}
+
+void
+kranal_terminate_conn_locked (kra_conn_t *conn)
+{
+        kra_peer_t     *peer - conn->rac_peer;
+
+        LASSERT (!in_interrupt());
+        LASSERT (conn->rac_closing);
+        LASSERT (!list_empty(&conn->rac_hashlist));
+        LASSERT (list_empty(&conn->rac_list));
+
+        /* Remove from conn hash table (no new callbacks) */
+        list_del_init(&conn->rac_hashlist);
+        kranal_conn_decref(conn);
+
+        /* Conn is now just waiting for remaining refs to go */
+}
+
+void
+kranal_close_conn_locked (kra_conn_t *conn, int error)
+{
+        kra_peer_t        *peer = conn->rac_peer;
+
+        CDEBUG(error == 0 ? D_NET : D_ERROR,
+              "closing conn to "LPX64": error %d\n", peer->rap_nid, error);
+
+        LASSERT (!in_interrupt());
+        LASSERT (!conn->rac_closing);
+        LASSERT (!list_empty(&conn->rac_hashlist));
+        LASSERT (!list_empty(&conn->rac_list));
+
+        list_del_init(&conn->rac_list);
+
+        if (list_empty(&peer->rap_conns) &&
+            peer->rap_persistence == 0) {
+                /* Non-persistent peer with no more conns... */
+                kranal_unlink_peer_locked(peer);
+        }
+
+        conn->rac_closing = 1;
+        kranal_schedule_conn(conn);
+
+        kranal_conn_decref(conn);               /* lose peer's ref */
+}
+
+void
+kranal_close_conn (kra_conn_t *conn, int error)
+{
+        unsigned long    flags;
+        
+
+        write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+        
+        if (!conn->rac_closing)
+                kranal_close_conn_locked(conn, error);
+        
+        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+}
+
+int
+kranal_passive_conn_handshake (struct socket *sock, 
+                               ptl_nid_t **peer_nidp, kra_conn_t **connp)
+{
+       struct sockaddr_in   addr;
+       __u32                peer_ip;
+        unsigned int         peer_port;
+       kra_connreq_t        connreq;
+       ptl_nid_t            peer_nid;
+        kra_conn_t          *conn;
+        kra_device_t        *dev;
+       RAP_RETURN           rrc;
+       int                  rc;
+        int                  i;
+
+       rc = sock->ops->getname(newsock, (struct sockaddr *)addr, &len, 2);
+        if (rc != 0) {
+                CERROR("Can't get peer's IP: %d\n", rc);
+                return rc;
+        }
+
+        peer_ip = ntohl(sin.sin_addr.s_addr);
+        peer_port = ntohs(sin.sin_port);
+
+        if (peer_port >= 1024) {
+                CERROR("Refusing unprivileged connection from %u.%u.%u.%u/%d\n",
+                       HIPQUAD(peer_ip), peer_port);
+                return -ECONNREFUSED;
+        }
+
+        rc = kranal_recv_connreq(sock, &connreq, 
+                                 kranal_data.kra_listener_timeout);
+        if (rc != 0) {
+                CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", 
+                       HIPQUAD(peer_ip), peer_port, rc);
+                return rc;
+        }
+
+        peer_nid = connreq.racr_nid;
+        LASSERT (peer_nid != PTL_NID_ANY);
+
+        for (i = 0;;i++) {
+                LASSERT(i < kranal_data.kra_ndevs);
+                dev = &kranal_data.kra_devices[i];
+                if (dev->rad_id == connreq->racr_devid)
+                        break;
+        }
+
+        rc = kranal_alloc_conn(&conn, dev,(__u32)(peer_nid & 0xffffffff));
+        if (rc != 0)
+                return rc;
+
+        conn->rac_peer_incarnation = connreq.racr_incarnation;
+        conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout);
+        kranal_update_reaper_timeout(conn->rac_keepalive);
+        
+        rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Can't set riparams for "LPX64": %d\n", peer_nid, rrc);
+                kranal_conn_decref(conn);
+                return -EPROTO;
+        }
+
+        kranal_pack_connreq(&connreq, conn);
+
+        rc = kranal_sock_write(sock, &connreq, sizeof(connreq));
+        if (rc != 0) {
+                CERROR("Can't tx connreq to %u.%u.%u.%u/%p: %d\n", 
+                       HIPQUAD(peer_ip), peer_port, rc);
+                kranal_conn_decref(conn);
+                return rc;
+        }
+
+        *connp = conn;
+        *peer_nidp = peer_nid;
+        return 0;
+}
+
+int
+ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
+{
+        struct sockaddr_in  locaddr;
+        struct sockaddr_in  srvaddr;
+        struct socket      *sock;
+        unsigned int        port;
+        int                 rc;
+        int                 option;
+        mm_segment_t        oldmm = get_fs();
+        struct timeval      tv;
+
+        for (port = 1023; port >= 512; port--) {
+
+                memset(&locaddr, 0, sizeof(locaddr)); 
+                locaddr.sin_family      = AF_INET; 
+                locaddr.sin_port        = htons(port);
+                locaddr.sin_addr.s_addr = htonl(INADDR_ANY);
+
+                memset (&srvaddr, 0, sizeof (srvaddr));
+                srvaddr.sin_family      = AF_INET;
+                srvaddr.sin_port        = htons (peer->rap_port);
+                srvaddr.sin_addr.s_addr = htonl (peer->rap_ip);
+
+                rc = kranal_create_sock(&sock);
+                if (rc != 0)
+                        return rc;
+
+                rc = sock->ops->bind(sock,
+                                     (struct sockaddr *)&locaddr, sizeof(locaddr));
+                if (rc != 0) {
+                        sock_release(sock);
+                        
+                        if (rc == -EADDRINUSE) {
+                                CDEBUG(D_NET, "Port %d already in use\n", port);
+                                continue;
+                        }
+
+                        CERROR("Can't bind to reserved port %d: %d\n", port, rc);
+                        return rc;
+                }
+
+                rc = sock->ops->connect(sock,
+                                        (struct sockaddr *)&srvaddr, sizeof(srvaddr),
+                                        0);
+                if (rc == 0) {
+                        *sockp = sock;
+                        return 0;
+                }
+                
+                sock_release(sock);
+
+                if (rc != -EADDRNOTAVAIL) {
+                        CERROR("Can't connect port %d to %u.%u.%u.%u/%d: %d\n",
+                               port, HIPQUAD(peer->rap_ip), peer->rap_port, rc);
+                        return rc;
+                }
+                
+                CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", 
+                       port, HIPQUAD(peer->rap_ip), peer->rap_port);
+        }
+}
+
+
+int
+kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
+{
+       kra_connreq_t       connreq;
+        kra_conn_t         *conn;
+        kra_device_t       *dev;
+        struct socket      *sock;
+        __u32               id32;
+       RAP_RETURN          rrc;
+       int                 rc;
+
+        id32 = (peer_nid & 0xffffffff);
+        dev = &kranal_data.kra_devices[id32 % kranal_data.kra_ndevs];
+
+        rc = kranal_alloc_conn(&conn, dev, id32);
+        if (rc != 0)
+                return rc;
+
+        kranal_pack_connreq(&connreq, conn);
+        
+        memset(&dstaddr, 0, sizeof(addr));
+        dstaddr.sin_family      = AF_INET;
+        dstaddr.sin_port        = htons(peer->rap_port);
+        dstaddr.sin_addr.s_addr = htonl(peer->rap_ip);
+
+        memset(&srcaddr, 0, sizeof(addr));
+
+        rc = ranal_connect_sock(peer, &sock);
+        if (rc != 0)
+                goto failed_0;
+
+        /* CAVEAT EMPTOR: the passive side receives with a SHORT rx timeout
+         * immediately after accepting a connection, so we connect and then
+         * send immediately. */
+
+        rc = kranal_sock_write(sock, &connreq, sizeof(connreq));
+        if (rc != 0) {
+                CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", 
+                       HIPQUAD(peer->rap_ip), peer->rap_port, rc);
+                goto failed_1;
+        }
+
+        rc = kranal_recv_connreq(sock, &connreq, kranal_data.kra_timeout);
+        if (rc != 0) {
+                CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", 
+                       HIPQUAD(peer->rap_ip), peer->rap_port, rc);
+                goto failed_1;
+        }
+
+        sock_release(sock);
+        rc = -EPROTO;
+
+        if (connreq.racr_nid != peer->rap_nid) {
+                CERROR("Unexpected nid from %u.%u.%u.%u/%d: "
+                       "received "LPX64" expected "LPX64"\n",
+                       HIPQUAD(peer->rap_ip), peer->rap_port, 
+                       connreq.racr_nid, peer->rap_nid);
+                goto failed_0;
+        }
+
+        if (connreq.racr_devid != dev->rad_id) {
+                CERROR("Unexpected device id from %u.%u.%u.%u/%d: "
+                       "received %d expected %d\n",
+                       HIPQUAD(peer->rap_ip), peer->rap_port, 
+                       connreq.racr_devid, dev->rad_id);
+                goto failed_0;
+        }
+
+        conn->rac_peer_incarnation = connreq.racr_incarnation; 
+        conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout);
+        kranal_update_reaper_timeout(conn->rac_keepalive);
+
+        rc = -ENETDOWN;
+        rrc = RapkSetRiParams(conn->rac_rihandle,
+                              &connreq->racr_riparams);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Can't set riparams for "LPX64": %d\n",
+                       peer_nid, rrc);
+                goto failed_0;
+        }
+
+        *connp = conn;
+       return 0;
+
+ failed_1:
+        release_sock(sock);
+ failed_0:
+        kranal_conn_decref(conn);
+        return rc;
+}
+
+int
+kranal_conn_handshake (struct socket *sock, kranal_peer_t *peer)
+{
+        kranal_peer_t     *peer2;
+       ptl_nid_t          peer_nid;
+       unsigned long      flags;
+        unsigned long      timeout;
+       kra_conn_t        *conn;
+       int                rc;
+        int                nstale;
+
+        if (sock != NULL) {
+                /* passive: listener accepted sock */
+                LASSERT (peer == NULL);
+
+                rc = kranal_passive_conn_handshake(sock, &peer_nid, &conn);
+                if (rc != 0)
+                        return rc;
+
+               /* assume this is a new peer */
+               peer = kranal_create_peer(peer_nid);
+               if (peer == NULL) {
+                       CERROR("Can't allocate peer for "LPX64"\n", peer_nid);
+                        kranal_conn_decref(conn);
+                        return -ENOMEM;
+               }
+
+               write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+               peer2 = kranal_find_peer_locked(peer_nid);
+               if (peer2 == NULL) {
+                       /* peer table takes my initial ref on peer */
+                       list_add_tail(&peer->rap_list,
+                                     kranal_nid2peerlist(peer_nid));
+               } else {
+                       /* peer_nid already in the peer table */
+                       kranal_peer_decref(peer);
+                       peer = peer2;
+               }
+                /* NB I may now have a non-persistent peer in the peer
+                 * table with no connections: I can't drop the global lock
+                 * until I've given it a connection or removed it, and when
+                 * I do 'peer' can disappear under me. */
+        } else {
+                /* active: connd wants to connect to peer */
+                LASSERT (peer != NULL);
+                LASSERT (peer->rap_connecting);
+                
+                rc = kranal_active_conn_handshake(peer, &conn);
+                if (rc != 0)
+                        return rc;
+
+               write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+               if (!kranal_peer_active(peer)) {
+                       /* raced with peer getting unlinked */
+                        write_unlock_irqrestore(&kranal_data.kra_global_lock, 
+                                                flags);
+                        kranal_conn_decref(conn);
+                       return ESTALE;
+               }
+       }
+
+       LASSERT (kranal_peer_active(peer));     /* peer is in the peer table */
+        peer_nid = peer->rap_nid;
+
+        /* Refuse to duplicate an existing connection (both sides might try
+         * to connect at once).  NB we return success!  We _do_ have a
+         * connection (so we don't need to remove the peer from the peer
+         * table) and we _don't_ have any blocked txs to complete */
+       if (kranal_conn_isdup_locked(peer, conn->rac_incarnation)) {
+                LASSERT (!list_empty(&peer->rap_conns));
+                LASSERT (list_empty(&peer->rap_tx_queue));
+                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+               CWARN("Not creating duplicate connection to "LPX64"\n",
+                      peer_nid);
+                kranal_conn_decref(conn);
+                return 0;
+       }
+
+       kranal_peer_addref(peer);               /* +1 ref for conn */
+       conn->rac_peer = peer;
+       list_add_tail(&conn->rac_list, &peer->rap_conns);
+
+        kranal_conn_addref(conn);               /* +1 ref for conn table */
+        list_add_tail(&conn->rac_hashlist,
+                      kranal_cqid2connlist(conn->rac_cqid));
+
+        /* Schedule all packets blocking for a connection */
+        while (!list_empty(&peer->rap_tx_queue)) {
+                tx = list_entry(&peer->rap_tx_queue.next,
+                                kra_tx_t, tx_list);
+
+                list_del(&tx->tx_list);
+                kranal_queue_tx_locked(tx, conn);
+        }
+
+       nstale = kranal_close_stale_conns_locked(peer, conn->rac_incarnation);
+
+       write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+
+        /* CAVEAT EMPTOR: passive peer can disappear NOW */
+
+        if (nstale != 0)
+                CWARN("Closed %d stale conns to "LPX64"\n", nstale, peer_nid);
+
+        /* Ensure conn gets checked.  Transmits may have been queued and an
+         * FMA event may have happened before it got in the cq hash table */
+        kranal_schedule_conn(conn);
+       return 0;
+}
+
+void
+kranal_connect (kra_peer_t *peer)
+{
+        kra_tx_t          *tx;
+        unsigned long      flags;
+        struct list_head   zombies;
+        int                rc;
+
+        LASSERT (peer->rap_connecting);
+
+        rc = kranal_conn_handshake(NULL, peer);
+
+        write_lock_irqqsave(&kranal_data.kra_global_lock, flags);
+
+        LASSERT (peer->rap_connecting);
+        peer->rap_connecting = 0;
+
+        if (rc == 0) {
+                /* kranal_conn_handshake() queues blocked txs immediately on
+                 * success to avoid messages jumping the queue */
+                LASSERT (list_empty(&peer->rap_tx_queue));
+
+                /* reset reconnection timeouts */
+                peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL;
+                peer->rap_reconnect_time = CURRENT_TIME;
+
+                write_unlock_irqrestore(&kranal-data.kra_global_lock, flags);
+                return;
+        }
+
+        LASSERT (peer->rap_reconnect_interval != 0);
+        peer->rap_reconnect_time = CURRENT_TIME + peer->rap_reconnect_interval;
+        peer->rap_reconnect_interval = MAX(RANAL_MAX_RECONNECT_INTERVAL,
+                                           1 * peer->rap_reconnect_interval);
+
+        /* Grab all blocked packets while we have the global lock */
+        list_add(&zombies, &peer->rap_tx_queue);
+        list_del_init(&peer->rap_tx_queue);
+
+        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+
+        if (list_empty(&zombies))
+                return;
+
+        CWARN("Dropping packets for "LPX64": connection failed\n",
+              peer->rap_nid);
+
+        do {
+                tx = list_entry(zombies.next, kra_tx_t, tx_list);
+
+                list_del(&tx->tx_list);
+                kranal_tx_done(tx, -EHOSTUNREACH);
+
+        } while (!list_empty(&zombies));
+}
+
+int
+kranal_listener(void *arg)
+{
+       struct sockaddr_in addr;
+       wait_queue_t       wait;
+       struct socket     *sock;
+       struct socket     *newsock;
+       int                port;
+       int                backlog;
+       int                timeout;
+       kra_connreq_t     *connreqs;
+       char               name[16];
+
+       /* Parent thread holds kra_nid_mutex, and is, or is about to
+        * block on kra_listener_signal */
+
+       port = kra_tunables.kra_port;
+       snprintf(name, "kranal_lstn%03d", port);
+       kportal_daemonize(name);
+       kportal_blockallsigs();
+
+       init_waitqueue_entry(&wait, current);
+
+       rc = -ENOMEM;
+       PORTAL_ALLOC(connreqs, 2 * sizeof(*connreqs));
+       if (connreqs == NULL)
+               goto out_0;
+
+       rc = kranal_create_sock(&sock, port);
+       if (rc != 0)
+               goto out_1;
+
+        memset(&addr, 0, sizeof(addr));
+        addr.sin_family      = AF_INET;
+        addr.sin_port        = htons(port);
+        addr.sin_addr.s_addr = INADDR_ANY
+
+       rc = sock->ops->bind(sock, &addr, sizeof(addr));
+       if (rc != 0) {
+               CERROR("Can't bind to port %d\n", port);
+               goto out_2;
+       }
+
+       rc = sock->ops->listen(sock, kra_tunalbes.kra_backlog);
+       if (rc != 0) {
+               CERROR("Can't set listen backlog %d: %d\n", backlog, rc);
+               goto out_2;
+       }
+
+       LASSERT (kranal_data.kra_listener_sock == NULL);
+       kranal_data.kra_listener_sock = sock;
+
+       /* unblock waiting parent */
+       LASSERT (kranal_data.kra_listener_shutdown == 0);
+       up(&kranal_data.kra_listener_signal);
+
+       /* Wake me any time something happens on my socket */
+       add_wait_queue(sock->sk->sk_sleep, &wait);
+
+       while (kranal_data.kra_listener_shutdown == 0) {
+
+               newsock = sock_alloc();
+               if (newsock == NULL) {
+                       CERROR("Can't allocate new socket for accept\n");
+                       kranal_pause(HZ);
+                       continue;
+               }
+
+               set_current_state(TASK_INTERRUPTIBLE);
+
+               rc = sock->ops->accept(sock, newsock, O_NONBLOCK);
+
+               if (rc == -EAGAIN &&
+                   kranal_data.kra_listener_shutdown == 0)
+                       schedule();
+
+               set_current_state(TASK_RUNNING);
+
+               if (rc != 0) {
+                       sock_release(newsock);
+                       if (rc != -EAGAIN) {
+                               CERROR("Accept failed: %d\n", rc);
+                               kranal_pause(HZ);
+                       }
+                       continue;
+               } 
+
+                kranal_conn_handshake(newsock, NULL);
+                sock_release(newsock);
+       }
+
+       rc = 0;
+       remove_wait_queue(sock->sk->sk_sleep, &wait);
+ out_2:
+       sock_release(sock);
+       kranal_data.kra_listener_sock = NULL;
+ out_1:
+       PORTAL_FREE(connreqs, 2 * sizeof(*connreqs));
+ out_0:
+       /* set completion status and unblock thread waiting for me 
+        * (parent on startup failure, executioner on normal shutdown) */
+       kranal_data.kra_listener_shutdown = rc;
+       up(&kranal_data.kra_listener_signal);
+
+       return 0;
+}
+
+int
+kranal_start_listener ()
+{
+       long           pid;
+       int            rc;
+
+        CDEBUG(D_WARNING, "Starting listener\n");
+
+       /* Called holding kra_nid_mutex: listener stopped */
+       LASSERT (kranal_data.kra_listener_sock == NULL);
+
+       kranal_data.kra_listener_shutdown == 0;
+       pid = kernel_thread(kranal_listener, sock, 0);
+       if (pid < 0) {
+               CERROR("Can't spawn listener: %ld\n", pid);
+               return (int)pid;
+       }
+
+       /* Block until listener has started up. */
+       down(&kranal_data.kra_listener_signal);
+
+       rc = kranal_data.kra_listener_shutdown;
+       LASSERT ((rc != 0) == (kranal_data.kra_listener_sock == NULL));
+
+        CDEBUG(D_WARNING, "Listener %ld started OK\n", pid);
+       return rc;
+}
+
+void
+kranal_stop_listener()
+{
+        CDEBUG(D_WARNING, "Stopping listener\n");
+
+       /* Called holding kra_nid_mutex: listener running */
+       LASSERT (kranal_data.kra_listener_sock != NULL);
+
+       kranal_data.kra_listener_shutdown = 1;
+       wake_up_all(kranal_data->kra_listener_sock->sk->sk_sleep);
+
+       /* Block until listener has torn down. */
+       down(&kranal_data.kra_listener_signal);
+
+       LASSERT (kranal_data.kra_listener_sock == NULL);
+        CDEBUG(D_WARNING, "Listener stopped\n");
+}
+
+int 
+kranal_listener_procint(ctl_table *table, int write, struct file *filp,
+                       void *buffer, size_t *lenp)
+{
+       int   *tunable = (int *)table->data;
+       int    old_val;
+       int    rc;
+
+       down(&kranal_data.kra_nid_mutex);
+
+       LASSERT (tunable == &kranal_data.kra_port ||
+                tunable == &kranal_data.kra_backlog);
+       old_val = *tunable;
+
+       rc = proc_dointvec(table, write, filp, buffer, lenp);
+
+       if (write &&
+           (*tunable != old_val ||
+            kranal_data.kra_listener_sock == NULL)) {
+
+               if (kranal_data.kra_listener_sock != NULL)
+                       kranal_stop_listener();
+
+               rc = kranal_start_listener();
+
+               if (rc != 0) {
+                       *tunable = old_val;
+                       kranal_start_listener();
+               }
+       }
+
+       up(&kranal_data.kra_nid_mutex);
+       return rc;
+}
+
+int
+kranal_set_mynid(ptl_nid_t nid)
+{
+        lib_ni_t      *ni = &kranal_lib.libnal_ni;
+        int            rc;
+
+        CDEBUG(D_NET, "setting mynid to "LPX64" (old nid="LPX64")\n",
+               nid, ni->ni_pid.nid);
+
+        down(&kranal_data.kra_nid_mutex);
+
+        if (nid == ni->ni_pid.nid) {
+                /* no change of NID */
+                up(&kranal_data.kra_nid_mutex);
+                return 0;
+        }
+
+       if (kranal_data.kra_listener_sock != NULL)
+               kranal_stop_listener();
+
+        ni->ni_pid.nid = nid;
+
+        /* Delete all existing peers and their connections after new
+         * NID/incarnation set to ensure no old connections in our brave
+         * new world. */
+        kranal_del_peer(PTL_NID_ANY, 0);
+
+        if (nid != PTL_NID_ANY)
+                rc = kranal_start_listener();
+
+        up(&kranal_data.kra_nid_mutex);
+        return rc;
+}
+
+kra_peer_t *
+kranal_create_peer (ptl_nid_t nid)
+{
+        kra_peer_t *peer;
+
+        LASSERT (nid != PTL_NID_ANY);
+
+        PORTAL_ALLOC(peer, sizeof(*peer));
+        if (peer == NULL)
+                return NULL;
+
+        memset(peer, 0, sizeof(*peer));         /* zero flags etc */
+
+        peer->rap_nid = nid;
+        atomic_set(&peer->rap_refcount, 1);     /* 1 ref for caller */
+
+        INIT_LIST_HEAD(&peer->rap_list);        /* not in the peer table yet */
+        INIT_LIST_HEAD(&peer->rap_conns);
+        INIT_LIST_HEAD(&peer->rap_tx_queue);
+
+        peer->rap_reconnect_time = CURRENT_TIME;
+        peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL;
+
+        atomic_inc(&kranal_data.kra_npeers);
+        return peer;
+}
+
+void
+__kranal_peer_decref (kra_peer_t *peer)
+{
+        CDEBUG(D_NET, "peer "LPX64" %p deleted\n", peer->rap_nid, peer);
+
+        LASSERT (atomic_read(&peer->rap_refcount) == 0);
+        LASSERT (peer->rap_persistence == 0);
+        LASSERT (!kranal_peer_active(peer));
+        LASSERT (peer->rap_connecting == 0);
+        LASSERT (list_empty(&peer->rap_conns));
+        LASSERT (list_empty(&peer->rap_tx_queue));
+
+        PORTAL_FREE(peer, sizeof(*peer));
+
+        /* NB a peer's connections keep a reference on their peer until
+         * they are destroyed, so we can be assured that _all_ state to do
+         * with this peer has been cleaned up when its refcount drops to
+         * zero. */
+        atomic_dec(&kranal_data.kra_npeers);
+}
+
+kra_peer_t *
+kranal_find_peer_locked (ptl_nid_t nid)
+{
+        struct list_head *peer_list = kranal_nid2peerlist(nid);
+        struct list_head *tmp;
+        kra_peer_t       *peer;
+
+        list_for_each (tmp, peer_list) {
+
+                peer = list_entry(tmp, kra_peer_t, rap_list);
+
+                LASSERT (peer->rap_persistence > 0 ||     /* persistent peer */
+                         !list_empty(&peer->rap_conns));  /* active conn */
+
+                if (peer->rap_nid != nid)
+                        continue;
+
+                CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
+                       peer, nid, atomic_read(&peer->rap_refcount));
+                return peer;
+        }
+        return NULL;
+}
+
+kra_peer_t *
+kranal_find_peer (ptl_nid_t nid)
+{
+        kra_peer_t     *peer;
+
+        read_lock(&kranal_data.kra_global_lock);
+        peer = kranal_find_peer_locked(nid);
+        if (peer != NULL)                       /* +1 ref for caller? */
+                kranal_peer_addref(peer);
+        read_unlock(&kranal_data.kra_global_lock);
+
+        return peer;
+}
+
+void
+kranal_unlink_peer_locked (kra_peer_t *peer)
+{
+        LASSERT (peer->rap_persistence == 0);
+        LASSERT (list_empty(&peer->rap_conns));
+
+        LASSERT (kranal_peer_active(peer));
+        list_del_init(&peer->rap_list);
+
+        /* lose peerlist's ref */
+        kranal_peer_decref(peer);
+}
+
+int
+kranal_get_peer_info (int index, ptl_nid_t *nidp, int *portp, int *persistencep)
+{
+        kra_peer_t        *peer;
+        struct list_head  *ptmp;
+        int                i;
+
+        read_lock(&kranal_data.kra_global_lock);
+
+        for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
+
+                list_for_each(ptmp, &kranal_data.kra_peers[i]) {
+
+                        peer = list_entry(ptmp, kra_peer_t, rap_list);
+                        LASSERT (peer->rap_persistence > 0 ||
+                                 !list_empty(&peer->rap_conns));
+
+                        if (index-- > 0)
+                                continue;
+
+                        *nidp = peer->rap_nid;
+                        *portp = peer->rap_port;
+                        *persistencep = peer->rap_persistence;
+
+                        read_unlock(&kranal_data.kra_global_lock);
+                        return 0;
+                }
+        }
+
+        read_unlock(&kranal_data.kra_global_lock);
+        return -ENOENT;
+}
+
+int
+kranal_add_persistent_peer (ptl_nid_t nid, __u32 ip, int port)
+{
+        unsigned long      flags;
+        kra_peer_t        *peer;
+        kra_peer_t        *peer2;
+
+        if (nid == PTL_NID_ANY)
+                return -EINVAL;
+
+        peer = kranal_create_peer(nid);
+        if (peer == NULL)
+                return -ENOMEM;
+
+        write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+        peer2 = kranal_find_peer_locked(nid);
+        if (peer2 != NULL) {
+                kranal_put_peer(peer);
+                peer = peer2;
+        } else {
+                /* peer table takes existing ref on peer */
+                list_add_tail(&peer->rap_list,
+                              kranal_nid2peerlist(nid));
+        }
+
+        peer->rap_ip = ip;
+        peer->rap_port = port;
+        peer->rap_persistence++;
+
+        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+        return 0;
+}
+
+void
+kranal_del_peer_locked (kra_peer_t *peer, int single_share)
+{
+        struct list_head *ctmp;
+        struct list_head *cnxt;
+        kra_conn_t       *conn;
+
+        if (!single_share)
+                peer->rap_persistence = 0;
+        else if (peer->rap_persistence > 0)
+                peer->rap_persistence--;
+
+        if (peer->rap_persistence != 0)
+                return;
+
+        if (list_empty(&peer->rap_conns)) {
+                kranal_unlink_peer_locked(peer);
+        } else {
+                list_for_each_safe(ctmp, cnxt, &peer->rap_conns) {
+                        conn = list_entry(ctmp, kra_conn_t, rac_list);
+
+                        kranal_close_conn_locked(conn, 0);
+                }
+                /* peer unlinks itself when last conn is closed */
+        }
+}
+
+int
+kranal_del_peer (ptl_nid_t nid, int single_share)
+{
+        unsigned long      flags;
+        struct list_head  *ptmp;
+        struct list_head  *pnxt;
+        kra_peer_t        *peer;
+        int                lo;
+        int                hi;
+        int                i;
+        int                rc = -ENOENT;
+
+        write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+        if (nid != PTL_NID_ANY)
+                lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
+        else {
+                lo = 0;
+                hi = kranal_data.kra_peer_hash_size - 1;
+        }
+
+        for (i = lo; i <= hi; i++) {
+                list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
+                        peer = list_entry(ptmp, kra_peer_t, rap_list);
+                        LASSERT (peer->rap_persistence > 0 ||
+                                 !list_empty(&peer->rap_conns));
+
+                        if (!(nid == PTL_NID_ANY || peer->rap_nid == nid))
+                                continue;
+
+                        kranal_del_peer_locked(peer, single_share);
+                        rc = 0;         /* matched something */
+
+                        if (single_share)
+                                goto out;
+                }
+        }
+ out:
+        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+
+        return rc;
+}
+
+kra_conn_t *
+kranal_get_conn_by_idx (int index)
+{
+        kra_peer_t        *peer;
+        struct list_head  *ptmp;
+        kra_conn_t        *conn;
+        struct list_head  *ctmp;
+        int                i;
+
+        read_lock (&kranal_data.kra_global_lock);
+
+        for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
+                list_for_each (ptmp, &kranal_data.kra_peers[i]) {
+
+                        peer = list_entry(ptmp, kra_peer_t, rap_list);
+                        LASSERT (peer->rap_persistence > 0 ||
+                                 !list_empty(&peer->rap_conns));
+
+                        list_for_each (ctmp, &peer->rap_conns) {
+                                if (index-- > 0)
+                                        continue;
+
+                                conn = list_entry(ctmp, kra_conn_t, rac_list);
+                                CDEBUG(D_NET, "++conn[%p] -> "LPX64" (%d)\n",
+                                       conn, conn->rac_peer->rap_nid,
+                                       atomic_read(&conn->rac_refcount));
+                                atomic_inc(&conn->rac_refcount);
+                                read_unlock(&kranal_data.kra_global_lock);
+                                return conn;
+                        }
+                }
+        }
+
+        read_unlock(&kranal_data.kra_global_lock);
+        return NULL;
+}
+
+int
+kranal_close_peer_conns_locked (kra_peer_t *peer, int why)
+{
+        kra_conn_t         *conn;
+        struct list_head   *ctmp;
+        struct list_head   *cnxt;
+        int                 count = 0;
+
+        list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
+                conn = list_entry(ctmp, kra_conn_t, rac_list);
+
+                count++;
+                kranal_close_conn_locked(conn, why);
+        }
+
+        return count;
+}
+
+int
+kranal_close_stale_conns_locked (kra_peer_t *peer, __u64 incarnation)
+{
+        kra_conn_t         *conn;
+        struct list_head   *ctmp;
+        struct list_head   *cnxt;
+        int                 count = 0;
+
+        list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
+                conn = list_entry(ctmp, kra_conn_t, rac_list);
+
+                if (conn->rac_incarnation == incarnation)
+                        continue;
+
+                CDEBUG(D_NET, "Closing stale conn nid:"LPX64" incarnation:"LPX64"("LPX64")\n",
+                       peer->rap_nid, conn->rac_incarnation, incarnation);
+                LASSERT (conn->rac_incarnation < incarnation);
+
+                count++;
+                kranal_close_conn_locked(conn, -ESTALE);
+        }
+
+        return count;
+}
+
+int
+kranal_close_matching_conns (ptl_nid_t nid)
+{
+        unsigned long       flags;
+        kra_peer_t         *peer;
+        struct list_head   *ptmp;
+        struct list_head   *pnxt;
+        int                 lo;
+        int                 hi;
+        int                 i;
+        int                 count = 0;
+
+        write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+        if (nid != PTL_NID_ANY)
+                lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
+        else {
+                lo = 0;
+                hi = kranal_data.kra_peer_hash_size - 1;
+        }
+
+        for (i = lo; i <= hi; i++) {
+                list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
+
+                        peer = list_entry(ptmp, kra_peer_t, rap_list);
+                        LASSERT (peer->rap_persistence > 0 ||
+                                 !list_empty(&peer->rap_conns));
+
+                        if (!(nid == PTL_NID_ANY || nid == peer->rap_nid))
+                                continue;
+
+                        count += kranal_close_peer_conns_locked(peer, 0);
+                }
+        }
+
+        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+
+        /* wildcards always succeed */
+        if (nid == PTL_NID_ANY)
+                return 0;
+
+        return (count == 0) ? -ENOENT : 0;
+}
+
+int
+kranal_cmd(struct portals_cfg *pcfg, void * private)
+{
+        int rc = -EINVAL;
+
+        LASSERT (pcfg != NULL);
+
+        switch(pcfg->pcfg_command) {
+        case NAL_CMD_GET_PEER: {
+                ptl_nid_t   nid = 0;
+                __u32       ip = 0;
+                int         port = 0;
+                int         share_count = 0;
+
+                rc = kranal_get_peer_info(pcfg->pcfg_count,
+                                          &nid, &ip, &port, &share_count);
+                pcfg->pcfg_nid   = nid;
+                pcfg->pcfg_size  = 0;
+                pcfg->pcfg_id    = ip;
+                pcfg->pcfg_misc  = port;
+                pcfg->pcfg_count = 0;
+                pcfg->pcfg_wait  = share_count;
+                break;
+        }
+        case NAL_CMD_ADD_PEER: {
+                rc = kranal_add_persistent_peer(pcfg->pcfg_nid,
+                                                pcfg->pcfg_id, /* IP */
+                                                pcfg->pcfg_misc); /* port */
+                break;
+        }
+        case NAL_CMD_DEL_PEER: {
+                rc = kranal_del_peer(pcfg->pcfg_nid, 
+                                     /* flags == single_share */
+                                     pcfg->pcfg_flags != 0);
+                break;
+        }
+        case NAL_CMD_GET_CONN: {
+                kra_conn_t *conn = kranal_get_conn_by_idx(pcfg->pcfg_count);
+
+                if (conn == NULL)
+                        rc = -ENOENT;
+                else {
+                        rc = 0;
+                        pcfg->pcfg_nid   = conn->rac_peer->rap_nid;
+                        pcfg->pcfg_id    = 0;
+                        pcfg->pcfg_misc  = 0;
+                        pcfg->pcfg_flags = 0;
+                        kranal_put_conn(conn);
+                }
+                break;
+        }
+        case NAL_CMD_CLOSE_CONNECTION: {
+                rc = kranal_close_matching_conns(pcfg->pcfg_nid);
+                break;
+        }
+        case NAL_CMD_REGISTER_MYNID: {
+                if (pcfg->pcfg_nid == PTL_NID_ANY)
+                        rc = -EINVAL;
+                else
+                        rc = kranal_set_mynid(pcfg->pcfg_nid);
+                break;
+        }
+        }
+
+        return rc;
+}
+
+void
+kranal_free_txdescs(struct list_head *freelist)
+{
+        kra_tx_t    *tx;
+
+        while (!list_empty(freelist)) {
+                tx = list_entry(freelist->next, kra_tx_t, tx_list);
+
+                list_del(&tx->tx_list);
+                PORTAL_FREE(tx->tx_phys, PTL_MD_MAX_IOV * sizeof(*tx->tx_phys));
+                PORTAL_FREE(tx, sizeof(*tx));
+        }
+}
+
+int
+kranal_alloc_txdescs(struct list_head *freelist, int n)
+{
+        int            isnblk = (freelist == &kranal_data.kra_idle_nblk_txs);
+        int            i;
+        kra_tx_t      *tx;
+
+        LASSERT (freelist == &kranal_data.kra_idle_txs ||
+                 freelist == &kranal_data.kra_idle_nblk_txs);
+        LASSERT (list_empty(freelist));
+
+        for (i = 0; i < n; i++) {
+
+                PORTAL_ALLOC(tx, sizeof(*tx));
+                if (tx == NULL) {
+                        CERROR("Can't allocate %stx[%d]\n", 
+                               isnblk ? "nblk ", i);
+                        kranal_free_txdescs();
+                        return -ENOMEM;
+                }
+
+                PORTAL_ALLOC(tx->tx_phys,
+                             PLT_MD_MAX_IOV * sizeof(*tx->tx_phys));
+                if (tx->tx_phys == NULL) {
+                        CERROR("Can't allocate %stx[%d]->tx_phys\n", 
+                               isnblk ? "nblk ", i);
+
+                        PORTAL_FREE(tx, sizeof(*tx));
+                        kranal_free_txdescs(freelist);
+                        return -ENOMEM;
+                }
+
+                tx->tx_isnblk = isnblk
+                tx->tx_buftype = RANAL_BUF_NONE;
+
+                list_add(&tx->tx_list, freelist);
+        }
+
+        return 0;
+}
+
+int
+kranal_device_init(int id, kra_device_t *dev)
+{
+        const int         total_ntx = RANAL_NTX + RANAL_NTX_NBLK;
+        RAP_RETURN        rrc;
+
+        dev->rad_id = id;
+        rrc = RapkGetDeviceByIndex(id, NULL, kranal_device_callback,
+                                   &dev->rad_handle);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Can't get Rapidarray Device %d: %d\n", idx, rrc);
+                goto failed_0;
+        }
+
+        rrc = RapkReserveRdma(dev->rad_handle, total_ntx);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Can't reserve %d RDMA descriptors"
+                       " for device[%d]: %d\n", total_ntx, i, rrc);
+                goto failed_1;
+        }
+
+        rrc = RapkCreatePtag(dev->rad_handle,
+                             &dev->rad_ptag);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Can't create ptag"
+                       " for device[%d]: %d\n", i, rrc);
+                goto failed_1;
+        }
+
+        rrc = RapkCreateCQ(dev->rad_handle, total_ntx, dev->rad_ptag,
+                           &dev->rad_rdma_cq);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Can't create rdma cq size %d"
+                       " for device[%d]: %d\n", total_ntx, i, rrc);
+                goto failed_2;
+        }
+
+        rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE,
+                           dev->rad_ptag, &dev->rad_fma_cq);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Can't create fma cq size %d"
+                       " for device[%d]: %d\n", RANAL_RX_CQ_SIZE, i, rrc);
+                goto failed_3;
+        }
+
+        return 0;
+
+ failed_3:
+        RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
+ failed_2:
+        RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
+ failed_1:
+        RapkReleaseDevice(dev->rad_handle);
+ failed_0:
+        return -ENODEV;
+}
+
+void
+kranal_device_fini(kra_device_t *dev)
+{
+        RapkDestroyCQ(dev->rad_handle, dev->rad_rx_cq, dev->rad_ptag);
+        RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
+        RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
+        RapkReleaseDevice(dev->rad_handle);
+}
+
+void
+kranal_api_shutdown (nal_t *nal)
+{
+        int           i;
+        int           rc;
+        unsigned long flags;
+        
+        if (nal->nal_refct != 0) {
+                /* This module got the first ref */
+                PORTAL_MODULE_UNUSE;
+                return;
+        }
+
+        CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
+               atomic_read(&portal_kmemory));
+
+        LASSERT (nal == &kranal_api);
+
+        switch (kranal_data.kra_init) {
+        default:
+                CERROR("Unexpected state %d\n", kranal_data.kra_init);
+                LBUG();
+
+        case RANAL_INIT_ALL:
+                /* stop calls to nal_cmd */
+                libcfs_nal_cmd_unregister(OPENRANAL);
+                /* No new persistent peers */
+
+                /* resetting my NID to unadvertises me, removes my
+                 * listener and nukes all current peers */
+                kranal_set_mynid(PTL_NID_ANY);
+                /* no new peers or conns */
+
+                /* Wait for all peer/conn state to clean up */
+                i = 2;
+                while (atomic_read(&kranal_data.kra_nconns) != 0 ||
+                       atomic_read(&kranal-data.kra_npeers) != 0) {
+                        i++;
+                        CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
+                               "waiting for %d peers and %d conns to close down\n",
+                               atomic_read(&kranal_data.kra_npeers),
+                               atomic_read(&kranal_data.kra_nconns));
+                       kranal_pause(HZ);
+                }
+                /* fall through */
+
+        case RANAL_INIT_LIB:
+                lib_fini(&kranal_lib);
+                /* fall through */
+
+        case RANAL_INIT_DATA:
+                break;
+        }
+
+        /* flag threads to terminate; wake and wait for them to die */
+        kranal_data.kra_shutdown = 1;
+
+        for (i = 0; i < kranal_data.kra_ndevs; i++) {
+                kra_device_t *dev = &kranal_data.kra_devices[i];
+
+                LASSERT (list_empty(&dev->rad_connq));
+
+                spin_lock_irqsave(&dev->rad_lock, flags);
+                wake_up(&dev->rad_waitq);
+                spin_unlock_irqrestore(&dev->rad_lock, flags);
+        }
+
+        spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
+        wake_up_all(&kranal_data.kra_reaper_waitq);
+        spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
+
+        LASSERT (list_empty(&kranal_data.kra_connd_peers));
+        spin_lock_irqsave(&kranal-data.kra_connd_lock, flags); 
+        wake_up_all(&kranal_data.kra_connd_waitq);
+        spin_unlock_irqrestore(&kranal-data.kra_connd_lock, flags); 
+
+        i = 2;
+        while (atomic_read(&kranal_data.kra_nthreads) != 0) {
+                i++;
+                CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
+                       "Waiting for %d threads to terminate\n",
+                       atomic_read(&kranal_data.kra_nthreads));
+                kranal_pause(HZ);
+        }
+
+        LASSERT (atomic_read(&kranal_data.kra_npeers) == 0);
+        if (kranal_data.kra_peers != NULL) {
+                for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
+                        LASSERT (list_empty(&kranal_data.kra_peers[i]));
+
+                PORTAL_FREE(kranal_data.kra_peers,
+                            sizeof (struct list_head) * 
+                            kranal_data.kra_peer_hash_size);
+        }
+
+        LASSERT (atomic_read(&kranal_data.kra_nconns) == 0);
+        if (kranal_data.kra_conns != NULL) {
+                for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
+                        LASSERT (list_empty(&kranal_data.kra_conns[i]));
+
+                PORTAL_FREE(kranal_data.kra_conns,
+                            sizeof (struct list_head) * 
+                            kranal_data.kra_conn_hash_size);
+        }
+
+        for (i = 0; i < kranal_data.kra_ndevs; i++)
+                kranal_device_fini(&kranal_data.kra_devices[i]);
+
+        kranal_free_txdescs(&kranal_data.kra_idle_txs);
+        kranal_free_txdescs(&kranal_data.kra_idle_nblk_txs);
+
+        CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
+               atomic_read(&portal_kmemory));
+        printk(KERN_INFO "Lustre: RapidArray NAL unloaded (final mem %d)\n",
+               atomic_read(&portal_kmemory));
+
+        kranal_data.kra_init = RANAL_INIT_NOTHING;
+}
+
+int
+kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
+                    ptl_ni_limits_t *requested_limits,
+                    ptl_ni_limits_t *actual_limits)
+{
+        static int        device_ids[] = {RAPK_MAIN_DEVICE_ID,
+                                          RAPK_EXPANSION_DEVICE_ID};
+        struct timeval    tv;
+        ptl_process_id_t  process_id;
+        int               pkmem = atomic_read(&portal_kmemory);
+        int               rc;
+        int               i;
+        kra_device_t     *dev;
+
+        LASSERT (nal == &kranal_api);
+
+        if (nal->nal_refct != 0) {
+                if (actual_limits != NULL)
+                        *actual_limits = kranal_lib.libnal_ni.ni_actual_limits;
+                /* This module got the first ref */
+                PORTAL_MODULE_USE;
+                return PTL_OK;
+        }
+
+        LASSERT (kranal_data.kra_init == RANAL_INIT_NOTHING);
+
+        memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */
+
+        /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and
+         * a unique (for all time) incarnation so we can uniquely identify
+         * the sender.  The incarnation is an incrementing counter
+         * initialised with seconds + microseconds at startup time.  So we
+         * rely on NOT creating connections more frequently on average than
+         * 1MHz to ensure we don't use old incarnations when we reboot. */
+        do_gettimeofday(&tv);
+        kranal_data.kra_next_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
+
+        init_MUTEX(&kranal_data.kra_nid_mutex);
+        init_MUTEX_LOCKED(&kranal_data.kra_listener_signal);
+
+        rwlock_init(&kranal_data.kra_global_lock);
+
+        for (i = 0; i < RANAL_MAXDEVS; i++ ) {
+                kra_device_t  *dev = &kranal_data.kra_devices[i];
+
+                dev->rad_idx = i;
+                INIT_LIST_HEAD(&dev->rad_connq);
+                init_waitqueue_head(&dev->rad_waitq);
+                spin_lock_init(&dev->rad_lock);
+        }
+
+        init_waitqueue_head(&kranal_data.kra_reaper_waitq);
+        spin_lock_init(&kranal_data.kra_reaper_lock);
+
+        INIT_LIST_HEAD(&kranal_data.kra_connd_peers);
+        init_waitqueue_head(&kranal_data.kra_connd_waitq);
+        spin_lock_init(&kranal_data.kra_connd_lock);
+
+        INIT_LIST_HEAD(&kranal_data.kra_idle_txs);
+        INIT_LIST_HEAD(&kranal_data.kra_idle_nblk_txs);
+        init_waitqueue_head(&kranal_data.kra_idle_tx_waitq);
+        spin_lock_init(&kranal_data.kra_tx_lock);
+
+        /* OK to call kranal_api_shutdown() to cleanup now */
+        kranal_data.kra_init = RANAL_INIT_DATA;
+        
+        kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE;
+        PORTAL_ALLOC(kranal_data.kra_peers,
+                     sizeof(struct list_head) * kranal_data.kra_peer_hash_size);
+        if (kranal_data.kra_peers == NULL)
+                goto failed;
+
+        for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
+                INIT_LIST_HEAD(&kranal_data.kra_peers[i]);
+
+        kranal_data.kra_conn_hash_size = RANAL_PEER_HASH_SIZE;
+        PORTAL_ALLOC(kranal_data.kra_conns,
+                     sizeof(struct list_head) * kranal_data.kra_conn_hash_size);
+        if (kranal_data.kra_conns == NULL)
+                goto failed;
+
+        for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
+                INIT_LIST_HEAD(&kranal_data.kra_conns[i]);
+
+        rc = kranal_alloc_txdescs(&kranal_data.kra_idle_txs, RANAL_NTX);
+        if (rc != 0)
+                goto failed;
+
+        rc = kranal_alloc_txdescs(&kranal_data.kra_idle_nblk_txs,RANAL_NTX_NBLK);
+        if (rc != 0)
+                goto failed;
+
+        process_id.pid = requested_pid;
+        process_id.nid = PTL_NID_ANY;           /* don't know my NID yet */
+
+        rc = lib_init(&kranal_lib, nal, process_id,
+                      requested_limits, actual_limits);
+        if (rc != PTL_OK) {
+                CERROR("lib_init failed: error %d\n", rc);
+                goto failed;
+        }
+
+        /* lib interface initialised */
+        kranal_data.kra_init = RANAL_INIT_LIB;
+        /*****************************************************/
+
+        rc = kranal_thread_start(kranal_reaper, NULL);
+        if (rc != 0) {
+                CERROR("Can't spawn ranal reaper: %d\n", rc);
+                goto failed;
+        }
+
+        for (i = 0; i < RANAL_N_CONND; i++) {
+                rc = kranal_thread_start(kranal_connd, (void *)i);
+                if (rc != 0) {
+                        CERROR("Can't spawn ranal connd[%d]: %d\n",
+                               i, rc);
+                        goto failed;
+                }
+        }
+
+        LASSERT(kranal_data.kra_ndevs == 0);
+        for (i = 0; i < sizeof(device_ids)/sizeof(device_ids[0]); i++) {
+                dev = &kranal_data.kra_devices[kranal_data.kra_ndevs];
+
+                rc = kranal_device_init(device_ids[i], dev);
+                if (rc == 0)
+                        kranal_data.kra_ndevs++;
+
+                rc = kranal_thread_start(kranal_scheduler, dev);
+                if (rc != 0) {
+                        CERROR("Can't spawn ranal scheduler[%d]: %d\n",
+                               i, rc);
+                        goto failed;
+                }
+        }
+
+        if (kranal_data.kra_ndevs == 0)
+                goto failed;
+
+        rc = libcfs_nal_cmd_register(OPENRANAL, &kranal_cmd, NULL);
+        if (rc != 0) {
+                CERROR("Can't initialise command interface (rc = %d)\n", rc);
+                goto failed;
+        }
+
+        /* flag everything initialised */
+        kranal_data.kra_init = RANAL_INIT_ALL;
+        /*****************************************************/
+
+        CDEBUG(D_MALLOC, "initial kmem %d\n", atomic_read(&portal_kmemory));
+        printk(KERN_INFO "Lustre: RapidArray NAL loaded "
+               "(initial mem %d)\n", pkmem);
+
+        return PTL_OK;
+
+ failed:
+        kranal_api_shutdown(&kranal_api);    
+        return PTL_FAIL;
+}
+
+void __exit
+kranal_module_fini (void)
+{
+#ifdef CONFIG_SYSCTL
+        if (kranal_tunables.kra_sysctl != NULL)
+                unregister_sysctl_table(kranal_tunables.kra_sysctl);
+#endif
+        PtlNIFini(kranal_ni);
+
+        ptl_unregister_nal(OPENRANAL);
+}
+
+int __init
+kranal_module_init (void)
+{
+        int    rc;
+
+        /* the following must be sizeof(int) for
+         * proc_dointvec/kranal_listener_procint() */
+        LASSERT (sizeof(kranal_tunables.kra_timeout) == sizeof(int));
+        LASSERT (sizeof(kranal_tunables.kra_listener_timeout) == sizeof(int));
+        LASSERT (sizeof(kranal_tunables.kra_backlog) == sizeof(int));
+        LASSERT (sizeof(kranal_tunables.kra_port) == sizeof(int));
+        LASSERT (sizeof(kranal_tunables.kra_max_immediate) == sizeof(int));
+
+        kranal_api.nal_ni_init = kranal_api_startup;
+        kranal_api.nal_ni_fini = kranal_api_shutdown;
+
+        /* Initialise dynamic tunables to defaults once only */
+        kranal_tunables.kra_timeout = RANAL_TIMEOUT;
+
+        rc = ptl_register_nal(OPENRANAL, &kranal_api);
+        if (rc != PTL_OK) {
+                CERROR("Can't register RANAL: %d\n", rc);
+                return -ENOMEM;               /* or something... */
+        }
+
+        /* Pure gateways want the NAL started up at module load time... */
+        rc = PtlNIInit(OPENRANAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kranal_ni);
+        if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
+                ptl_unregister_nal(OPENRANAL);
+                return -ENODEV;
+        }
+
+#ifdef CONFIG_SYSCTL
+        /* Press on regardless even if registering sysctl doesn't work */
+        kranal_tunables.kra_sysctl = 
+                register_sysctl_table(kranal_top_ctl_table, 0);
+#endif
+        return 0;
+}
+
+MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
+MODULE_DESCRIPTION("Kernel RapidArray NAL v0.01");
+MODULE_LICENSE("GPL");
+
+module_init(kranal_module_init);
+module_exit(kranal_module_fini);
diff --git a/lnet/klnds/ralnd/ralnd.h b/lnet/klnds/ralnd/ralnd.h
new file mode 100644 (file)
index 0000000..c134179
--- /dev/null
@@ -0,0 +1,438 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2004 Cluster File Systems, Inc.
+ *   Author: Eric Barton <eric@bartonsoftware.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/kernel.h>
+#include <linux/mm.h>
+#include <linux/string.h>
+#include <linux/stat.h>
+#include <linux/errno.h>
+#include <linux/smp_lock.h>
+#include <linux/unistd.h>
+#include <linux/uio.h>
+
+#include <asm/system.h>
+#include <asm/uaccess.h>
+#include <asm/io.h>
+
+#include <linux/init.h>
+#include <linux/fs.h>
+#include <linux/file.h>
+#include <linux/stat.h>
+#include <linux/list.h>
+#include <linux/kmod.h>
+#include <linux/sysctl.h>
+
+#define DEBUG_SUBSYSTEM S_NAL
+
+#include <linux/kp30.h>
+#include <portals/p30.h>
+#include <portals/lib-p30.h>
+#include <portals/nal.h>
+
+#include <rapl.h>
+
+#if CONFIG_SMP
+# define RANAL_N_SCHED      num_online_cpus()   /* # schedulers */
+#else
+# define RANAL_N_SCHED      1                   /* # schedulers */
+#endif
+
+#define RANAL_MAXDEVS       2                   /* max # devices RapidArray supports */
+
+#define RANAL_N_CONND       4                   /* # connection daemons */
+
+#define RANAL_MIN_RECONNECT_INTERVAL 1          /* first failed connection retry (seconds)... */
+#define RANAL_MAX_RECONNECT_INTERVAL 60         /* ...exponentially increasing to this */
+
+#define RANAL_FMA_PREFIX_LEN      232           /* size of FMA "Prefix" */
+#define RANAL_FMA_MAX_DATA_LEN    ((7<<10)-256) /* Max FMA MSG is 7K including prefix */
+
+#define RANAL_PEER_HASH_SIZE  101               /* # peer lists */
+#define RANAL_CONN_HASH_SIZE  101               /* # conn lists */
+
+#define RANAL_NTX             64                /* # tx descs */
+#define RANAL_NTX_NBLK        256               /* # reserved tx descs */
+
+#define RANAL_RX_CQ_SIZE      1024              /* # entries in receive CQ 
+                                                 * (overflow is a performance hit) */
+
+#define RANAL_RESCHED         100               /* # scheduler loops before reschedule */
+
+#define RANAL_MIN_TIMEOUT     5                 /* minimum timeout interval (seconds) */
+#define RANAL_TIMEOUT2KEEPALIVE(t) (((t)+1)/2)  /* timeout -> keepalive interval */
+
+/* default vals for runtime tunables */
+#define RANAL_TIMEOUT           30              /* comms timeout (seconds) */
+#define RANAL_LISTENER_TIMEOUT   5              /* listener timeout (seconds) */
+#define RANAL_MAX_IMMEDIATE    (2<<10)          /* biggest immediate payload */
+
+typedef struct 
+{
+        int               kra_timeout;          /* comms timeout (seconds) */
+        int               kra_listener_timeout; /* max time the listener can block */
+       int               kra_backlog;          /* listener's backlog */
+       int               kra_port;             /* listener's TCP/IP port */
+        int               kra_max_immediate;    /* biggest immediate payload */
+        struct ctl_table_header *kra_sysctl;    /* sysctl interface */
+} kra_tunables_t;
+
+typedef struct
+{
+        RAP_PVOID               rad_handle;     /* device handle */
+        RAP_PROTECTION_HANDLE   rad_ptag;       /* protection tag */
+        RAP_CQ_HANDLE           rad_fma_cq;     /* FMA (small message) completion queue */
+        RAP_CQ_HANDLE           rad_rdma_cq;    /* rdma completion queue */
+        int                     rad_id;         /* device id */
+        int                     rad_idx;        /* index in kra_devices */
+        int                     rad_ready;      /* set by device callback */
+        struct list_head        rad_connq;      /* connections requiring attention */
+        wait_queue_head_t       rad_waitq;      /* scheduler waits here */
+        spinlock_t              rad_lock;       /* serialise */
+} kra_device_t;
+        
+typedef struct 
+{
+        int               kra_init;             /* initialisation state */
+        int               kra_shutdown;         /* shut down? */
+        atomic_t          kra_nthreads;         /* # live threads */
+
+        struct semaphore  kra_nid_mutex;        /* serialise NID/listener ops */
+       struct semaphore  kra_listener_signal;  /* block for listener startup/shutdown */
+       struct socket    *kra_listener_sock;    /* listener's socket */
+       int               kra_listener_shutdown; /* ask listener to close */
+
+        kra_device_t      kra_devices[RANAL_MAXDEVS]; /* device/ptag/cq etc */
+        int               kra_ndevs;            /* # devices */
+
+        rwlock_t          kra_global_lock;      /* stabilize peer/conn ops */
+
+        struct list_head *kra_peers;            /* hash table of all my known peers */
+        int               kra_peer_hash_size;   /* size of kra_peers */
+        atomic_t          kra_npeers;           /* # peers extant */
+
+        struct list_head *kra_conns;            /* conns hashed by cqid */
+        int               kra_conn_hash_size;   /* size of kra_conns */
+        __u64             kra_next_incarnation; /* conn incarnation # generator */
+        int               kra_next_cqid;        /* cqid generator */
+        atomic_t          kra_nconns;           /* # connections extant */
+
+        long              kra_new_min_timeout;  /* minimum timeout on any new conn */
+        wait_queue_head_t kra_reaper_waitq;     /* reaper sleeps here */
+        spinlock_t        kra_reaper_lock;      /* serialise */
+        
+        struct list_head  kra_connd_peers;      /* peers waiting for a connection */
+        wait_queue_head_t kra_connd_waitq;      /* connection daemons sleep here */
+        spinlock_t        kra_connd_lock;       /* serialise */
+
+        struct list_head  kra_idle_txs;         /* idle tx descriptors */
+        struct list_head  kra_idle_nblk_txs;    /* idle reserved tx descriptors */
+        __u64             kra_next_tx_cookie;   /* RDMA completion cookie */
+        wait_queue_head_t kra_idle_tx_waitq;    /* block here for tx descriptor */
+        spinlock_t        kra_tx_lock;          /* serialise */
+} kra_data_t;
+
+#define RANAL_INIT_NOTHING         0
+#define RANAL_INIT_DATA            1
+
+#define RANAL_INIT_ALL             7
+
+/************************************************************************
+ * Wire message structs.  These are sent in sender's byte order
+ * (i.e. receiver checks magic and flips if required).
+ */
+
+typedef struct kra_connreq                     /* connection request/response */
+{                                              /* (sent via socket) */
+        __u32             racr_magic;          /* I'm an ranal connreq */
+        __u16             racr_version;                /* this is my version number */
+        __u16             racr_devid;           /* which device to connect on */
+        __u64             racr_nid;            /* my NID */
+        __u64             racr_incarnation;    /* my incarnation */
+        __u32             racr_timeout;         /* my timeout */
+       RAP_RI_PARAMETERS racr_riparams;        /* my endpoint info */
+} kra_connreq_t;
+
+typedef struct
+{
+        RAP_MEM_KEY       rard_key;
+        RAP_PVOID64       rard_addr;
+        RAP_UINT32        rard_nob;
+} kra_rdma_desc_t;
+
+typedef struct
+{
+        ptl_hdr_t         raim_hdr;             /* portals header */
+        /* Portals payload is in FMA "Message Data" */
+} kra_immediate_msg_t;
+
+typedef struct
+{
+        ptl_hdr_t         raprm_hdr;           /* portals header */
+        __u64             raprm_cookie;                /* opaque completion cookie */
+} kra_putreq_msg_t;
+
+typedef struct
+{
+       __u64             rapam_src_cookie;     /* reflected completion cookie */
+       __u64             rapam_dst_cookie;     /* opaque completion cookie */
+       kra_rdma_desc_t   rapam_desc;           /* sender's sink buffer */
+} kra_putack_msg_t;
+
+typedef struct
+{
+        ptl_hdr_t         ragm_hdr;             /* portals header */
+        __u64             ragm_cookie;          /* opaque completion cookie */
+        kra_rdma_desc_t   ragm_desc;            /* sender's sink buffer */
+} kra_get_msg_t;
+
+typedef struct
+{
+        __u64             racm_cookie;          /* reflected completion cookie */
+} kra_completion_msg_t;
+
+typedef struct                                  /* NB must fit in FMA "Prefix" */
+{
+        __u32             ram_magic;           /* I'm an ranal message */
+        __u16             ram_version;         /* this is my version number */
+        __u16             ram_type;            /* msg type */
+        __u64             ram_srcnid;           /* sender's NID */
+        __u64             ram_incarnation;      /* sender's connection incarnation */
+        union {
+                kra_immediate_msg_t   immediate;
+               kra_putreq_msg_t      putreq;
+               kra_putack_msg_t      putack;
+               kra_get_msg_t         get;
+                kra_completion_msg_t  completion;
+        }                    ram_u;
+        __u32             ram_seq;              /* incrementing sequence number */
+} kra_msg_t;
+
+#define RANAL_MSG_MAGIC       0x0be91b92        /* unique magic */
+#define RANAL_MSG_VERSION              1        /* current protocol version */
+
+#define RANAL_MSG_FENCE             0x80        /* fence RDMA */
+
+#define RANAL_MSG_NONE              0x00        /* illegal message */
+#define RANAL_MSG_NOOP              0x01        /* empty ram_u (keepalive) */
+#define RANAL_MSG_IMMEDIATE         0x02        /* ram_u.immediate */
+#define RANAL_MSG_PUT_REQ           0x03       /* ram_u.putreq (src->sink) */
+#define RANAL_MSG_PUT_NAK           0x04       /* ram_u.completion (no PUT match: sink->src) */
+#define RANAL_MSG_PUT_ACK           0x05       /* ram_u.putack (PUT matched: sink->src) */
+#define RANAL_MSG_PUT_DONE          0x86       /* ram_u.completion (src->sink) */
+#define RANAL_MSG_GET_REQ           0x07               /* ram_u.get (sink->src) */
+#define RANAL_MSG_GET_NAK           0x08        /* ram_u.completion (no GET match: src->sink) */
+#define RANAL_MSG_GET_DONE          0x89       /* ram_u.completion (src->sink) */
+#define RANAL_MSG_CLOSE             0x8a        /* empty ram_u */
+
+/***********************************************************************/
+
+typedef struct kra_tx                           /* message descriptor */
+{
+        struct list_head          tx_list;      /* queue on idle_txs/rac_sendq/rac_waitq */
+        struct kra_conn          *tx_conn;      /* owning conn */
+        lib_msg_t                *tx_libmsg[2]; /* lib msgs to finalize on completion */
+        unsigned long             tx_qtime;     /* when tx started to wait for something */
+        int                       tx_isnblk;    /* I'm reserved for non-blocking sends */
+        int                       tx_nob;       /* # bytes of payload */
+        int                       tx_buftype;   /* payload buffer type */
+        void                     *tx_buffer;    /* source/sink buffer */
+        int                       tx_phys_offset; /* first page offset (if phys) */
+        int                       tx_phys_npages; /* # physical pages */
+        RAP_PHYS_REGION          *tx_phys;      /* page descriptors */
+        RAP_MEM_KEY               tx_map_key;   /* mapping key */
+        RAP_RDMA_DESCRIPTOR       tx_rdma_desc; /* rdma descriptor */
+        __u64                     tx_cookie;    /* identify this tx to peer */
+        kra_msg_t                 tx_msg;       /* FMA message buffer */
+} kra_tx_t;
+
+#define RANAL_BUF_NONE           0              /* buffer type not set */
+#define RANAL_BUF_IMMEDIATE      1              /* immediate data */
+#define RANAL_BUF_PHYS_UNMAPPED  2              /* physical: not mapped yet */
+#define RANAL_BUF_PHYS_MAPPED    3              /* physical: mapped already */
+#define RANAL_BUF_VIRT_UNMAPPED  4              /* virtual: not mapped yet */
+#define RANAL_BUF_VIRT_MAPPED    5              /* virtual: mapped already */
+
+#define RANAL_TX_IDLE            0x00           /* on freelist */
+#define RANAL_TX_SIMPLE          0x10           /* about to send a simple message */
+#define RANAL_TX_PUTI_REQ        0x20           /* PUT initiator about to send PUT_REQ */
+#define RANAL_TX_PUTI_WAIT_ACK   0x21           /* PUT initiator waiting for PUT_ACK */
+#define RANAL_TX_PUTI_RDMA       0x22           /* PUT initiator waiting for RDMA to complete */
+#define RANAL_TX_PUTI_DONE       0x23           /* PUT initiator about to send PUT_DONE */
+#define RANAL_TX_PUTT_NAK        0x30           /* PUT target about to send PUT_NAK */
+#define RANAL_TX_PUTT_ACK        0x30           /* PUT target about to send PUT_ACK */
+#define RANAL_TX_PUTT_WAIT_DONE  0x31           /* PUT target waiting for PUT_DONE */
+#define RANAL_TX_GETI_REQ        0x40           /* GET initiator about to send GET_REQ */
+#define RANAL_TX_GETI_WAIT_DONE  0x41           /* GET initiator waiting for GET_DONE */
+#define RANAL_TX_GETT_NAK        0x50           /* GET target about to send PUT_NAK */
+#define RANAL_TX_GETT_RDMA       0x51           /* GET target waiting for RDMA to complete */
+#define RANAL_TX_GETT_DONE       0x52           /* GET target about to send GET_DONE */
+
+typedef struct kra_conn
+{ 
+        struct kra_peer    *rac_peer;           /* owning peer */
+        struct list_head    rac_list;           /* stash on peer's conn list */
+        struct list_head    rac_hashlist;       /* stash in connection hash table */
+        struct list_head    rac_schedlist;      /* queue for scheduler */
+        struct list_head    rac_fmaq;           /* txs queued for FMA */
+        struct list_head    rac_rdmaq;          /* txs awaiting RDMA completion */
+        struct list_head    rac_replyq;         /* txs awaiting replies */
+        __u64               rac_peer_incarnation; /* peer's unique connection stamp */
+        __u64               rac_my_incarnation; /* my unique connection stamp */
+        unsigned long       rac_last_tx;        /* when I last sent an FMA message */
+        unsigned long       rac_last_rx;        /* when I last received an FMA messages */
+        long                rac_keepalive;      /* keepalive interval */
+        long                rac_timeout;        /* infer peer death on (last_rx + timout > now) */
+        __u32               rac_cqid;           /* my completion callback id (non-unique) */
+        __u32               rac_tx_seq;         /* tx msg sequence number */
+        __u32               rac_rx_seq;         /* rx msg sequence number */
+        atomic_t            rac_refcount;       /* # users */
+        unsigned int        rac_close_sent;     /* I've sent CLOSE */
+        unsigned int        rac_close_recvd;    /* I've received CLOSE */
+        unsigned int        rac_closing;        /* connection being torn down */
+        unsigned int        rac_scheduled;      /* being attented to */
+        spinlock_t          rac_lock;           /* serialise */
+        kra_device_t       *rac_device;         /* which device */
+       RAP_PVOID           rac_rihandle;       /* RA endpoint */
+        kra_msg_t          *rac_rxmsg;          /* incoming message (FMA prefix) */
+        kra_msg_t           rac_msg;            /* keepalive/CLOSE message buffer */
+} kra_conn_t;
+
+typedef struct kra_peer
+{
+        struct list_head    rap_list;           /* stash on global peer list */
+        struct list_head    rap_connd_list;     /* schedule on kra_connd_peers */
+        struct list_head    rap_conns;          /* all active connections */
+        struct list_head    rap_tx_queue;       /* msgs waiting for a conn */
+        ptl_nid_t           rap_nid;            /* who's on the other end(s) */
+        __u32               rap_ip;             /* IP address of peer */
+        int                 rap_port;           /* port on which peer listens */
+        atomic_t            rap_refcount;       /* # users */
+        int                 rap_persistence;    /* "known" peer refs */
+        int                 rap_connecting;     /* connection forming */
+        unsigned long       rap_reconnect_time; /* CURRENT_TIME when reconnect OK */
+        unsigned long       rap_reconnect_interval; /* exponential backoff */
+} kra_peer_t;
+
+
+extern lib_nal_t       kranal_lib;
+extern kra_data_t      kranal_data;
+extern kra_tunables_t  kranal_tunables;
+
+static inline void
+kranal_peer_addref(kra_peer_t *peer)
+{
+        CDEBUG(D_NET, "%p->"LPX64"\n", peer, peer->rap_nid);
+       LASSERT(atomic_read(&peer->rap_refcount) > 0);
+       atomic_inc(&peer->rap_refcount);
+}
+
+static inline void
+kranal_peer_decref(kra_peer_t *peer)
+{
+        CDEBUG(D_NET, "%p->"LPX64"\n", peer, peer->rap_nid);
+       LASSERT(atomic_read(&peer->rap_refcount) > 0);
+       if (atomic_dec_and_test(&peer->rap_refcount))
+               __kranal_peer_decref(peer);
+}
+
+static inline struct list_head *
+kranal_nid2peerlist (ptl_nid_t nid) 
+{
+        unsigned int hash = ((unsigned int)nid) % kranal_data.kra_peer_hash_size;
+        
+        return (&kranal_data.kra_peers [hash]);
+}
+
+static inline int
+kranal_peer_active(kra_peer_t *peer)
+{
+        /* Am I in the peer hash table? */
+        return (!list_empty(&peer->rap_list));
+}
+
+static inline void
+kranal_conn_addref(kra_conn_t *conn)
+{
+        CDEBUG(D_NET, "%p->"LPX64"\n", conn, conn->rac_peer->rap_nid);
+       LASSERT(atomic_read(&conn->rac_refcount) > 0);
+       atomic_inc(&conn->rac_refcount);
+}
+
+static inline void
+kranal_conn_decref(kra_conn_t *conn)
+{
+        CDEBUG(D_NET, "%p->"LPX64"\n", conn, conn->rac_peer->rap_nid);
+       LASSERT(atomic_read(&conn->rac_refcount) > 0);
+       if (atomic_dec_and_test(&conn->rac_refcount))
+               __kranal_conn_decref(conn);
+}
+
+static inline struct list_head *
+kranal_cqid2connlist (__u32 cqid) 
+{
+        unsigned int hash = cqid % kranal_data.kra_conn_hash_size;
+        
+        return (&kranal_data.kra_conns [hash]);
+}
+
+static inline kra_conn_t *
+kranal_cqid2conn_locked (__u32 cqid) 
+{
+        struct list_head  conns = kranal_cqid2connlist(cqid);
+        struct list_head *tmp;
+        
+        list_for_each(tmp, conns) {
+                conn = list_entry(tmp, kra_conn_t, rac_hashlist);
+                
+                if (conn->rac_cqid == cqid)
+                        return conn;
+        }
+        
+        return NULL;
+}
+
+static inline int
+kranal_tx_mapped (kra_tx_t *tx)
+{
+        return (tx->tx_buftype == RANAL_BUF_VIRT_MAPPED ||
+                tx->tx_buftype == RANAL_BUF_PHYS_MAPPED);
+}
+
+#if CONFIG_X86
+static inline __u64
+kranal_page2phys (struct page *p)
+{
+        __u64 page_number = p - mem_map;
+        
+        return (page_number << PAGE_SHIFT);
+}
+#else
+# error "no page->phys"
+#endif
+
diff --git a/lnet/klnds/ralnd/ralnd_cb.c b/lnet/klnds/ralnd/ralnd_cb.c
new file mode 100644 (file)
index 0000000..b491d71
--- /dev/null
@@ -0,0 +1,1754 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2004 Cluster File Systems, Inc.
+ *   Author: Eric Barton <eric@bartonsoftware.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#include "ranal.h"
+
+int
+kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
+{
+        /* I would guess that if kranal_get_peer (nid) == NULL,
+           and we're not routing, then 'nid' is very distant :) */
+        if ( nal->libnal_ni.ni_pid.nid == nid ) {
+                *dist = 0;
+        } else {
+                *dist = 1;
+        }
+
+        return 0;
+}
+
+void
+kranal_device_callback(RAP_INT32 devid)
+{
+        kra_device_t *dev;
+        int           i;
+        
+        for (i = 0; i < kranal_data.kra_ndevs; i++) {
+
+                dev = &kranal_data.kra_devices[i];
+                if (dev->rad_id != devid)
+                        continue;
+
+                spin_lock_irqsave(&dev->rad_lock, flags);
+
+                if (!dev->rad_ready) {
+                        dev->rad_ready = 1;
+                        wake_up(&dev->rad_waitq);
+                }
+
+                spin_unlock_irqrestore(&dev->rad_lock, flags);
+                return;
+        }
+        
+        CWARN("callback for unknown device %d\n", devid);
+}
+
+void
+kranal_schedule_conn(kra_conn_t *conn)
+{
+        kra_device_t    *dev = conn->rac_device;
+        unsigned long    flags;
+        
+        spin_lock_irqsave(&dev->rad_lock, flags);
+        
+        if (!conn->rac_scheduled) {
+                kranal_conn_addref(conn);       /* +1 ref for scheduler */
+                conn->rac_scheduled = 1;
+                list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
+                wake_up(&dev->rad_waitq);
+        }
+
+        spin_unlock_irqrestore(&dev->rad_lock, flags);
+}
+
+void
+kranal_schedule_cqid (__u32 cqid)
+{
+        kra_conn_t         *conn;
+        struct list_head   *conns;
+        struct list_head   *tmp;
+
+        conns = kranal_cqid2connlist(cqid);
+
+        read_lock(&kranal_data.kra_global_lock);
+
+        conn = kranal_cqid2conn_locked(cqid);
+        
+        if (conn == NULL)
+                CWARN("no cqid %x\n", cqid);
+        else
+                kranal_schedule_conn(conn);
+        
+        read_unlock(&kranal_data.kra_global_lock);
+}
+
+void
+kranal_schedule_dev(kra_device_t *dev)
+{
+        kra_conn_t         *conn;
+        struct list_head   *conns;
+        struct list_head   *tmp;
+        int                 i;
+
+        /* Don't do this in IRQ context (servers may have 1000s of clients) */
+        LASSERT (!in_interrupt()); 
+
+        CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
+
+        for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
+
+                /* Drop the lock on each hash bucket to ensure we don't
+                 * block anyone for too long at IRQ priority on another CPU */
+                
+                read_lock(&kranal_data.kra_global_lock);
+        
+                conns = &kranal_data.kra_conns[i];
+
+                list_for_each (tmp, conns) {
+                        conn = list_entry(tmp, kra_conn_t, rac_hashlist);
+                
+                        if (conn->rac_device == dev)
+                                kranal_schedule_conn(conn);
+                }
+                read_unlock(&kranal_data.kra_global_lock);
+        }
+}
+
+void
+kranal_tx_done (kra_tx_t *tx, int completion)
+{
+        ptl_err_t        ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
+        kra_device_t    *dev;
+        unsigned long    flags;
+        int              i;
+        RAP_RETURN       rrc;
+
+        LASSERT (!in_interrupt());
+
+        switch (tx->tx_buftype) {
+        default:
+                LBUG();
+
+        case RANAL_BUF_NONE:
+        case RANAL_BUF_IMMEDIATE:
+        case RANAL_BUF_PHYS_UNMAPPED:
+        case RANAL_BUF_VIRT_UNMAPPED:
+                break;
+
+        case RANAL_BUF_PHYS_MAPPED:
+                LASSERT (tx->tx_conn != NULL);
+                dev = tx->tx_con->rac_device;
+                rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
+                                           dev->rad_ptag, &tx->tx_map_key);
+                LASSERT (rrc == RAP_SUCCESS);
+                break;
+
+        case RANAL_BUF_VIRT_MAPPED:
+                LASSERT (tx->tx_conn != NULL);
+                dev = tx->tx_con->rac_device;
+                rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer
+                                           dev->rad_ptag, &tx->tx_map_key);
+                LASSERT (rrc == RAP_SUCCESS);
+                break;
+        }
+
+        for (i = 0; i < 2; i++) {
+                /* tx may have up to 2 libmsgs to finalise */
+                if (tx->tx_libmsg[i] == NULL)
+                        continue;
+
+                lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
+                tx->tx_libmsg[i] = NULL;
+        }
+
+        tx->tx_buftype = RANAL_BUF_NONE;
+        tx->tx_msg.ram_type = RANAL_MSG_NONE;
+        tx->tx_conn = NULL;
+
+        spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
+
+        if (tx->tx_isnblk) {
+                list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
+        } else {
+                list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
+                wake_up(&kranal_data.kra_idle_tx_waitq);
+        }
+
+        spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
+}
+
+kra_tx_t *
+kranal_get_idle_tx (int may_block) 
+{
+        unsigned long  flags;
+        kra_tx_t      *tx = NULL;
+        
+        for (;;) {
+                spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
+
+                /* "normal" descriptor is free */
+                if (!list_empty(&kranal_data.kra_idle_txs)) {
+                        tx = list_entry(kranal_data.kra_idle_txs.next,
+                                       kra_tx_t, tx_list);
+                        break;
+                }
+
+                if (!may_block) {
+                        /* may dip into reserve pool */
+                        if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
+                                CERROR("reserved tx desc pool exhausted\n");
+                                break;
+                        }
+
+                        tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
+                                       kra_tx_t, tx_list);
+                        break;
+                }
+
+                /* block for idle tx */
+                spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
+
+                wait_event(kranal_data.kra_idle_tx_waitq,
+                          !list_empty(&kranal_data.kra_idle_txs));
+        }
+
+        if (tx != NULL) {
+                list_del(&tx->tx_list);
+
+                /* Allocate a new completion cookie.  It might not be
+                 * needed, but we've got a lock right now... */
+                tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
+
+                LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
+                LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
+                LASSERT (tx->tx_conn == NULL);
+                LASSERT (tx->tx_libmsg[0] == NULL);
+                LASSERT (tx->tx_libmsg[1] == NULL);
+        }
+
+        spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
+        
+        return tx;
+}
+
+void
+kranal_init_msg(kra_msg_t *msg, int type)
+{
+        msg->ram_magic = RANAL_MSG_MAGIC;
+        msg->ram_version = RANAL_MSG_VERSION;
+        msg->ram_type = type;
+        msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
+        /* ram_incarnation gets set when FMA is sent */
+}
+
+kra_tx_t
+kranal_new_tx_msg (int may_block, int type)
+{
+        kra_tx_t *tx = kranal_get_idle_tx(may_block);
+
+        if (tx == NULL)
+                return NULL;
+
+        kranal_init_msg(&tx->tx_msg, type);
+        return tx;
+}
+
+int
+kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
+                               int offset, int nob)
+                 
+{
+        LASSERT (nob > 0);
+        LASSERT (niov > 0);
+        LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
+
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                niov--;
+                iov++;
+                LASSERT (niov > 0);
+        }
+
+        if (nob > iov->iov_len - offset) {
+                CERROR("Can't handle multiple vaddr fragments\n");
+                return -EMSGSIZE;
+        }
+
+        tx->tx_bufftype = RANAL_BUF_IMMEDIATE;
+        tx->tx_nob = nob;
+        tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
+        return 0;
+}
+
+int
+kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
+                          int offset, int nob)
+                 
+{
+        LASSERT (nob > 0);
+        LASSERT (niov > 0);
+        LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
+
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                niov--;
+                iov++;
+                LASSERT (niov > 0);
+        }
+
+        if (nob > iov->iov_len - offset) {
+                CERROR("Can't handle multiple vaddr fragments\n");
+                return -EMSGSIZE;
+        }
+
+        tx->tx_bufftype = RANAL_BUF_VIRT_UNMAPPED;
+        tx->tx_nob = nob;
+        tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
+        return 0;
+}
+
+int
+kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
+                          int offset, int nob)
+{
+        RAP_PHYS_REGION *phys = tx->tx_phys;
+        int              resid;
+
+        CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
+
+        LASSERT (nob > 0);
+        LASSERT (nkiov > 0);
+        LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
+
+        while (offset >= kiov->kiov_len) {
+                offset -= kiov->kiov_len;
+                nkiov--;
+                kiov++;
+                LASSERT (nkiov > 0);
+        }
+
+        tx->tx_bufftype = RANAL_BUF_PHYS_UNMAPPED;
+        tx->tx_nob = nob;
+        tx->tx_buffer = NULL;
+        tx->tx_phys_offset = kiov->kiov_offset + offset;
+        
+        phys->Address = kranal_page2phys(kiov->kiov_page);
+        phys->Length  = PAGE_SIZE;
+        phys++;
+
+        resid = nob - (kiov->kiov_len - offset);
+        while (resid > 0) {
+                kiov++;
+                nkiov--;
+                LASSERT (nkiov > 0);
+
+                if (kiov->kiov_offset != 0 ||
+                    ((resid > PAGE_SIZE) && 
+                     kiov->kiov_len < PAGE_SIZE)) {
+                        int i;
+                        /* Can't have gaps */
+                        CERROR("Can't make payload contiguous in I/O VM:"
+                               "page %d, offset %d, len %d \n", nphys, 
+                               kiov->kiov_offset, kiov->kiov_len);
+
+                        for (i = -nphys; i < nkiov; i++) {
+                                CERROR("kiov[%d] %p +%d for %d\n",
+                                       i, kiov[i].kiov_page, 
+                                       kiov[i].kiov_offset, kiov[i].kiov_len);
+                        }
+                        
+                        return -EINVAL;
+                }
+
+                if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
+                        CERROR ("payload too big (%d)\n", phys - tx->tx_phys);
+                        return -EMSGSIZE;
+                }
+
+                phys->Address = kranal_page2phys(kiov->kiov_page);
+                phys->Length  = PAGE_SIZE;
+                phys++;
+
+                resid -= PAGE_SIZE;
+        }
+
+        tx->tx_phys_npages = phys - tx->tx_phys;
+        return 0;
+}
+
+static inline int
+kranal_setup_buffer (kra_tx_t *tx, int niov, 
+                     struct iovec *iov, ptl_kiov_t *kiov,
+                     int offset, int nob)
+{
+        LASSERT ((iov == NULL) != (kiov == NULL));
+        
+        if (kiov != NULL)
+                return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
+        
+        return kranal_setup_virt_buffer(tx, niov, kiov, offset, nob);
+}
+
+void
+kranal_map_buffer (kra_tx_t *tx)
+{
+        kra_conn_t     *conn = tx->tx_conn;
+        kra_device_t   *dev = conn->rac_device;
+
+        switch (tx->tx_buftype) {
+        default:
+                
+        case RANAL_BUF_PHYS_UNMAPPED:
+                rrc = RapkRegisterPhys(conn->rac_device->rad_handle,
+                                       tx->tx_phys, tx->tx_phys_npages,
+                                       conn->rac_device->rad_ptag,
+                                       &tx->tx_map_key);
+                LASSERT (rrc == RAP_SUCCESS);
+                tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
+                return;
+
+        case RANAL_BUF_VIRT_UNMAPPED:
+                rrc = RapkRegisterMemory(conn->rac_device->rad_handle,
+                                         tx->tx_buffer, tx->tx_nob,
+                                         conn->rac_device->rad_ptag,
+                                         &tx->tx_map_key);
+                LASSERT (rrc == RAP_SUCCESS);
+                tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
+                return;
+        }
+}
+
+kra_conn_t *
+kranal_find_conn_locked (kra_peer_t *peer)
+{
+        struct list_head *tmp;
+
+        /* just return the first connection */
+        list_for_each (tmp, &peer->rap_conns) {
+                return list_entry(tmp, kra_conn_t, rac_list);
+        }
+
+        return NULL;
+}
+
+void
+kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
+{
+        unsigned long    flags;
+
+        tx->tx_conn = conn;
+
+        spin_lock_irqsave(&conn->rac_lock, flags);
+        list_add_tail(&tx->tx_list, &conn->rac_fmaq);
+        tx->tx_qtime = jiffies;
+        spin_unlock_irqrestore(&conn->rac_lock, flags);
+
+        kranal_schedule_conn(conn);
+}
+
+void
+kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
+{
+        unsigned long    flags;
+        kra_peer_t      *peer;
+        kra_conn_t      *conn;
+        unsigned long    now;
+        rwlock_t        *g_lock = &kranal_data.kra_global_lock;
+
+        /* If I get here, I've committed to send, so I complete the tx with
+         * failure on any problems */
+        
+        LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
+
+        read_lock(g_lock);
+        
+        peer = kranal_find_peer_locked(nid);
+        if (peer == NULL) {
+                read_unlock(g_lock);
+                kranal_tx_done(tx, -EHOSTUNREACH);
+                return;
+        }
+
+        conn = kranal_find_conn_locked(peer);
+        if (conn != NULL) {
+                kranal_post_fma(conn, tx);
+                read_unlock(g_lock);
+                return;
+        }
+        
+        /* Making one or more connections; I'll need a write lock... */
+        read_unlock(g_lock);
+        write_lock_irqsave(g_lock, flags);
+
+        peer = kranal_find_peer_locked(nid);
+        if (peer == NULL) {
+                write_unlock_irqrestore(g_lock, flags);
+                kranal_tx_done(tx -EHOSTUNREACH);
+                return;
+        }
+
+        conn = kranal_find_conn_locked(peer);
+        if (conn != NULL) {
+                /* Connection exists; queue message on it */
+                kranal_post_fma(conn, tx);
+                write_unlock_irqrestore(g_lock, flags);
+                return;
+        }
+
+        LASSERT (peer->rap_persistence > 0);
+
+        if (!peer->rap_connecting) {
+                now = CURRENT_TIME;
+                if (now < peer->rap_reconnect_time) {
+                        write_unlock_irqrestore(g_lock, flags);
+                        kranal_tx_done(tx, -EHOSTUNREACH);
+                        return;
+                }
+        
+                peer->rap_connecting = 1;
+                kranal_peer_addref(peer); /* extra ref for connd */
+        
+                spin_lock(&kranal_data.kra_connd_lock);
+        
+                list_add_tail(&peer->rap_connd_list,
+                             &kranal_data.kra_connd_peers);
+                wake_up(&kranal_data.kra_connd_waitq);
+        
+                spin_unlock(&kranal_data.kra_connd_lock);
+        }
+        
+        /* A connection is being established; queue the message... */
+        list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
+
+        write_unlock_irqrestore(g_lock, flags);
+}
+
+static void
+kranal_rdma(kra_tx_t *tx, int type, 
+            kra_rdma_desc_t *rard, int nob, __u64 cookie)
+{
+        kra_conn_t *conn = tx->tx_conn;
+        RAP_RETURN  rrc;
+
+        /* prep final completion message */
+        kranal_init_msg(&tx->tx_msg, type);
+        tx->tx_msg.ram_u.completion.racm_cookie = cookie;
+        
+        LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
+                 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
+        LASSERT (nob <= rard->rard_nob);
+
+        memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
+        tx->tx_rdma_desc.SrcPtr = tx->tx_buffer;
+        tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
+        tx->tx_rdma_desc.DstPtr = rard->rard_addr;
+        tx->tx_rdma_desc.DstKey = rard->rard_key;
+        tx->tx_rdma_desc.Length = nob;
+        tx->tx_rdma_desc.AppPtr = tx;
+
+        if (nob == 0) { /* Immediate completion */
+                kranal_post_fma(conn, tx);
+                return;
+        }
+        
+        rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
+        LASSERT (rrc == RAP_SUCCESS);
+
+        spin_lock_irqsave(&conn->rac_lock, flags);
+        list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
+        tx->tx_qtime = jiffies;
+        spin_unlock_irqrestore(&conn->rac_lock, flags);
+}
+
+int
+kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
+{
+        __u32      nob_received = nob;
+        RAP_RETURN rrc;
+
+        LASSERT (conn->rac_rxmsg != NULL);
+
+        rrc = RapkFmaCopyToUser(conn->rac_rihandle, buffer,
+                                &nob_received, sizeof(kra_msg_t));
+        LASSERT (rrc == RAP_SUCCESS);
+
+        conn->rac_rxmsg = NULL;
+
+        if (nob_received != nob) {
+                CWARN("Expected %d immediate bytes but got %d\n",
+                      nob, nob_received);
+                return -EPROTO;
+        }
+        
+        return 0;
+}
+
+ptl_err_t
+kranal_do_send (lib_nal_t    *nal, 
+                void         *private,
+                lib_msg_t    *libmsg,
+                ptl_hdr_t    *hdr, 
+                int           type, 
+                ptl_nid_t     nid, 
+                ptl_pid_t     pid,
+                unsigned int  niov, 
+                struct iovec *iov, 
+                ptl_kiov_t   *kiov,
+                size_t        offset,
+                size_t        nob)
+{
+        kra_conn_t *conn;
+        kra_tx_t   *tx;
+
+        /* NB 'private' is different depending on what we're sending.... */
+
+        CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
+               " pid %d\n", nob, niov, nid , pid);
+
+        LASSERT (nob == 0 || niov > 0);
+        LASSERT (niov <= PTL_MD_MAX_IOV);
+
+        LASSERT (!in_interrupt());
+        /* payload is either all vaddrs or all pages */
+        LASSERT (!(kiov != NULL && iov != NULL));
+
+        switch(type) {
+        default:
+                LBUG();
+                
+        case PTL_MSG_REPLY: {
+                /* reply's 'private' is the conn that received the GET_REQ */
+                conn = private;
+                LASSERT (conn->rac_rxmsg != NULL);
+
+                if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
+                        if (nob > RANAL_MAX_IMMEDIATE) {
+                                CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
+                                       nob, nid);
+                                return PTL_FAIL;
+                        }
+                        break;                  /* RDMA not expected */
+                }
+                
+                /* Incoming message consistent with immediate reply? */
+                if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
+                        CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
+                              nid, conn->rac_rxmsg->ram_type);
+                        return PTL_FAIL;
+                }
+
+                tx = kranal_get_idle_tx(0);
+                if (tx == NULL)
+                        return PTL_FAIL;
+
+                rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
+                if (rc != 0) {
+                        kranal_tx_done(tx, rc);
+                        return PTL_FAIL;
+                }
+
+                tx->tx_conn = conn;
+                tx->tx_libmsg[0] = libmsg;
+
+                kranal_map_buffer(tx);
+                kranal_rdma(tx, RANAL_MSG_GET_DONE,
+                            &conn->rac_rxmsg->ram_u.getreq.ragm_desc, nob,
+                            &conn->rac_rxmsg->ram_u.getreq.ragm_cookie);
+                return PTL_OK;
+        }
+
+        case PTL_MSG_GET:
+                if (kiov == NULL &&             /* not paged */
+                    nob <= RANAL_MAX_IMMEDIATE && /* small enough */
+                    nob <= kranal_tunables.kra_max_immediate)
+                        break;                  /* send IMMEDIATE */
+
+                tx = kranal_new_tx_msg(0, RANAL_MSG_GET_REQ);
+                if (tx == NULL)
+                        return PTL_NO_SPACE;
+
+                rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
+                if (rc != 0) {
+                        kranal_tx_done(tx, rc);
+                        return PTL_FAIL;
+                }
+
+                tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
+                if (tx->tx_libmsg[1] == NULL) {
+                        CERROR("Can't create reply for GET to "LPX64"\n", nid);
+                        kranal_tx_done(tx, rc);
+                        return PTL_FAIL;
+                }
+
+                tx->tx_libmsg[0] = libmsg;
+                tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
+                /* rest of tx_msg is setup just before it is sent */
+                kranal_launch_tx(tx, nid);
+                return PTL_OK
+
+        case PTL_MSG_ACK:
+                LASSERT (nob == 0);
+                break;
+
+        case PTL_MSG_PUT:
+                if (kiov == NULL &&             /* not paged */
+                    nob <= RANAL_MAX_IMMEDIATE && /* small enough */
+                    nob <= kranal_tunables.kra_max_immediate)
+                        break;                  /* send IMMEDIATE */
+                
+                tx = kranal_new_tx_msg(!in_interrupt(), RANA_MSG_PUT_REQ);
+                if (tx == NULL)
+                        return PTL_NO_SPACE;
+
+                rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
+                if (rc != 0) {
+                        kranal_tx_done(tx, rc);
+                        return PTL_FAIL;
+                }
+
+                tx->tx_libmsg[0] = libmsg;
+                tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
+                /* rest of tx_msg is setup just before it is sent */
+                kranal_launch_tx(tx, nid);
+                return PTL_OK;
+        }
+
+        LASSERT (kiov == NULL);
+        LASSERT (nob <= RANAL_MAX_IMMEDIATE);
+
+        tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
+                                 type == PTL_MSG_REPLY ||
+                                 in_interrupt()), 
+                               RANAL_MSG_IMMEDIATE);
+        if (tx == NULL)
+                return PTL_NO_SPACE;
+
+        rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
+        if (rc != 0) {
+                kranal_tx_done(tx, rc);
+                return PTL_FAIL;
+        }
+                
+        tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
+        tx->tx_libmsg[0] = libmsg;
+        kranal_launch_tx(tx, nid);
+        return PTL_OK;
+}
+
+ptl_err_t
+kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
+            ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
+            unsigned int niov, struct iovec *iov,
+            size_t offset, size_t len)
+{
+        return kranal_do_send(nal, private, cookie,
+                             hdr, type, nid, pid,
+                             niov, iov, NULL,
+                             offset, len);
+}
+
+ptl_err_t
+kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
+                  ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
+                  unsigned int niov, ptl_kiov_t *kiov, 
+                  size_t offset, size_t len)
+{
+        return kranal_do_send(nal, private, cookie,
+                             hdr, type, nid, pid,
+                             niov, NULL, kiov,
+                             offset, len);
+}
+
+ptl_err_t
+kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
+               unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
+               size_t offset, size_t mlen, size_t rlen)
+{
+        kra_conn_t  *conn = private;
+        kra_msg_t   *rxmsg = conn->rac_rxmsg;
+        void        *buffer;
+        int          rc;
+        
+        LASSERT (mlen <= rlen);
+        LASSERT (!in_interrupt());
+        /* Either all pages or all vaddrs */
+        LASSERT (!(kiov != NULL && iov != NULL));
+
+        switch(rxmsg->ram_type) {
+        default:
+                LBUG();
+                return PTL_FAIL;
+                
+        case RANAL_MSG_IMMEDIATE:
+                if (mlen == 0) {
+                        buffer = NULL;
+                } else if (kiov != NULL) {
+                        CERROR("Can't recv immediate into paged buffer\n");
+                        return PTL_FAIL;
+                } else {
+                        LASSERT (niov > 0);
+                        while (offset >= iov->iov_len) {
+                                offset -= iov->iov_len;
+                                iov++;
+                                niov--;
+                                LASSERT (niov > 0);
+                        }
+                        if (mlen > iov->iov_len - offset) {
+                                CERROR("Can't handle immediate frags\n");
+                                return PTL_FAIL;
+                        }
+                        buffer = ((char *)iov->iov_base) + offset;
+                }
+                rc = kranal_consume_rxmsg(conn, buffer, mlen);
+                lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
+                return PTL_OK;
+
+        case RANAL_MSG_GET_REQ:
+                /* If the GET matched, we've already handled it in
+                 * kranal_do_send which is called to send the REPLY.  We're
+                 * only called here to complete the GET receive (if we needed
+                 * it which we don't, but I digress...) */
+                LASSERT (libmsg == NULL);
+                lib_finalize(nal, NULL, libmsg, PTL_OK);
+                return PTL_OK;
+
+        case RANAL_MSG_PUT_REQ:
+                if (libmsg == NULL) {           /* PUT didn't match... */
+                        lib_finalize(null, NULL, libmsg, PTL_OK);
+                        return PTL_OK;
+                }
+                
+                tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
+                if (tx == NULL)
+                        return PTL_NO_SPACE;
+
+                rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, mlen);
+                if (rc != 0) {
+                        kranal_tx_done(tx, rc);
+                        return PTL_FAIL;
+                }
+
+                kranal_map_buffer(tx);
+                
+                tx->tx_msg.ram_u.putack.rapam_src_cookie = 
+                        conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
+                tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
+                tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_key = tx->tx_map_key;
+                tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_addr = tx->tx_buffer;
+                tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_nob = mlen;
+
+                tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
+
+                kranal_post_fma(conn, tx);
+                
+                /* flag matched by consuming rx message */
+                kranal_consume_rxmsg(conn, NULL, 0);
+                return PTL_OK;
+        }
+}
+
+ptl_err_t
+kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
+            unsigned int niov, struct iovec *iov, 
+            size_t offset, size_t mlen, size_t rlen)
+{
+        return kranal_recvmsg(nal, private, msg, niov, iov, NULL,
+                             offset, mlen, rlen);
+}
+
+ptl_err_t
+kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
+                  unsigned int niov, ptl_kiov_t *kiov, 
+                  size_t offset, size_t mlen, size_t rlen)
+{
+        return kranal_recvmsg(nal, private, msg, niov, NULL, kiov,
+                             offset, mlen, rlen);
+}
+
+int
+kranal_thread_start (int(*fn)(void *arg), void *arg)
+{
+        long    pid = kernel_thread(fn, arg, 0);
+
+        if (pid < 0)
+                return(int)pid;
+
+        atomic_inc(&kranal_data.kra_nthreads);
+        return 0;
+}
+
+void
+kranal_thread_fini (void)
+{
+        atomic_dec(&kranal_data.kra_nthreads);
+}
+
+int
+kranal_check_conn (kra_conn_t *conn)
+{
+        kra_tx_t          *tx;
+        struct list_head  *ttmp;
+        unsigned long      flags;
+        long               timeout;
+        unsigned long      now = jiffies;
+
+        if (!conn->rac_closing &&
+            time_after_eq(now, conn->rac_last_sent + conn->rac_keepalive * HZ)) {
+                /* not sent in a while; schedule conn so scheduler sends a keepalive */
+                kranal_schedule_conn(conn);
+        }
+
+        /* wait twice as long for CLOSE to be sure peer is dead */
+        timeout = (conn->rac_closing ? 1 : 2) * conn->rac_timeout * HZ;
+
+        if (!conn->rac_close_recvd &&
+            time_after_eq(now, conn->rac_last_rx + timeout)) {
+                CERROR("Nothing received from "LPX64" within %d seconds\n",
+                       conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
+                return -ETIMEDOUT;
+        }
+
+        if (conn->rac_closing)
+                return 0;
+        
+        /* Check the conn's queues are moving.  These are "belt+braces" checks,
+         * in case of hardware/software errors that make this conn seem
+         * responsive even though it isn't progressing its message queues. */
+
+        spin_lock_irqsave(&conn->rac_lock, flags);
+
+        list_for_each (ttmp, &conn->rac_fmaq) {
+                tx = list_entry(ttmp, kra_tx_t, tx_list);
+                
+                if (time_after_eq(now, tx->tx_qtime + timeout)) {
+                        spin_unlock_irqrestore(&conn->rac_lock, flags);
+                        CERROR("tx on fmaq for "LPX64" blocked %d seconds\n",
+                               conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
+                        return -ETIMEDOUT;
+                }
+        }
+        
+        list_for_each (ttmp, &conn->rac_rdmaq) {
+                tx = list_entry(ttmp, kra_tx_t, tx_list);
+                
+                if (time_after_eq(now, tx->tx_qtime + timeout)) {
+                        spin_unlock_irqrestore(&conn->rac_lock, flags);
+                        CERROR("tx on rdmaq for "LPX64" blocked %d seconds\n",
+                               conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
+                        return -ETIMEDOUT;
+                }
+        }
+        
+        list_for_each (ttmp, &conn->rac_replyq) {
+                tx = list_entry(ttmp, kra_tx_t, tx_list);
+                
+                if (time_after_eq(now, tx->tx_qtime + timeout)) {
+                        spin_unlock_irqrestore(&conn->rac_lock, flags);
+                        CERROR("tx on replyq for "LPX64" blocked %d seconds\n",
+                               conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
+                        return -ETIMEDOUT;
+                }
+        }
+        
+        spin_unlock_irqrestore(&conn->rac_lock, flags);
+        return 0;
+}
+
+void
+kranal_check_conns (int idx, unsigned long *min_timeoutp)
+{
+        struct list_head  *conns = &kranal_data.kra_conns[idx];
+        struct list_head  *ctmp;
+        kra_conn_t        *conn;
+
+ again:
+        /* NB. We expect to check all the conns and not find any problems, so
+         * we just use a shared lock while we take a look... */
+        read_lock(&kranal_data.kra_global_lock);
+
+        list_for_each (ctmp, conns) {
+                conn = list_entry(ptmp, kra_conn_t, rac_hashlist);
+
+                if (conn->rac_timeout < *min_timeoutp )
+                        *min_timeoutp = conn->rac_timeout;
+                if (conn->rac_keepalive < *min_timeoutp )
+                        *min_timeoutp = conn->rac_keepalive;
+
+                rc = kranal_check_conn(conn);
+                if (rc == 0)
+                        continue;
+
+                kranal_conn_addref(conn);
+                read_unlock(&kranal_data.kra_global_lock);
+
+                CERROR("Check on conn to "LPX64"failed: %d\n",
+                       conn->rac_peer->rap_nid, rc);
+
+                write_lock_irqsave(&kranal_data.kra_global_lock);
+
+                if (!conn->rac_closing)
+                        kranal_close_conn_locked(conn, -ETIMEDOUT);
+                else
+                        kranal_terminate_conn_locked(conn);
+                        
+                kranal_conn_decref(conn);
+
+                /* start again now I've dropped the lock */
+                goto again;
+        }
+
+        read_unlock(&kranal_data.kra_global_lock);
+}
+
+int
+kranal_connd (void *arg)
+{
+       char               name[16];
+        wait_queue_t       wait;
+        unsigned long      flags;
+        kra_peer_t        *peer;
+        int                i;
+
+       snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
+        kportal_daemonize(name);
+        kportal_blockallsigs();
+
+        init_waitqueue_entry(&wait, current);
+
+        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+
+        while (!kranal_data.kra_shutdown) {
+                /* Safe: kra_shutdown only set when quiescent */
+
+                if (!list_empty(&kranal_data.kra_connd_peers)) {
+                        peer = list_entry(kranal_data.kra_connd_peers.next,
+                                         kra_peer_t, rap_connd_list);
+                        
+                        list_del_init(&peer->rap_connd_list);
+                        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+
+                        kranal_connect(peer);
+                        kranal_put_peer(peer);
+
+                        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+                       continue;
+                }
+
+                set_current_state(TASK_INTERRUPTIBLE);
+                add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
+                
+                spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+
+                schedule ();
+                
+                set_current_state(TASK_RUNNING);
+                remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
+
+                spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+        }
+
+        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+
+        kranal_thread_fini();
+        return 0;
+}
+
+void
+kranal_update_reaper_timeout(long timeout) 
+{
+        unsigned long   flags;
+
+        LASSERT (timeout > 0);
+        
+        spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
+        
+        if (timeout < kranal_data.kra_new_min_timeout)
+                kranal_data.kra_new_min_timeout = timeout;
+
+        spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
+}
+
+int
+kranal_reaper (void *arg)
+{
+        wait_queue_t       wait;
+        unsigned long      flags;
+        kra_conn_t        *conn;
+        kra_peer_t        *peer;
+        unsigned long      flags;
+        long               timeout;
+        int                i;
+        int                conn_entries = kranal_data.kra_conn_hash_size;
+        int                conn_index = 0;
+        int                base_index = conn_entries - 1;
+        unsigned long      next_check_time = jiffies;
+        long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
+        long               current_min_timeout = 1;
+        
+        kportal_daemonize("kranal_reaper");
+        kportal_blockallsigs();
+
+        init_waitqueue_entry(&wait, current);
+
+        spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
+        kranal_data.kra_new_min_timeout = 1;
+
+        while (!kranal_data.kra_shutdown) {
+
+                /* careful with the jiffy wrap... */
+                timeout = (long)(next_check_time - jiffies);
+                if (timeout <= 0) {
+                
+                        /* I wake up every 'p' seconds to check for
+                         * timeouts on some more peers.  I try to check
+                         * every connection 'n' times within the global
+                         * minimum of all keepalive and timeout intervals,
+                         * to ensure I attend to every connection within
+                         * (n+1)/n times its timeout intervals. */
+                
+                        const int     p = 1;
+                        const int     n = 3;
+                        unsigned long min_timeout;
+                        int           chunk;
+
+                        if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
+                                /* new min timeout set: restart min timeout scan */
+                                next_min_timeout = MAX_SCHEDULE_TIMEOUT;
+                                base_index = conn_index - 1;
+                                if (base_index < 0)
+                                        base_index = conn_entries - 1;
+
+                                if (kranal_data.kra_new_min_timeout < current_min_timeout) {
+                                        current_min_timeout = kranal_data.kra_new_min_timeout;
+                                        CWARN("Set new min timeout %ld\n",
+                                              current_min_timeout);
+                                }
+
+                                kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
+                        }
+                        min_timeout = current_min_timeout;
+
+                        spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
+                                               flags);
+
+                        LASSERT (min_timeout > 0);
+
+                        /* Compute how many table entries to check now so I
+                         * get round the whole table fast enough (NB I do
+                         * this at fixed intervals of 'p' seconds) */
+                       chunk = conn_entries;
+                        if (min_timeout > n * p)
+                                chunk = (chunk * n * p) / min_timeout;
+                        if (chunk == 0)
+                                chunk = 1;
+
+                        for (i = 0; i < chunk; i++) {
+                                kranal_check_conns(conn_index, 
+                                                   &next_min_timeout);
+                                conn_index = (conn_index + 1) % conn_entries;
+                        }
+
+                        next_check_time += p * HZ;
+
+                        spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
+
+                        if (((conn_index - chunk <= base_index &&
+                              base_index < conn_index) ||
+                             (conn_index - conn_entries - chunk <= base_index &&
+                              base_index < conn_index - conn_entries))) {
+
+                                /* Scanned all conns: set current_min_timeout... */
+                                if (current_min_timeout != next_min_timeout) {
+                                        current_min_timeout = next_min_timeout;                                        
+                                        CWARN("Set new min timeout %ld\n",
+                                              current_min_timeout);
+                                }
+
+                                /* ...and restart min timeout scan */
+                                next_min_timeout = MAX_SCHEDULE_TIMEOUT;
+                                base_index = conn_index - 1;
+                                if (base_index < 0)
+                                        base_index = conn_entries - 1;
+                        }
+                }
+
+                set_current_state(TASK_INTERRUPTIBLE);
+                add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
+
+                spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
+
+                busy_loops = 0;
+                schedule_timeout(timeout);
+
+                spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
+
+                set_current_state(TASK_RUNNING);
+                remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
+        }
+
+        kranal_thread_fini();
+        return 0;
+}
+
+void
+kranal_process_rdmaq (__u32 cqid)
+{
+        kra_conn_t          *conn;
+        kra_tx_t            *tx;
+        RAP_RETURN           rrc;
+        unsigned long        flags;
+        RAP_RDMA_DESCRIPTOR *desc;
+        
+        read_lock(&kranal_data.kra_global_lock);
+
+        conn = kranal_cqid2conn_locked(cqid);
+        LASSERT (conn != NULL);
+
+        rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
+        LASSERT (rrc == RAP_SUCCESS);
+
+        spin_lock_irqsave(&conn->rac_lock, flags);
+
+        LASSERT (!list_empty(&conn->rac_rdmaq));
+        tx = list_entry(con->rac_rdmaq.next, kra_tx_t, tx_list);
+        list_del(&tx->tx_list);
+
+        LASSERT(desc->AppPtr == (void *)tx);
+        LASSERT(desc->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
+                desc->tx_msg.ram_type == RANAL_MSG_GET_DONE);
+
+        list_add_tail(&tx->tx_list, &conn->rac_fmaq);
+        tx->tx_qtime = jiffies;
+        
+        spin_unlock_irqrestore(&conn->rac_lock, flags);
+
+        /* Get conn's fmaq processed, now I've just put something there */
+        kranal_schedule_conn(conn);
+
+        read_unlock(&kranal_data.kra_global_lock);
+}
+
+int
+kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
+               void *immediate, int immediatenob)
+{
+        int   sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
+
+        LASSERT (sizeof(*msg) <= RANAL_FMA_PREFIX_LEN);
+        LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
+                 immediatenob <= RANAL_FMA_MAX_DATA_LEN :
+                 immediatenob == 0);
+
+        msg->ram_incarnation = conn->rac_incarnation;
+        msg->ram_seq = conn->rac_tx_seq;
+
+        if (sync)
+                rrc = RapkFmaSyncSend(conn->rac_device.rad_handle,
+                                      immediate, immediatenob,
+                                      msg, sizeof(*msg));
+        else
+                rrc = RapkFmaSend(conn->rac_device.rad_handle,
+                                  immediate, immediatenob,
+                                  msg, sizeof(*msg));
+
+        switch (rrc) {
+        case RAP_SUCCESS:
+                conn->rac_last_tx = jiffies;
+                conn->rac_tx_seq++;
+                return 0;
+                
+        case RAP_NOT_DONE:
+                return -EAGAIN;
+
+        default:
+                LBUG();
+        }
+}
+
+int
+kranal_process_fmaq (kra_conn_t *conn) 
+{
+        unsigned long flags;
+        int           more_to_do;
+        kra_tx_t     *tx;
+        int           rc;
+        int           expect_reply;
+
+        /* NB I will be rescheduled some via a rad_fma_cq event if my FMA is
+         * out of credits when I try to send right now... */
+
+        if (conn->rac_closing) {
+
+                if (!list_empty(&conn->rac_rdmaq)) {
+                        /* Can't send CLOSE yet; I'm still waiting for RDMAs I
+                         * posted to finish */
+                        LASSERT (!conn->rac_close_sent);
+                        kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
+                        kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
+                        return 0;
+                }
+
+                if (conn->rac_close_sent)
+                        return 0;
+                
+                kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
+                rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
+                conn->rac_close_sent = (rc == 0);
+                return 0;
+        }
+
+        spin_lock_irqsave(&conn->rac_lock, flags);
+
+        if (list_empty(&conn->rac_fmaq)) {
+
+                spin_unlock_irqrestore(&conn->rac_lock, flags);
+
+                if (time_after_eq(conn->rac_last_tx + conn->rac_keepalive)) {
+                        kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
+                        kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
+                }
+                return 0;
+        }
+        
+        tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
+        list_del(&tx->tx_list);
+        more_to_do = !list_empty(&conn->rac_fmaq);
+
+        spin_unlock_irqrestore(&conn->rac_lock, flags);
+
+        expect_reply = 0;
+        switch (tx->tx_msg.ram_type) {
+        default:
+                LBUG();
+                
+        case RANAL_MSG_IMMEDIATE:
+        case RANAL_MSG_PUT_NAK:
+        case RANAL_MSG_PUT_DONE:
+        case RANAL_MSG_GET_NAK:
+        case RANAL_MSG_GET_DONE:
+                rc = kranal_sendmsg(conn, &tx->tx_msg,
+                                    tx->tx_buffer, tx->tx_nob);
+                expect_reply = 0;
+                break;
+                
+        case RANAL_MSG_PUT_REQ:
+                tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
+                rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
+                kranal_map_buffer(tx);
+                expect_reply = 1;
+                break;
+
+        case RANAL_MSG_PUT_ACK:
+                rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
+                expect_reply = 1;
+                break;
+
+        case RANAL_MSG_GET_REQ:
+                kranal_map_buffer(tx);
+                tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
+                tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
+                tx->tx_msg.ram_u.get.ragm_desc.rard_addr = tx->tx_buffer;
+                tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
+                rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
+                expect_reply = 1;
+                break;
+        }
+
+        if (rc == -EAGAIN) {
+                /* replace at the head of the list for later */
+                spin_lock_irqsave(&conn->rac_lock, flags);
+                list_add(&tx->tx_list, &conn->rac_fmaq);
+                spin_unlock_irqrestore(&conn->rac_lock, flags);
+
+                return 0;
+        }
+
+        LASSERT (rc == 0);
+        
+        if (!expect_reply) {
+                kranal_tx_done(tx, 0);
+        } else {
+                spin_lock_irqsave(&conn->rac_lock, flags);
+                list_add_tail(&tx->tx_list, &conn->rac_replyq);
+                tx->tx_qtime = jiffies;
+                spin_unlock_irqrestore(&conn->rac_lock, flags);
+        }
+
+        return more_to_do;
+}
+
+static inline void
+kranal_swab_rdma_desc (kra_rdma_desc_t *d)
+{
+        __swab64s(&d->rard_key.Key);
+        __swab16s(&d->rard_key.Cookie);
+        __swab16s(&d->rard_key.MdHandle);
+        __swab32s(&d->rard_key.Flags);
+        __swab64s(&d->rard_addr);
+        __swab32s(&d->rard_nob);
+}
+
+kra_tx_t *
+kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
+{
+        unsigned long     flags;
+        struct list_head *ttmp;
+        kra_tx_t         *tx;
+        
+        list_for_each(ttmp, &conn->rac_replyq) {
+                tx = list_entry(ttmp, kra_tx_t, tx_list);
+                
+                if (tx->tx_cookie != cookie)
+                        continue;
+                
+                if (tx->tx_msg.ram_type != type) {
+                        CWARN("Unexpected type %x (%x expected) "
+                              "matched reply from "LPX64"\n",
+                              tx->tx_msg.ram_type, type,
+                              conn->rac_peer->rap_nid);
+                        return NULL;
+                }
+        }
+        
+        CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid);
+        return NULL;
+}
+
+int
+kranal_process_receives(kra_conn_t *conn)
+{
+        unsigned long flags;
+        __u32         seq;
+        __u32         nob;
+        kra_msg_t    *msg;
+        RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &msg);
+        kra_peer_t   *peer = conn->rac_peer;
+
+        if (rrc == RAP_NOT_DONE)
+                return 0;
+        
+        LASSERT (rrc == RAP_SUCCESS);
+        conn->rac_last_rx = jiffies;
+        seq = conn->rac_seq++;
+
+        if (msg->ram_magic != RANAL_MSG_MAGIC) {
+                if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
+                        CERROR("Unexpected magic %08x from "LPX64"\n",
+                               msg->ram_magic, peer->rap_nid);
+                        goto out;
+                }
+
+                __swab32s(&msg->ram_magic);
+                __swab16s(&msg->ram_version);
+                __swab16s(&msg->ram_type);
+                __swab64s(&msg->ram_srcnid);
+                __swab64s(&msg->ram_incarnation);
+                __swab32s(&msg->ram_seq);
+
+                /* NB message type checked below; NOT here... */
+                switch (msg->ram_type) {
+                case RANAL_MSG_PUT_ACK:
+                        kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
+                        break;
+
+                case RANAL_MSG_GET_REQ:
+                        kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
+                        break;
+                        
+                default:
+                        break;
+                }
+        }
+
+        if (msg->ram_version != RANAL_MSG_VERSION) {
+                CERROR("Unexpected protocol version %d from "LPX64"\n",
+                       msg->ram_version, peer->rap_nid);
+                goto out;
+        }
+
+        if (msg->ram_srcnid != peer->rap_nid) {
+                CERROR("Unexpected peer "LPX64" from "LPX64"\n",
+                       msg->ram_srcnid, peer->rap_nid);
+                goto out;
+        }
+        
+        if (msg->ram_incarnation != conn->rac_incarnation) {
+                CERROR("Unexpected incarnation "LPX64"("LPX64
+                       " expected) from "LPX64"\n",
+                       msg->ram_incarnation, conn->rac_incarnation,
+                       peer->rap_nid);
+                goto out;
+        }
+        
+        if (msg->ram_seq != seq) {
+                CERROR("Unexpected sequence number %d(%d expected) from "
+                       LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
+                goto out;
+        }
+
+        if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
+                /* This message signals RDMA completion: wait now... */
+                rrc = RapkFmaSyncWait(conn->rac_rihandle);
+                LASSERT (rrc == RAP_SUCCESS);
+        }
+        if (msg->ram_type == RANAL_MSG_CLOSE) {
+                conn->rac_close_recvd = 1;
+                write_lock_irqsave(&kranal_data.kra_global_lock);
+
+                if (!conn->rac_closing)
+                        kranal_close_conn_locked(conn, -ETIMEDOUT);
+                else if (conn->rac_close_sent)
+                        kranal_terminate_conn_locked(conn);
+                
+                goto out;
+        }
+
+        if (conn->rac_closing)
+                goto out;
+        
+        conn->rac_rxmsg = msg;                  /* stash message for portals callbacks */
+                                                /* they'll NULL rac_rxmsg if they consume it */
+        switch (msg->ram_type) {
+        case RANAL_MSG_NOOP:
+                /* Nothing to do; just a keepalive */
+                break;
+                
+        case RANAL_MSG_IMMEDIATE:
+                lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
+                break;
+                
+        case RANAL_MSG_PUT_REQ:
+                lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
+
+                if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
+                        break;
+
+                tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
+                if (tx == NULL)
+                        break;
+                
+                tx->tx_msg.ram_u.racm_cookie = msg->msg_u.putreq.raprm_cookie;
+                kranal_post_fma(conn, tx);
+                break;
+
+        case RANAL_MSG_PUT_NAK:
+                tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
+                                        msg->ram_u.completion.racm_cookie);
+                if (tx == NULL)
+                        break;
+                
+                LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
+                         tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
+                kranal_tx_done(tx, -ENOENT);    /* no match */
+                break;
+                
+        case RANAL_MSG_PUT_ACK:
+                tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
+                                        msg->ram_u.putack.rapam_src_cookie);
+                if (tx == NULL)
+                        break;
+
+                kranal_rdma(tx, RANAL_MSG_PUT_DONE,
+                            &msg->ram_u.putack.rapam_desc, 
+                            msg->msg_u.putack.rapam_desc.rard_nob,
+                            msg->ram_u.putack.rapam_dst_cookie);
+                break;
+
+        case RANAL_MSG_PUT_DONE:
+                tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
+                                        msg->ram_u.completion.racm_cookie);
+                if (tx == NULL)
+                        break;
+
+                LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
+                         tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
+                kranal_tx_done(tx, 0);
+                break;
+
+        case RANAL_MSG_GET_REQ:
+                lib_parse(&kranal_lib, &msg->ram_u.getreq.ragm_hdr, conn);
+                
+                if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
+                        break;
+
+                tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
+                if (tx == NULL)
+                        break;
+
+                tx->tx_msg.ram_u.racm_cookie = msg->msg_u.getreq.ragm_cookie;
+                kranal_post_fma(conn, tx);
+                break;
+                
+        case RANAL_MSG_GET_NAK:
+                tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
+                                        msg->ram_u.completion.racm_cookie);
+                if (tx == NULL)
+                        break;
+                
+                LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
+                         tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
+                kranal_tx_done(tx, -ENOENT);    /* no match */
+                break;
+                
+        case RANAL_MSG_GET_DONE:
+                tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
+                                        msg->ram_u.completion.racm_cookie);
+                if (tx == NULL)
+                        break;
+                
+                LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
+                         tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
+                kranal_tx_done(tx, 0);
+                break;
+        }
+
+ out:
+        if (conn->rac_msg != NULL)
+                kranal_consume_rxmsg(conn, NULL, 0);
+
+        return 1;
+}
+
+int
+kranal_scheduler (void *arg)
+{
+        kra_device_t   *dev = (kra_device_t *)arg;
+        wait_queue_t    wait;
+        char            name[16];
+        kra_conn_t     *conn;
+        unsigned long   flags;
+        int             rc;
+        int             i;
+        __u32           cqid;
+        int             did_something;
+        int             busy_loops = 0;
+
+        snprintf(name, sizeof(name), "kranal_sd_%02ld", dev->rad_idx);
+        kportal_daemonize(name);
+        kportal_blockallsigs();
+
+        init_waitqueue_entry(&wait, current);
+
+        spin_lock_irqsave(&dev->rad_lock, flags);
+
+        while (!kranal_data.kra_shutdown) {
+                /* Safe: kra_shutdown only set when quiescent */
+                
+               if (busy_loops++ >= RANAL_RESCHED) {
+                        spin_unlock_irqrestore(&dev->rad_lock, flags);
+
+                        our_cond_resched();
+                       busy_loops = 0;
+
+                        spin_lock_irqsave(&dev->rad_lock, flags);
+               }
+
+                did_something = 0;
+
+                if (dev->rad_ready) {
+                        dev->rad_ready = 0;
+                        spin_unlock_irqrestore(&dev->rad_lock, flags);
+
+                        rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type);
+
+                        LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE);
+                        LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
+                        
+                        if (rrc == RAP_SUCCESS) {
+                                kranal_process_rdmaq(cqid);
+                                did_something = 1;
+                        }
+                        
+                        rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type);
+                        LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE);
+                        
+                        if (rrc == RAP_SUCCESS) {
+                                if ((event_type & RAPK_CQ_EVENT_OVERRUN) != 0)
+                                        kranal_schedule_dev(dev);
+                                else
+                                        kranal_schedule_cqid(cqid);
+                                did_something = 1;
+                        }
+                        
+                        spin_lock_irqsave(&dev->rad_lock, flags);
+
+                        /* If there were no completions to handle, I leave
+                         * rad_ready clear.  NB I cleared it BEFORE I checked
+                         * the completion queues since I'm racing with the
+                         * device callback. */
+
+                        if (did_something)
+                                dev->rad_ready = 1;
+                }
+               
+                if (!list_empty(&dev->rad_connq)) {
+                        conn = list_entry(dev->rad_connq.next,
+                                          kra_conn_t, rac_schedlist);
+                        list_del(&conn->rac_schedlist);
+                        spin_unlock_irqrestore(&dev->rad_lock, flags);
+
+                        LASSERT (conn->rac_scheduled);
+
+                        resched  = kranal_process_fmaq(conn);
+                        resched |= kranal_process_receives(conn);
+                        did_something = 1;
+
+                        spin_lock_irqsave(&dev->rad_lock, flags);
+                        if (resched)
+                                list_add_tail(&conn->rac_schedlist,
+                                              &dev->rad_connq);
+                }
+
+                if (did_something)
+                        continue;
+
+                add_wait_queue(&dev->rad_waitq, &wait);
+                set_current_state(TASK_INTERRUPTIBLE);
+
+                spin_unlock_irqrestore(&dev->rad_lock, flags);
+
+                busy_loops = 0;
+                schedule();
+
+                set_current_state(TASK_RUNNING);
+                remove_wait_queue(&dev->rad_waitq, &wait);
+
+                spin_lock_irqsave(&dev->rad_lock, flags);
+        }
+
+        spin_unlock_irqrestore(&dev->rad_lock, flags);
+
+        kranal_thread_fini();
+        return 0;
+}
+
+
+lib_nal_t kranal_lib = {
+        libnal_data:        &kranal_data,      /* NAL private data */
+        libnal_send:         kranal_send,
+        libnal_send_pages:   kranal_send_pages,
+        libnal_recv:         kranal_recv,
+        libnal_recv_pages:   kranal_recv_pages,
+        libnal_dist:         kranal_dist
+};
diff --git a/lustre/portals/knals/ranal/.cvsignore b/lustre/portals/knals/ranal/.cvsignore
new file mode 100644 (file)
index 0000000..5ed596b
--- /dev/null
@@ -0,0 +1,10 @@
+.deps
+Makefile
+.*.cmd
+autoMakefile.in
+autoMakefile
+*.ko
+*.mod.c
+.*.flags
+.tmp_versions
+.depend
diff --git a/lustre/portals/knals/ranal/Makefile.in b/lustre/portals/knals/ranal/Makefile.in
new file mode 100644 (file)
index 0000000..1772cc2
--- /dev/null
@@ -0,0 +1,6 @@
+MODULES := kranal
+kranal-objs := ranal.o ranal_cb.o
+
+EXTRA_POST_CFLAGS := @RACPPFLAGS@
+
+@INCLUDE_RULES@
diff --git a/lustre/portals/knals/ranal/autoMakefile.am b/lustre/portals/knals/ranal/autoMakefile.am
new file mode 100644 (file)
index 0000000..f136aa5
--- /dev/null
@@ -0,0 +1,15 @@
+# Copyright (C) 2001  Cluster File Systems, Inc.
+#
+# This code is issued under the GNU General Public License.
+# See the file COPYING in this distribution
+
+if MODULES
+if !CRAY_PORTALS
+if BUILD_RANAL
+modulenet_DATA = kranal$(KMODEXT)
+endif
+endif
+endif
+
+MOSTLYCLEANFILES = *.o *.ko *.mod.c
+DIST_SOURCES = $(kranal-objs:%.o=%.c) ranal.h
diff --git a/lustre/portals/knals/ranal/ranal.c b/lustre/portals/knals/ranal/ranal.c
new file mode 100644 (file)
index 0000000..a59757d
--- /dev/null
@@ -0,0 +1,1978 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2004 Cluster File Systems, Inc.
+ *   Author: Eric Barton <eric@bartonsoftware.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+#include "ranal.h"
+
+
+nal_t                   kranal_api;
+ptl_handle_ni_t         kranal_ni;
+kra_data_t              kranal_data;
+kra_tunables_t          kranal_tunables;
+
+#ifdef CONFIG_SYSCTL
+#define RANAL_SYSCTL_TIMEOUT           1
+#define RANAL_SYSCTL_LISTENER_TIMEOUT  2
+#define RANAL_SYSCTL_BACKLOG           3
+#define RANAL_SYSCTL_PORT              4
+#define RANAL_SYSCTL_MAX_IMMEDIATE     5
+
+#define RANAL_SYSCTL                   202
+
+static ctl_table kranal_ctl_table[] = {
+        {RANAL_SYSCTL_TIMEOUT, "timeout", 
+         &kranal_tunables.kra_timeout, sizeof(int),
+         0644, NULL, &proc_dointvec},
+        {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout", 
+         &kranal_tunables.kra_listener_timeout, sizeof(int),
+         0644, NULL, &proc_dointvec},
+       {RANAL_SYSCTL_BACKLOG, "backlog",
+        &kranal_tunables.kra_backlog, sizeof(int),
+        0644, NULL, kranal_listener_procint},
+       {RANAL_SYSCTL_PORT, "port",
+        &kranal_tunables.kra_port, sizeof(int),
+        0644, NULL, kranal_listener_procint},
+        {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate", 
+         &kranal_tunables.kra_max_immediate, sizeof(int),
+         0644, NULL, &proc_dointvec},
+        { 0 }
+};
+
+static ctl_table kranal_top_ctl_table[] = {
+        {RANAL_SYSCTL, "ranal", NULL, 0, 0555, kranal_ctl_table},
+        { 0 }
+};
+#endif
+
+int
+kranal_sock_write (struct socket *sock, void *buffer, int nob)
+{
+        int           rc;
+        mm_segment_t  oldmm = get_fs();
+       struct iovec  iov = {
+               .iov_base = buffer,
+               .iov_len  = nob
+       };
+       struct msghdr msg = {
+               .msg_name       = NULL,
+               .msg_namelen    = 0,
+               .msg_iov        = &iov,
+               .msg_iovlen     = 1,
+               .msg_control    = NULL,
+               .msg_controllen = 0,
+               .msg_flags      = MSG_DONTWAIT
+       };
+
+       /* We've set up the socket's send buffer to be large enough for
+        * everything we send, so a single non-blocking send should
+        * complete without error. */
+
+       set_fs(KERNEL_DS);
+       rc = sock_sendmsg(sock, &msg, iov.iov_len);
+       set_fs(oldmm);
+
+       return rc;
+}
+
+int
+kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout)
+{
+        int            rc;
+        mm_segment_t   oldmm = get_fs();
+       long           ticks = timeout * HZ;
+       unsigned long  then;
+       struct timeval tv;
+
+       LASSERT (nob > 0);
+       LASSERT (ticks > 0);
+
+        for (;;) {
+                struct iovec  iov = {
+                        .iov_base = buffer,
+                        .iov_len  = nob
+                };
+                struct msghdr msg = {
+                        .msg_name       = NULL,
+                        .msg_namelen    = 0,
+                        .msg_iov        = &iov,
+                        .msg_iovlen     = 1,
+                        .msg_control    = NULL,
+                        .msg_controllen = 0,
+                        .msg_flags      = 0
+                };
+
+               /* Set receive timeout to remaining time */
+               tv = (struct timeval) {
+                       .tv_sec = ticks / HZ,
+                       .tv_usec = ((ticks % HZ) * 1000000) / HZ;
+               };
+               set_fs(KERNEL_DS);
+               rc = sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO,
+                                    (char *)&tv, sizeof(tv));
+               set_fs(oldmm);
+               if (rc != 0) {
+                       CERROR("Can't set socket recv timeout %d: %d\n",
+                              send_timeout, rc);
+                       return rc;
+               }
+
+                set_fs(KERNEL_DS);
+               then = jiffies;
+                rc = sock_recvmsg(sock, &msg, iov.iov_len, 0);
+               ticks -= jiffies - then;
+                set_fs(oldmm);
+
+                if (rc < 0)
+                        return rc;
+
+                if (rc == 0)
+                        return -ECONNABORTED;
+
+                buffer = ((char *)buffer) + rc;
+                nob -= rc;
+
+               if (nob == 0)
+                       return 0;
+
+               if (ticks <= 0)
+                       return -ETIMEDOUT;
+        }
+}
+
+int
+kranal_create_sock(struct socket **sockp)
+{
+       struct socket       *sock;
+       int                  rc;
+        struct timeval       tv;
+       int                  option;
+        mm_segment_t         oldmm = get_fs();
+
+       rc = sock_create(PF_INET, SOCK_STREAM, 0, &sock);
+       if (rc != 0) {
+               CERROR("Can't create socket: %d\n", rc);
+               return rc;
+       }
+
+       /* Ensure sending connection info doesn't block */
+       option = 2 * sizeof(kra_connreq_t);
+       set_fs(KERNEL_DS);
+       rc = sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
+                            (char *)&option, sizeof(option));
+       set_fs(oldmm);
+       if (rc != 0) {
+               CERROR("Can't set send buffer %d: %d\n", option, rc);
+               goto failed;
+       }
+
+       option = 1;
+       set_fs(KERNEL_DS);
+       rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+                            (char *)&option, sizeof(option));
+       set_fs(oldmm);
+       if (rc != 0) {
+               CERROR("Can't set SO_REUSEADDR: %d\n", rc);
+               goto failed;
+       }
+
+       *sockp = sock;
+       return 0;
+
+ failed:
+       sock_release(sock);
+       return rc;
+}
+
+void
+kranal_pause(int ticks)
+{
+       set_current_state(TASK_UNINTERRUPTIBLE);
+       schedule_timeout(ticks);
+}
+
+void
+kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn)
+{
+        memset(connreq, 0, sizeof(*connreq));
+
+        connreq->racr_magic       = RANAL_MSG_MAGIC;
+        connreq->racr_version     = RANAL_MSG_VERSION;
+        connreq->racr_devid       = conn->rac_device->rad_id;
+        connreq->racr_nid         = kranal_lib.libnal_ni.ni_pid.nid;
+        connreq->racr_timeout     = conn->rac_timeout;
+        connreq->racr_incarnation = conn->rac_my_incarnation;
+
+        rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
+        LASSERT(rrc == RAP_SUCCESS);
+}
+
+int
+kranal_recv_connreq(struct sock *sock, kra_connreq_t *connreq, int timeout)
+{
+        int         i;
+       int         rc;
+
+       rc = kranal_sock_read(newsock, connreq, sizeof(*connreq), timeout);
+       if (rc != 0) {
+               CERROR("Read failed: %d\n", rc);
+               return rc;
+       }
+
+       if (connreq->racr_magic != RANAL_MSG_MAGIC) {
+               if (__swab32(connreq->racr_magic) != RANAL_MSG_MAGIC) {
+                       CERROR("Unexpected magic %08x\n", connreq->racr_magic);
+                       return -EPROTO;
+               }
+
+               __swab32s(&connreq->racr_magic);
+               __swab16s(&connreq->racr_version);
+                __swab16s(&connreq->racr_devid);
+               __swab64s(&connreq->racr_nid);
+               __swab64s(&connreq->racr_incarnation);
+                __swab32s(&connreq->racr_timeout);
+
+               __swab32s(&connreq->racr_riparams.FmaDomainHndl);
+               __swab32s(&connreq->racr_riparams.RcvCqHndl);
+               __swab32s(&connreq->racr_riparams.PTag);
+                __swab32s(&connreq->racr_riparams.CompletionCookie);
+       }
+
+       if (connreq->racr_version != RANAL_MSG_VERSION) {
+               CERROR("Unexpected version %d\n", connreq->racr_version);
+               return -EPROTO;
+       }
+
+        if (connreq->racr_nid == PTL_NID_ANY) {
+                CERROR("Received PTL_NID_ANY\n");
+                return -EPROTO;
+        }
+
+        if (connreq->racr_timeout < RANAL_MIN_TIMEOUT) {
+                CERROR("Received timeout %d < MIN %d\n",
+                       connreq->racr_timeout, RANAL_MIN_TIMEOUT);
+                return -EPROTO;
+        }
+        
+        for (i = 0; i < kranal_data.kra_ndevs; i++)
+                if (connreq->racr_devid == 
+                    kranal_data.kra_devices[i]->rad_id)
+                        break;
+
+        if (i == kranal_data.kra_ndevs) {
+                CERROR("Can't match device %d\n", connreq->racr_devid);
+                return -ENODEV;
+        }
+
+       return 0;
+}
+
+int
+kranal_conn_isdup_locked(kranal_peer_t *peer, __u64 incarnation)
+{
+       kra_conn_t       *conn;
+       struct list_head *tmp;
+        int               loopback = 0;
+
+       list_for_each(tmp, &peer->rap_conns) {
+               conn = list_entry(tmp, kra_conn_t, rac_list);
+
+                if (conn->rac_incarnation < incarnation) {
+                        /* Conns with an older incarnation get culled later */
+                        continue;
+                }
+
+                if (!loopback &&
+                    conn->rac_incarnation == incarnation &&
+                    peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid) {
+                        /* loopback creates 2 conns */
+                        loopback = 1;
+                        continue;
+                }
+
+                return 1;
+       }
+
+       return 0;
+}
+
+void
+kranal_set_conn_uniqueness (kra_conn_t *conn)
+{
+        unsigned long  flags;
+
+        write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+        conn->rac_my_incarnation = kranal_data.kra_next_incarnation++;
+
+        do {    /* allocate a unique cqid */
+                conn->rac_cqid = kranal_data.kra_next_cqid++;
+        } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL)
+        
+
+        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+}
+
+int
+kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev)
+{
+       kra_conn_t    *conn;
+        RAP_RETURN     rrc;
+
+        LASSERT (!in_interrupt());
+       PORTAL_ALLOC(conn, sizeof(*conn));
+
+       if (conn == NULL)
+               return -ENOMEM;
+
+       memset(conn, 0, sizeof(*conn));
+        conn->rac_cqid = cqid;
+       atomic_set(&conn->rac_refcount, 1);
+       INIT_LIST_HEAD(&conn->rac_list);
+       INIT_LIST_HEAD(&conn->rac_hashlist);
+       INIT_LIST_HEAD(&conn->rac_fmaq);
+       INIT_LIST_HEAD(&conn->rac_rdmaq);
+       INIT_LIST_HEAD(&conn->rac_replyq);
+       spin_lock_init(&conn->rac_lock);
+
+        conn->rac_timeout = MAX(kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT);
+        kranal_update_reaper_timeout(conn->rac_timeout);
+
+        rrc = RapkCreateRi(dev->rad_handle, cqid, dev->rad_ptag,
+                           dev->rad_rdma_cq, dev->rad_fma_cq,
+                           &conn->rac_rihandle);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("RapkCreateRi failed: %d\n", rrc);
+                PORTAL_FREE(conn, sizeof(*conn));
+                return -ENETDOWN;
+        }
+
+        atomic_inc(&kranal_data.kra_nconns);
+       *connp = conn;
+       return 0;
+}
+
+void
+__kranal_conn_decref(kra_conn_t *conn) 
+{
+        kra_tx_t          *tx;
+        RAP_RETURN         rrc;
+
+        LASSERT (!in_interrupt());
+        LASSERT (!conn->rac_scheduled);
+        LASSERT (list_empty(&conn->rac_list));
+        LASSERT (list_empty(&conn->rac_hashlist));
+        LASSERT (atomic_read(&conn->rac_refcount) == 0);
+
+        while (!list_empty(&conn->rac_fmaq)) {
+                tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
+                
+                list_del(&tx->tx_list);
+                kranal_tx_done(tx, -ECONNABORTED);
+        }
+        
+        /* We may not destroy this connection while it has RDMAs outstanding */
+        LASSERT (list_empty(&conn->rac_rdmaq));
+
+        while (!list_empty(&conn->rac_replyq)) {
+                tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
+                
+                list_del(&tx->tx_list);
+                kranal_tx_done(tx, -ECONNABORTED);
+        }
+        
+        rrc = RapkDestroyRi(conn->rac_device->rad_handle,
+                            conn->rac_rihandle);
+        LASSERT (rrc == RAP_SUCCESS);
+
+        if (conn->rac_peer != NULL)
+                kranal_peer_decref(conn->rac_peer);
+
+       PORTAL_FREE(conn, sizeof(*conn));
+        atomic_dec(&kranal_data.kra_nconns);
+}
+
+void
+kranal_terminate_conn_locked (kra_conn_t *conn)
+{
+        kra_peer_t     *peer - conn->rac_peer;
+
+        LASSERT (!in_interrupt());
+        LASSERT (conn->rac_closing);
+        LASSERT (!list_empty(&conn->rac_hashlist));
+        LASSERT (list_empty(&conn->rac_list));
+
+        /* Remove from conn hash table (no new callbacks) */
+        list_del_init(&conn->rac_hashlist);
+        kranal_conn_decref(conn);
+
+        /* Conn is now just waiting for remaining refs to go */
+}
+
+void
+kranal_close_conn_locked (kra_conn_t *conn, int error)
+{
+        kra_peer_t        *peer = conn->rac_peer;
+
+        CDEBUG(error == 0 ? D_NET : D_ERROR,
+              "closing conn to "LPX64": error %d\n", peer->rap_nid, error);
+
+        LASSERT (!in_interrupt());
+        LASSERT (!conn->rac_closing);
+        LASSERT (!list_empty(&conn->rac_hashlist));
+        LASSERT (!list_empty(&conn->rac_list));
+
+        list_del_init(&conn->rac_list);
+
+        if (list_empty(&peer->rap_conns) &&
+            peer->rap_persistence == 0) {
+                /* Non-persistent peer with no more conns... */
+                kranal_unlink_peer_locked(peer);
+        }
+
+        conn->rac_closing = 1;
+        kranal_schedule_conn(conn);
+
+        kranal_conn_decref(conn);               /* lose peer's ref */
+}
+
+void
+kranal_close_conn (kra_conn_t *conn, int error)
+{
+        unsigned long    flags;
+        
+
+        write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+        
+        if (!conn->rac_closing)
+                kranal_close_conn_locked(conn, error);
+        
+        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+}
+
+int
+kranal_passive_conn_handshake (struct socket *sock, 
+                               ptl_nid_t **peer_nidp, kra_conn_t **connp)
+{
+       struct sockaddr_in   addr;
+       __u32                peer_ip;
+        unsigned int         peer_port;
+       kra_connreq_t        connreq;
+       ptl_nid_t            peer_nid;
+        kra_conn_t          *conn;
+        kra_device_t        *dev;
+       RAP_RETURN           rrc;
+       int                  rc;
+        int                  i;
+
+       rc = sock->ops->getname(newsock, (struct sockaddr *)addr, &len, 2);
+        if (rc != 0) {
+                CERROR("Can't get peer's IP: %d\n", rc);
+                return rc;
+        }
+
+        peer_ip = ntohl(sin.sin_addr.s_addr);
+        peer_port = ntohs(sin.sin_port);
+
+        if (peer_port >= 1024) {
+                CERROR("Refusing unprivileged connection from %u.%u.%u.%u/%d\n",
+                       HIPQUAD(peer_ip), peer_port);
+                return -ECONNREFUSED;
+        }
+
+        rc = kranal_recv_connreq(sock, &connreq, 
+                                 kranal_data.kra_listener_timeout);
+        if (rc != 0) {
+                CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", 
+                       HIPQUAD(peer_ip), peer_port, rc);
+                return rc;
+        }
+
+        peer_nid = connreq.racr_nid;
+        LASSERT (peer_nid != PTL_NID_ANY);
+
+        for (i = 0;;i++) {
+                LASSERT(i < kranal_data.kra_ndevs);
+                dev = &kranal_data.kra_devices[i];
+                if (dev->rad_id == connreq->racr_devid)
+                        break;
+        }
+
+        rc = kranal_alloc_conn(&conn, dev,(__u32)(peer_nid & 0xffffffff));
+        if (rc != 0)
+                return rc;
+
+        conn->rac_peer_incarnation = connreq.racr_incarnation;
+        conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout);
+        kranal_update_reaper_timeout(conn->rac_keepalive);
+        
+        rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Can't set riparams for "LPX64": %d\n", peer_nid, rrc);
+                kranal_conn_decref(conn);
+                return -EPROTO;
+        }
+
+        kranal_pack_connreq(&connreq, conn);
+
+        rc = kranal_sock_write(sock, &connreq, sizeof(connreq));
+        if (rc != 0) {
+                CERROR("Can't tx connreq to %u.%u.%u.%u/%p: %d\n", 
+                       HIPQUAD(peer_ip), peer_port, rc);
+                kranal_conn_decref(conn);
+                return rc;
+        }
+
+        *connp = conn;
+        *peer_nidp = peer_nid;
+        return 0;
+}
+
+int
+ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
+{
+        struct sockaddr_in  locaddr;
+        struct sockaddr_in  srvaddr;
+        struct socket      *sock;
+        unsigned int        port;
+        int                 rc;
+        int                 option;
+        mm_segment_t        oldmm = get_fs();
+        struct timeval      tv;
+
+        for (port = 1023; port >= 512; port--) {
+
+                memset(&locaddr, 0, sizeof(locaddr)); 
+                locaddr.sin_family      = AF_INET; 
+                locaddr.sin_port        = htons(port);
+                locaddr.sin_addr.s_addr = htonl(INADDR_ANY);
+
+                memset (&srvaddr, 0, sizeof (srvaddr));
+                srvaddr.sin_family      = AF_INET;
+                srvaddr.sin_port        = htons (peer->rap_port);
+                srvaddr.sin_addr.s_addr = htonl (peer->rap_ip);
+
+                rc = kranal_create_sock(&sock);
+                if (rc != 0)
+                        return rc;
+
+                rc = sock->ops->bind(sock,
+                                     (struct sockaddr *)&locaddr, sizeof(locaddr));
+                if (rc != 0) {
+                        sock_release(sock);
+                        
+                        if (rc == -EADDRINUSE) {
+                                CDEBUG(D_NET, "Port %d already in use\n", port);
+                                continue;
+                        }
+
+                        CERROR("Can't bind to reserved port %d: %d\n", port, rc);
+                        return rc;
+                }
+
+                rc = sock->ops->connect(sock,
+                                        (struct sockaddr *)&srvaddr, sizeof(srvaddr),
+                                        0);
+                if (rc == 0) {
+                        *sockp = sock;
+                        return 0;
+                }
+                
+                sock_release(sock);
+
+                if (rc != -EADDRNOTAVAIL) {
+                        CERROR("Can't connect port %d to %u.%u.%u.%u/%d: %d\n",
+                               port, HIPQUAD(peer->rap_ip), peer->rap_port, rc);
+                        return rc;
+                }
+                
+                CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", 
+                       port, HIPQUAD(peer->rap_ip), peer->rap_port);
+        }
+}
+
+
+int
+kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
+{
+       kra_connreq_t       connreq;
+        kra_conn_t         *conn;
+        kra_device_t       *dev;
+        struct socket      *sock;
+        __u32               id32;
+       RAP_RETURN          rrc;
+       int                 rc;
+
+        id32 = (peer_nid & 0xffffffff);
+        dev = &kranal_data.kra_devices[id32 % kranal_data.kra_ndevs];
+
+        rc = kranal_alloc_conn(&conn, dev, id32);
+        if (rc != 0)
+                return rc;
+
+        kranal_pack_connreq(&connreq, conn);
+        
+        memset(&dstaddr, 0, sizeof(addr));
+        dstaddr.sin_family      = AF_INET;
+        dstaddr.sin_port        = htons(peer->rap_port);
+        dstaddr.sin_addr.s_addr = htonl(peer->rap_ip);
+
+        memset(&srcaddr, 0, sizeof(addr));
+
+        rc = ranal_connect_sock(peer, &sock);
+        if (rc != 0)
+                goto failed_0;
+
+        /* CAVEAT EMPTOR: the passive side receives with a SHORT rx timeout
+         * immediately after accepting a connection, so we connect and then
+         * send immediately. */
+
+        rc = kranal_sock_write(sock, &connreq, sizeof(connreq));
+        if (rc != 0) {
+                CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", 
+                       HIPQUAD(peer->rap_ip), peer->rap_port, rc);
+                goto failed_1;
+        }
+
+        rc = kranal_recv_connreq(sock, &connreq, kranal_data.kra_timeout);
+        if (rc != 0) {
+                CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", 
+                       HIPQUAD(peer->rap_ip), peer->rap_port, rc);
+                goto failed_1;
+        }
+
+        sock_release(sock);
+        rc = -EPROTO;
+
+        if (connreq.racr_nid != peer->rap_nid) {
+                CERROR("Unexpected nid from %u.%u.%u.%u/%d: "
+                       "received "LPX64" expected "LPX64"\n",
+                       HIPQUAD(peer->rap_ip), peer->rap_port, 
+                       connreq.racr_nid, peer->rap_nid);
+                goto failed_0;
+        }
+
+        if (connreq.racr_devid != dev->rad_id) {
+                CERROR("Unexpected device id from %u.%u.%u.%u/%d: "
+                       "received %d expected %d\n",
+                       HIPQUAD(peer->rap_ip), peer->rap_port, 
+                       connreq.racr_devid, dev->rad_id);
+                goto failed_0;
+        }
+
+        conn->rac_peer_incarnation = connreq.racr_incarnation; 
+        conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout);
+        kranal_update_reaper_timeout(conn->rac_keepalive);
+
+        rc = -ENETDOWN;
+        rrc = RapkSetRiParams(conn->rac_rihandle,
+                              &connreq->racr_riparams);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Can't set riparams for "LPX64": %d\n",
+                       peer_nid, rrc);
+                goto failed_0;
+        }
+
+        *connp = conn;
+       return 0;
+
+ failed_1:
+        release_sock(sock);
+ failed_0:
+        kranal_conn_decref(conn);
+        return rc;
+}
+
+int
+kranal_conn_handshake (struct socket *sock, kranal_peer_t *peer)
+{
+        kranal_peer_t     *peer2;
+       ptl_nid_t          peer_nid;
+       unsigned long      flags;
+        unsigned long      timeout;
+       kra_conn_t        *conn;
+       int                rc;
+        int                nstale;
+
+        if (sock != NULL) {
+                /* passive: listener accepted sock */
+                LASSERT (peer == NULL);
+
+                rc = kranal_passive_conn_handshake(sock, &peer_nid, &conn);
+                if (rc != 0)
+                        return rc;
+
+               /* assume this is a new peer */
+               peer = kranal_create_peer(peer_nid);
+               if (peer == NULL) {
+                       CERROR("Can't allocate peer for "LPX64"\n", peer_nid);
+                        kranal_conn_decref(conn);
+                        return -ENOMEM;
+               }
+
+               write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+               peer2 = kranal_find_peer_locked(peer_nid);
+               if (peer2 == NULL) {
+                       /* peer table takes my initial ref on peer */
+                       list_add_tail(&peer->rap_list,
+                                     kranal_nid2peerlist(peer_nid));
+               } else {
+                       /* peer_nid already in the peer table */
+                       kranal_peer_decref(peer);
+                       peer = peer2;
+               }
+                /* NB I may now have a non-persistent peer in the peer
+                 * table with no connections: I can't drop the global lock
+                 * until I've given it a connection or removed it, and when
+                 * I do 'peer' can disappear under me. */
+        } else {
+                /* active: connd wants to connect to peer */
+                LASSERT (peer != NULL);
+                LASSERT (peer->rap_connecting);
+                
+                rc = kranal_active_conn_handshake(peer, &conn);
+                if (rc != 0)
+                        return rc;
+
+               write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+               if (!kranal_peer_active(peer)) {
+                       /* raced with peer getting unlinked */
+                        write_unlock_irqrestore(&kranal_data.kra_global_lock, 
+                                                flags);
+                        kranal_conn_decref(conn);
+                       return ESTALE;
+               }
+       }
+
+       LASSERT (kranal_peer_active(peer));     /* peer is in the peer table */
+        peer_nid = peer->rap_nid;
+
+        /* Refuse to duplicate an existing connection (both sides might try
+         * to connect at once).  NB we return success!  We _do_ have a
+         * connection (so we don't need to remove the peer from the peer
+         * table) and we _don't_ have any blocked txs to complete */
+       if (kranal_conn_isdup_locked(peer, conn->rac_incarnation)) {
+                LASSERT (!list_empty(&peer->rap_conns));
+                LASSERT (list_empty(&peer->rap_tx_queue));
+                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+               CWARN("Not creating duplicate connection to "LPX64"\n",
+                      peer_nid);
+                kranal_conn_decref(conn);
+                return 0;
+       }
+
+       kranal_peer_addref(peer);               /* +1 ref for conn */
+       conn->rac_peer = peer;
+       list_add_tail(&conn->rac_list, &peer->rap_conns);
+
+        kranal_conn_addref(conn);               /* +1 ref for conn table */
+        list_add_tail(&conn->rac_hashlist,
+                      kranal_cqid2connlist(conn->rac_cqid));
+
+        /* Schedule all packets blocking for a connection */
+        while (!list_empty(&peer->rap_tx_queue)) {
+                tx = list_entry(&peer->rap_tx_queue.next,
+                                kra_tx_t, tx_list);
+
+                list_del(&tx->tx_list);
+                kranal_queue_tx_locked(tx, conn);
+        }
+
+       nstale = kranal_close_stale_conns_locked(peer, conn->rac_incarnation);
+
+       write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+
+        /* CAVEAT EMPTOR: passive peer can disappear NOW */
+
+        if (nstale != 0)
+                CWARN("Closed %d stale conns to "LPX64"\n", nstale, peer_nid);
+
+        /* Ensure conn gets checked.  Transmits may have been queued and an
+         * FMA event may have happened before it got in the cq hash table */
+        kranal_schedule_conn(conn);
+       return 0;
+}
+
+void
+kranal_connect (kra_peer_t *peer)
+{
+        kra_tx_t          *tx;
+        unsigned long      flags;
+        struct list_head   zombies;
+        int                rc;
+
+        LASSERT (peer->rap_connecting);
+
+        rc = kranal_conn_handshake(NULL, peer);
+
+        write_lock_irqqsave(&kranal_data.kra_global_lock, flags);
+
+        LASSERT (peer->rap_connecting);
+        peer->rap_connecting = 0;
+
+        if (rc == 0) {
+                /* kranal_conn_handshake() queues blocked txs immediately on
+                 * success to avoid messages jumping the queue */
+                LASSERT (list_empty(&peer->rap_tx_queue));
+
+                /* reset reconnection timeouts */
+                peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL;
+                peer->rap_reconnect_time = CURRENT_TIME;
+
+                write_unlock_irqrestore(&kranal-data.kra_global_lock, flags);
+                return;
+        }
+
+        LASSERT (peer->rap_reconnect_interval != 0);
+        peer->rap_reconnect_time = CURRENT_TIME + peer->rap_reconnect_interval;
+        peer->rap_reconnect_interval = MAX(RANAL_MAX_RECONNECT_INTERVAL,
+                                           1 * peer->rap_reconnect_interval);
+
+        /* Grab all blocked packets while we have the global lock */
+        list_add(&zombies, &peer->rap_tx_queue);
+        list_del_init(&peer->rap_tx_queue);
+
+        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+
+        if (list_empty(&zombies))
+                return;
+
+        CWARN("Dropping packets for "LPX64": connection failed\n",
+              peer->rap_nid);
+
+        do {
+                tx = list_entry(zombies.next, kra_tx_t, tx_list);
+
+                list_del(&tx->tx_list);
+                kranal_tx_done(tx, -EHOSTUNREACH);
+
+        } while (!list_empty(&zombies));
+}
+
+int
+kranal_listener(void *arg)
+{
+       struct sockaddr_in addr;
+       wait_queue_t       wait;
+       struct socket     *sock;
+       struct socket     *newsock;
+       int                port;
+       int                backlog;
+       int                timeout;
+       kra_connreq_t     *connreqs;
+       char               name[16];
+
+       /* Parent thread holds kra_nid_mutex, and is, or is about to
+        * block on kra_listener_signal */
+
+       port = kra_tunables.kra_port;
+       snprintf(name, "kranal_lstn%03d", port);
+       kportal_daemonize(name);
+       kportal_blockallsigs();
+
+       init_waitqueue_entry(&wait, current);
+
+       rc = -ENOMEM;
+       PORTAL_ALLOC(connreqs, 2 * sizeof(*connreqs));
+       if (connreqs == NULL)
+               goto out_0;
+
+       rc = kranal_create_sock(&sock, port);
+       if (rc != 0)
+               goto out_1;
+
+        memset(&addr, 0, sizeof(addr));
+        addr.sin_family      = AF_INET;
+        addr.sin_port        = htons(port);
+        addr.sin_addr.s_addr = INADDR_ANY
+
+       rc = sock->ops->bind(sock, &addr, sizeof(addr));
+       if (rc != 0) {
+               CERROR("Can't bind to port %d\n", port);
+               goto out_2;
+       }
+
+       rc = sock->ops->listen(sock, kra_tunalbes.kra_backlog);
+       if (rc != 0) {
+               CERROR("Can't set listen backlog %d: %d\n", backlog, rc);
+               goto out_2;
+       }
+
+       LASSERT (kranal_data.kra_listener_sock == NULL);
+       kranal_data.kra_listener_sock = sock;
+
+       /* unblock waiting parent */
+       LASSERT (kranal_data.kra_listener_shutdown == 0);
+       up(&kranal_data.kra_listener_signal);
+
+       /* Wake me any time something happens on my socket */
+       add_wait_queue(sock->sk->sk_sleep, &wait);
+
+       while (kranal_data.kra_listener_shutdown == 0) {
+
+               newsock = sock_alloc();
+               if (newsock == NULL) {
+                       CERROR("Can't allocate new socket for accept\n");
+                       kranal_pause(HZ);
+                       continue;
+               }
+
+               set_current_state(TASK_INTERRUPTIBLE);
+
+               rc = sock->ops->accept(sock, newsock, O_NONBLOCK);
+
+               if (rc == -EAGAIN &&
+                   kranal_data.kra_listener_shutdown == 0)
+                       schedule();
+
+               set_current_state(TASK_RUNNING);
+
+               if (rc != 0) {
+                       sock_release(newsock);
+                       if (rc != -EAGAIN) {
+                               CERROR("Accept failed: %d\n", rc);
+                               kranal_pause(HZ);
+                       }
+                       continue;
+               } 
+
+                kranal_conn_handshake(newsock, NULL);
+                sock_release(newsock);
+       }
+
+       rc = 0;
+       remove_wait_queue(sock->sk->sk_sleep, &wait);
+ out_2:
+       sock_release(sock);
+       kranal_data.kra_listener_sock = NULL;
+ out_1:
+       PORTAL_FREE(connreqs, 2 * sizeof(*connreqs));
+ out_0:
+       /* set completion status and unblock thread waiting for me 
+        * (parent on startup failure, executioner on normal shutdown) */
+       kranal_data.kra_listener_shutdown = rc;
+       up(&kranal_data.kra_listener_signal);
+
+       return 0;
+}
+
+int
+kranal_start_listener ()
+{
+       long           pid;
+       int            rc;
+
+        CDEBUG(D_WARNING, "Starting listener\n");
+
+       /* Called holding kra_nid_mutex: listener stopped */
+       LASSERT (kranal_data.kra_listener_sock == NULL);
+
+       kranal_data.kra_listener_shutdown == 0;
+       pid = kernel_thread(kranal_listener, sock, 0);
+       if (pid < 0) {
+               CERROR("Can't spawn listener: %ld\n", pid);
+               return (int)pid;
+       }
+
+       /* Block until listener has started up. */
+       down(&kranal_data.kra_listener_signal);
+
+       rc = kranal_data.kra_listener_shutdown;
+       LASSERT ((rc != 0) == (kranal_data.kra_listener_sock == NULL));
+
+        CDEBUG(D_WARNING, "Listener %ld started OK\n", pid);
+       return rc;
+}
+
+void
+kranal_stop_listener()
+{
+        CDEBUG(D_WARNING, "Stopping listener\n");
+
+       /* Called holding kra_nid_mutex: listener running */
+       LASSERT (kranal_data.kra_listener_sock != NULL);
+
+       kranal_data.kra_listener_shutdown = 1;
+       wake_up_all(kranal_data->kra_listener_sock->sk->sk_sleep);
+
+       /* Block until listener has torn down. */
+       down(&kranal_data.kra_listener_signal);
+
+       LASSERT (kranal_data.kra_listener_sock == NULL);
+        CDEBUG(D_WARNING, "Listener stopped\n");
+}
+
+int 
+kranal_listener_procint(ctl_table *table, int write, struct file *filp,
+                       void *buffer, size_t *lenp)
+{
+       int   *tunable = (int *)table->data;
+       int    old_val;
+       int    rc;
+
+       down(&kranal_data.kra_nid_mutex);
+
+       LASSERT (tunable == &kranal_data.kra_port ||
+                tunable == &kranal_data.kra_backlog);
+       old_val = *tunable;
+
+       rc = proc_dointvec(table, write, filp, buffer, lenp);
+
+       if (write &&
+           (*tunable != old_val ||
+            kranal_data.kra_listener_sock == NULL)) {
+
+               if (kranal_data.kra_listener_sock != NULL)
+                       kranal_stop_listener();
+
+               rc = kranal_start_listener();
+
+               if (rc != 0) {
+                       *tunable = old_val;
+                       kranal_start_listener();
+               }
+       }
+
+       up(&kranal_data.kra_nid_mutex);
+       return rc;
+}
+
+int
+kranal_set_mynid(ptl_nid_t nid)
+{
+        lib_ni_t      *ni = &kranal_lib.libnal_ni;
+        int            rc;
+
+        CDEBUG(D_NET, "setting mynid to "LPX64" (old nid="LPX64")\n",
+               nid, ni->ni_pid.nid);
+
+        down(&kranal_data.kra_nid_mutex);
+
+        if (nid == ni->ni_pid.nid) {
+                /* no change of NID */
+                up(&kranal_data.kra_nid_mutex);
+                return 0;
+        }
+
+       if (kranal_data.kra_listener_sock != NULL)
+               kranal_stop_listener();
+
+        ni->ni_pid.nid = nid;
+
+        /* Delete all existing peers and their connections after new
+         * NID/incarnation set to ensure no old connections in our brave
+         * new world. */
+        kranal_del_peer(PTL_NID_ANY, 0);
+
+        if (nid != PTL_NID_ANY)
+                rc = kranal_start_listener();
+
+        up(&kranal_data.kra_nid_mutex);
+        return rc;
+}
+
+kra_peer_t *
+kranal_create_peer (ptl_nid_t nid)
+{
+        kra_peer_t *peer;
+
+        LASSERT (nid != PTL_NID_ANY);
+
+        PORTAL_ALLOC(peer, sizeof(*peer));
+        if (peer == NULL)
+                return NULL;
+
+        memset(peer, 0, sizeof(*peer));         /* zero flags etc */
+
+        peer->rap_nid = nid;
+        atomic_set(&peer->rap_refcount, 1);     /* 1 ref for caller */
+
+        INIT_LIST_HEAD(&peer->rap_list);        /* not in the peer table yet */
+        INIT_LIST_HEAD(&peer->rap_conns);
+        INIT_LIST_HEAD(&peer->rap_tx_queue);
+
+        peer->rap_reconnect_time = CURRENT_TIME;
+        peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL;
+
+        atomic_inc(&kranal_data.kra_npeers);
+        return peer;
+}
+
+void
+__kranal_peer_decref (kra_peer_t *peer)
+{
+        CDEBUG(D_NET, "peer "LPX64" %p deleted\n", peer->rap_nid, peer);
+
+        LASSERT (atomic_read(&peer->rap_refcount) == 0);
+        LASSERT (peer->rap_persistence == 0);
+        LASSERT (!kranal_peer_active(peer));
+        LASSERT (peer->rap_connecting == 0);
+        LASSERT (list_empty(&peer->rap_conns));
+        LASSERT (list_empty(&peer->rap_tx_queue));
+
+        PORTAL_FREE(peer, sizeof(*peer));
+
+        /* NB a peer's connections keep a reference on their peer until
+         * they are destroyed, so we can be assured that _all_ state to do
+         * with this peer has been cleaned up when its refcount drops to
+         * zero. */
+        atomic_dec(&kranal_data.kra_npeers);
+}
+
+kra_peer_t *
+kranal_find_peer_locked (ptl_nid_t nid)
+{
+        struct list_head *peer_list = kranal_nid2peerlist(nid);
+        struct list_head *tmp;
+        kra_peer_t       *peer;
+
+        list_for_each (tmp, peer_list) {
+
+                peer = list_entry(tmp, kra_peer_t, rap_list);
+
+                LASSERT (peer->rap_persistence > 0 ||     /* persistent peer */
+                         !list_empty(&peer->rap_conns));  /* active conn */
+
+                if (peer->rap_nid != nid)
+                        continue;
+
+                CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
+                       peer, nid, atomic_read(&peer->rap_refcount));
+                return peer;
+        }
+        return NULL;
+}
+
+kra_peer_t *
+kranal_find_peer (ptl_nid_t nid)
+{
+        kra_peer_t     *peer;
+
+        read_lock(&kranal_data.kra_global_lock);
+        peer = kranal_find_peer_locked(nid);
+        if (peer != NULL)                       /* +1 ref for caller? */
+                kranal_peer_addref(peer);
+        read_unlock(&kranal_data.kra_global_lock);
+
+        return peer;
+}
+
+void
+kranal_unlink_peer_locked (kra_peer_t *peer)
+{
+        LASSERT (peer->rap_persistence == 0);
+        LASSERT (list_empty(&peer->rap_conns));
+
+        LASSERT (kranal_peer_active(peer));
+        list_del_init(&peer->rap_list);
+
+        /* lose peerlist's ref */
+        kranal_peer_decref(peer);
+}
+
+int
+kranal_get_peer_info (int index, ptl_nid_t *nidp, int *portp, int *persistencep)
+{
+        kra_peer_t        *peer;
+        struct list_head  *ptmp;
+        int                i;
+
+        read_lock(&kranal_data.kra_global_lock);
+
+        for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
+
+                list_for_each(ptmp, &kranal_data.kra_peers[i]) {
+
+                        peer = list_entry(ptmp, kra_peer_t, rap_list);
+                        LASSERT (peer->rap_persistence > 0 ||
+                                 !list_empty(&peer->rap_conns));
+
+                        if (index-- > 0)
+                                continue;
+
+                        *nidp = peer->rap_nid;
+                        *portp = peer->rap_port;
+                        *persistencep = peer->rap_persistence;
+
+                        read_unlock(&kranal_data.kra_global_lock);
+                        return 0;
+                }
+        }
+
+        read_unlock(&kranal_data.kra_global_lock);
+        return -ENOENT;
+}
+
+int
+kranal_add_persistent_peer (ptl_nid_t nid, __u32 ip, int port)
+{
+        unsigned long      flags;
+        kra_peer_t        *peer;
+        kra_peer_t        *peer2;
+
+        if (nid == PTL_NID_ANY)
+                return -EINVAL;
+
+        peer = kranal_create_peer(nid);
+        if (peer == NULL)
+                return -ENOMEM;
+
+        write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+        peer2 = kranal_find_peer_locked(nid);
+        if (peer2 != NULL) {
+                kranal_put_peer(peer);
+                peer = peer2;
+        } else {
+                /* peer table takes existing ref on peer */
+                list_add_tail(&peer->rap_list,
+                              kranal_nid2peerlist(nid));
+        }
+
+        peer->rap_ip = ip;
+        peer->rap_port = port;
+        peer->rap_persistence++;
+
+        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+        return 0;
+}
+
+void
+kranal_del_peer_locked (kra_peer_t *peer, int single_share)
+{
+        struct list_head *ctmp;
+        struct list_head *cnxt;
+        kra_conn_t       *conn;
+
+        if (!single_share)
+                peer->rap_persistence = 0;
+        else if (peer->rap_persistence > 0)
+                peer->rap_persistence--;
+
+        if (peer->rap_persistence != 0)
+                return;
+
+        if (list_empty(&peer->rap_conns)) {
+                kranal_unlink_peer_locked(peer);
+        } else {
+                list_for_each_safe(ctmp, cnxt, &peer->rap_conns) {
+                        conn = list_entry(ctmp, kra_conn_t, rac_list);
+
+                        kranal_close_conn_locked(conn, 0);
+                }
+                /* peer unlinks itself when last conn is closed */
+        }
+}
+
+int
+kranal_del_peer (ptl_nid_t nid, int single_share)
+{
+        unsigned long      flags;
+        struct list_head  *ptmp;
+        struct list_head  *pnxt;
+        kra_peer_t        *peer;
+        int                lo;
+        int                hi;
+        int                i;
+        int                rc = -ENOENT;
+
+        write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+        if (nid != PTL_NID_ANY)
+                lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
+        else {
+                lo = 0;
+                hi = kranal_data.kra_peer_hash_size - 1;
+        }
+
+        for (i = lo; i <= hi; i++) {
+                list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
+                        peer = list_entry(ptmp, kra_peer_t, rap_list);
+                        LASSERT (peer->rap_persistence > 0 ||
+                                 !list_empty(&peer->rap_conns));
+
+                        if (!(nid == PTL_NID_ANY || peer->rap_nid == nid))
+                                continue;
+
+                        kranal_del_peer_locked(peer, single_share);
+                        rc = 0;         /* matched something */
+
+                        if (single_share)
+                                goto out;
+                }
+        }
+ out:
+        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+
+        return rc;
+}
+
+kra_conn_t *
+kranal_get_conn_by_idx (int index)
+{
+        kra_peer_t        *peer;
+        struct list_head  *ptmp;
+        kra_conn_t        *conn;
+        struct list_head  *ctmp;
+        int                i;
+
+        read_lock (&kranal_data.kra_global_lock);
+
+        for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
+                list_for_each (ptmp, &kranal_data.kra_peers[i]) {
+
+                        peer = list_entry(ptmp, kra_peer_t, rap_list);
+                        LASSERT (peer->rap_persistence > 0 ||
+                                 !list_empty(&peer->rap_conns));
+
+                        list_for_each (ctmp, &peer->rap_conns) {
+                                if (index-- > 0)
+                                        continue;
+
+                                conn = list_entry(ctmp, kra_conn_t, rac_list);
+                                CDEBUG(D_NET, "++conn[%p] -> "LPX64" (%d)\n",
+                                       conn, conn->rac_peer->rap_nid,
+                                       atomic_read(&conn->rac_refcount));
+                                atomic_inc(&conn->rac_refcount);
+                                read_unlock(&kranal_data.kra_global_lock);
+                                return conn;
+                        }
+                }
+        }
+
+        read_unlock(&kranal_data.kra_global_lock);
+        return NULL;
+}
+
+int
+kranal_close_peer_conns_locked (kra_peer_t *peer, int why)
+{
+        kra_conn_t         *conn;
+        struct list_head   *ctmp;
+        struct list_head   *cnxt;
+        int                 count = 0;
+
+        list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
+                conn = list_entry(ctmp, kra_conn_t, rac_list);
+
+                count++;
+                kranal_close_conn_locked(conn, why);
+        }
+
+        return count;
+}
+
+int
+kranal_close_stale_conns_locked (kra_peer_t *peer, __u64 incarnation)
+{
+        kra_conn_t         *conn;
+        struct list_head   *ctmp;
+        struct list_head   *cnxt;
+        int                 count = 0;
+
+        list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
+                conn = list_entry(ctmp, kra_conn_t, rac_list);
+
+                if (conn->rac_incarnation == incarnation)
+                        continue;
+
+                CDEBUG(D_NET, "Closing stale conn nid:"LPX64" incarnation:"LPX64"("LPX64")\n",
+                       peer->rap_nid, conn->rac_incarnation, incarnation);
+                LASSERT (conn->rac_incarnation < incarnation);
+
+                count++;
+                kranal_close_conn_locked(conn, -ESTALE);
+        }
+
+        return count;
+}
+
+int
+kranal_close_matching_conns (ptl_nid_t nid)
+{
+        unsigned long       flags;
+        kra_peer_t         *peer;
+        struct list_head   *ptmp;
+        struct list_head   *pnxt;
+        int                 lo;
+        int                 hi;
+        int                 i;
+        int                 count = 0;
+
+        write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+        if (nid != PTL_NID_ANY)
+                lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
+        else {
+                lo = 0;
+                hi = kranal_data.kra_peer_hash_size - 1;
+        }
+
+        for (i = lo; i <= hi; i++) {
+                list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
+
+                        peer = list_entry(ptmp, kra_peer_t, rap_list);
+                        LASSERT (peer->rap_persistence > 0 ||
+                                 !list_empty(&peer->rap_conns));
+
+                        if (!(nid == PTL_NID_ANY || nid == peer->rap_nid))
+                                continue;
+
+                        count += kranal_close_peer_conns_locked(peer, 0);
+                }
+        }
+
+        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+
+        /* wildcards always succeed */
+        if (nid == PTL_NID_ANY)
+                return 0;
+
+        return (count == 0) ? -ENOENT : 0;
+}
+
+int
+kranal_cmd(struct portals_cfg *pcfg, void * private)
+{
+        int rc = -EINVAL;
+
+        LASSERT (pcfg != NULL);
+
+        switch(pcfg->pcfg_command) {
+        case NAL_CMD_GET_PEER: {
+                ptl_nid_t   nid = 0;
+                __u32       ip = 0;
+                int         port = 0;
+                int         share_count = 0;
+
+                rc = kranal_get_peer_info(pcfg->pcfg_count,
+                                          &nid, &ip, &port, &share_count);
+                pcfg->pcfg_nid   = nid;
+                pcfg->pcfg_size  = 0;
+                pcfg->pcfg_id    = ip;
+                pcfg->pcfg_misc  = port;
+                pcfg->pcfg_count = 0;
+                pcfg->pcfg_wait  = share_count;
+                break;
+        }
+        case NAL_CMD_ADD_PEER: {
+                rc = kranal_add_persistent_peer(pcfg->pcfg_nid,
+                                                pcfg->pcfg_id, /* IP */
+                                                pcfg->pcfg_misc); /* port */
+                break;
+        }
+        case NAL_CMD_DEL_PEER: {
+                rc = kranal_del_peer(pcfg->pcfg_nid, 
+                                     /* flags == single_share */
+                                     pcfg->pcfg_flags != 0);
+                break;
+        }
+        case NAL_CMD_GET_CONN: {
+                kra_conn_t *conn = kranal_get_conn_by_idx(pcfg->pcfg_count);
+
+                if (conn == NULL)
+                        rc = -ENOENT;
+                else {
+                        rc = 0;
+                        pcfg->pcfg_nid   = conn->rac_peer->rap_nid;
+                        pcfg->pcfg_id    = 0;
+                        pcfg->pcfg_misc  = 0;
+                        pcfg->pcfg_flags = 0;
+                        kranal_put_conn(conn);
+                }
+                break;
+        }
+        case NAL_CMD_CLOSE_CONNECTION: {
+                rc = kranal_close_matching_conns(pcfg->pcfg_nid);
+                break;
+        }
+        case NAL_CMD_REGISTER_MYNID: {
+                if (pcfg->pcfg_nid == PTL_NID_ANY)
+                        rc = -EINVAL;
+                else
+                        rc = kranal_set_mynid(pcfg->pcfg_nid);
+                break;
+        }
+        }
+
+        return rc;
+}
+
+void
+kranal_free_txdescs(struct list_head *freelist)
+{
+        kra_tx_t    *tx;
+
+        while (!list_empty(freelist)) {
+                tx = list_entry(freelist->next, kra_tx_t, tx_list);
+
+                list_del(&tx->tx_list);
+                PORTAL_FREE(tx->tx_phys, PTL_MD_MAX_IOV * sizeof(*tx->tx_phys));
+                PORTAL_FREE(tx, sizeof(*tx));
+        }
+}
+
+int
+kranal_alloc_txdescs(struct list_head *freelist, int n)
+{
+        int            isnblk = (freelist == &kranal_data.kra_idle_nblk_txs);
+        int            i;
+        kra_tx_t      *tx;
+
+        LASSERT (freelist == &kranal_data.kra_idle_txs ||
+                 freelist == &kranal_data.kra_idle_nblk_txs);
+        LASSERT (list_empty(freelist));
+
+        for (i = 0; i < n; i++) {
+
+                PORTAL_ALLOC(tx, sizeof(*tx));
+                if (tx == NULL) {
+                        CERROR("Can't allocate %stx[%d]\n", 
+                               isnblk ? "nblk ", i);
+                        kranal_free_txdescs();
+                        return -ENOMEM;
+                }
+
+                PORTAL_ALLOC(tx->tx_phys,
+                             PLT_MD_MAX_IOV * sizeof(*tx->tx_phys));
+                if (tx->tx_phys == NULL) {
+                        CERROR("Can't allocate %stx[%d]->tx_phys\n", 
+                               isnblk ? "nblk ", i);
+
+                        PORTAL_FREE(tx, sizeof(*tx));
+                        kranal_free_txdescs(freelist);
+                        return -ENOMEM;
+                }
+
+                tx->tx_isnblk = isnblk
+                tx->tx_buftype = RANAL_BUF_NONE;
+
+                list_add(&tx->tx_list, freelist);
+        }
+
+        return 0;
+}
+
+int
+kranal_device_init(int id, kra_device_t *dev)
+{
+        const int         total_ntx = RANAL_NTX + RANAL_NTX_NBLK;
+        RAP_RETURN        rrc;
+
+        dev->rad_id = id;
+        rrc = RapkGetDeviceByIndex(id, NULL, kranal_device_callback,
+                                   &dev->rad_handle);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Can't get Rapidarray Device %d: %d\n", idx, rrc);
+                goto failed_0;
+        }
+
+        rrc = RapkReserveRdma(dev->rad_handle, total_ntx);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Can't reserve %d RDMA descriptors"
+                       " for device[%d]: %d\n", total_ntx, i, rrc);
+                goto failed_1;
+        }
+
+        rrc = RapkCreatePtag(dev->rad_handle,
+                             &dev->rad_ptag);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Can't create ptag"
+                       " for device[%d]: %d\n", i, rrc);
+                goto failed_1;
+        }
+
+        rrc = RapkCreateCQ(dev->rad_handle, total_ntx, dev->rad_ptag,
+                           &dev->rad_rdma_cq);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Can't create rdma cq size %d"
+                       " for device[%d]: %d\n", total_ntx, i, rrc);
+                goto failed_2;
+        }
+
+        rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE,
+                           dev->rad_ptag, &dev->rad_fma_cq);
+        if (rrc != RAP_SUCCESS) {
+                CERROR("Can't create fma cq size %d"
+                       " for device[%d]: %d\n", RANAL_RX_CQ_SIZE, i, rrc);
+                goto failed_3;
+        }
+
+        return 0;
+
+ failed_3:
+        RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
+ failed_2:
+        RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
+ failed_1:
+        RapkReleaseDevice(dev->rad_handle);
+ failed_0:
+        return -ENODEV;
+}
+
+void
+kranal_device_fini(kra_device_t *dev)
+{
+        RapkDestroyCQ(dev->rad_handle, dev->rad_rx_cq, dev->rad_ptag);
+        RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
+        RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
+        RapkReleaseDevice(dev->rad_handle);
+}
+
+void
+kranal_api_shutdown (nal_t *nal)
+{
+        int           i;
+        int           rc;
+        unsigned long flags;
+        
+        if (nal->nal_refct != 0) {
+                /* This module got the first ref */
+                PORTAL_MODULE_UNUSE;
+                return;
+        }
+
+        CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
+               atomic_read(&portal_kmemory));
+
+        LASSERT (nal == &kranal_api);
+
+        switch (kranal_data.kra_init) {
+        default:
+                CERROR("Unexpected state %d\n", kranal_data.kra_init);
+                LBUG();
+
+        case RANAL_INIT_ALL:
+                /* stop calls to nal_cmd */
+                libcfs_nal_cmd_unregister(OPENRANAL);
+                /* No new persistent peers */
+
+                /* resetting my NID to unadvertises me, removes my
+                 * listener and nukes all current peers */
+                kranal_set_mynid(PTL_NID_ANY);
+                /* no new peers or conns */
+
+                /* Wait for all peer/conn state to clean up */
+                i = 2;
+                while (atomic_read(&kranal_data.kra_nconns) != 0 ||
+                       atomic_read(&kranal-data.kra_npeers) != 0) {
+                        i++;
+                        CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
+                               "waiting for %d peers and %d conns to close down\n",
+                               atomic_read(&kranal_data.kra_npeers),
+                               atomic_read(&kranal_data.kra_nconns));
+                       kranal_pause(HZ);
+                }
+                /* fall through */
+
+        case RANAL_INIT_LIB:
+                lib_fini(&kranal_lib);
+                /* fall through */
+
+        case RANAL_INIT_DATA:
+                break;
+        }
+
+        /* flag threads to terminate; wake and wait for them to die */
+        kranal_data.kra_shutdown = 1;
+
+        for (i = 0; i < kranal_data.kra_ndevs; i++) {
+                kra_device_t *dev = &kranal_data.kra_devices[i];
+
+                LASSERT (list_empty(&dev->rad_connq));
+
+                spin_lock_irqsave(&dev->rad_lock, flags);
+                wake_up(&dev->rad_waitq);
+                spin_unlock_irqrestore(&dev->rad_lock, flags);
+        }
+
+        spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
+        wake_up_all(&kranal_data.kra_reaper_waitq);
+        spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
+
+        LASSERT (list_empty(&kranal_data.kra_connd_peers));
+        spin_lock_irqsave(&kranal-data.kra_connd_lock, flags); 
+        wake_up_all(&kranal_data.kra_connd_waitq);
+        spin_unlock_irqrestore(&kranal-data.kra_connd_lock, flags); 
+
+        i = 2;
+        while (atomic_read(&kranal_data.kra_nthreads) != 0) {
+                i++;
+                CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
+                       "Waiting for %d threads to terminate\n",
+                       atomic_read(&kranal_data.kra_nthreads));
+                kranal_pause(HZ);
+        }
+
+        LASSERT (atomic_read(&kranal_data.kra_npeers) == 0);
+        if (kranal_data.kra_peers != NULL) {
+                for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
+                        LASSERT (list_empty(&kranal_data.kra_peers[i]));
+
+                PORTAL_FREE(kranal_data.kra_peers,
+                            sizeof (struct list_head) * 
+                            kranal_data.kra_peer_hash_size);
+        }
+
+        LASSERT (atomic_read(&kranal_data.kra_nconns) == 0);
+        if (kranal_data.kra_conns != NULL) {
+                for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
+                        LASSERT (list_empty(&kranal_data.kra_conns[i]));
+
+                PORTAL_FREE(kranal_data.kra_conns,
+                            sizeof (struct list_head) * 
+                            kranal_data.kra_conn_hash_size);
+        }
+
+        for (i = 0; i < kranal_data.kra_ndevs; i++)
+                kranal_device_fini(&kranal_data.kra_devices[i]);
+
+        kranal_free_txdescs(&kranal_data.kra_idle_txs);
+        kranal_free_txdescs(&kranal_data.kra_idle_nblk_txs);
+
+        CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
+               atomic_read(&portal_kmemory));
+        printk(KERN_INFO "Lustre: RapidArray NAL unloaded (final mem %d)\n",
+               atomic_read(&portal_kmemory));
+
+        kranal_data.kra_init = RANAL_INIT_NOTHING;
+}
+
+int
+kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
+                    ptl_ni_limits_t *requested_limits,
+                    ptl_ni_limits_t *actual_limits)
+{
+        static int        device_ids[] = {RAPK_MAIN_DEVICE_ID,
+                                          RAPK_EXPANSION_DEVICE_ID};
+        struct timeval    tv;
+        ptl_process_id_t  process_id;
+        int               pkmem = atomic_read(&portal_kmemory);
+        int               rc;
+        int               i;
+        kra_device_t     *dev;
+
+        LASSERT (nal == &kranal_api);
+
+        if (nal->nal_refct != 0) {
+                if (actual_limits != NULL)
+                        *actual_limits = kranal_lib.libnal_ni.ni_actual_limits;
+                /* This module got the first ref */
+                PORTAL_MODULE_USE;
+                return PTL_OK;
+        }
+
+        LASSERT (kranal_data.kra_init == RANAL_INIT_NOTHING);
+
+        memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */
+
+        /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and
+         * a unique (for all time) incarnation so we can uniquely identify
+         * the sender.  The incarnation is an incrementing counter
+         * initialised with seconds + microseconds at startup time.  So we
+         * rely on NOT creating connections more frequently on average than
+         * 1MHz to ensure we don't use old incarnations when we reboot. */
+        do_gettimeofday(&tv);
+        kranal_data.kra_next_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
+
+        init_MUTEX(&kranal_data.kra_nid_mutex);
+        init_MUTEX_LOCKED(&kranal_data.kra_listener_signal);
+
+        rwlock_init(&kranal_data.kra_global_lock);
+
+        for (i = 0; i < RANAL_MAXDEVS; i++ ) {
+                kra_device_t  *dev = &kranal_data.kra_devices[i];
+
+                dev->rad_idx = i;
+                INIT_LIST_HEAD(&dev->rad_connq);
+                init_waitqueue_head(&dev->rad_waitq);
+                spin_lock_init(&dev->rad_lock);
+        }
+
+        init_waitqueue_head(&kranal_data.kra_reaper_waitq);
+        spin_lock_init(&kranal_data.kra_reaper_lock);
+
+        INIT_LIST_HEAD(&kranal_data.kra_connd_peers);
+        init_waitqueue_head(&kranal_data.kra_connd_waitq);
+        spin_lock_init(&kranal_data.kra_connd_lock);
+
+        INIT_LIST_HEAD(&kranal_data.kra_idle_txs);
+        INIT_LIST_HEAD(&kranal_data.kra_idle_nblk_txs);
+        init_waitqueue_head(&kranal_data.kra_idle_tx_waitq);
+        spin_lock_init(&kranal_data.kra_tx_lock);
+
+        /* OK to call kranal_api_shutdown() to cleanup now */
+        kranal_data.kra_init = RANAL_INIT_DATA;
+        
+        kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE;
+        PORTAL_ALLOC(kranal_data.kra_peers,
+                     sizeof(struct list_head) * kranal_data.kra_peer_hash_size);
+        if (kranal_data.kra_peers == NULL)
+                goto failed;
+
+        for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
+                INIT_LIST_HEAD(&kranal_data.kra_peers[i]);
+
+        kranal_data.kra_conn_hash_size = RANAL_PEER_HASH_SIZE;
+        PORTAL_ALLOC(kranal_data.kra_conns,
+                     sizeof(struct list_head) * kranal_data.kra_conn_hash_size);
+        if (kranal_data.kra_conns == NULL)
+                goto failed;
+
+        for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
+                INIT_LIST_HEAD(&kranal_data.kra_conns[i]);
+
+        rc = kranal_alloc_txdescs(&kranal_data.kra_idle_txs, RANAL_NTX);
+        if (rc != 0)
+                goto failed;
+
+        rc = kranal_alloc_txdescs(&kranal_data.kra_idle_nblk_txs,RANAL_NTX_NBLK);
+        if (rc != 0)
+                goto failed;
+
+        process_id.pid = requested_pid;
+        process_id.nid = PTL_NID_ANY;           /* don't know my NID yet */
+
+        rc = lib_init(&kranal_lib, nal, process_id,
+                      requested_limits, actual_limits);
+        if (rc != PTL_OK) {
+                CERROR("lib_init failed: error %d\n", rc);
+                goto failed;
+        }
+
+        /* lib interface initialised */
+        kranal_data.kra_init = RANAL_INIT_LIB;
+        /*****************************************************/
+
+        rc = kranal_thread_start(kranal_reaper, NULL);
+        if (rc != 0) {
+                CERROR("Can't spawn ranal reaper: %d\n", rc);
+                goto failed;
+        }
+
+        for (i = 0; i < RANAL_N_CONND; i++) {
+                rc = kranal_thread_start(kranal_connd, (void *)i);
+                if (rc != 0) {
+                        CERROR("Can't spawn ranal connd[%d]: %d\n",
+                               i, rc);
+                        goto failed;
+                }
+        }
+
+        LASSERT(kranal_data.kra_ndevs == 0);
+        for (i = 0; i < sizeof(device_ids)/sizeof(device_ids[0]); i++) {
+                dev = &kranal_data.kra_devices[kranal_data.kra_ndevs];
+
+                rc = kranal_device_init(device_ids[i], dev);
+                if (rc == 0)
+                        kranal_data.kra_ndevs++;
+
+                rc = kranal_thread_start(kranal_scheduler, dev);
+                if (rc != 0) {
+                        CERROR("Can't spawn ranal scheduler[%d]: %d\n",
+                               i, rc);
+                        goto failed;
+                }
+        }
+
+        if (kranal_data.kra_ndevs == 0)
+                goto failed;
+
+        rc = libcfs_nal_cmd_register(OPENRANAL, &kranal_cmd, NULL);
+        if (rc != 0) {
+                CERROR("Can't initialise command interface (rc = %d)\n", rc);
+                goto failed;
+        }
+
+        /* flag everything initialised */
+        kranal_data.kra_init = RANAL_INIT_ALL;
+        /*****************************************************/
+
+        CDEBUG(D_MALLOC, "initial kmem %d\n", atomic_read(&portal_kmemory));
+        printk(KERN_INFO "Lustre: RapidArray NAL loaded "
+               "(initial mem %d)\n", pkmem);
+
+        return PTL_OK;
+
+ failed:
+        kranal_api_shutdown(&kranal_api);    
+        return PTL_FAIL;
+}
+
+void __exit
+kranal_module_fini (void)
+{
+#ifdef CONFIG_SYSCTL
+        if (kranal_tunables.kra_sysctl != NULL)
+                unregister_sysctl_table(kranal_tunables.kra_sysctl);
+#endif
+        PtlNIFini(kranal_ni);
+
+        ptl_unregister_nal(OPENRANAL);
+}
+
+int __init
+kranal_module_init (void)
+{
+        int    rc;
+
+        /* the following must be sizeof(int) for
+         * proc_dointvec/kranal_listener_procint() */
+        LASSERT (sizeof(kranal_tunables.kra_timeout) == sizeof(int));
+        LASSERT (sizeof(kranal_tunables.kra_listener_timeout) == sizeof(int));
+        LASSERT (sizeof(kranal_tunables.kra_backlog) == sizeof(int));
+        LASSERT (sizeof(kranal_tunables.kra_port) == sizeof(int));
+        LASSERT (sizeof(kranal_tunables.kra_max_immediate) == sizeof(int));
+
+        kranal_api.nal_ni_init = kranal_api_startup;
+        kranal_api.nal_ni_fini = kranal_api_shutdown;
+
+        /* Initialise dynamic tunables to defaults once only */
+        kranal_tunables.kra_timeout = RANAL_TIMEOUT;
+
+        rc = ptl_register_nal(OPENRANAL, &kranal_api);
+        if (rc != PTL_OK) {
+                CERROR("Can't register RANAL: %d\n", rc);
+                return -ENOMEM;               /* or something... */
+        }
+
+        /* Pure gateways want the NAL started up at module load time... */
+        rc = PtlNIInit(OPENRANAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kranal_ni);
+        if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
+                ptl_unregister_nal(OPENRANAL);
+                return -ENODEV;
+        }
+
+#ifdef CONFIG_SYSCTL
+        /* Press on regardless even if registering sysctl doesn't work */
+        kranal_tunables.kra_sysctl = 
+                register_sysctl_table(kranal_top_ctl_table, 0);
+#endif
+        return 0;
+}
+
+MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
+MODULE_DESCRIPTION("Kernel RapidArray NAL v0.01");
+MODULE_LICENSE("GPL");
+
+module_init(kranal_module_init);
+module_exit(kranal_module_fini);
diff --git a/lustre/portals/knals/ranal/ranal.h b/lustre/portals/knals/ranal/ranal.h
new file mode 100644 (file)
index 0000000..c134179
--- /dev/null
@@ -0,0 +1,438 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2004 Cluster File Systems, Inc.
+ *   Author: Eric Barton <eric@bartonsoftware.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/kernel.h>
+#include <linux/mm.h>
+#include <linux/string.h>
+#include <linux/stat.h>
+#include <linux/errno.h>
+#include <linux/smp_lock.h>
+#include <linux/unistd.h>
+#include <linux/uio.h>
+
+#include <asm/system.h>
+#include <asm/uaccess.h>
+#include <asm/io.h>
+
+#include <linux/init.h>
+#include <linux/fs.h>
+#include <linux/file.h>
+#include <linux/stat.h>
+#include <linux/list.h>
+#include <linux/kmod.h>
+#include <linux/sysctl.h>
+
+#define DEBUG_SUBSYSTEM S_NAL
+
+#include <linux/kp30.h>
+#include <portals/p30.h>
+#include <portals/lib-p30.h>
+#include <portals/nal.h>
+
+#include <rapl.h>
+
+#if CONFIG_SMP
+# define RANAL_N_SCHED      num_online_cpus()   /* # schedulers */
+#else
+# define RANAL_N_SCHED      1                   /* # schedulers */
+#endif
+
+#define RANAL_MAXDEVS       2                   /* max # devices RapidArray supports */
+
+#define RANAL_N_CONND       4                   /* # connection daemons */
+
+#define RANAL_MIN_RECONNECT_INTERVAL 1          /* first failed connection retry (seconds)... */
+#define RANAL_MAX_RECONNECT_INTERVAL 60         /* ...exponentially increasing to this */
+
+#define RANAL_FMA_PREFIX_LEN      232           /* size of FMA "Prefix" */
+#define RANAL_FMA_MAX_DATA_LEN    ((7<<10)-256) /* Max FMA MSG is 7K including prefix */
+
+#define RANAL_PEER_HASH_SIZE  101               /* # peer lists */
+#define RANAL_CONN_HASH_SIZE  101               /* # conn lists */
+
+#define RANAL_NTX             64                /* # tx descs */
+#define RANAL_NTX_NBLK        256               /* # reserved tx descs */
+
+#define RANAL_RX_CQ_SIZE      1024              /* # entries in receive CQ 
+                                                 * (overflow is a performance hit) */
+
+#define RANAL_RESCHED         100               /* # scheduler loops before reschedule */
+
+#define RANAL_MIN_TIMEOUT     5                 /* minimum timeout interval (seconds) */
+#define RANAL_TIMEOUT2KEEPALIVE(t) (((t)+1)/2)  /* timeout -> keepalive interval */
+
+/* default vals for runtime tunables */
+#define RANAL_TIMEOUT           30              /* comms timeout (seconds) */
+#define RANAL_LISTENER_TIMEOUT   5              /* listener timeout (seconds) */
+#define RANAL_MAX_IMMEDIATE    (2<<10)          /* biggest immediate payload */
+
+typedef struct 
+{
+        int               kra_timeout;          /* comms timeout (seconds) */
+        int               kra_listener_timeout; /* max time the listener can block */
+       int               kra_backlog;          /* listener's backlog */
+       int               kra_port;             /* listener's TCP/IP port */
+        int               kra_max_immediate;    /* biggest immediate payload */
+        struct ctl_table_header *kra_sysctl;    /* sysctl interface */
+} kra_tunables_t;
+
+typedef struct
+{
+        RAP_PVOID               rad_handle;     /* device handle */
+        RAP_PROTECTION_HANDLE   rad_ptag;       /* protection tag */
+        RAP_CQ_HANDLE           rad_fma_cq;     /* FMA (small message) completion queue */
+        RAP_CQ_HANDLE           rad_rdma_cq;    /* rdma completion queue */
+        int                     rad_id;         /* device id */
+        int                     rad_idx;        /* index in kra_devices */
+        int                     rad_ready;      /* set by device callback */
+        struct list_head        rad_connq;      /* connections requiring attention */
+        wait_queue_head_t       rad_waitq;      /* scheduler waits here */
+        spinlock_t              rad_lock;       /* serialise */
+} kra_device_t;
+        
+typedef struct 
+{
+        int               kra_init;             /* initialisation state */
+        int               kra_shutdown;         /* shut down? */
+        atomic_t          kra_nthreads;         /* # live threads */
+
+        struct semaphore  kra_nid_mutex;        /* serialise NID/listener ops */
+       struct semaphore  kra_listener_signal;  /* block for listener startup/shutdown */
+       struct socket    *kra_listener_sock;    /* listener's socket */
+       int               kra_listener_shutdown; /* ask listener to close */
+
+        kra_device_t      kra_devices[RANAL_MAXDEVS]; /* device/ptag/cq etc */
+        int               kra_ndevs;            /* # devices */
+
+        rwlock_t          kra_global_lock;      /* stabilize peer/conn ops */
+
+        struct list_head *kra_peers;            /* hash table of all my known peers */
+        int               kra_peer_hash_size;   /* size of kra_peers */
+        atomic_t          kra_npeers;           /* # peers extant */
+
+        struct list_head *kra_conns;            /* conns hashed by cqid */
+        int               kra_conn_hash_size;   /* size of kra_conns */
+        __u64             kra_next_incarnation; /* conn incarnation # generator */
+        int               kra_next_cqid;        /* cqid generator */
+        atomic_t          kra_nconns;           /* # connections extant */
+
+        long              kra_new_min_timeout;  /* minimum timeout on any new conn */
+        wait_queue_head_t kra_reaper_waitq;     /* reaper sleeps here */
+        spinlock_t        kra_reaper_lock;      /* serialise */
+        
+        struct list_head  kra_connd_peers;      /* peers waiting for a connection */
+        wait_queue_head_t kra_connd_waitq;      /* connection daemons sleep here */
+        spinlock_t        kra_connd_lock;       /* serialise */
+
+        struct list_head  kra_idle_txs;         /* idle tx descriptors */
+        struct list_head  kra_idle_nblk_txs;    /* idle reserved tx descriptors */
+        __u64             kra_next_tx_cookie;   /* RDMA completion cookie */
+        wait_queue_head_t kra_idle_tx_waitq;    /* block here for tx descriptor */
+        spinlock_t        kra_tx_lock;          /* serialise */
+} kra_data_t;
+
+#define RANAL_INIT_NOTHING         0
+#define RANAL_INIT_DATA            1
+
+#define RANAL_INIT_ALL             7
+
+/************************************************************************
+ * Wire message structs.  These are sent in sender's byte order
+ * (i.e. receiver checks magic and flips if required).
+ */
+
+typedef struct kra_connreq                     /* connection request/response */
+{                                              /* (sent via socket) */
+        __u32             racr_magic;          /* I'm an ranal connreq */
+        __u16             racr_version;                /* this is my version number */
+        __u16             racr_devid;           /* which device to connect on */
+        __u64             racr_nid;            /* my NID */
+        __u64             racr_incarnation;    /* my incarnation */
+        __u32             racr_timeout;         /* my timeout */
+       RAP_RI_PARAMETERS racr_riparams;        /* my endpoint info */
+} kra_connreq_t;
+
+typedef struct
+{
+        RAP_MEM_KEY       rard_key;
+        RAP_PVOID64       rard_addr;
+        RAP_UINT32        rard_nob;
+} kra_rdma_desc_t;
+
+typedef struct
+{
+        ptl_hdr_t         raim_hdr;             /* portals header */
+        /* Portals payload is in FMA "Message Data" */
+} kra_immediate_msg_t;
+
+typedef struct
+{
+        ptl_hdr_t         raprm_hdr;           /* portals header */
+        __u64             raprm_cookie;                /* opaque completion cookie */
+} kra_putreq_msg_t;
+
+typedef struct
+{
+       __u64             rapam_src_cookie;     /* reflected completion cookie */
+       __u64             rapam_dst_cookie;     /* opaque completion cookie */
+       kra_rdma_desc_t   rapam_desc;           /* sender's sink buffer */
+} kra_putack_msg_t;
+
+typedef struct
+{
+        ptl_hdr_t         ragm_hdr;             /* portals header */
+        __u64             ragm_cookie;          /* opaque completion cookie */
+        kra_rdma_desc_t   ragm_desc;            /* sender's sink buffer */
+} kra_get_msg_t;
+
+typedef struct
+{
+        __u64             racm_cookie;          /* reflected completion cookie */
+} kra_completion_msg_t;
+
+typedef struct                                  /* NB must fit in FMA "Prefix" */
+{
+        __u32             ram_magic;           /* I'm an ranal message */
+        __u16             ram_version;         /* this is my version number */
+        __u16             ram_type;            /* msg type */
+        __u64             ram_srcnid;           /* sender's NID */
+        __u64             ram_incarnation;      /* sender's connection incarnation */
+        union {
+                kra_immediate_msg_t   immediate;
+               kra_putreq_msg_t      putreq;
+               kra_putack_msg_t      putack;
+               kra_get_msg_t         get;
+                kra_completion_msg_t  completion;
+        }                    ram_u;
+        __u32             ram_seq;              /* incrementing sequence number */
+} kra_msg_t;
+
+#define RANAL_MSG_MAGIC       0x0be91b92        /* unique magic */
+#define RANAL_MSG_VERSION              1        /* current protocol version */
+
+#define RANAL_MSG_FENCE             0x80        /* fence RDMA */
+
+#define RANAL_MSG_NONE              0x00        /* illegal message */
+#define RANAL_MSG_NOOP              0x01        /* empty ram_u (keepalive) */
+#define RANAL_MSG_IMMEDIATE         0x02        /* ram_u.immediate */
+#define RANAL_MSG_PUT_REQ           0x03       /* ram_u.putreq (src->sink) */
+#define RANAL_MSG_PUT_NAK           0x04       /* ram_u.completion (no PUT match: sink->src) */
+#define RANAL_MSG_PUT_ACK           0x05       /* ram_u.putack (PUT matched: sink->src) */
+#define RANAL_MSG_PUT_DONE          0x86       /* ram_u.completion (src->sink) */
+#define RANAL_MSG_GET_REQ           0x07               /* ram_u.get (sink->src) */
+#define RANAL_MSG_GET_NAK           0x08        /* ram_u.completion (no GET match: src->sink) */
+#define RANAL_MSG_GET_DONE          0x89       /* ram_u.completion (src->sink) */
+#define RANAL_MSG_CLOSE             0x8a        /* empty ram_u */
+
+/***********************************************************************/
+
+typedef struct kra_tx                           /* message descriptor */
+{
+        struct list_head          tx_list;      /* queue on idle_txs/rac_sendq/rac_waitq */
+        struct kra_conn          *tx_conn;      /* owning conn */
+        lib_msg_t                *tx_libmsg[2]; /* lib msgs to finalize on completion */
+        unsigned long             tx_qtime;     /* when tx started to wait for something */
+        int                       tx_isnblk;    /* I'm reserved for non-blocking sends */
+        int                       tx_nob;       /* # bytes of payload */
+        int                       tx_buftype;   /* payload buffer type */
+        void                     *tx_buffer;    /* source/sink buffer */
+        int                       tx_phys_offset; /* first page offset (if phys) */
+        int                       tx_phys_npages; /* # physical pages */
+        RAP_PHYS_REGION          *tx_phys;      /* page descriptors */
+        RAP_MEM_KEY               tx_map_key;   /* mapping key */
+        RAP_RDMA_DESCRIPTOR       tx_rdma_desc; /* rdma descriptor */
+        __u64                     tx_cookie;    /* identify this tx to peer */
+        kra_msg_t                 tx_msg;       /* FMA message buffer */
+} kra_tx_t;
+
+#define RANAL_BUF_NONE           0              /* buffer type not set */
+#define RANAL_BUF_IMMEDIATE      1              /* immediate data */
+#define RANAL_BUF_PHYS_UNMAPPED  2              /* physical: not mapped yet */
+#define RANAL_BUF_PHYS_MAPPED    3              /* physical: mapped already */
+#define RANAL_BUF_VIRT_UNMAPPED  4              /* virtual: not mapped yet */
+#define RANAL_BUF_VIRT_MAPPED    5              /* virtual: mapped already */
+
+#define RANAL_TX_IDLE            0x00           /* on freelist */
+#define RANAL_TX_SIMPLE          0x10           /* about to send a simple message */
+#define RANAL_TX_PUTI_REQ        0x20           /* PUT initiator about to send PUT_REQ */
+#define RANAL_TX_PUTI_WAIT_ACK   0x21           /* PUT initiator waiting for PUT_ACK */
+#define RANAL_TX_PUTI_RDMA       0x22           /* PUT initiator waiting for RDMA to complete */
+#define RANAL_TX_PUTI_DONE       0x23           /* PUT initiator about to send PUT_DONE */
+#define RANAL_TX_PUTT_NAK        0x30           /* PUT target about to send PUT_NAK */
+#define RANAL_TX_PUTT_ACK        0x30           /* PUT target about to send PUT_ACK */
+#define RANAL_TX_PUTT_WAIT_DONE  0x31           /* PUT target waiting for PUT_DONE */
+#define RANAL_TX_GETI_REQ        0x40           /* GET initiator about to send GET_REQ */
+#define RANAL_TX_GETI_WAIT_DONE  0x41           /* GET initiator waiting for GET_DONE */
+#define RANAL_TX_GETT_NAK        0x50           /* GET target about to send PUT_NAK */
+#define RANAL_TX_GETT_RDMA       0x51           /* GET target waiting for RDMA to complete */
+#define RANAL_TX_GETT_DONE       0x52           /* GET target about to send GET_DONE */
+
+typedef struct kra_conn
+{ 
+        struct kra_peer    *rac_peer;           /* owning peer */
+        struct list_head    rac_list;           /* stash on peer's conn list */
+        struct list_head    rac_hashlist;       /* stash in connection hash table */
+        struct list_head    rac_schedlist;      /* queue for scheduler */
+        struct list_head    rac_fmaq;           /* txs queued for FMA */
+        struct list_head    rac_rdmaq;          /* txs awaiting RDMA completion */
+        struct list_head    rac_replyq;         /* txs awaiting replies */
+        __u64               rac_peer_incarnation; /* peer's unique connection stamp */
+        __u64               rac_my_incarnation; /* my unique connection stamp */
+        unsigned long       rac_last_tx;        /* when I last sent an FMA message */
+        unsigned long       rac_last_rx;        /* when I last received an FMA messages */
+        long                rac_keepalive;      /* keepalive interval */
+        long                rac_timeout;        /* infer peer death on (last_rx + timout > now) */
+        __u32               rac_cqid;           /* my completion callback id (non-unique) */
+        __u32               rac_tx_seq;         /* tx msg sequence number */
+        __u32               rac_rx_seq;         /* rx msg sequence number */
+        atomic_t            rac_refcount;       /* # users */
+        unsigned int        rac_close_sent;     /* I've sent CLOSE */
+        unsigned int        rac_close_recvd;    /* I've received CLOSE */
+        unsigned int        rac_closing;        /* connection being torn down */
+        unsigned int        rac_scheduled;      /* being attented to */
+        spinlock_t          rac_lock;           /* serialise */
+        kra_device_t       *rac_device;         /* which device */
+       RAP_PVOID           rac_rihandle;       /* RA endpoint */
+        kra_msg_t          *rac_rxmsg;          /* incoming message (FMA prefix) */
+        kra_msg_t           rac_msg;            /* keepalive/CLOSE message buffer */
+} kra_conn_t;
+
+typedef struct kra_peer
+{
+        struct list_head    rap_list;           /* stash on global peer list */
+        struct list_head    rap_connd_list;     /* schedule on kra_connd_peers */
+        struct list_head    rap_conns;          /* all active connections */
+        struct list_head    rap_tx_queue;       /* msgs waiting for a conn */
+        ptl_nid_t           rap_nid;            /* who's on the other end(s) */
+        __u32               rap_ip;             /* IP address of peer */
+        int                 rap_port;           /* port on which peer listens */
+        atomic_t            rap_refcount;       /* # users */
+        int                 rap_persistence;    /* "known" peer refs */
+        int                 rap_connecting;     /* connection forming */
+        unsigned long       rap_reconnect_time; /* CURRENT_TIME when reconnect OK */
+        unsigned long       rap_reconnect_interval; /* exponential backoff */
+} kra_peer_t;
+
+
+extern lib_nal_t       kranal_lib;
+extern kra_data_t      kranal_data;
+extern kra_tunables_t  kranal_tunables;
+
+static inline void
+kranal_peer_addref(kra_peer_t *peer)
+{
+        CDEBUG(D_NET, "%p->"LPX64"\n", peer, peer->rap_nid);
+       LASSERT(atomic_read(&peer->rap_refcount) > 0);
+       atomic_inc(&peer->rap_refcount);
+}
+
+static inline void
+kranal_peer_decref(kra_peer_t *peer)
+{
+        CDEBUG(D_NET, "%p->"LPX64"\n", peer, peer->rap_nid);
+       LASSERT(atomic_read(&peer->rap_refcount) > 0);
+       if (atomic_dec_and_test(&peer->rap_refcount))
+               __kranal_peer_decref(peer);
+}
+
+static inline struct list_head *
+kranal_nid2peerlist (ptl_nid_t nid) 
+{
+        unsigned int hash = ((unsigned int)nid) % kranal_data.kra_peer_hash_size;
+        
+        return (&kranal_data.kra_peers [hash]);
+}
+
+static inline int
+kranal_peer_active(kra_peer_t *peer)
+{
+        /* Am I in the peer hash table? */
+        return (!list_empty(&peer->rap_list));
+}
+
+static inline void
+kranal_conn_addref(kra_conn_t *conn)
+{
+        CDEBUG(D_NET, "%p->"LPX64"\n", conn, conn->rac_peer->rap_nid);
+       LASSERT(atomic_read(&conn->rac_refcount) > 0);
+       atomic_inc(&conn->rac_refcount);
+}
+
+static inline void
+kranal_conn_decref(kra_conn_t *conn)
+{
+        CDEBUG(D_NET, "%p->"LPX64"\n", conn, conn->rac_peer->rap_nid);
+       LASSERT(atomic_read(&conn->rac_refcount) > 0);
+       if (atomic_dec_and_test(&conn->rac_refcount))
+               __kranal_conn_decref(conn);
+}
+
+static inline struct list_head *
+kranal_cqid2connlist (__u32 cqid) 
+{
+        unsigned int hash = cqid % kranal_data.kra_conn_hash_size;
+        
+        return (&kranal_data.kra_conns [hash]);
+}
+
+static inline kra_conn_t *
+kranal_cqid2conn_locked (__u32 cqid) 
+{
+        struct list_head  conns = kranal_cqid2connlist(cqid);
+        struct list_head *tmp;
+        
+        list_for_each(tmp, conns) {
+                conn = list_entry(tmp, kra_conn_t, rac_hashlist);
+                
+                if (conn->rac_cqid == cqid)
+                        return conn;
+        }
+        
+        return NULL;
+}
+
+static inline int
+kranal_tx_mapped (kra_tx_t *tx)
+{
+        return (tx->tx_buftype == RANAL_BUF_VIRT_MAPPED ||
+                tx->tx_buftype == RANAL_BUF_PHYS_MAPPED);
+}
+
+#if CONFIG_X86
+static inline __u64
+kranal_page2phys (struct page *p)
+{
+        __u64 page_number = p - mem_map;
+        
+        return (page_number << PAGE_SHIFT);
+}
+#else
+# error "no page->phys"
+#endif
+
diff --git a/lustre/portals/knals/ranal/ranal_cb.c b/lustre/portals/knals/ranal/ranal_cb.c
new file mode 100644 (file)
index 0000000..b491d71
--- /dev/null
@@ -0,0 +1,1754 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2004 Cluster File Systems, Inc.
+ *   Author: Eric Barton <eric@bartonsoftware.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#include "ranal.h"
+
+int
+kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
+{
+        /* I would guess that if kranal_get_peer (nid) == NULL,
+           and we're not routing, then 'nid' is very distant :) */
+        if ( nal->libnal_ni.ni_pid.nid == nid ) {
+                *dist = 0;
+        } else {
+                *dist = 1;
+        }
+
+        return 0;
+}
+
+void
+kranal_device_callback(RAP_INT32 devid)
+{
+        kra_device_t *dev;
+        int           i;
+        
+        for (i = 0; i < kranal_data.kra_ndevs; i++) {
+
+                dev = &kranal_data.kra_devices[i];
+                if (dev->rad_id != devid)
+                        continue;
+
+                spin_lock_irqsave(&dev->rad_lock, flags);
+
+                if (!dev->rad_ready) {
+                        dev->rad_ready = 1;
+                        wake_up(&dev->rad_waitq);
+                }
+
+                spin_unlock_irqrestore(&dev->rad_lock, flags);
+                return;
+        }
+        
+        CWARN("callback for unknown device %d\n", devid);
+}
+
+void
+kranal_schedule_conn(kra_conn_t *conn)
+{
+        kra_device_t    *dev = conn->rac_device;
+        unsigned long    flags;
+        
+        spin_lock_irqsave(&dev->rad_lock, flags);
+        
+        if (!conn->rac_scheduled) {
+                kranal_conn_addref(conn);       /* +1 ref for scheduler */
+                conn->rac_scheduled = 1;
+                list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
+                wake_up(&dev->rad_waitq);
+        }
+
+        spin_unlock_irqrestore(&dev->rad_lock, flags);
+}
+
+void
+kranal_schedule_cqid (__u32 cqid)
+{
+        kra_conn_t         *conn;
+        struct list_head   *conns;
+        struct list_head   *tmp;
+
+        conns = kranal_cqid2connlist(cqid);
+
+        read_lock(&kranal_data.kra_global_lock);
+
+        conn = kranal_cqid2conn_locked(cqid);
+        
+        if (conn == NULL)
+                CWARN("no cqid %x\n", cqid);
+        else
+                kranal_schedule_conn(conn);
+        
+        read_unlock(&kranal_data.kra_global_lock);
+}
+
+void
+kranal_schedule_dev(kra_device_t *dev)
+{
+        kra_conn_t         *conn;
+        struct list_head   *conns;
+        struct list_head   *tmp;
+        int                 i;
+
+        /* Don't do this in IRQ context (servers may have 1000s of clients) */
+        LASSERT (!in_interrupt()); 
+
+        CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
+
+        for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
+
+                /* Drop the lock on each hash bucket to ensure we don't
+                 * block anyone for too long at IRQ priority on another CPU */
+                
+                read_lock(&kranal_data.kra_global_lock);
+        
+                conns = &kranal_data.kra_conns[i];
+
+                list_for_each (tmp, conns) {
+                        conn = list_entry(tmp, kra_conn_t, rac_hashlist);
+                
+                        if (conn->rac_device == dev)
+                                kranal_schedule_conn(conn);
+                }
+                read_unlock(&kranal_data.kra_global_lock);
+        }
+}
+
+void
+kranal_tx_done (kra_tx_t *tx, int completion)
+{
+        ptl_err_t        ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
+        kra_device_t    *dev;
+        unsigned long    flags;
+        int              i;
+        RAP_RETURN       rrc;
+
+        LASSERT (!in_interrupt());
+
+        switch (tx->tx_buftype) {
+        default:
+                LBUG();
+
+        case RANAL_BUF_NONE:
+        case RANAL_BUF_IMMEDIATE:
+        case RANAL_BUF_PHYS_UNMAPPED:
+        case RANAL_BUF_VIRT_UNMAPPED:
+                break;
+
+        case RANAL_BUF_PHYS_MAPPED:
+                LASSERT (tx->tx_conn != NULL);
+                dev = tx->tx_con->rac_device;
+                rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
+                                           dev->rad_ptag, &tx->tx_map_key);
+                LASSERT (rrc == RAP_SUCCESS);
+                break;
+
+        case RANAL_BUF_VIRT_MAPPED:
+                LASSERT (tx->tx_conn != NULL);
+                dev = tx->tx_con->rac_device;
+                rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer
+                                           dev->rad_ptag, &tx->tx_map_key);
+                LASSERT (rrc == RAP_SUCCESS);
+                break;
+        }
+
+        for (i = 0; i < 2; i++) {
+                /* tx may have up to 2 libmsgs to finalise */
+                if (tx->tx_libmsg[i] == NULL)
+                        continue;
+
+                lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
+                tx->tx_libmsg[i] = NULL;
+        }
+
+        tx->tx_buftype = RANAL_BUF_NONE;
+        tx->tx_msg.ram_type = RANAL_MSG_NONE;
+        tx->tx_conn = NULL;
+
+        spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
+
+        if (tx->tx_isnblk) {
+                list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
+        } else {
+                list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
+                wake_up(&kranal_data.kra_idle_tx_waitq);
+        }
+
+        spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
+}
+
+kra_tx_t *
+kranal_get_idle_tx (int may_block) 
+{
+        unsigned long  flags;
+        kra_tx_t      *tx = NULL;
+        
+        for (;;) {
+                spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
+
+                /* "normal" descriptor is free */
+                if (!list_empty(&kranal_data.kra_idle_txs)) {
+                        tx = list_entry(kranal_data.kra_idle_txs.next,
+                                       kra_tx_t, tx_list);
+                        break;
+                }
+
+                if (!may_block) {
+                        /* may dip into reserve pool */
+                        if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
+                                CERROR("reserved tx desc pool exhausted\n");
+                                break;
+                        }
+
+                        tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
+                                       kra_tx_t, tx_list);
+                        break;
+                }
+
+                /* block for idle tx */
+                spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
+
+                wait_event(kranal_data.kra_idle_tx_waitq,
+                          !list_empty(&kranal_data.kra_idle_txs));
+        }
+
+        if (tx != NULL) {
+                list_del(&tx->tx_list);
+
+                /* Allocate a new completion cookie.  It might not be
+                 * needed, but we've got a lock right now... */
+                tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
+
+                LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
+                LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
+                LASSERT (tx->tx_conn == NULL);
+                LASSERT (tx->tx_libmsg[0] == NULL);
+                LASSERT (tx->tx_libmsg[1] == NULL);
+        }
+
+        spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
+        
+        return tx;
+}
+
+void
+kranal_init_msg(kra_msg_t *msg, int type)
+{
+        msg->ram_magic = RANAL_MSG_MAGIC;
+        msg->ram_version = RANAL_MSG_VERSION;
+        msg->ram_type = type;
+        msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
+        /* ram_incarnation gets set when FMA is sent */
+}
+
+kra_tx_t
+kranal_new_tx_msg (int may_block, int type)
+{
+        kra_tx_t *tx = kranal_get_idle_tx(may_block);
+
+        if (tx == NULL)
+                return NULL;
+
+        kranal_init_msg(&tx->tx_msg, type);
+        return tx;
+}
+
+int
+kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
+                               int offset, int nob)
+                 
+{
+        LASSERT (nob > 0);
+        LASSERT (niov > 0);
+        LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
+
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                niov--;
+                iov++;
+                LASSERT (niov > 0);
+        }
+
+        if (nob > iov->iov_len - offset) {
+                CERROR("Can't handle multiple vaddr fragments\n");
+                return -EMSGSIZE;
+        }
+
+        tx->tx_bufftype = RANAL_BUF_IMMEDIATE;
+        tx->tx_nob = nob;
+        tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
+        return 0;
+}
+
+int
+kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
+                          int offset, int nob)
+                 
+{
+        LASSERT (nob > 0);
+        LASSERT (niov > 0);
+        LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
+
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                niov--;
+                iov++;
+                LASSERT (niov > 0);
+        }
+
+        if (nob > iov->iov_len - offset) {
+                CERROR("Can't handle multiple vaddr fragments\n");
+                return -EMSGSIZE;
+        }
+
+        tx->tx_bufftype = RANAL_BUF_VIRT_UNMAPPED;
+        tx->tx_nob = nob;
+        tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
+        return 0;
+}
+
+int
+kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
+                          int offset, int nob)
+{
+        RAP_PHYS_REGION *phys = tx->tx_phys;
+        int              resid;
+
+        CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
+
+        LASSERT (nob > 0);
+        LASSERT (nkiov > 0);
+        LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
+
+        while (offset >= kiov->kiov_len) {
+                offset -= kiov->kiov_len;
+                nkiov--;
+                kiov++;
+                LASSERT (nkiov > 0);
+        }
+
+        tx->tx_bufftype = RANAL_BUF_PHYS_UNMAPPED;
+        tx->tx_nob = nob;
+        tx->tx_buffer = NULL;
+        tx->tx_phys_offset = kiov->kiov_offset + offset;
+        
+        phys->Address = kranal_page2phys(kiov->kiov_page);
+        phys->Length  = PAGE_SIZE;
+        phys++;
+
+        resid = nob - (kiov->kiov_len - offset);
+        while (resid > 0) {
+                kiov++;
+                nkiov--;
+                LASSERT (nkiov > 0);
+
+                if (kiov->kiov_offset != 0 ||
+                    ((resid > PAGE_SIZE) && 
+                     kiov->kiov_len < PAGE_SIZE)) {
+                        int i;
+                        /* Can't have gaps */
+                        CERROR("Can't make payload contiguous in I/O VM:"
+                               "page %d, offset %d, len %d \n", nphys, 
+                               kiov->kiov_offset, kiov->kiov_len);
+
+                        for (i = -nphys; i < nkiov; i++) {
+                                CERROR("kiov[%d] %p +%d for %d\n",
+                                       i, kiov[i].kiov_page, 
+                                       kiov[i].kiov_offset, kiov[i].kiov_len);
+                        }
+                        
+                        return -EINVAL;
+                }
+
+                if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
+                        CERROR ("payload too big (%d)\n", phys - tx->tx_phys);
+                        return -EMSGSIZE;
+                }
+
+                phys->Address = kranal_page2phys(kiov->kiov_page);
+                phys->Length  = PAGE_SIZE;
+                phys++;
+
+                resid -= PAGE_SIZE;
+        }
+
+        tx->tx_phys_npages = phys - tx->tx_phys;
+        return 0;
+}
+
+static inline int
+kranal_setup_buffer (kra_tx_t *tx, int niov, 
+                     struct iovec *iov, ptl_kiov_t *kiov,
+                     int offset, int nob)
+{
+        LASSERT ((iov == NULL) != (kiov == NULL));
+        
+        if (kiov != NULL)
+                return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
+        
+        return kranal_setup_virt_buffer(tx, niov, kiov, offset, nob);
+}
+
+void
+kranal_map_buffer (kra_tx_t *tx)
+{
+        kra_conn_t     *conn = tx->tx_conn;
+        kra_device_t   *dev = conn->rac_device;
+
+        switch (tx->tx_buftype) {
+        default:
+                
+        case RANAL_BUF_PHYS_UNMAPPED:
+                rrc = RapkRegisterPhys(conn->rac_device->rad_handle,
+                                       tx->tx_phys, tx->tx_phys_npages,
+                                       conn->rac_device->rad_ptag,
+                                       &tx->tx_map_key);
+                LASSERT (rrc == RAP_SUCCESS);
+                tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
+                return;
+
+        case RANAL_BUF_VIRT_UNMAPPED:
+                rrc = RapkRegisterMemory(conn->rac_device->rad_handle,
+                                         tx->tx_buffer, tx->tx_nob,
+                                         conn->rac_device->rad_ptag,
+                                         &tx->tx_map_key);
+                LASSERT (rrc == RAP_SUCCESS);
+                tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
+                return;
+        }
+}
+
+kra_conn_t *
+kranal_find_conn_locked (kra_peer_t *peer)
+{
+        struct list_head *tmp;
+
+        /* just return the first connection */
+        list_for_each (tmp, &peer->rap_conns) {
+                return list_entry(tmp, kra_conn_t, rac_list);
+        }
+
+        return NULL;
+}
+
+void
+kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
+{
+        unsigned long    flags;
+
+        tx->tx_conn = conn;
+
+        spin_lock_irqsave(&conn->rac_lock, flags);
+        list_add_tail(&tx->tx_list, &conn->rac_fmaq);
+        tx->tx_qtime = jiffies;
+        spin_unlock_irqrestore(&conn->rac_lock, flags);
+
+        kranal_schedule_conn(conn);
+}
+
+void
+kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
+{
+        unsigned long    flags;
+        kra_peer_t      *peer;
+        kra_conn_t      *conn;
+        unsigned long    now;
+        rwlock_t        *g_lock = &kranal_data.kra_global_lock;
+
+        /* If I get here, I've committed to send, so I complete the tx with
+         * failure on any problems */
+        
+        LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
+
+        read_lock(g_lock);
+        
+        peer = kranal_find_peer_locked(nid);
+        if (peer == NULL) {
+                read_unlock(g_lock);
+                kranal_tx_done(tx, -EHOSTUNREACH);
+                return;
+        }
+
+        conn = kranal_find_conn_locked(peer);
+        if (conn != NULL) {
+                kranal_post_fma(conn, tx);
+                read_unlock(g_lock);
+                return;
+        }
+        
+        /* Making one or more connections; I'll need a write lock... */
+        read_unlock(g_lock);
+        write_lock_irqsave(g_lock, flags);
+
+        peer = kranal_find_peer_locked(nid);
+        if (peer == NULL) {
+                write_unlock_irqrestore(g_lock, flags);
+                kranal_tx_done(tx -EHOSTUNREACH);
+                return;
+        }
+
+        conn = kranal_find_conn_locked(peer);
+        if (conn != NULL) {
+                /* Connection exists; queue message on it */
+                kranal_post_fma(conn, tx);
+                write_unlock_irqrestore(g_lock, flags);
+                return;
+        }
+
+        LASSERT (peer->rap_persistence > 0);
+
+        if (!peer->rap_connecting) {
+                now = CURRENT_TIME;
+                if (now < peer->rap_reconnect_time) {
+                        write_unlock_irqrestore(g_lock, flags);
+                        kranal_tx_done(tx, -EHOSTUNREACH);
+                        return;
+                }
+        
+                peer->rap_connecting = 1;
+                kranal_peer_addref(peer); /* extra ref for connd */
+        
+                spin_lock(&kranal_data.kra_connd_lock);
+        
+                list_add_tail(&peer->rap_connd_list,
+                             &kranal_data.kra_connd_peers);
+                wake_up(&kranal_data.kra_connd_waitq);
+        
+                spin_unlock(&kranal_data.kra_connd_lock);
+        }
+        
+        /* A connection is being established; queue the message... */
+        list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
+
+        write_unlock_irqrestore(g_lock, flags);
+}
+
+static void
+kranal_rdma(kra_tx_t *tx, int type, 
+            kra_rdma_desc_t *rard, int nob, __u64 cookie)
+{
+        kra_conn_t *conn = tx->tx_conn;
+        RAP_RETURN  rrc;
+
+        /* prep final completion message */
+        kranal_init_msg(&tx->tx_msg, type);
+        tx->tx_msg.ram_u.completion.racm_cookie = cookie;
+        
+        LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
+                 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
+        LASSERT (nob <= rard->rard_nob);
+
+        memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
+        tx->tx_rdma_desc.SrcPtr = tx->tx_buffer;
+        tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
+        tx->tx_rdma_desc.DstPtr = rard->rard_addr;
+        tx->tx_rdma_desc.DstKey = rard->rard_key;
+        tx->tx_rdma_desc.Length = nob;
+        tx->tx_rdma_desc.AppPtr = tx;
+
+        if (nob == 0) { /* Immediate completion */
+                kranal_post_fma(conn, tx);
+                return;
+        }
+        
+        rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
+        LASSERT (rrc == RAP_SUCCESS);
+
+        spin_lock_irqsave(&conn->rac_lock, flags);
+        list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
+        tx->tx_qtime = jiffies;
+        spin_unlock_irqrestore(&conn->rac_lock, flags);
+}
+
+int
+kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
+{
+        __u32      nob_received = nob;
+        RAP_RETURN rrc;
+
+        LASSERT (conn->rac_rxmsg != NULL);
+
+        rrc = RapkFmaCopyToUser(conn->rac_rihandle, buffer,
+                                &nob_received, sizeof(kra_msg_t));
+        LASSERT (rrc == RAP_SUCCESS);
+
+        conn->rac_rxmsg = NULL;
+
+        if (nob_received != nob) {
+                CWARN("Expected %d immediate bytes but got %d\n",
+                      nob, nob_received);
+                return -EPROTO;
+        }
+        
+        return 0;
+}
+
+ptl_err_t
+kranal_do_send (lib_nal_t    *nal, 
+                void         *private,
+                lib_msg_t    *libmsg,
+                ptl_hdr_t    *hdr, 
+                int           type, 
+                ptl_nid_t     nid, 
+                ptl_pid_t     pid,
+                unsigned int  niov, 
+                struct iovec *iov, 
+                ptl_kiov_t   *kiov,
+                size_t        offset,
+                size_t        nob)
+{
+        kra_conn_t *conn;
+        kra_tx_t   *tx;
+
+        /* NB 'private' is different depending on what we're sending.... */
+
+        CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
+               " pid %d\n", nob, niov, nid , pid);
+
+        LASSERT (nob == 0 || niov > 0);
+        LASSERT (niov <= PTL_MD_MAX_IOV);
+
+        LASSERT (!in_interrupt());
+        /* payload is either all vaddrs or all pages */
+        LASSERT (!(kiov != NULL && iov != NULL));
+
+        switch(type) {
+        default:
+                LBUG();
+                
+        case PTL_MSG_REPLY: {
+                /* reply's 'private' is the conn that received the GET_REQ */
+                conn = private;
+                LASSERT (conn->rac_rxmsg != NULL);
+
+                if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
+                        if (nob > RANAL_MAX_IMMEDIATE) {
+                                CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
+                                       nob, nid);
+                                return PTL_FAIL;
+                        }
+                        break;                  /* RDMA not expected */
+                }
+                
+                /* Incoming message consistent with immediate reply? */
+                if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
+                        CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
+                              nid, conn->rac_rxmsg->ram_type);
+                        return PTL_FAIL;
+                }
+
+                tx = kranal_get_idle_tx(0);
+                if (tx == NULL)
+                        return PTL_FAIL;
+
+                rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
+                if (rc != 0) {
+                        kranal_tx_done(tx, rc);
+                        return PTL_FAIL;
+                }
+
+                tx->tx_conn = conn;
+                tx->tx_libmsg[0] = libmsg;
+
+                kranal_map_buffer(tx);
+                kranal_rdma(tx, RANAL_MSG_GET_DONE,
+                            &conn->rac_rxmsg->ram_u.getreq.ragm_desc, nob,
+                            &conn->rac_rxmsg->ram_u.getreq.ragm_cookie);
+                return PTL_OK;
+        }
+
+        case PTL_MSG_GET:
+                if (kiov == NULL &&             /* not paged */
+                    nob <= RANAL_MAX_IMMEDIATE && /* small enough */
+                    nob <= kranal_tunables.kra_max_immediate)
+                        break;                  /* send IMMEDIATE */
+
+                tx = kranal_new_tx_msg(0, RANAL_MSG_GET_REQ);
+                if (tx == NULL)
+                        return PTL_NO_SPACE;
+
+                rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
+                if (rc != 0) {
+                        kranal_tx_done(tx, rc);
+                        return PTL_FAIL;
+                }
+
+                tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
+                if (tx->tx_libmsg[1] == NULL) {
+                        CERROR("Can't create reply for GET to "LPX64"\n", nid);
+                        kranal_tx_done(tx, rc);
+                        return PTL_FAIL;
+                }
+
+                tx->tx_libmsg[0] = libmsg;
+                tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
+                /* rest of tx_msg is setup just before it is sent */
+                kranal_launch_tx(tx, nid);
+                return PTL_OK
+
+        case PTL_MSG_ACK:
+                LASSERT (nob == 0);
+                break;
+
+        case PTL_MSG_PUT:
+                if (kiov == NULL &&             /* not paged */
+                    nob <= RANAL_MAX_IMMEDIATE && /* small enough */
+                    nob <= kranal_tunables.kra_max_immediate)
+                        break;                  /* send IMMEDIATE */
+                
+                tx = kranal_new_tx_msg(!in_interrupt(), RANA_MSG_PUT_REQ);
+                if (tx == NULL)
+                        return PTL_NO_SPACE;
+
+                rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
+                if (rc != 0) {
+                        kranal_tx_done(tx, rc);
+                        return PTL_FAIL;
+                }
+
+                tx->tx_libmsg[0] = libmsg;
+                tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
+                /* rest of tx_msg is setup just before it is sent */
+                kranal_launch_tx(tx, nid);
+                return PTL_OK;
+        }
+
+        LASSERT (kiov == NULL);
+        LASSERT (nob <= RANAL_MAX_IMMEDIATE);
+
+        tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
+                                 type == PTL_MSG_REPLY ||
+                                 in_interrupt()), 
+                               RANAL_MSG_IMMEDIATE);
+        if (tx == NULL)
+                return PTL_NO_SPACE;
+
+        rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
+        if (rc != 0) {
+                kranal_tx_done(tx, rc);
+                return PTL_FAIL;
+        }
+                
+        tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
+        tx->tx_libmsg[0] = libmsg;
+        kranal_launch_tx(tx, nid);
+        return PTL_OK;
+}
+
+ptl_err_t
+kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
+            ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
+            unsigned int niov, struct iovec *iov,
+            size_t offset, size_t len)
+{
+        return kranal_do_send(nal, private, cookie,
+                             hdr, type, nid, pid,
+                             niov, iov, NULL,
+                             offset, len);
+}
+
+ptl_err_t
+kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
+                  ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
+                  unsigned int niov, ptl_kiov_t *kiov, 
+                  size_t offset, size_t len)
+{
+        return kranal_do_send(nal, private, cookie,
+                             hdr, type, nid, pid,
+                             niov, NULL, kiov,
+                             offset, len);
+}
+
+ptl_err_t
+kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
+               unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
+               size_t offset, size_t mlen, size_t rlen)
+{
+        kra_conn_t  *conn = private;
+        kra_msg_t   *rxmsg = conn->rac_rxmsg;
+        void        *buffer;
+        int          rc;
+        
+        LASSERT (mlen <= rlen);
+        LASSERT (!in_interrupt());
+        /* Either all pages or all vaddrs */
+        LASSERT (!(kiov != NULL && iov != NULL));
+
+        switch(rxmsg->ram_type) {
+        default:
+                LBUG();
+                return PTL_FAIL;
+                
+        case RANAL_MSG_IMMEDIATE:
+                if (mlen == 0) {
+                        buffer = NULL;
+                } else if (kiov != NULL) {
+                        CERROR("Can't recv immediate into paged buffer\n");
+                        return PTL_FAIL;
+                } else {
+                        LASSERT (niov > 0);
+                        while (offset >= iov->iov_len) {
+                                offset -= iov->iov_len;
+                                iov++;
+                                niov--;
+                                LASSERT (niov > 0);
+                        }
+                        if (mlen > iov->iov_len - offset) {
+                                CERROR("Can't handle immediate frags\n");
+                                return PTL_FAIL;
+                        }
+                        buffer = ((char *)iov->iov_base) + offset;
+                }
+                rc = kranal_consume_rxmsg(conn, buffer, mlen);
+                lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
+                return PTL_OK;
+
+        case RANAL_MSG_GET_REQ:
+                /* If the GET matched, we've already handled it in
+                 * kranal_do_send which is called to send the REPLY.  We're
+                 * only called here to complete the GET receive (if we needed
+                 * it which we don't, but I digress...) */
+                LASSERT (libmsg == NULL);
+                lib_finalize(nal, NULL, libmsg, PTL_OK);
+                return PTL_OK;
+
+        case RANAL_MSG_PUT_REQ:
+                if (libmsg == NULL) {           /* PUT didn't match... */
+                        lib_finalize(null, NULL, libmsg, PTL_OK);
+                        return PTL_OK;
+                }
+                
+                tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
+                if (tx == NULL)
+                        return PTL_NO_SPACE;
+
+                rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, mlen);
+                if (rc != 0) {
+                        kranal_tx_done(tx, rc);
+                        return PTL_FAIL;
+                }
+
+                kranal_map_buffer(tx);
+                
+                tx->tx_msg.ram_u.putack.rapam_src_cookie = 
+                        conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
+                tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
+                tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_key = tx->tx_map_key;
+                tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_addr = tx->tx_buffer;
+                tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_nob = mlen;
+
+                tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
+
+                kranal_post_fma(conn, tx);
+                
+                /* flag matched by consuming rx message */
+                kranal_consume_rxmsg(conn, NULL, 0);
+                return PTL_OK;
+        }
+}
+
+ptl_err_t
+kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
+            unsigned int niov, struct iovec *iov, 
+            size_t offset, size_t mlen, size_t rlen)
+{
+        return kranal_recvmsg(nal, private, msg, niov, iov, NULL,
+                             offset, mlen, rlen);
+}
+
+ptl_err_t
+kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
+                  unsigned int niov, ptl_kiov_t *kiov, 
+                  size_t offset, size_t mlen, size_t rlen)
+{
+        return kranal_recvmsg(nal, private, msg, niov, NULL, kiov,
+                             offset, mlen, rlen);
+}
+
+int
+kranal_thread_start (int(*fn)(void *arg), void *arg)
+{
+        long    pid = kernel_thread(fn, arg, 0);
+
+        if (pid < 0)
+                return(int)pid;
+
+        atomic_inc(&kranal_data.kra_nthreads);
+        return 0;
+}
+
+void
+kranal_thread_fini (void)
+{
+        atomic_dec(&kranal_data.kra_nthreads);
+}
+
+int
+kranal_check_conn (kra_conn_t *conn)
+{
+        kra_tx_t          *tx;
+        struct list_head  *ttmp;
+        unsigned long      flags;
+        long               timeout;
+        unsigned long      now = jiffies;
+
+        if (!conn->rac_closing &&
+            time_after_eq(now, conn->rac_last_sent + conn->rac_keepalive * HZ)) {
+                /* not sent in a while; schedule conn so scheduler sends a keepalive */
+                kranal_schedule_conn(conn);
+        }
+
+        /* wait twice as long for CLOSE to be sure peer is dead */
+        timeout = (conn->rac_closing ? 1 : 2) * conn->rac_timeout * HZ;
+
+        if (!conn->rac_close_recvd &&
+            time_after_eq(now, conn->rac_last_rx + timeout)) {
+                CERROR("Nothing received from "LPX64" within %d seconds\n",
+                       conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
+                return -ETIMEDOUT;
+        }
+
+        if (conn->rac_closing)
+                return 0;
+        
+        /* Check the conn's queues are moving.  These are "belt+braces" checks,
+         * in case of hardware/software errors that make this conn seem
+         * responsive even though it isn't progressing its message queues. */
+
+        spin_lock_irqsave(&conn->rac_lock, flags);
+
+        list_for_each (ttmp, &conn->rac_fmaq) {
+                tx = list_entry(ttmp, kra_tx_t, tx_list);
+                
+                if (time_after_eq(now, tx->tx_qtime + timeout)) {
+                        spin_unlock_irqrestore(&conn->rac_lock, flags);
+                        CERROR("tx on fmaq for "LPX64" blocked %d seconds\n",
+                               conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
+                        return -ETIMEDOUT;
+                }
+        }
+        
+        list_for_each (ttmp, &conn->rac_rdmaq) {
+                tx = list_entry(ttmp, kra_tx_t, tx_list);
+                
+                if (time_after_eq(now, tx->tx_qtime + timeout)) {
+                        spin_unlock_irqrestore(&conn->rac_lock, flags);
+                        CERROR("tx on rdmaq for "LPX64" blocked %d seconds\n",
+                               conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
+                        return -ETIMEDOUT;
+                }
+        }
+        
+        list_for_each (ttmp, &conn->rac_replyq) {
+                tx = list_entry(ttmp, kra_tx_t, tx_list);
+                
+                if (time_after_eq(now, tx->tx_qtime + timeout)) {
+                        spin_unlock_irqrestore(&conn->rac_lock, flags);
+                        CERROR("tx on replyq for "LPX64" blocked %d seconds\n",
+                               conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
+                        return -ETIMEDOUT;
+                }
+        }
+        
+        spin_unlock_irqrestore(&conn->rac_lock, flags);
+        return 0;
+}
+
+void
+kranal_check_conns (int idx, unsigned long *min_timeoutp)
+{
+        struct list_head  *conns = &kranal_data.kra_conns[idx];
+        struct list_head  *ctmp;
+        kra_conn_t        *conn;
+
+ again:
+        /* NB. We expect to check all the conns and not find any problems, so
+         * we just use a shared lock while we take a look... */
+        read_lock(&kranal_data.kra_global_lock);
+
+        list_for_each (ctmp, conns) {
+                conn = list_entry(ptmp, kra_conn_t, rac_hashlist);
+
+                if (conn->rac_timeout < *min_timeoutp )
+                        *min_timeoutp = conn->rac_timeout;
+                if (conn->rac_keepalive < *min_timeoutp )
+                        *min_timeoutp = conn->rac_keepalive;
+
+                rc = kranal_check_conn(conn);
+                if (rc == 0)
+                        continue;
+
+                kranal_conn_addref(conn);
+                read_unlock(&kranal_data.kra_global_lock);
+
+                CERROR("Check on conn to "LPX64"failed: %d\n",
+                       conn->rac_peer->rap_nid, rc);
+
+                write_lock_irqsave(&kranal_data.kra_global_lock);
+
+                if (!conn->rac_closing)
+                        kranal_close_conn_locked(conn, -ETIMEDOUT);
+                else
+                        kranal_terminate_conn_locked(conn);
+                        
+                kranal_conn_decref(conn);
+
+                /* start again now I've dropped the lock */
+                goto again;
+        }
+
+        read_unlock(&kranal_data.kra_global_lock);
+}
+
+int
+kranal_connd (void *arg)
+{
+       char               name[16];
+        wait_queue_t       wait;
+        unsigned long      flags;
+        kra_peer_t        *peer;
+        int                i;
+
+       snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
+        kportal_daemonize(name);
+        kportal_blockallsigs();
+
+        init_waitqueue_entry(&wait, current);
+
+        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+
+        while (!kranal_data.kra_shutdown) {
+                /* Safe: kra_shutdown only set when quiescent */
+
+                if (!list_empty(&kranal_data.kra_connd_peers)) {
+                        peer = list_entry(kranal_data.kra_connd_peers.next,
+                                         kra_peer_t, rap_connd_list);
+                        
+                        list_del_init(&peer->rap_connd_list);
+                        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+
+                        kranal_connect(peer);
+                        kranal_put_peer(peer);
+
+                        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+                       continue;
+                }
+
+                set_current_state(TASK_INTERRUPTIBLE);
+                add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
+                
+                spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+
+                schedule ();
+                
+                set_current_state(TASK_RUNNING);
+                remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
+
+                spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+        }
+
+        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+
+        kranal_thread_fini();
+        return 0;
+}
+
+void
+kranal_update_reaper_timeout(long timeout) 
+{
+        unsigned long   flags;
+
+        LASSERT (timeout > 0);
+        
+        spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
+        
+        if (timeout < kranal_data.kra_new_min_timeout)
+                kranal_data.kra_new_min_timeout = timeout;
+
+        spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
+}
+
+int
+kranal_reaper (void *arg)
+{
+        wait_queue_t       wait;
+        unsigned long      flags;
+        kra_conn_t        *conn;
+        kra_peer_t        *peer;
+        unsigned long      flags;
+        long               timeout;
+        int                i;
+        int                conn_entries = kranal_data.kra_conn_hash_size;
+        int                conn_index = 0;
+        int                base_index = conn_entries - 1;
+        unsigned long      next_check_time = jiffies;
+        long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
+        long               current_min_timeout = 1;
+        
+        kportal_daemonize("kranal_reaper");
+        kportal_blockallsigs();
+
+        init_waitqueue_entry(&wait, current);
+
+        spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
+        kranal_data.kra_new_min_timeout = 1;
+
+        while (!kranal_data.kra_shutdown) {
+
+                /* careful with the jiffy wrap... */
+                timeout = (long)(next_check_time - jiffies);
+                if (timeout <= 0) {
+                
+                        /* I wake up every 'p' seconds to check for
+                         * timeouts on some more peers.  I try to check
+                         * every connection 'n' times within the global
+                         * minimum of all keepalive and timeout intervals,
+                         * to ensure I attend to every connection within
+                         * (n+1)/n times its timeout intervals. */
+                
+                        const int     p = 1;
+                        const int     n = 3;
+                        unsigned long min_timeout;
+                        int           chunk;
+
+                        if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
+                                /* new min timeout set: restart min timeout scan */
+                                next_min_timeout = MAX_SCHEDULE_TIMEOUT;
+                                base_index = conn_index - 1;
+                                if (base_index < 0)
+                                        base_index = conn_entries - 1;
+
+                                if (kranal_data.kra_new_min_timeout < current_min_timeout) {
+                                        current_min_timeout = kranal_data.kra_new_min_timeout;
+                                        CWARN("Set new min timeout %ld\n",
+                                              current_min_timeout);
+                                }
+
+                                kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
+                        }
+                        min_timeout = current_min_timeout;
+
+                        spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
+                                               flags);
+
+                        LASSERT (min_timeout > 0);
+
+                        /* Compute how many table entries to check now so I
+                         * get round the whole table fast enough (NB I do
+                         * this at fixed intervals of 'p' seconds) */
+                       chunk = conn_entries;
+                        if (min_timeout > n * p)
+                                chunk = (chunk * n * p) / min_timeout;
+                        if (chunk == 0)
+                                chunk = 1;
+
+                        for (i = 0; i < chunk; i++) {
+                                kranal_check_conns(conn_index, 
+                                                   &next_min_timeout);
+                                conn_index = (conn_index + 1) % conn_entries;
+                        }
+
+                        next_check_time += p * HZ;
+
+                        spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
+
+                        if (((conn_index - chunk <= base_index &&
+                              base_index < conn_index) ||
+                             (conn_index - conn_entries - chunk <= base_index &&
+                              base_index < conn_index - conn_entries))) {
+
+                                /* Scanned all conns: set current_min_timeout... */
+                                if (current_min_timeout != next_min_timeout) {
+                                        current_min_timeout = next_min_timeout;                                        
+                                        CWARN("Set new min timeout %ld\n",
+                                              current_min_timeout);
+                                }
+
+                                /* ...and restart min timeout scan */
+                                next_min_timeout = MAX_SCHEDULE_TIMEOUT;
+                                base_index = conn_index - 1;
+                                if (base_index < 0)
+                                        base_index = conn_entries - 1;
+                        }
+                }
+
+                set_current_state(TASK_INTERRUPTIBLE);
+                add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
+
+                spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
+
+                busy_loops = 0;
+                schedule_timeout(timeout);
+
+                spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
+
+                set_current_state(TASK_RUNNING);
+                remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
+        }
+
+        kranal_thread_fini();
+        return 0;
+}
+
+void
+kranal_process_rdmaq (__u32 cqid)
+{
+        kra_conn_t          *conn;
+        kra_tx_t            *tx;
+        RAP_RETURN           rrc;
+        unsigned long        flags;
+        RAP_RDMA_DESCRIPTOR *desc;
+        
+        read_lock(&kranal_data.kra_global_lock);
+
+        conn = kranal_cqid2conn_locked(cqid);
+        LASSERT (conn != NULL);
+
+        rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
+        LASSERT (rrc == RAP_SUCCESS);
+
+        spin_lock_irqsave(&conn->rac_lock, flags);
+
+        LASSERT (!list_empty(&conn->rac_rdmaq));
+        tx = list_entry(con->rac_rdmaq.next, kra_tx_t, tx_list);
+        list_del(&tx->tx_list);
+
+        LASSERT(desc->AppPtr == (void *)tx);
+        LASSERT(desc->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
+                desc->tx_msg.ram_type == RANAL_MSG_GET_DONE);
+
+        list_add_tail(&tx->tx_list, &conn->rac_fmaq);
+        tx->tx_qtime = jiffies;
+        
+        spin_unlock_irqrestore(&conn->rac_lock, flags);
+
+        /* Get conn's fmaq processed, now I've just put something there */
+        kranal_schedule_conn(conn);
+
+        read_unlock(&kranal_data.kra_global_lock);
+}
+
+int
+kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
+               void *immediate, int immediatenob)
+{
+        int   sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
+
+        LASSERT (sizeof(*msg) <= RANAL_FMA_PREFIX_LEN);
+        LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
+                 immediatenob <= RANAL_FMA_MAX_DATA_LEN :
+                 immediatenob == 0);
+
+        msg->ram_incarnation = conn->rac_incarnation;
+        msg->ram_seq = conn->rac_tx_seq;
+
+        if (sync)
+                rrc = RapkFmaSyncSend(conn->rac_device.rad_handle,
+                                      immediate, immediatenob,
+                                      msg, sizeof(*msg));
+        else
+                rrc = RapkFmaSend(conn->rac_device.rad_handle,
+                                  immediate, immediatenob,
+                                  msg, sizeof(*msg));
+
+        switch (rrc) {
+        case RAP_SUCCESS:
+                conn->rac_last_tx = jiffies;
+                conn->rac_tx_seq++;
+                return 0;
+                
+        case RAP_NOT_DONE:
+                return -EAGAIN;
+
+        default:
+                LBUG();
+        }
+}
+
+int
+kranal_process_fmaq (kra_conn_t *conn) 
+{
+        unsigned long flags;
+        int           more_to_do;
+        kra_tx_t     *tx;
+        int           rc;
+        int           expect_reply;
+
+        /* NB I will be rescheduled some via a rad_fma_cq event if my FMA is
+         * out of credits when I try to send right now... */
+
+        if (conn->rac_closing) {
+
+                if (!list_empty(&conn->rac_rdmaq)) {
+                        /* Can't send CLOSE yet; I'm still waiting for RDMAs I
+                         * posted to finish */
+                        LASSERT (!conn->rac_close_sent);
+                        kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
+                        kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
+                        return 0;
+                }
+
+                if (conn->rac_close_sent)
+                        return 0;
+                
+                kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
+                rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
+                conn->rac_close_sent = (rc == 0);
+                return 0;
+        }
+
+        spin_lock_irqsave(&conn->rac_lock, flags);
+
+        if (list_empty(&conn->rac_fmaq)) {
+
+                spin_unlock_irqrestore(&conn->rac_lock, flags);
+
+                if (time_after_eq(conn->rac_last_tx + conn->rac_keepalive)) {
+                        kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
+                        kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
+                }
+                return 0;
+        }
+        
+        tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
+        list_del(&tx->tx_list);
+        more_to_do = !list_empty(&conn->rac_fmaq);
+
+        spin_unlock_irqrestore(&conn->rac_lock, flags);
+
+        expect_reply = 0;
+        switch (tx->tx_msg.ram_type) {
+        default:
+                LBUG();
+                
+        case RANAL_MSG_IMMEDIATE:
+        case RANAL_MSG_PUT_NAK:
+        case RANAL_MSG_PUT_DONE:
+        case RANAL_MSG_GET_NAK:
+        case RANAL_MSG_GET_DONE:
+                rc = kranal_sendmsg(conn, &tx->tx_msg,
+                                    tx->tx_buffer, tx->tx_nob);
+                expect_reply = 0;
+                break;
+                
+        case RANAL_MSG_PUT_REQ:
+                tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
+                rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
+                kranal_map_buffer(tx);
+                expect_reply = 1;
+                break;
+
+        case RANAL_MSG_PUT_ACK:
+                rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
+                expect_reply = 1;
+                break;
+
+        case RANAL_MSG_GET_REQ:
+                kranal_map_buffer(tx);
+                tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
+                tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
+                tx->tx_msg.ram_u.get.ragm_desc.rar