Whamcloud - gitweb
LU-14390 gnilnd: Use DIV_ROUND_UP to calculate niov
[fs/lustre-release.git] / lnet / klnds / gnilnd / gnilnd_cb.c
index 56be88a..edd60ce 100644 (file)
@@ -4,6 +4,7 @@
  * Copyright (C) 2009-2012 Cray, Inc.
  *
  *   Derived from work by Eric Barton <eric@bartonsoftware.com>
+ *   Author: James Shimek <jshimek@cray.com>
  *   Author: Nic Henke <nic@cray.com>
  *
  *   This file is part of Lustre, http://www.lustre.org.
  *
  */
 
+#include <asm/page.h>
 #include <linux/nmi.h>
+#include <linux/pagemap.h>
+
+#include <libcfs/linux/linux-mem.h>
+
 #include "gnilnd.h"
 
 /* this is useful when needed to debug wire corruption. */
@@ -79,12 +85,18 @@ kgnilnd_schedule_device(kgn_device_t *dev)
        if (!already_live) {
                wake_up_all(&dev->gnd_waitq);
        }
-       return;
 }
 
-void kgnilnd_schedule_device_timer(unsigned long arg)
+void kgnilnd_schedule_device_timer(cfs_timer_cb_arg_t data)
 {
-       kgn_device_t *dev = (kgn_device_t *) arg;
+       kgn_device_t *dev = cfs_from_timer(dev, data, gnd_map_timer);
+
+       kgnilnd_schedule_device(dev);
+}
+
+void kgnilnd_schedule_device_timer_rd(cfs_timer_cb_arg_t data)
+{
+       kgn_device_t *dev = cfs_from_timer(dev, data, gnd_rdmaq_timer);
 
        kgnilnd_schedule_device(dev);
 }
@@ -117,8 +129,11 @@ kgnilnd_device_callback(__u32 devid, __u64 arg)
  * < 0 : do not reschedule under any circumstances
  * == 0: reschedule if someone marked him WANTS_SCHED
  * > 0 : force a reschedule */
+/* Return code 0 means it did not schedule the conn, 1
+ * means it successfully scheduled the conn.
+ */
 
-void
+int
 kgnilnd_schedule_process_conn(kgn_conn_t *conn, int sched_intent)
 {
        int     conn_sched;
@@ -136,19 +151,28 @@ kgnilnd_schedule_process_conn(kgn_conn_t *conn, int sched_intent)
 
        if (sched_intent >= 0) {
                if ((sched_intent > 0 || (conn_sched == GNILND_CONN_WANTS_SCHED))) {
-                       kgnilnd_schedule_conn(conn);
+                       return kgnilnd_schedule_conn_refheld(conn, 1);
                }
        }
+       return 0;
 }
 
-void
-kgnilnd_schedule_conn(kgn_conn_t *conn)
+/* Return of 0 for conn not scheduled, 1 returned if conn was scheduled or marked
+ * as scheduled */
+
+int
+_kgnilnd_schedule_conn(kgn_conn_t *conn, const char *caller, int line, int refheld, int lock_held)
 {
        kgn_device_t        *dev = conn->gnc_device;
        int                  sched;
+       int                  rc;
 
        sched = xchg(&conn->gnc_scheduled, GNILND_CONN_WANTS_SCHED);
-
+       /* we only care about the last person who marked want_sched since they
+        * are most likely the culprit
+        */
+       memcpy(conn->gnc_sched_caller, caller, sizeof(conn->gnc_sched_caller));
+       conn->gnc_sched_line = line;
        /* if we are IDLE, add to list - only one guy sees IDLE and "wins"
         * the chance to put it onto gnd_ready_conns.
         * otherwise, leave marked as WANTS_SCHED and the thread that "owns"
@@ -158,25 +182,51 @@ kgnilnd_schedule_conn(kgn_conn_t *conn)
        if (sched == GNILND_CONN_IDLE) {
                /* if the conn is already scheduled, we've already requested
                 * the scheduler thread wakeup */
-               kgnilnd_conn_addref(conn);       /* +1 ref for scheduler */
-
+               if (!refheld) {
+                       /* Add a reference to the conn if we are not holding a reference
+                        * already from the exisiting scheduler. We now use the same
+                        * reference if we need to reschedule a conn while in a scheduler
+                        * thread.
+                        */
+                       kgnilnd_conn_addref(conn);
+               }
                LASSERTF(list_empty(&conn->gnc_schedlist), "conn %p already sched state %d\n",
                         conn, sched);
 
-               CDEBUG(D_INFO, "scheduling conn 0x%p\n", conn);
-
-               spin_lock(&dev->gnd_lock);
+               CDEBUG(D_INFO, "scheduling conn 0x%p caller %s:%d\n", conn, caller, line);
+               if (!lock_held)
+                       spin_lock(&dev->gnd_lock);
                list_add_tail(&conn->gnc_schedlist, &dev->gnd_ready_conns);
-               spin_unlock(&dev->gnd_lock);
+               if (!lock_held)
+                       spin_unlock(&dev->gnd_lock);
                set_mb(conn->gnc_last_sched_ask, jiffies);
-
+               rc = 1;
        } else {
-               CDEBUG(D_INFO, "not scheduling conn 0x%p: %d\n", conn, sched);
+               CDEBUG(D_INFO, "not scheduling conn 0x%p: %d caller %s:%d\n", conn, sched, caller, line);
+               rc = 0;
        }
 
        /* make sure thread(s) going to process conns - but let it make
         * separate decision from conn schedule */
+       if (!lock_held)
+               kgnilnd_schedule_device(dev);
+       return rc;
+}
+
+int
+_kgnilnd_schedule_delay_conn(kgn_conn_t *conn)
+{
+       kgn_device_t    *dev = conn->gnc_device;
+       int rc = 0;
+       spin_lock(&dev->gnd_lock);
+       if (list_empty(&conn->gnc_delaylist)) {
+               list_add_tail(&conn->gnc_delaylist, &dev->gnd_delay_conns);
+               rc = 1;
+       }
+       spin_unlock(&dev->gnd_lock);
+
        kgnilnd_schedule_device(dev);
+       return rc;
 }
 
 void
@@ -208,27 +258,33 @@ kgnilnd_free_tx(kgn_tx_t *tx)
 
        /* we only allocate this if we need to */
        if (tx->tx_phys != NULL) {
-               cfs_mem_cache_free(kgnilnd_data.kgn_tx_phys_cache, tx->tx_phys);
+               kmem_cache_free(kgnilnd_data.kgn_tx_phys_cache, tx->tx_phys);
                CDEBUG(D_MALLOC, "slab-freed 'tx_phys': %lu at %p.\n",
                       LNET_MAX_IOV * sizeof(gni_mem_segment_t), tx->tx_phys);
        }
+
+       /* Only free the buffer if we used it */
+       if (tx->tx_buffer_copy != NULL) {
+               kgnilnd_vfree(tx->tx_buffer_copy, tx->tx_rdma_desc.length);
+               tx->tx_buffer_copy = NULL;
+               CDEBUG(D_MALLOC, "vfreed buffer2\n");
+       }
 #if 0
        KGNILND_POISON(tx, 0x5a, sizeof(kgn_tx_t));
 #endif
-       cfs_mem_cache_free(kgnilnd_data.kgn_tx_cache, tx);
-       CDEBUG(D_MALLOC, "slab-freed 'tx': %lu at %p.\n",
-              sizeof(*tx), tx);
+       CDEBUG(D_MALLOC, "slab-freed 'tx': %lu at %p.\n", sizeof(*tx), tx);
+       kmem_cache_free(kgnilnd_data.kgn_tx_cache, tx);
 }
 
 kgn_tx_t *
-kgnilnd_alloc_tx(void)
+kgnilnd_alloc_tx (void)
 {
-       kgn_tx_t      *tx = NULL;
+       kgn_tx_t        *tx = NULL;
 
        if (CFS_FAIL_CHECK(CFS_FAIL_GNI_ALLOC_TX))
                return tx;
 
-       tx = cfs_mem_cache_alloc(kgnilnd_data.kgn_tx_cache, CFS_ALLOC_ATOMIC);
+       tx = kmem_cache_zalloc(kgnilnd_data.kgn_tx_cache, GFP_ATOMIC);
        if (tx == NULL) {
                CERROR("failed to allocate tx\n");
                return NULL;
@@ -236,9 +292,6 @@ kgnilnd_alloc_tx(void)
        CDEBUG(D_MALLOC, "slab-alloced 'tx': %lu at %p.\n",
               sizeof(*tx), tx);
 
-       /* need this memset, cache alloc'd memory is not cleared */
-       memset(tx, 0, sizeof(*tx));
-
        /* setup everything here to minimize time under the lock */
        tx->tx_buftype = GNILND_BUF_NONE;
        tx->tx_msg.gnm_type = GNILND_MSG_NONE;
@@ -255,8 +308,8 @@ kgnilnd_alloc_tx(void)
 #define _kgnilnd_cksum(seed, ptr, nob)  csum_partial(ptr, nob, seed)
 
 /* we don't use offset as every one is passing a buffer reference that already
- * includes the offset into the base address -
- *  see kgnilnd_setup_virt_buffer and kgnilnd_setup_immediate_buffer */
+ * includes the offset into the base address.
+ */
 static inline __u16
 kgnilnd_cksum(void *ptr, size_t nob)
 {
@@ -274,9 +327,9 @@ kgnilnd_cksum(void *ptr, size_t nob)
        return sum;
 }
 
-inline __u16
-kgnilnd_cksum_kiov(unsigned int nkiov, lnet_kiov_t *kiov,
-                   unsigned int offset, unsigned int nob, int dump_blob)
+__u16
+kgnilnd_cksum_kiov(unsigned int nkiov, struct bio_vec *kiov,
+                  unsigned int offset, unsigned int nob, int dump_blob)
 {
        __wsum             cksum = 0;
        __wsum             tmpck;
@@ -293,15 +346,15 @@ kgnilnd_cksum_kiov(unsigned int nkiov, lnet_kiov_t *kiov,
 
        /* if loops changes, please change kgnilnd_setup_phys_buffer */
 
-       while (offset >= kiov->kiov_len) {
-               offset -= kiov->kiov_len;
+       while (offset >= kiov->bv_len) {
+               offset -= kiov->bv_len;
                nkiov--;
                kiov++;
                LASSERT(nkiov > 0);
        }
 
-       /* ignore nob here, if nob < (kiov_len - offset), kiov == 1 */
-       odd = (unsigned long) (kiov[0].kiov_len - offset) & 1;
+       /* ignore nob here, if nob < (bv_len - offset), kiov == 1 */
+       odd = (unsigned long) (kiov[0].bv_len - offset) & 1;
 
        if ((odd || *kgnilnd_tunables.kgn_vmap_cksum) && nkiov > 1) {
                struct page **pages = kgnilnd_data.kgn_cksum_map_pages[get_cpu()];
@@ -310,10 +363,10 @@ kgnilnd_cksum_kiov(unsigned int nkiov, lnet_kiov_t *kiov,
                         get_cpu(), kgnilnd_data.kgn_cksum_map_pages);
 
                CDEBUG(D_BUFFS, "odd %d len %u offset %u nob %u\n",
-                      odd, kiov[0].kiov_len, offset, nob);
+                      odd, kiov[0].bv_len, offset, nob);
 
                for (i = 0; i < nkiov; i++) {
-                       pages[i] = kiov[i].kiov_page;
+                       pages[i] = kiov[i].bv_page;
                }
 
                addr = vmap(pages, nkiov, VM_MAP, PAGE_KERNEL);
@@ -326,42 +379,46 @@ kgnilnd_cksum_kiov(unsigned int nkiov, lnet_kiov_t *kiov,
                }
                atomic_inc(&kgnilnd_data.kgn_nvmap_cksum);
 
-               tmpck = _kgnilnd_cksum(0, (void *) addr + kiov[0].kiov_offset + offset, nob);
+               tmpck = _kgnilnd_cksum(0, ((void *) addr + kiov[0].bv_offset +
+                                          offset), nob);
                cksum = tmpck;
 
                if (dump_blob) {
                        kgnilnd_dump_blob(D_BUFFS, "flat kiov RDMA payload",
-                                         (void *)addr + kiov[0].kiov_offset + offset, nob);
+                                         (void *)addr + kiov[0].bv_offset +
+                                         offset, nob);
                }
                CDEBUG(D_BUFFS, "cksum 0x%x (+0x%x) for addr 0x%p+%u len %u offset %u\n",
-                      cksum, tmpck, addr, kiov[0].kiov_offset, nob, offset);
+                      cksum, tmpck, addr, kiov[0].bv_offset, nob, offset);
                vunmap(addr);
        } else {
                do {
-                       fraglen = min(kiov->kiov_len - offset, nob);
+                       fraglen = min(kiov->bv_len - offset, nob);
 
                        /* make dang sure we don't send a bogus checksum if somehow we get
                         * an odd length fragment on anything but the last entry in a kiov  -
                         * we know from kgnilnd_setup_rdma_buffer that we can't have non
                         * PAGE_SIZE pages in the middle, so if nob < PAGE_SIZE, it is the last one */
                        LASSERTF(!(fraglen&1) || (nob < PAGE_SIZE),
-                                "odd fraglen %u on nkiov %d, nob %u kiov_len %u offset %u kiov 0x%p\n",
-                                fraglen, nkiov, nob, kiov->kiov_len, offset, kiov);
+                                "odd fraglen %u on nkiov %d, nob %u bv_len %u offset %u kiov 0x%p\n",
+                                fraglen, nkiov, nob, kiov->bv_len,
+                                offset, kiov);
 
-                       addr = (void *)kmap(kiov->kiov_page) + kiov->kiov_offset + offset;
+                       addr = (void *)kmap(kiov->bv_page) + kiov->bv_offset +
+                               offset;
                        tmpck = _kgnilnd_cksum(cksum, addr, fraglen);
 
                        CDEBUG(D_BUFFS,
                               "cksum 0x%x (+0x%x) for page 0x%p+%u (0x%p) len %u offset %u\n",
-                              cksum, tmpck, kiov->kiov_page, kiov->kiov_offset, addr,
-                              fraglen, offset);
+                              cksum, tmpck, kiov->bv_page, kiov->bv_offset,
+                              addr, fraglen, offset);
 
                        cksum = tmpck;
 
                        if (dump_blob)
                                kgnilnd_dump_blob(D_BUFFS, "kiov cksum", addr, fraglen);
 
-                       kunmap(kiov->kiov_page);
+                       kunmap(kiov->bv_page);
 
                        kiov++;
                        nkiov--;
@@ -416,14 +473,40 @@ kgnilnd_new_tx_msg(int type, lnet_nid_t source)
 }
 
 static void
-kgnilnd_nak_rdma(kgn_conn_t *conn, int type, int error, __u64 cookie, lnet_nid_t source) {
+kgnilnd_nak_rdma(kgn_conn_t *conn, int rx_type, int error, __u64 cookie, lnet_nid_t source) {
        kgn_tx_t        *tx;
 
+       int             nak_type;
+
+       switch (rx_type) {
+       case GNILND_MSG_GET_REQ:
+       case GNILND_MSG_GET_DONE:
+               nak_type = GNILND_MSG_GET_NAK;
+               break;
+       case GNILND_MSG_PUT_REQ:
+       case GNILND_MSG_PUT_ACK:
+       case GNILND_MSG_PUT_DONE:
+               nak_type = GNILND_MSG_PUT_NAK;
+               break;
+       case GNILND_MSG_PUT_REQ_REV:
+       case GNILND_MSG_PUT_DONE_REV:
+               nak_type = GNILND_MSG_PUT_NAK_REV;
+               break;
+       case GNILND_MSG_GET_REQ_REV:
+       case GNILND_MSG_GET_ACK_REV:
+       case GNILND_MSG_GET_DONE_REV:
+               nak_type = GNILND_MSG_GET_NAK_REV;
+               break;
+       default:
+               CERROR("invalid msg type %s (%d)\n",
+                       kgnilnd_msgtype2str(rx_type), rx_type);
+               LBUG();
+       }
        /* only allow NAK on error and truncate to zero */
-       LASSERTF(error <= 0, "error %d conn 0x%p, cookie "LPU64"\n",
+       LASSERTF(error <= 0, "error %d conn 0x%p, cookie %llu\n",
                 error, conn, cookie);
 
-       tx = kgnilnd_new_tx_msg(type, source);
+       tx = kgnilnd_new_tx_msg(nak_type, source);
        if (tx == NULL) {
                CNETERR("can't get TX to NAK RDMA to %s\n",
                        libcfs_nid2str(conn->gnc_peer->gnp_nid));
@@ -435,10 +518,10 @@ kgnilnd_nak_rdma(kgn_conn_t *conn, int type, int error, __u64 cookie, lnet_nid_t
        kgnilnd_queue_tx(conn, tx);
 }
 
-int
-kgnilnd_setup_immediate_buffer(kgn_tx_t *tx, unsigned int niov, struct iovec *iov,
-                              lnet_kiov_t *kiov, unsigned int offset, unsigned int nob)
-
+static int
+kgnilnd_setup_immediate_buffer(kgn_tx_t *tx, unsigned int niov,
+                              struct bio_vec *kiov,
+                              unsigned int offset, unsigned int nob)
 {
        kgn_msg_t       *msg = &tx->tx_msg;
        int              i;
@@ -447,42 +530,50 @@ kgnilnd_setup_immediate_buffer(kgn_tx_t *tx, unsigned int niov, struct iovec *io
         * gni_smsg_send to send that as the payload */
 
        LASSERT(tx->tx_buftype == GNILND_BUF_NONE);
-       LASSERT(nob >= 0);
 
        if (nob == 0) {
                tx->tx_buffer = NULL;
-       } else if (kiov != NULL) {
+       } else {
+
+               if (niov && niov > (nob >> PAGE_SHIFT))
+                       niov = DIV_ROUND_UP(nob + offset + kiov->bv_offset,
+                                           PAGE_SIZE);
+
                LASSERTF(niov > 0 && niov < GNILND_MAX_IMMEDIATE/PAGE_SIZE,
-                        "bad niov %d\n", niov);
+                       "bad niov %d msg %p kiov %p offset %d nob%d\n",
+                       niov, msg, kiov, offset, nob);
 
-               while (offset >= kiov->kiov_len) {
-                       offset -= kiov->kiov_len;
+               while (offset >= kiov->bv_len) {
+                       offset -= kiov->bv_len;
                        niov--;
                        kiov++;
                        LASSERT(niov > 0);
                }
                for (i = 0; i < niov; i++) {
-                       /* We can't have a kiov_offset on anything but the first entry,
-                        * otherwise we'll have a hole at the end of the mapping as we only map
-                        * whole pages.
-                        * Also, if we have a kiov_len < PAGE_SIZE but we need to map more
-                        * than kiov_len, we will also have a whole at the end of that page
-                        * which isn't allowed */
-                       if ((kiov[i].kiov_offset != 0 && i > 0) ||
-                           (kiov[i].kiov_offset + kiov[i].kiov_len != CFS_PAGE_SIZE && i < niov - 1)) {
-                               CNETERR("Can't make payload contiguous in I/O VM:"
-                                      "page %d, offset %u, nob %u, kiov_offset %u kiov_len %u \n",
-                                      i, offset, nob, kiov->kiov_offset, kiov->kiov_len);
+                       /* We can't have a bv_offset on anything but the first
+                        * entry, otherwise we'll have a hole at the end of the
+                        * mapping as we only map whole pages.
+                        * Also, if we have a bv_len < PAGE_SIZE but we need to
+                        * map more than bv_len, we will also have a whole at
+                        * the end of that page which isn't allowed
+                        */
+                       if ((kiov[i].bv_offset != 0 && i > 0) ||
+                           (kiov[i].bv_offset + kiov[i].bv_len != PAGE_SIZE &&
+                            i < niov - 1)) {
+                               CNETERR("Can't make payload contiguous in I/O VM:page %d, offset %u, nob %u, bv_offset %u bv_len %u\n",
+                                      i, offset, nob, kiov->bv_offset,
+                                       kiov->bv_len);
                                RETURN(-EINVAL);
                        }
-                       tx->tx_imm_pages[i] = kiov[i].kiov_page;
+                       tx->tx_imm_pages[i] = kiov[i].bv_page;
                }
 
                /* hijack tx_phys for the later unmap */
                if (niov == 1) {
                        /* tx->phyx being equal to NULL is the signal for unmap to discern between kmap and vmap */
                        tx->tx_phys = NULL;
-                       tx->tx_buffer = (void *)kmap(tx->tx_imm_pages[0]) + kiov[0].kiov_offset + offset;
+                       tx->tx_buffer = (void *)kmap(tx->tx_imm_pages[0]) +
+                               kiov[0].bv_offset + offset;
                        atomic_inc(&kgnilnd_data.kgn_nkmap_short);
                        GNIDBG_TX(D_NET, tx, "kmapped page for %d bytes for kiov 0x%p, buffer 0x%p",
                                nob, kiov, tx->tx_buffer);
@@ -494,37 +585,18 @@ kgnilnd_setup_immediate_buffer(kgn_tx_t *tx, unsigned int niov, struct iovec *io
 
                        }
                        atomic_inc(&kgnilnd_data.kgn_nvmap_short);
-                       /* make sure we take into account the kiov offset as the start of the buffer */
-                       tx->tx_buffer = (void *)tx->tx_phys + kiov[0].kiov_offset + offset;
-                       GNIDBG_TX(D_NET, tx, "mapped %d pages for %d bytes from kiov 0x%p to 0x%p, buffer 0x%p",
-                               niov, nob, kiov, tx->tx_phys, tx->tx_buffer);
+                       /* make sure we take into account the kiov offset as the
+                        * start of the buffer
+                        */
+                       tx->tx_buffer = (void *)tx->tx_phys + kiov[0].bv_offset
+                               + offset;
+                       GNIDBG_TX(D_NET, tx,
+                                 "mapped %d pages for %d bytes from kiov 0x%p to 0x%p, buffer 0x%p",
+                                 niov, nob, kiov, tx->tx_phys, tx->tx_buffer);
                }
                tx->tx_buftype = GNILND_BUF_IMMEDIATE_KIOV;
                tx->tx_nob = nob;
 
-       } else {
-               /* For now this is almost identical to kgnilnd_setup_virt_buffer, but we
-                * could "flatten" the payload into a single contiguous buffer ready
-                * for sending direct over an FMA if we ever needed to. */
-
-               LASSERT(niov > 0);
-
-               while (offset >= iov->iov_len) {
-                       offset -= iov->iov_len;
-                       niov--;
-                       iov++;
-                       LASSERT(niov > 0);
-               }
-
-               if (nob > iov->iov_len - offset) {
-                       CERROR("Can't handle multiple vaddr fragments\n");
-                       return -EMSGSIZE;
-               }
-
-               tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
-
-               tx->tx_buftype = GNILND_BUF_IMMEDIATE;
-               tx->tx_nob = nob;
        }
 
        /* checksum payload early - it shouldn't be changing after lnd_send */
@@ -545,40 +617,12 @@ kgnilnd_setup_immediate_buffer(kgn_tx_t *tx, unsigned int niov, struct iovec *io
 }
 
 int
-kgnilnd_setup_virt_buffer(kgn_tx_t *tx,
-                         unsigned int niov, struct iovec *iov,
-                         unsigned int offset, unsigned int nob)
-
-{
-       LASSERT(nob > 0);
-       LASSERT(niov > 0);
-       LASSERT(tx->tx_buftype == GNILND_BUF_NONE);
-
-       while (offset >= iov->iov_len) {
-               offset -= iov->iov_len;
-               niov--;
-               iov++;
-               LASSERT(niov > 0);
-       }
-
-       if (nob > iov->iov_len - offset) {
-               CERROR("Can't handle multiple vaddr fragments\n");
-               return -EMSGSIZE;
-       }
-
-       tx->tx_buftype = GNILND_BUF_VIRT_UNMAPPED;
-       tx->tx_nob = nob;
-       tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
-       return 0;
-}
-
-int
-kgnilnd_setup_phys_buffer(kgn_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
+kgnilnd_setup_phys_buffer(kgn_tx_t *tx, int nkiov, struct bio_vec *kiov,
                          unsigned int offset, unsigned int nob)
 {
        gni_mem_segment_t *phys;
-       int                rc = 0;
-       unsigned int       fraglen;
+       int             rc = 0;
+       unsigned int    fraglen;
 
        GNIDBG_TX(D_NET, tx, "niov %d kiov 0x%p offset %u nob %u", nkiov, kiov, offset, nob);
 
@@ -587,8 +631,8 @@ kgnilnd_setup_phys_buffer(kgn_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
        LASSERT(tx->tx_buftype == GNILND_BUF_NONE);
 
        /* only allocate this if we are going to use it */
-       tx->tx_phys = cfs_mem_cache_alloc(kgnilnd_data.kgn_tx_phys_cache,
-                                         CFS_ALLOC_ATOMIC);
+       tx->tx_phys = kmem_cache_alloc(kgnilnd_data.kgn_tx_phys_cache,
+                                             GFP_ATOMIC);
        if (tx->tx_phys == NULL) {
                CERROR("failed to allocate tx_phys\n");
                rc = -ENOMEM;
@@ -601,8 +645,8 @@ kgnilnd_setup_phys_buffer(kgn_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
        /* if loops changes, please change kgnilnd_cksum_kiov
         *   and kgnilnd_setup_immediate_buffer */
 
-       while (offset >= kiov->kiov_len) {
-               offset -= kiov->kiov_len;
+       while (offset >= kiov->bv_len) {
+               offset -= kiov->bv_len;
                nkiov--;
                kiov++;
                LASSERT(nkiov > 0);
@@ -614,31 +658,31 @@ kgnilnd_setup_phys_buffer(kgn_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
        tx->tx_buftype = GNILND_BUF_PHYS_UNMAPPED;
        tx->tx_nob = nob;
 
-       /* kiov_offset is start of 'valid' buffer, so index offset past that */
-       tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
+       /* bv_offset is start of 'valid' buffer, so index offset past that */
+       tx->tx_buffer = (void *)((unsigned long)(kiov->bv_offset + offset));
        phys = tx->tx_phys;
 
        CDEBUG(D_NET, "tx 0x%p buffer 0x%p map start kiov 0x%p+%u niov %d offset %u\n",
-              tx, tx->tx_buffer, kiov, kiov->kiov_offset, nkiov, offset);
+              tx, tx->tx_buffer, kiov, kiov->bv_offset, nkiov, offset);
 
        do {
-               fraglen = min(kiov->kiov_len - offset, nob);
-
-               /* We can't have a kiov_offset on anything but the first entry,
-                * otherwise we'll have a hole at the end of the mapping as we only map
-                * whole pages. Only the first page is allowed to have an offset -
-                * we'll add that into tx->tx_buffer and that will get used when we
-                * map in the segments (see kgnilnd_map_buffer).
-                * Also, if we have a kiov_len < PAGE_SIZE but we need to map more
-                * than kiov_len, we will also have a whole at the end of that page
-                * which isn't allowed */
+               fraglen = min(kiov->bv_len - offset, nob);
+
+               /* We can't have a bv_offset on anything but the first entry,
+                * otherwise we'll have a hole at the end of the mapping as we
+                * only map whole pages.  Only the first page is allowed to
+                * have an offset - we'll add that into tx->tx_buffer and that
+                * will get used when we map in the segments (see
+                * kgnilnd_map_buffer).  Also, if we have a bv_len < PAGE_SIZE
+                * but we need to map more than bv_len, we will also have a
+                * whole at the end of that page which isn't allowed
+                */
                if ((phys != tx->tx_phys) &&
-                   ((kiov->kiov_offset != 0) ||
-                    ((kiov->kiov_len < PAGE_SIZE) && (nob > kiov->kiov_len)))) {
-                       CERROR("Can't make payload contiguous in I/O VM:"
-                              "page %d, offset %u, nob %u, kiov_offset %u kiov_len %u \n",
+                   ((kiov->bv_offset != 0) ||
+                    ((kiov->bv_len < PAGE_SIZE) && (nob > kiov->bv_len)))) {
+                       CERROR("Can't make payload contiguous in I/O VM:page %d, offset %u, nob %u, bv_offset %u bv_len %u\n",
                               (int)(phys - tx->tx_phys),
-                              offset, nob, kiov->kiov_offset, kiov->kiov_len);
+                              offset, nob, kiov->bv_offset, kiov->bv_len);
                        rc = -EINVAL;
                        GOTO(error, rc);
                }
@@ -654,11 +698,12 @@ kgnilnd_setup_phys_buffer(kgn_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
                        GOTO(error, rc);
                }
 
-               CDEBUG(D_BUFFS, "page 0x%p kiov_offset %u kiov_len %u nob %u "
-                              "nkiov %u offset %u\n",
-                     kiov->kiov_page, kiov->kiov_offset, kiov->kiov_len, nob, nkiov, offset);
+               CDEBUG(D_BUFFS,
+                      "page 0x%p bv_offset %u bv_len %u nob %u nkiov %u offset %u\n",
+                      kiov->bv_page, kiov->bv_offset, kiov->bv_len, nob, nkiov,
+                      offset);
 
-               phys->address = lnet_page2phys(kiov->kiov_page);
+               phys->address = page_to_phys(kiov->bv_page);
                phys++;
                kiov++;
                nkiov--;
@@ -676,7 +721,7 @@ kgnilnd_setup_phys_buffer(kgn_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
 
 error:
        if (tx->tx_phys != NULL) {
-               cfs_mem_cache_free(kgnilnd_data.kgn_tx_phys_cache, tx->tx_phys);
+               kmem_cache_free(kgnilnd_data.kgn_tx_phys_cache, tx->tx_phys);
                CDEBUG(D_MALLOC, "slab-freed 'tx_phys': %lu at %p.\n",
                       sizeof(*tx->tx_phys), tx->tx_phys);
                tx->tx_phys = NULL;
@@ -686,31 +731,33 @@ error:
 
 static inline int
 kgnilnd_setup_rdma_buffer(kgn_tx_t *tx, unsigned int niov,
-                         struct iovec *iov, lnet_kiov_t *kiov,
+                         struct bio_vec *kiov,
                          unsigned int offset, unsigned int nob)
 {
-       int     rc;
-
-       LASSERT((iov == NULL) != (kiov == NULL));
-
-       if (kiov != NULL) {
-               rc = kgnilnd_setup_phys_buffer(tx, niov, kiov, offset, nob);
-       } else {
-               rc = kgnilnd_setup_virt_buffer(tx, niov, iov, offset, nob);
-       }
-       return rc;
+       return kgnilnd_setup_phys_buffer(tx, niov, kiov, offset, nob);
 }
 
+/* kgnilnd_parse_lnet_rdma()
+ * lntmsg - message passed in from lnet.
+ * niov, kiov, offset - see lnd_t in lib-types.h for descriptions.
+ * nob - actual number of bytes to in this message.
+ * put_len - It is possible for PUTs to have a different length than the
+ *           length stored in lntmsg->msg_len since LNET can adjust this
+ *           length based on it's buffer size and offset.
+ *           lnet_try_match_md() sets the mlength that we use to do the RDMA
+ *           transfer.
+ */
 static void
-kgnilnd_parse_lnet_rdma(lnet_msg_t *lntmsg, unsigned int *niov, unsigned int *offset,
-                       unsigned int *nob, lnet_kiov_t **kiov)
+kgnilnd_parse_lnet_rdma(struct lnet_msg *lntmsg, unsigned int *niov,
+                       unsigned int *offset, unsigned int *nob,
+                       struct bio_vec **kiov, int put_len)
 {
        /* GETs are weird, see kgnilnd_send */
        if (lntmsg->msg_type == LNET_MSG_GET) {
                if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0) {
                        *kiov = NULL;
                } else {
-                       *kiov = lntmsg->msg_md->md_iov.kiov;
+                       *kiov = lntmsg->msg_md->md_kiov;
                }
                *niov = lntmsg->msg_md->md_niov;
                *nob = lntmsg->msg_md->md_length;
@@ -718,24 +765,32 @@ kgnilnd_parse_lnet_rdma(lnet_msg_t *lntmsg, unsigned int *niov, unsigned int *of
        } else {
                *kiov = lntmsg->msg_kiov;
                *niov = lntmsg->msg_niov;
-               *nob = lntmsg->msg_len;
+               *nob = put_len;
                *offset = lntmsg->msg_offset;
        }
 }
 
 static inline void
-kgnilnd_compute_rdma_cksum(kgn_tx_t *tx)
+kgnilnd_compute_rdma_cksum(kgn_tx_t *tx, int put_len)
 {
-       unsigned int     niov, offset, nob;
-       lnet_kiov_t     *kiov;
-       lnet_msg_t      *lntmsg = tx->tx_lntmsg[0];
-       int              dump_cksum = (*kgnilnd_tunables.kgn_checksum_dump > 1);
+       unsigned int niov, offset, nob;
+       struct bio_vec *kiov;
+       struct lnet_msg *lntmsg = tx->tx_lntmsg[0];
+       int dump_cksum = (*kgnilnd_tunables.kgn_checksum_dump > 1);
 
        GNITX_ASSERTF(tx, ((tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE) ||
-                          (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE)),
+                          (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE) ||
+                          (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE_REV) ||
+                          (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV) ||
+                          (tx->tx_msg.gnm_type == GNILND_MSG_GET_ACK_REV) ||
+                          (tx->tx_msg.gnm_type == GNILND_MSG_PUT_REQ_REV)),
                      "bad type %s", kgnilnd_msgtype2str(tx->tx_msg.gnm_type));
 
-
+       if ((tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE_REV) ||
+           (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV)) {
+               tx->tx_msg.gnm_payload_cksum = 0;
+               return;
+       }
        if (*kgnilnd_tunables.kgn_checksum < 3) {
                tx->tx_msg.gnm_payload_cksum = 0;
                return;
@@ -743,7 +798,8 @@ kgnilnd_compute_rdma_cksum(kgn_tx_t *tx)
 
        GNITX_ASSERTF(tx, lntmsg, "no LNet message!", NULL);
 
-       kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov);
+       kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov,
+                               put_len);
 
        if (kiov != NULL) {
                tx->tx_msg.gnm_payload_cksum = kgnilnd_cksum_kiov(niov, kiov, offset, nob, dump_cksum);
@@ -759,21 +815,35 @@ kgnilnd_compute_rdma_cksum(kgn_tx_t *tx)
        }
 }
 
+/* kgnilnd_verify_rdma_cksum()
+ * tx - PUT_DONE/GET_DONE matched tx.
+ * rx_cksum - received checksum to compare against.
+ * put_len - see kgnilnd_parse_lnet_rdma comments.
+ */
 static inline int
-kgnilnd_verify_rdma_cksum(kgn_tx_t *tx, __u16 rx_cksum)
+kgnilnd_verify_rdma_cksum(kgn_tx_t *tx, __u16 rx_cksum, int put_len)
 {
        int              rc = 0;
        __u16            cksum;
        unsigned int     niov, offset, nob;
-       lnet_kiov_t     *kiov;
-       lnet_msg_t      *lntmsg = tx->tx_lntmsg[0];
+       struct bio_vec  *kiov;
+       struct lnet_msg *lntmsg = tx->tx_lntmsg[0];
        int dump_on_err = *kgnilnd_tunables.kgn_checksum_dump;
 
        /* we can only match certain requests */
        GNITX_ASSERTF(tx, ((tx->tx_msg.gnm_type == GNILND_MSG_GET_REQ) ||
-                          (tx->tx_msg.gnm_type == GNILND_MSG_PUT_ACK)),
+                          (tx->tx_msg.gnm_type == GNILND_MSG_PUT_ACK) ||
+                          (tx->tx_msg.gnm_type == GNILND_MSG_PUT_REQ_REV) ||
+                          (tx->tx_msg.gnm_type == GNILND_MSG_GET_ACK_REV) ||
+                          (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV) ||
+                          (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE_REV)),
                      "bad type %s", kgnilnd_msgtype2str(tx->tx_msg.gnm_type));
 
+       if ((tx->tx_msg.gnm_type == GNILND_MSG_PUT_REQ_REV) ||
+           (tx->tx_msg.gnm_type == GNILND_MSG_GET_ACK_REV)) {
+               return 0;
+       }
+
        if (rx_cksum == 0)  {
                if (*kgnilnd_tunables.kgn_checksum >= 3) {
                        GNIDBG_MSG(D_WARNING, &tx->tx_msg,
@@ -784,7 +854,7 @@ kgnilnd_verify_rdma_cksum(kgn_tx_t *tx, __u16 rx_cksum)
 
        GNITX_ASSERTF(tx, lntmsg, "no LNet message!", NULL);
 
-       kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov);
+       kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov, put_len);
 
        if (kiov != NULL) {
                cksum = kgnilnd_cksum_kiov(niov, kiov, offset, nob, 0);
@@ -805,7 +875,7 @@ kgnilnd_verify_rdma_cksum(kgn_tx_t *tx, __u16 rx_cksum)
                                kgnilnd_dump_blob(D_BUFFS, "RDMA payload",
                                                  tx->tx_buffer, nob);
                        }
-                       /* fall through to dump log */
+                       /* fallthrough */
                case 1:
                        libcfs_debug_dumplog();
                        break;
@@ -840,18 +910,12 @@ kgnilnd_mem_add_map_list(kgn_device_t *dev, kgn_tx_t *tx)
                dev->gnd_map_nphys++;
                dev->gnd_map_physnop += tx->tx_phys_npages;
                break;
-
-       case GNILND_BUF_VIRT_MAPPED:
-               bytes = tx->tx_nob;
-               dev->gnd_map_nvirt++;
-               dev->gnd_map_virtnob += tx->tx_nob;
-               break;
        }
 
        if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_ACK ||
            tx->tx_msg.gnm_type == GNILND_MSG_GET_REQ) {
                atomic64_add(bytes, &dev->gnd_rdmaq_bytes_out);
-               GNIDBG_TX(D_NETTRACE, tx, "rdma ++ %d to "LPD64"",
+               GNIDBG_TX(D_NETTRACE, tx, "rdma ++ %d to %lld",
                          bytes, atomic64_read(&dev->gnd_rdmaq_bytes_out));
        }
 
@@ -889,21 +953,16 @@ kgnilnd_mem_del_map_list(kgn_device_t *dev, kgn_tx_t *tx)
                dev->gnd_map_nphys--;
                dev->gnd_map_physnop -= tx->tx_phys_npages;
                break;
-
-       case GNILND_BUF_VIRT_UNMAPPED:
-               bytes = tx->tx_nob;
-               dev->gnd_map_nvirt--;
-               dev->gnd_map_virtnob -= tx->tx_nob;
-               break;
        }
 
        if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_ACK ||
            tx->tx_msg.gnm_type == GNILND_MSG_GET_REQ) {
                atomic64_sub(bytes, &dev->gnd_rdmaq_bytes_out);
                LASSERTF(atomic64_read(&dev->gnd_rdmaq_bytes_out) >= 0,
-                        "bytes_out negative! %ld\n", atomic64_read(&dev->gnd_rdmaq_bytes_out));
-               GNIDBG_TX(D_NETTRACE, tx, "rdma -- %d to "LPD64"",
-                         bytes, atomic64_read(&dev->gnd_rdmaq_bytes_out));
+                        "bytes_out negative! %lld\n",
+                        (s64)atomic64_read(&dev->gnd_rdmaq_bytes_out));
+               GNIDBG_TX(D_NETTRACE, tx, "rdma -- %d to %lld",
+                         bytes, (s64)atomic64_read(&dev->gnd_rdmaq_bytes_out));
        }
 
        atomic_dec(&dev->gnd_n_mdd);
@@ -947,7 +1006,6 @@ kgnilnd_map_buffer(kgn_tx_t *tx)
        case GNILND_BUF_IMMEDIATE:
        case GNILND_BUF_IMMEDIATE_KIOV:
        case GNILND_BUF_PHYS_MAPPED:
-       case GNILND_BUF_VIRT_MAPPED:
                return 0;
 
        case GNILND_BUF_PHYS_UNMAPPED:
@@ -960,55 +1018,33 @@ kgnilnd_map_buffer(kgn_tx_t *tx)
                 * - this needs to turn into a non-fatal error soon to allow
                 *  GART resource, etc starvation handling */
                if (rrc != GNI_RC_SUCCESS) {
-                       GNIDBG_TX(D_NET, tx, "Can't map %d pages: dev %d "
-                               "phys %u pp %u, virt %u nob "LPU64"",
+                       GNIDBG_TX(D_NET, tx,
+                                 "Can't map %d pages: dev %d phys %u pp %u",
                                tx->tx_phys_npages, dev->gnd_id,
-                               dev->gnd_map_nphys, dev->gnd_map_physnop,
-                               dev->gnd_map_nvirt, dev->gnd_map_virtnob);
+                               dev->gnd_map_nphys, dev->gnd_map_physnop);
                        RETURN(rrc == GNI_RC_ERROR_RESOURCE ? -ENOMEM : -EINVAL);
                }
 
                tx->tx_buftype = GNILND_BUF_PHYS_MAPPED;
                kgnilnd_mem_add_map_list(dev, tx);
                return 0;
-
-       case GNILND_BUF_VIRT_UNMAPPED:
-               rrc = kgnilnd_mem_register(dev->gnd_handle,
-                       (__u64)tx->tx_buffer, tx->tx_nob,
-                       NULL, flags, &tx->tx_map_key);
-               if (rrc != GNI_RC_SUCCESS) {
-                       GNIDBG_TX(D_NET, tx, "Can't map %u bytes: dev %d "
-                               "phys %u pp %u, virt %u nob "LPU64"",
-                               tx->tx_nob, dev->gnd_id,
-                               dev->gnd_map_nphys, dev->gnd_map_physnop,
-                               dev->gnd_map_nvirt, dev->gnd_map_virtnob);
-                       RETURN(rrc == GNI_RC_ERROR_RESOURCE ? -ENOMEM : -EINVAL);
-               }
-
-               tx->tx_buftype = GNILND_BUF_VIRT_MAPPED;
-               kgnilnd_mem_add_map_list(dev, tx);
-               if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_ACK ||
-                   tx->tx_msg.gnm_type == GNILND_MSG_GET_REQ) {
-                       atomic64_add(tx->tx_nob, &dev->gnd_rdmaq_bytes_out);
-                       GNIDBG_TX(D_NETTRACE, tx, "rdma ++ %d to %ld\n",
-                              tx->tx_nob, atomic64_read(&dev->gnd_rdmaq_bytes_out));
-               }
-
-               return 0;
        }
 }
 
 void
 kgnilnd_add_purgatory_tx(kgn_tx_t *tx)
 {
-       kgn_conn_t                  *conn = tx->tx_conn;
-       kgn_mdd_purgatory_t         *gmp;
+       kgn_conn_t              *conn = tx->tx_conn;
+       kgn_mdd_purgatory_t     *gmp;
 
        LIBCFS_ALLOC(gmp, sizeof(*gmp));
        LASSERTF(gmp != NULL, "couldn't allocate MDD purgatory member;"
                " asserting to avoid data corruption\n");
+       if (tx->tx_buffer_copy)
+               gmp->gmp_map_key = tx->tx_buffer_copy_map_key;
+       else
+               gmp->gmp_map_key = tx->tx_map_key;
 
-       gmp->gmp_map_key = tx->tx_map_key;
        atomic_inc(&conn->gnc_device->gnd_n_mdd_held);
 
        /* ensure that we don't have a blank purgatory - indicating the
@@ -1033,8 +1069,8 @@ kgnilnd_unmap_buffer(kgn_tx_t *tx, int error)
        int               hold_timeout = 0;
 
        /* code below relies on +1 relationship ... */
-       CLASSERT(GNILND_BUF_PHYS_MAPPED == (GNILND_BUF_PHYS_UNMAPPED + 1));
-       CLASSERT(GNILND_BUF_VIRT_MAPPED == (GNILND_BUF_VIRT_UNMAPPED + 1));
+       BUILD_BUG_ON(GNILND_BUF_PHYS_MAPPED !=
+                    (GNILND_BUF_PHYS_UNMAPPED + 1));
 
        switch (tx->tx_buftype) {
        default:
@@ -1043,7 +1079,6 @@ kgnilnd_unmap_buffer(kgn_tx_t *tx, int error)
        case GNILND_BUF_NONE:
        case GNILND_BUF_IMMEDIATE:
        case GNILND_BUF_PHYS_UNMAPPED:
-       case GNILND_BUF_VIRT_UNMAPPED:
                break;
        case GNILND_BUF_IMMEDIATE_KIOV:
                if (tx->tx_phys != NULL) {
@@ -1057,7 +1092,6 @@ kgnilnd_unmap_buffer(kgn_tx_t *tx, int error)
                break;
 
        case GNILND_BUF_PHYS_MAPPED:
-       case GNILND_BUF_VIRT_MAPPED:
                LASSERT(tx->tx_conn != NULL);
 
                dev = tx->tx_conn->gnc_device;
@@ -1066,6 +1100,7 @@ kgnilnd_unmap_buffer(kgn_tx_t *tx, int error)
                 * verified peer notification  - the theory is that
                 * a TX error can be communicated in all other cases */
                if (tx->tx_conn->gnc_state != GNILND_CONN_ESTABLISHED &&
+                   error != -GNILND_NOPURG &&
                    kgnilnd_check_purgatory_conn(tx->tx_conn)) {
                        kgnilnd_add_purgatory_tx(tx);
 
@@ -1075,14 +1110,19 @@ kgnilnd_unmap_buffer(kgn_tx_t *tx, int error)
                        hold_timeout = GNILND_TIMEOUT2DEADMAN;
 
                        GNIDBG_TX(D_NET, tx,
-                                "dev %p delaying MDD release for %dms key "LPX64"."LPX64"",
+                                "dev %p delaying MDD release for %dms key %#llx.%#llx",
                                 tx->tx_conn->gnc_device, hold_timeout,
                                 tx->tx_map_key.qword1, tx->tx_map_key.qword2);
                }
-
-               rrc = kgnilnd_mem_deregister(dev->gnd_handle, &tx->tx_map_key, hold_timeout);
-
-               LASSERTF(rrc == GNI_RC_SUCCESS, "rrc %d\n", rrc);
+               if (tx->tx_buffer_copy != NULL) {
+                       rrc = kgnilnd_mem_deregister(dev->gnd_handle, &tx->tx_buffer_copy_map_key, hold_timeout);
+                       LASSERTF(rrc == GNI_RC_SUCCESS, "rrc %d\n", rrc);
+                       rrc = kgnilnd_mem_deregister(dev->gnd_handle, &tx->tx_map_key, 0);
+                       LASSERTF(rrc == GNI_RC_SUCCESS, "rrc %d\n", rrc);
+               } else {
+                       rrc = kgnilnd_mem_deregister(dev->gnd_handle, &tx->tx_map_key, hold_timeout);
+                       LASSERTF(rrc == GNI_RC_SUCCESS, "rrc %d\n", rrc);
+               }
 
                tx->tx_buftype--;
                kgnilnd_mem_del_map_list(dev, tx);
@@ -1093,9 +1133,9 @@ kgnilnd_unmap_buffer(kgn_tx_t *tx, int error)
 void
 kgnilnd_tx_done(kgn_tx_t *tx, int completion)
 {
-       lnet_msg_t      *lntmsg0, *lntmsg1;
+       struct lnet_msg      *lntmsg0, *lntmsg1;
        int             status0, status1;
-       lnet_ni_t       *ni = NULL;
+       struct lnet_ni       *ni = NULL;
        kgn_conn_t      *conn = tx->tx_conn;
 
        LASSERT(!in_interrupt());
@@ -1112,7 +1152,7 @@ kgnilnd_tx_done(kgn_tx_t *tx, int completion)
                       libcfs_nid2str(conn->gnc_peer->gnp_nid) : "<?>",
                       tx->tx_id.txe_smsg_id, tx->tx_id.txe_idx,
                       kgnilnd_tx_state2str(tx->tx_list_state),
-                      cfs_duration_sec((long)jiffies - tx->tx_qtime));
+                      cfs_duration_sec((unsigned long)jiffies - tx->tx_qtime));
        }
 
        /* The error codes determine if we hold onto the MDD */
@@ -1159,10 +1199,11 @@ kgnilnd_tx_done(kgn_tx_t *tx, int completion)
        /* warning - we should hold no locks here - calling lnet_finalize
         * could free up lnet credits, resulting in a call chain back into
         * the LND via kgnilnd_send and friends */
-       lnet_finalize(ni, lntmsg0, status0);
+
+       lnet_finalize(lntmsg0, status0);
 
        if (lntmsg1 != NULL) {
-               lnet_finalize(ni, lntmsg1, status1);
+               lnet_finalize(lntmsg1, status1);
        }
 }
 
@@ -1229,7 +1270,7 @@ search_again:
         * if we are sending to the same node faster than 256000/sec.
         * To help guard against this, we OR in the tx_seq - that is 32 bits */
 
-       tx->tx_id.txe_chips = (__u32)(jiffies | conn->gnc_tx_seq);
+       tx->tx_id.txe_chips = (__u32)(jiffies | atomic_read(&conn->gnc_tx_seq));
 
        GNIDBG_TX(D_NET, tx, "set cookie/id/bits", NULL);
 
@@ -1237,70 +1278,35 @@ search_again:
        return 0;
 }
 
-static inline int
-kgnilnd_tx_should_retry(kgn_conn_t *conn, kgn_tx_t *tx)
+static inline void
+kgnilnd_tx_log_retrans(kgn_conn_t *conn, kgn_tx_t *tx)
 {
-       int             max_retrans = *kgnilnd_tunables.kgn_max_retransmits;
        int             log_retrans;
-       int             log_retrans_level;
-
-       /* I need kgni credits to send this.  Replace tx at the head of the
-        * fmaq and I'll get rescheduled when credits appear */
-       tx->tx_state = 0;
-       tx->tx_retrans++;
-       conn->gnc_tx_retrans++;
-       log_retrans = ((tx->tx_retrans < 25) || ((tx->tx_retrans % 25) == 0) ||
-                       (tx->tx_retrans > (max_retrans / 2)));
-       log_retrans_level = tx->tx_retrans < (max_retrans / 2) ? D_NET : D_NETERROR;
 
-       /* Decision time - either error, warn or just retransmit */
+       log_retrans = ((tx->tx_retrans < 25) || ((tx->tx_retrans % 25) == 0));
 
        /* we don't care about TX timeout - it could be that the network is slower
         * or throttled. We'll keep retranmitting - so if the network is so slow
         * that we fill up our mailbox, we'll keep trying to resend that msg
         * until we exceed the max_retrans _or_ gnc_last_rx expires, indicating
         * that he hasn't send us any traffic in return */
-
-       if (tx->tx_retrans > max_retrans) {
-               /* this means we are not backing off the retransmits
-                * in a healthy manner and are likely chewing up the
-                * CPU cycles quite badly */
-               GNIDBG_TOMSG(D_ERROR, &tx->tx_msg,
-                       "SOFTWARE BUG: too many retransmits (%d) for tx id %x "
-                       "conn 0x%p->%s\n",
-                       tx->tx_retrans, tx->tx_id, conn,
-                       libcfs_nid2str(conn->gnc_peer->gnp_nid));
-
-               /* yes - double errors to help debug this condition */
-               GNIDBG_TOMSG(D_NETERROR, &tx->tx_msg, "connection dead. "
-                       "unable to send to %s for %lu secs (%d tries)",
-                       libcfs_nid2str(tx->tx_conn->gnc_peer->gnp_nid),
-                       cfs_duration_sec(jiffies - tx->tx_cred_wait),
-                       tx->tx_retrans);
-
-               kgnilnd_close_conn(conn, -ETIMEDOUT);
-
-               /* caller should terminate */
-               RETURN(0);
-       } else {
-               /* some reasonable throttling of the debug message */
-               if (log_retrans) {
-                       unsigned long now = jiffies;
-                       /* XXX Nic: Mystical TX debug here... */
-                       GNIDBG_SMSG_CREDS(log_retrans_level, conn);
-                       GNIDBG_TOMSG(log_retrans_level, &tx->tx_msg,
-                               "NOT_DONE on conn 0x%p->%s id %x retrans %d wait %dus"
-                               " last_msg %uus/%uus last_cq %uus/%uus",
-                               conn, libcfs_nid2str(conn->gnc_peer->gnp_nid),
-                               tx->tx_id, tx->tx_retrans,
-                               jiffies_to_usecs(now - tx->tx_cred_wait),
-                               jiffies_to_usecs(now - conn->gnc_last_tx),
-                               jiffies_to_usecs(now - conn->gnc_last_rx),
-                               jiffies_to_usecs(now - conn->gnc_last_tx_cq),
-                               jiffies_to_usecs(now - conn->gnc_last_rx_cq));
-               }
-               /* caller should retry */
-               RETURN(1);
+       
+       /* some reasonable throttling of the debug message */
+       if (log_retrans) {
+               unsigned long now = jiffies;
+               /* XXX Nic: Mystical TX debug here... */
+               /* We expect retransmissions so only log when D_NET is enabled */
+               GNIDBG_SMSG_CREDS(D_NET, conn);
+               GNIDBG_TOMSG(D_NET, &tx->tx_msg,
+                       "NOT_DONE on conn 0x%p->%s id %x retrans %d wait %dus"
+                       " last_msg %uus/%uus last_cq %uus/%uus",
+                       conn, libcfs_nid2str(conn->gnc_peer->gnp_nid),
+                       tx->tx_id, tx->tx_retrans,
+                       jiffies_to_usecs(now - tx->tx_cred_wait),
+                       jiffies_to_usecs(now - conn->gnc_last_tx),
+                       jiffies_to_usecs(now - conn->gnc_last_rx),
+                       jiffies_to_usecs(now - conn->gnc_last_tx_cq),
+                       jiffies_to_usecs(now - conn->gnc_last_rx_cq));
        }
 }
 
@@ -1313,7 +1319,6 @@ kgnilnd_sendmsg_nolock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob,
 {
        kgn_conn_t      *conn = tx->tx_conn;
        kgn_msg_t       *msg = &tx->tx_msg;
-       int              retry_send;
        gni_return_t     rrc;
        unsigned long    newest_last_rx, timeout;
        unsigned long    now;
@@ -1331,7 +1336,8 @@ kgnilnd_sendmsg_nolock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob,
         * close message.
         */
        if (atomic_read(&conn->gnc_peer->gnp_dirty_eps) != 0 && msg->gnm_type != GNILND_MSG_CLOSE) {
-               mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+               kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex);
+               kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
                /* Return -ETIME, we are closing the connection already so we dont want to
                 * have this tx hit the wire. The tx will be killed by the calling function.
                 * Once the EP is marked dirty the close message will be the last
@@ -1349,11 +1355,13 @@ kgnilnd_sendmsg_nolock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob,
        }
 
        if (time_after_eq(now, newest_last_rx + GNILND_TIMEOUTRX(timeout))) {
-               GNIDBG_CONN(D_NETERROR|D_CONSOLE, conn, "Cant send to %s after timeout lapse of %lu; TO %lu",
+               GNIDBG_CONN(D_NETERROR|D_CONSOLE, conn,
+                           "Cant send to %s after timeout lapse of %lu; TO %lu\n",
                libcfs_nid2str(conn->gnc_peer->gnp_nid),
                cfs_duration_sec(now - newest_last_rx),
                cfs_duration_sec(GNILND_TIMEOUTRX(timeout)));
-               mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+               kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex);
+               kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
                return -ETIME;
        }
 
@@ -1364,7 +1372,7 @@ kgnilnd_sendmsg_nolock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob,
         */
        msg->gnm_connstamp = conn->gnc_my_connstamp;
        msg->gnm_payload_len = immediatenob;
-       msg->gnm_seq = conn->gnc_tx_seq;
+       msg->gnm_seq = atomic_read(&conn->gnc_tx_seq);
 
        /* always init here - kgn_checksum is a /sys module tunable
         * and can be flipped at any point, even between msg init and sending */
@@ -1388,14 +1396,15 @@ kgnilnd_sendmsg_nolock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob,
        if (unlikely(tx->tx_state & GNILND_TX_FAIL_SMSG)) {
                rrc = cfs_fail_val ? cfs_fail_val : GNI_RC_NOT_DONE;
        } else {
-       rrc = kgnilnd_smsg_send(conn->gnc_ephandle,
-                                   msg, sizeof(*msg), immediate, immediatenob,
-                           tx->tx_id.txe_smsg_id);
+               rrc = kgnilnd_smsg_send(conn->gnc_ephandle,
+                                       msg, sizeof(*msg), immediate,
+                                       immediatenob,
+                                       tx->tx_id.txe_smsg_id);
        }
 
        switch (rrc) {
        case GNI_RC_SUCCESS:
-               conn->gnc_tx_seq++;
+               atomic_inc(&conn->gnc_tx_seq);
                conn->gnc_last_tx = jiffies;
                /* no locking here as LIVE isn't a list */
                kgnilnd_tx_add_state_locked(tx, NULL, conn, GNILND_TX_LIVE_FMAQ, 1);
@@ -1409,7 +1418,8 @@ kgnilnd_sendmsg_nolock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob,
 
                /* serialize with seeing CQ events for completion on this, as well as
                 * tx_seq */
-               mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+               kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex);
+               kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
 
                atomic_inc(&conn->gnc_device->gnd_short_ntx);
                atomic64_add(immediatenob, &conn->gnc_device->gnd_short_txbytes);
@@ -1418,11 +1428,13 @@ kgnilnd_sendmsg_nolock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob,
                return 0;
 
        case GNI_RC_NOT_DONE:
-               /* XXX Nic: We need to figure out how to track this
-                * - there are bound to be good reasons for it,
-                * but we want to know when it happens */
-
-               mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+               /* Jshimek: We can get GNI_RC_NOT_DONE for 3 reasons currently
+                * 1: out of mbox credits
+                * 2: out of mbox payload credits
+                * 3: On Aries out of dla credits
+                */
+               kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex);
+               kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
                /* We'll handle this error inline - makes the calling logic much more
                 * clean */
 
@@ -1431,35 +1443,41 @@ kgnilnd_sendmsg_nolock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob,
                        return -EAGAIN;
                }
 
-               retry_send = kgnilnd_tx_should_retry(conn, tx);
-               if (retry_send) {
-                       /* add to head of list for the state and retries */
-                       spin_lock(state_lock);
-                       kgnilnd_tx_add_state_locked(tx, conn->gnc_peer, conn, state, 0);
-                       spin_unlock(state_lock);
-
-                       /* We only reschedule for a certain number of retries, then
-                        * we will wait for the CQ events indicating a release of SMSG
-                        * credits */
-                       if (tx->tx_retrans < (*kgnilnd_tunables.kgn_max_retransmits/4)) {
-                               kgnilnd_schedule_conn(conn);
-                               return 0;
-                       } else {
-                               /* CQ event coming in signifies either TX completed or
-                                * RX receive. Either of these *could* free up credits
-                                * in the SMSG mbox and we should try sending again */
-                               GNIDBG_TX(D_NET, tx, "waiting for CQID %u event to resend",
-                                        tx->tx_conn->gnc_cqid);
-                               /* use +ve return code to let upper layers know they
-                                * should stop looping on sends */
-                               return EAGAIN;
-                       }
+               /* I need kgni credits to send this.  Replace tx at the head of the
+                * fmaq and I'll get rescheduled when credits appear. Reset the tx_state
+                * and bump retrans counts since we are requeueing the tx.
+                */
+               tx->tx_state = 0;
+               tx->tx_retrans++;
+               conn->gnc_tx_retrans++;
+
+               kgnilnd_tx_log_retrans(conn, tx);
+               /* add to head of list for the state and retries */
+               spin_lock(state_lock);
+               kgnilnd_tx_add_state_locked(tx, conn->gnc_peer, conn, state, 0);
+               spin_unlock(state_lock);
+
+               /* We only reschedule for a certain number of retries, then
+                * we will wait for the CQ events indicating a release of SMSG
+                * credits */
+               if (tx->tx_retrans < *kgnilnd_tunables.kgn_max_retransmits) {
+                       kgnilnd_schedule_conn(conn);
+                       return 0;
                } else {
-                       return -EAGAIN;
+                       /* CQ event coming in signifies either TX completed or
+                        * RX receive. Either of these *could* free up credits
+                        * in the SMSG mbox and we should try sending again */
+                       GNIDBG_TX(D_NET, tx, "waiting for CQID %u event to resend",
+                                tx->tx_conn->gnc_cqid);
+                       kgnilnd_schedule_delay_conn(conn);
+                       /* use +ve return code to let upper layers know they
+                        * should stop looping on sends */
+                       return EAGAIN;
                }
        default:
                /* handle bad retcode gracefully */
-               mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+               kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex);
+               kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
                return -EIO;
        }
 }
@@ -1474,7 +1492,8 @@ kgnilnd_sendmsg(kgn_tx_t *tx, void *immediate, unsigned int immediatenob,
        int              rc;
 
        timestamp = jiffies;
-       mutex_lock(&dev->gnd_cq_mutex);
+       kgnilnd_gl_mutex_lock(&dev->gnd_cq_mutex);
+       kgnilnd_conn_mutex_lock(&tx->tx_conn->gnc_smsg_mutex);
        /* delay in jiffies - we are really concerned only with things that
         * result in a schedule() or really holding this off for long times .
         * NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */
@@ -1519,7 +1538,8 @@ kgnilnd_sendmsg_trylock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob
                rc = 0;
        } else {
                atomic_inc(&conn->gnc_device->gnd_fast_try);
-               rc = mutex_trylock(&conn->gnc_device->gnd_cq_mutex);
+               rc = kgnilnd_trylock(&conn->gnc_device->gnd_cq_mutex,
+                                    &conn->gnc_smsg_mutex);
        }
        if (!rc) {
                rc = -EAGAIN;
@@ -1542,7 +1562,7 @@ kgnilnd_sendmsg_trylock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob
 }
 
 /* lets us know if we can push this RDMA through now */
-inline int
+static int
 kgnilnd_auth_rdma_bytes(kgn_device_t *dev, kgn_tx_t *tx)
 {
        long    bytes_left;
@@ -1599,7 +1619,7 @@ kgnilnd_queue_rdma(kgn_conn_t *conn, kgn_tx_t *tx)
 void
 kgnilnd_queue_tx(kgn_conn_t *conn, kgn_tx_t *tx)
 {
-       int            rc;
+       int            rc = 0;
        int            add_tail = 1;
 
        /* set the tx_id here, we delay it until we have an actual conn
@@ -1627,6 +1647,8 @@ kgnilnd_queue_tx(kgn_conn_t *conn, kgn_tx_t *tx)
        switch (tx->tx_msg.gnm_type) {
        case GNILND_MSG_PUT_ACK:
        case GNILND_MSG_GET_REQ:
+       case GNILND_MSG_PUT_REQ_REV:
+       case GNILND_MSG_GET_ACK_REV:
                /* hijacking time! If this messages will authorize our peer to
                 * send his dirty little bytes in an RDMA, we need to get permission */
                kgnilnd_queue_rdma(conn, tx);
@@ -1638,17 +1660,9 @@ kgnilnd_queue_tx(kgn_conn_t *conn, kgn_tx_t *tx)
                if (rc >= 0) {
                        /* it was sent, break out of switch to avoid default case of queueing */
                        break;
-               } else if (rc == -EAGAIN) {
-                       /* needs to queue to try again, so  fall through to default case */
-               } else {
-                       /* bail: it wasnt sent and we didn't get EAGAIN indicating
-                        * we should retrans - We do not close the conn due to locking
-                        * we let the reaper thread take care of it. There are no hard
-                        * errors from send_msg that would require close to be called
-                        */
-                       kgnilnd_tx_done(tx, rc);
-                       break;
                }
+               /* needs to queue to try again, so... */
+               /* fall through... */
        case GNILND_MSG_NOOP:
                /* Just make sure this goes out first for this conn */
                add_tail = 0;
@@ -1663,12 +1677,13 @@ kgnilnd_queue_tx(kgn_conn_t *conn, kgn_tx_t *tx)
 }
 
 void
-kgnilnd_launch_tx(kgn_tx_t *tx, kgn_net_t *net, lnet_process_id_t *target)
+kgnilnd_launch_tx(kgn_tx_t *tx, kgn_net_t *net, struct lnet_process_id *target)
 {
        kgn_peer_t      *peer;
        kgn_peer_t      *new_peer = NULL;
        kgn_conn_t      *conn = NULL;
        int              rc;
+       int              node_state;
 
        ENTRY;
 
@@ -1695,6 +1710,13 @@ kgnilnd_launch_tx(kgn_tx_t *tx, kgn_net_t *net, lnet_process_id_t *target)
                        read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
                        RETURN_EXIT;
                }
+
+               /* don't create a connection if the peer is marked down */
+               if (peer->gnp_state != GNILND_PEER_UP) {
+                       read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
+                       rc = -ENETRESET;
+                       GOTO(no_peer, rc);
+               }
        }
 
        /* creating peer or conn; I'll need a write lock... */
@@ -1702,6 +1724,8 @@ kgnilnd_launch_tx(kgn_tx_t *tx, kgn_net_t *net, lnet_process_id_t *target)
 
        CFS_RACE(CFS_FAIL_GNI_FIND_TARGET);
 
+       node_state = kgnilnd_get_node_state(LNET_NIDADDR(target->nid));
+
        /* NB - this will not block during normal operations -
         * the only writer of this is in the startup/shutdown path. */
        rc = down_read_trylock(&kgnilnd_data.kgn_net_rw_sem);
@@ -1713,7 +1737,7 @@ kgnilnd_launch_tx(kgn_tx_t *tx, kgn_net_t *net, lnet_process_id_t *target)
        /* ignore previous peer entirely - we cycled the lock, so we
         * will create new peer and at worst drop it if peer is still
         * in the tables */
-       rc = kgnilnd_create_peer_safe(&new_peer, target->nid, net);
+       rc = kgnilnd_create_peer_safe(&new_peer, target->nid, net, node_state);
        if (rc != 0) {
                up_read(&kgnilnd_data.kgn_net_rw_sem);
                GOTO(no_peer, rc);
@@ -1726,7 +1750,20 @@ kgnilnd_launch_tx(kgn_tx_t *tx, kgn_net_t *net, lnet_process_id_t *target)
         * if we don't find it, add our new one to the list */
        kgnilnd_add_peer_locked(target->nid, new_peer, &peer);
 
+       /* don't create a connection if the peer is not up */
+       if (peer->gnp_state != GNILND_PEER_UP) {
+               write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
+               rc = -ENETRESET;
+               GOTO(no_peer, rc);
+       }
+
        conn = kgnilnd_find_or_create_conn_locked(peer);
+
+       if (CFS_FAIL_CHECK(CFS_FAIL_GNI_DGRAM_DROP_TX)) {
+               write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
+               GOTO(no_peer, rc);
+       }
+
        if (conn != NULL) {
                /* oh hey, found a conn now... magical */
                kgnilnd_queue_tx(conn, tx);
@@ -1742,14 +1779,18 @@ no_peer:
        RETURN_EXIT;
 }
 
-void
+int
 kgnilnd_rdma(kgn_tx_t *tx, int type,
            kgn_rdma_desc_t *sink, unsigned int nob, __u64 cookie)
 {
        kgn_conn_t   *conn = tx->tx_conn;
        unsigned long timestamp;
+       gni_post_type_t post_type;
        gni_return_t  rrc;
-
+       int rc = 0;
+       unsigned int desc_nob = nob;
+       void *desc_buffer = tx->tx_buffer;
+       gni_mem_handle_t desc_map_key = tx->tx_map_key;
        LASSERTF(kgnilnd_tx_mapped(tx),
                "unmapped tx %p\n", tx);
        LASSERTF(conn != NULL,
@@ -1761,45 +1802,113 @@ kgnilnd_rdma(kgn_tx_t *tx, int type,
                "nob %d > tx(%p)->tx_nob %d\n",
                nob, tx, tx->tx_nob);
 
+       switch (type) {
+       case GNILND_MSG_GET_DONE:
+       case GNILND_MSG_PUT_DONE:
+               post_type = GNI_POST_RDMA_PUT;
+               break;
+       case GNILND_MSG_GET_DONE_REV:
+       case GNILND_MSG_PUT_DONE_REV:
+               post_type = GNI_POST_RDMA_GET;
+               break;
+       default:
+               CERROR("invalid msg type %s (%d)\n",
+                       kgnilnd_msgtype2str(type), type);
+               LBUG();
+       }
+       if (post_type == GNI_POST_RDMA_GET) {
+               /* Check for remote buffer / local buffer / length alignment. All must be 4 byte
+                * aligned. If the local buffer is not aligned correctly using the copy buffer
+                * will fix that issue. If length is misaligned copy buffer will also fix the issue, we end
+                * up transferring extra bytes into the buffer but only copy the correct nob into the original
+                * buffer.  Remote offset correction is done through a combination of adjusting the offset,
+                * making sure the length and addr are aligned and copying the data into the correct location
+                * once the transfer has completed.
+                */
+               if ((((__u64)((unsigned long)tx->tx_buffer)) & 3) ||
+                     (sink->gnrd_addr & 3) ||
+                     (nob & 3)) {
+
+                       tx->tx_offset = ((__u64)((unsigned long)sink->gnrd_addr)) & 3;
+                       if (tx->tx_offset)
+                               atomic_inc(&kgnilnd_data.kgn_rev_offset);
+
+                       if ((nob + tx->tx_offset) & 3) {
+                               desc_nob = ((nob + tx->tx_offset) + (4 - ((nob + tx->tx_offset) & 3)));
+                               atomic_inc(&kgnilnd_data.kgn_rev_length);
+                       } else {
+                               desc_nob = (nob + tx->tx_offset);
+                       }
+
+                       if (tx->tx_buffer_copy == NULL) {
+                               /* Allocate the largest copy buffer we will need, this will prevent us from overwriting data
+                                * and require at most we allocate a few extra bytes. */
+                               tx->tx_buffer_copy = kgnilnd_vzalloc(desc_nob);
+
+                               if (!tx->tx_buffer_copy) {
+                                       /* allocation of buffer failed nak the rdma */
+                                       kgnilnd_nak_rdma(tx->tx_conn, tx->tx_msg.gnm_type, -EFAULT, cookie, tx->tx_msg.gnm_srcnid);
+                                       kgnilnd_tx_done(tx, -EFAULT);
+                                       return 0;
+                               }
+                               atomic_inc(&kgnilnd_data.kgn_rev_copy_buff);
+                               rc = kgnilnd_mem_register(conn->gnc_device->gnd_handle, (__u64)tx->tx_buffer_copy, desc_nob, NULL, GNI_MEM_READWRITE, &tx->tx_buffer_copy_map_key);
+                               if (rc != GNI_RC_SUCCESS) {
+                                       /* Registration Failed nak rdma and kill the tx. */
+                                       kgnilnd_vfree(tx->tx_buffer_copy,
+                                                     desc_nob);
+                                       tx->tx_buffer_copy = NULL;
+                                       kgnilnd_nak_rdma(tx->tx_conn, tx->tx_msg.gnm_type, -EFAULT, cookie, tx->tx_msg.gnm_srcnid);
+                                       kgnilnd_tx_done(tx, -EFAULT);
+                                       return 0;
+                               }
+                       }
+                       desc_map_key = tx->tx_buffer_copy_map_key;
+                       desc_buffer = tx->tx_buffer_copy;
+               }
+       }
+
        memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
        tx->tx_rdma_desc.post_id = tx->tx_id.txe_cookie;
-       tx->tx_rdma_desc.type = GNI_POST_RDMA_PUT;
+       tx->tx_rdma_desc.type = post_type;
        tx->tx_rdma_desc.cq_mode = GNI_CQMODE_GLOBAL_EVENT;
-       tx->tx_rdma_desc.local_addr = (__u64)((unsigned long)tx->tx_buffer);
-       tx->tx_rdma_desc.local_mem_hndl = tx->tx_map_key;
-       tx->tx_rdma_desc.remote_addr = sink->gnrd_addr;
+       tx->tx_rdma_desc.local_addr = (__u64)((unsigned long)desc_buffer);
+       tx->tx_rdma_desc.local_mem_hndl = desc_map_key;
+       tx->tx_rdma_desc.remote_addr = sink->gnrd_addr - tx->tx_offset;
        tx->tx_rdma_desc.remote_mem_hndl = sink->gnrd_key;
-       tx->tx_rdma_desc.length = nob;
-       if (!*kgnilnd_tunables.kgn_bte_hash)
-               tx->tx_rdma_desc.dlvr_mode |= GNI_DLVMODE_NO_HASH;
-       if (!*kgnilnd_tunables.kgn_bte_adapt)
-               tx->tx_rdma_desc.dlvr_mode |= (GNI_DLVMODE_NO_ADAPT | GNI_DLVMODE_NO_RADAPT);
-
+       tx->tx_rdma_desc.length = desc_nob;
+       tx->tx_nob_rdma = nob;
+       if (post_type == GNI_POST_RDMA_PUT && *kgnilnd_tunables.kgn_bte_put_dlvr_mode)
+               tx->tx_rdma_desc.dlvr_mode = *kgnilnd_tunables.kgn_bte_put_dlvr_mode;
+       if (post_type == GNI_POST_RDMA_GET && *kgnilnd_tunables.kgn_bte_get_dlvr_mode)
+               tx->tx_rdma_desc.dlvr_mode = *kgnilnd_tunables.kgn_bte_get_dlvr_mode;
        /* prep final completion message */
        kgnilnd_init_msg(&tx->tx_msg, type, tx->tx_msg.gnm_srcnid);
        tx->tx_msg.gnm_u.completion.gncm_cookie = cookie;
        /* send actual size RDMA'd in retval */
        tx->tx_msg.gnm_u.completion.gncm_retval = nob;
 
-       kgnilnd_compute_rdma_cksum(tx);
+       kgnilnd_compute_rdma_cksum(tx, nob);
 
        if (nob == 0) {
                kgnilnd_queue_tx(conn, tx);
-               return;
+               return 0;
        }
 
        /* Don't lie (CLOSE == RDMA idle) */
        LASSERTF(!conn->gnc_close_sent, "tx %p on conn %p after close sent %d\n",
                 tx, conn, conn->gnc_close_sent);
 
-       GNIDBG_TX(D_NET, tx, "Post RDMA type 0x%02x dlvr_mode 0x%x",
-              type, tx->tx_rdma_desc.dlvr_mode);
+       GNIDBG_TX(D_NET, tx, "Post RDMA type 0x%02x conn %p dlvr_mode "
+               "0x%x cookie:%#llx",
+               type, conn, tx->tx_rdma_desc.dlvr_mode, cookie);
 
        /* set CQ dedicated for RDMA */
        tx->tx_rdma_desc.src_cq_hndl = conn->gnc_device->gnd_snd_rdma_cqh;
 
        timestamp = jiffies;
-       mutex_lock(&conn->gnc_device->gnd_cq_mutex);
+       kgnilnd_conn_mutex_lock(&conn->gnc_rdma_mutex);
+       kgnilnd_gl_mutex_lock(&conn->gnc_device->gnd_cq_mutex);
        /* delay in jiffies - we are really concerned only with things that
         * result in a schedule() or really holding this off for long times .
         * NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */
@@ -1807,25 +1916,43 @@ kgnilnd_rdma(kgn_tx_t *tx, int type,
 
        rrc = kgnilnd_post_rdma(conn->gnc_ephandle, &tx->tx_rdma_desc);
 
+       if (rrc == GNI_RC_ERROR_RESOURCE) {
+               kgnilnd_conn_mutex_unlock(&conn->gnc_rdma_mutex);
+               kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+               kgnilnd_unmap_buffer(tx, 0);
+
+               if (tx->tx_buffer_copy != NULL) {
+                       kgnilnd_vfree(tx->tx_buffer_copy, desc_nob);
+                       tx->tx_buffer_copy = NULL;
+               }
+
+               spin_lock(&tx->tx_conn->gnc_device->gnd_lock);
+               kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn,
+                                           GNILND_TX_MAPQ, 0);
+               spin_unlock(&tx->tx_conn->gnc_device->gnd_lock);
+               kgnilnd_schedule_device(tx->tx_conn->gnc_device);
+               return -EAGAIN;
+       }
+
        spin_lock(&conn->gnc_list_lock);
        kgnilnd_tx_add_state_locked(tx, conn->gnc_peer, conn, GNILND_TX_LIVE_RDMAQ, 1);
        tx->tx_qtime = jiffies;
        spin_unlock(&conn->gnc_list_lock);
-
-       mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+       kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+       kgnilnd_conn_mutex_unlock(&conn->gnc_rdma_mutex);
 
        /* XXX Nic: is this a place we should handle more errors for
         * robustness sake */
        LASSERT(rrc == GNI_RC_SUCCESS);
-
+       return 0;
 }
 
 kgn_rx_t *
 kgnilnd_alloc_rx(void)
 {
-       kgn_rx_t        *rx;
+       kgn_rx_t        *rx;
 
-       rx = cfs_mem_cache_alloc(kgnilnd_data.kgn_rx_cache, CFS_ALLOC_ATOMIC);
+       rx = kmem_cache_alloc(kgnilnd_data.kgn_rx_cache, GFP_ATOMIC);
        if (rx == NULL) {
                CERROR("failed to allocate rx\n");
                return NULL;
@@ -1848,19 +1975,19 @@ kgnilnd_release_msg(kgn_conn_t *conn)
        CDEBUG(D_NET, "consuming %p\n", conn);
 
        timestamp = jiffies;
-       mutex_lock(&conn->gnc_device->gnd_cq_mutex);
+       kgnilnd_gl_mutex_lock(&conn->gnc_device->gnd_cq_mutex);
        /* delay in jiffies - we are really concerned only with things that
         * result in a schedule() or really holding this off for long times .
         * NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */
        conn->gnc_device->gnd_mutex_delay += (long) jiffies - timestamp;
 
        rrc = kgnilnd_smsg_release(conn->gnc_ephandle);
-       mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+       kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
 
        LASSERTF(rrc == GNI_RC_SUCCESS, "bad rrc %d\n", rrc);
        GNIDBG_SMSG_CREDS(D_NET, conn);
 
-       return;
+       kgnilnd_schedule_conn(conn);
 }
 
 void
@@ -1872,6 +1999,7 @@ kgnilnd_consume_rx(kgn_rx_t *rx)
        /* if we are eager, free the cache alloc'd msg */
        if (unlikely(rx->grx_eager)) {
                LIBCFS_FREE(rxmsg, sizeof(*rxmsg) + *kgnilnd_tunables.kgn_max_immediate);
+               atomic_dec(&kgnilnd_data.kgn_neager_allocs);
 
                /* release ref from eager_recv */
                kgnilnd_conn_decref(conn);
@@ -1880,31 +2008,30 @@ kgnilnd_consume_rx(kgn_rx_t *rx)
                kgnilnd_release_msg(conn);
        }
 
-       cfs_mem_cache_free(kgnilnd_data.kgn_rx_cache, rx);
+       kmem_cache_free(kgnilnd_data.kgn_rx_cache, rx);
        CDEBUG(D_MALLOC, "slab-freed 'rx': %lu at %p.\n",
               sizeof(*rx), rx);
-
-       return;
 }
 
 int
-kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
+kgnilnd_send(struct lnet_ni *ni, void *private, struct lnet_msg *lntmsg)
 {
-       lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
+       struct lnet_hdr  *hdr = &lntmsg->msg_hdr;
        int               type = lntmsg->msg_type;
-       lnet_process_id_t target = lntmsg->msg_target;
+       struct lnet_process_id target = lntmsg->msg_target;
        int               target_is_router = lntmsg->msg_target_is_router;
        int               routing = lntmsg->msg_routing;
        unsigned int      niov = lntmsg->msg_niov;
-       struct iovec     *iov = lntmsg->msg_iov;
-       lnet_kiov_t      *kiov = lntmsg->msg_kiov;
+       struct bio_vec   *kiov = lntmsg->msg_kiov;
        unsigned int      offset = lntmsg->msg_offset;
        unsigned int      nob = lntmsg->msg_len;
        unsigned int      msg_vmflush = lntmsg->msg_vmflush;
        kgn_net_t        *net = ni->ni_data;
        kgn_tx_t         *tx;
        int               rc = 0;
-       int               mpflag = 0;
+       /* '1' for consistency with code that checks !mpflag to restore */
+       unsigned int mpflag = 1;
+       int               reverse_rdma_flag = *kgnilnd_tunables.kgn_reverse_rdma;
 
        /* NB 'private' is different depending on what we're sending.... */
        LASSERT(!in_interrupt());
@@ -1917,12 +2044,8 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
        LASSERTF(niov <= LNET_MAX_IOV,
                "lntmsg %p niov %d\n", lntmsg, niov);
 
-       /* payload is either all vaddrs or all pages */
-       LASSERTF(!(kiov != NULL && iov != NULL),
-               "lntmsg %p kiov %p iov %p\n", lntmsg, kiov, iov);
-
        if (msg_vmflush)
-               mpflag = cfs_memory_pressure_get_and_set();
+               mpflag = memalloc_noreclaim_save();
 
        switch (type) {
        default:
@@ -1951,22 +2074,18 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                if (lntmsg->msg_md->md_length <= *kgnilnd_tunables.kgn_max_immediate)
                       break;
 
-               tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_REQ, ni->ni_nid);
+               if ((reverse_rdma_flag & GNILND_REVERSE_GET) == 0)
+                       tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_REQ, ni->ni_nid);
+               else
+                       tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_REQ_REV, ni->ni_nid);
+
                if (tx == NULL) {
                        rc = -ENOMEM;
                        goto out;
                }
-
-               /* slightly different options as we might actually have a GET with a
-                * MD_KIOV set but a non-NULL md_iov.iov */
-               if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
-                       rc = kgnilnd_setup_rdma_buffer(tx, lntmsg->msg_md->md_niov,
-                                                     lntmsg->msg_md->md_iov.iov, NULL,
-                                                     0, lntmsg->msg_md->md_length);
-               else
-                       rc = kgnilnd_setup_rdma_buffer(tx, lntmsg->msg_md->md_niov,
-                                                     NULL, lntmsg->msg_md->md_iov.kiov,
-                                                     0, lntmsg->msg_md->md_length);
+               rc = kgnilnd_setup_rdma_buffer(tx, lntmsg->msg_md->md_niov,
+                                              lntmsg->msg_md->md_kiov,
+                                              0, lntmsg->msg_md->md_length);
                if (rc != 0) {
                        CERROR("unable to setup buffer: %d\n", rc);
                        kgnilnd_tx_done(tx, rc);
@@ -1984,11 +2103,14 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                }
 
                tx->tx_lntmsg[0] = lntmsg;
-               tx->tx_msg.gnm_u.get.gngm_hdr = *hdr;
+               if ((reverse_rdma_flag & GNILND_REVERSE_GET) == 0)
+                       tx->tx_msg.gnm_u.get.gngm_hdr = *hdr;
+               else
+                       tx->tx_msg.gnm_u.putreq.gnprm_hdr = *hdr;
+
                /* rest of tx_msg is setup just before it is sent */
                kgnilnd_launch_tx(tx, net, &target);
                goto out;
-
        case LNET_MSG_REPLY:
        case LNET_MSG_PUT:
                /* to save on MDDs, we'll handle short kiov by vmap'ing
@@ -1996,13 +2118,18 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                if (nob <= *kgnilnd_tunables.kgn_max_immediate)
                       break;
 
-               tx = kgnilnd_new_tx_msg(GNILND_MSG_PUT_REQ, ni->ni_nid);
+               if ((reverse_rdma_flag & GNILND_REVERSE_PUT) == 0)
+                       tx = kgnilnd_new_tx_msg(GNILND_MSG_PUT_REQ, ni->ni_nid);
+               else
+                       tx = kgnilnd_new_tx_msg(GNILND_MSG_PUT_REQ_REV, ni->ni_nid);
+
                if (tx == NULL) {
                        rc = -ENOMEM;
                        goto out;
                }
 
-               rc = kgnilnd_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
+               rc = kgnilnd_setup_rdma_buffer(tx, niov,
+                                              kiov, offset, nob);
                if (rc != 0) {
                        kgnilnd_tx_done(tx, rc);
                        rc = -EIO;
@@ -2010,7 +2137,11 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                }
 
                tx->tx_lntmsg[0] = lntmsg;
-               tx->tx_msg.gnm_u.putreq.gnprm_hdr = *hdr;
+               if ((reverse_rdma_flag & GNILND_REVERSE_PUT) == 0)
+                       tx->tx_msg.gnm_u.putreq.gnprm_hdr = *hdr;
+               else
+                       tx->tx_msg.gnm_u.get.gngm_hdr = *hdr;
+
                /* rest of tx_msg is setup just before it is sent */
                kgnilnd_launch_tx(tx, net, &target);
                goto out;
@@ -2027,7 +2158,7 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                goto out;
        }
 
-       rc = kgnilnd_setup_immediate_buffer(tx, niov, iov, kiov, offset, nob);
+       rc = kgnilnd_setup_immediate_buffer(tx, niov, kiov, offset, nob);
        if (rc != 0) {
                kgnilnd_tx_done(tx, rc);
                goto out;
@@ -2040,24 +2171,39 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
 out:
        /* use stored value as we could have already finalized lntmsg here from a failed launch */
        if (msg_vmflush)
-               cfs_memory_pressure_restore(mpflag);
+               memalloc_noreclaim_restore(mpflag);
        return rc;
 }
 
 void
-kgnilnd_reply(lnet_ni_t *ni, kgn_rx_t *rx, lnet_msg_t *lntmsg)
+kgnilnd_setup_rdma(struct lnet_ni *ni, kgn_rx_t *rx, struct lnet_msg *lntmsg, int mlen)
 {
        kgn_conn_t    *conn = rx->grx_conn;
        kgn_msg_t     *rxmsg = rx->grx_msg;
        unsigned int   niov = lntmsg->msg_niov;
-       struct iovec  *iov = lntmsg->msg_iov;
-       lnet_kiov_t   *kiov = lntmsg->msg_kiov;
+       struct bio_vec *kiov = lntmsg->msg_kiov;
        unsigned int   offset = lntmsg->msg_offset;
        unsigned int   nob = lntmsg->msg_len;
+       int            done_type;
        kgn_tx_t      *tx;
        int            rc = 0;
 
-       tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_DONE, ni->ni_nid);
+       switch (rxmsg->gnm_type) {
+       case GNILND_MSG_PUT_REQ_REV:
+               done_type = GNILND_MSG_PUT_DONE_REV;
+               nob = mlen;
+               break;
+       case GNILND_MSG_GET_REQ:
+               done_type = GNILND_MSG_GET_DONE;
+               break;
+       default:
+               CERROR("invalid msg type %s (%d)\n",
+                       kgnilnd_msgtype2str(rxmsg->gnm_type),
+                       rxmsg->gnm_type);
+               LBUG();
+       }
+
+       tx = kgnilnd_new_tx_msg(done_type, ni->ni_nid);
        if (tx == NULL)
                goto failed_0;
 
@@ -2065,7 +2211,7 @@ kgnilnd_reply(lnet_ni_t *ni, kgn_rx_t *rx, lnet_msg_t *lntmsg)
        if (rc != 0)
                goto failed_1;
 
-       rc = kgnilnd_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
+       rc = kgnilnd_setup_rdma_buffer(tx, niov, kiov, offset, nob);
        if (rc != 0)
                goto failed_1;
 
@@ -2084,19 +2230,21 @@ kgnilnd_reply(lnet_ni_t *ni, kgn_rx_t *rx, lnet_msg_t *lntmsg)
 
  failed_1:
        kgnilnd_tx_done(tx, rc);
-       kgnilnd_nak_rdma(conn, GNILND_MSG_GET_NAK, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid);
+       kgnilnd_nak_rdma(conn, done_type, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid);
  failed_0:
-       lnet_finalize(ni, lntmsg, rc);
+       lnet_finalize(lntmsg, rc);
 }
 
 int
-kgnilnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
+kgnilnd_eager_recv(struct lnet_ni *ni, void *private, struct lnet_msg *lntmsg,
                   void **new_private)
 {
        kgn_rx_t        *rx = private;
        kgn_conn_t      *conn = rx->grx_conn;
        kgn_msg_t       *rxmsg = rx->grx_msg;
        kgn_msg_t       *eagermsg = NULL;
+       kgn_peer_t      *peer = NULL;
+       kgn_conn_t      *found_conn = NULL;
 
        GNIDBG_MSG(D_NET, rxmsg, "eager recv for conn %p, rxmsg %p, lntmsg %p",
                conn, rxmsg, lntmsg);
@@ -2106,11 +2254,56 @@ kgnilnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
                        rxmsg->gnm_payload_len);
                return -EPROTO;
        }
+       /* Grab a read lock so the connection doesnt disappear on us
+        * while we look it up
+        */
+       read_lock(&kgnilnd_data.kgn_peer_conn_lock);
+
+       peer = kgnilnd_find_peer_locked(rxmsg->gnm_srcnid);
+       if (peer != NULL)
+               found_conn = kgnilnd_find_conn_locked(peer);
+
+
+       /* Verify the connection found is the same one that the message
+        * is supposed to be using, if it is not output an error message
+        * and return.
+        */
+       if (!peer || !found_conn
+           || found_conn->gnc_peer_connstamp != rxmsg->gnm_connstamp) {
+               read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
+               CERROR("Couldnt find matching peer %p or conn %p / %p\n",
+                       peer, conn, found_conn);
+               if (found_conn) {
+                       CERROR("Unexpected connstamp %#llx(%#llx expected)"
+                               " from %s", rxmsg->gnm_connstamp,
+                               found_conn->gnc_peer_connstamp,
+                               libcfs_nid2str(peer->gnp_nid));
+               }
+               return -ENOTCONN;
+       }
+
+       /* add conn ref to ensure it doesn't go away until all eager
+        * messages processed */
+       kgnilnd_conn_addref(conn);
+
+       /* Now that we have verified the connection is valid and added a
+        * reference we can remove the read_lock on the peer_conn_lock */
+       read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
 
        /* we have no credits or buffers for this message, so copy it
         * somewhere for a later kgnilnd_recv */
+       if (atomic_read(&kgnilnd_data.kgn_neager_allocs) >=
+                       *kgnilnd_tunables.kgn_eager_credits) {
+               CERROR("Out of eager credits to %s\n",
+                       libcfs_nid2str(conn->gnc_peer->gnp_nid));
+               return -ENOMEM;
+       }
+
+       atomic_inc(&kgnilnd_data.kgn_neager_allocs);
+
        LIBCFS_ALLOC(eagermsg, sizeof(*eagermsg) + *kgnilnd_tunables.kgn_max_immediate);
        if (eagermsg == NULL) {
+               kgnilnd_conn_decref(conn);
                CERROR("couldn't allocate eager rx message for conn %p to %s\n",
                        conn, libcfs_nid2str(conn->gnc_peer->gnp_nid));
                return -ENOMEM;
@@ -2124,9 +2317,6 @@ kgnilnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
        /* stash this for lnet_finalize on cancel-on-conn-close */
        rx->grx_lntmsg = lntmsg;
 
-       /* add conn ref to ensure it doesn't go away until all eager messages processed */
-       kgnilnd_conn_addref(conn);
-
        /* keep the same rx_t, it just has a new grx_msg now */
        *new_private = private;
 
@@ -2137,9 +2327,9 @@ kgnilnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
 }
 
 int
-kgnilnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
+kgnilnd_recv(struct lnet_ni *ni, void *private, struct lnet_msg *lntmsg,
             int delayed, unsigned int niov,
-            struct iovec *iov, lnet_kiov_t *kiov,
+            struct bio_vec *kiov,
             unsigned int offset, unsigned int mlen, unsigned int rlen)
 {
        kgn_rx_t    *rx = private;
@@ -2152,14 +2342,11 @@ kgnilnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
 
        LASSERT(!in_interrupt());
        LASSERTF(mlen <= rlen, "%d <= %d\n", mlen, rlen);
-       /* Either all pages or all vaddrs */
-       LASSERTF(!(kiov != NULL && iov != NULL), "kiov %p iov %p\n",
-               kiov, iov);
 
        GNIDBG_MSG(D_NET, rxmsg, "conn %p, rxmsg %p, lntmsg %p"
-               " niov=%d kiov=%p iov=%p offset=%d mlen=%d rlen=%d",
+               " niov=%d kiov=%p offset=%d mlen=%d rlen=%d",
                conn, rxmsg, lntmsg,
-               niov, kiov, iov, offset, mlen, rlen);
+               niov, kiov, offset, mlen, rlen);
 
        /* we need to lock here as recv can be called from any context */
        read_lock(&kgnilnd_data.kgn_peer_conn_lock);
@@ -2168,13 +2355,16 @@ kgnilnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
 
                /* someone closed the conn after we copied this out, nuke it */
                kgnilnd_consume_rx(rx);
-               lnet_finalize(ni, lntmsg, conn->gnc_error);
+               lnet_finalize(lntmsg, conn->gnc_error);
                RETURN(0);
        }
        read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
 
        switch (rxmsg->gnm_type) {
        default:
+               GNIDBG_MSG(D_NETERROR, rxmsg, "conn %p, rx %p, rxmsg %p, lntmsg %p"
+               " niov=%d kiov=%p offset=%d mlen=%d rlen=%d",
+               conn, rx, rxmsg, lntmsg, niov, kiov, offset, mlen, rlen);
                LBUG();
 
        case GNILND_MSG_IMMEDIATE:
@@ -2212,7 +2402,7 @@ kgnilnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
                                case 2:
                                        kgnilnd_dump_blob(D_BUFFS, "bad payload checksum",
                                                          &rxmsg[1], rxmsg->gnm_payload_len);
-                                       /* fall through to dump */
+                                       /* fallthrough */
                                case 1:
                                        libcfs_debug_dumplog();
                                        break;
@@ -2227,88 +2417,178 @@ kgnilnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
                        }
                }
 
-               if (kiov != NULL)
-                       lnet_copy_flat2kiov(
-                               niov, kiov, offset,
-                               *kgnilnd_tunables.kgn_max_immediate,
-                               &rxmsg[1], 0, mlen);
-               else
-                       lnet_copy_flat2iov(
-                               niov, iov, offset,
-                               *kgnilnd_tunables.kgn_max_immediate,
-                               &rxmsg[1], 0, mlen);
-
+               lnet_copy_flat2kiov(
+                       niov, kiov, offset,
+                       *kgnilnd_tunables.kgn_max_immediate,
+                       &rxmsg[1], 0, mlen);
+
+               kgnilnd_consume_rx(rx);
+               lnet_finalize(lntmsg, 0);
+               RETURN(0);
+
+       case GNILND_MSG_PUT_REQ:
+               /* LNET wants to truncate or drop transaction, sending NAK */
+               if (mlen == 0) {
+                       kgnilnd_consume_rx(rx);
+                       lnet_finalize(lntmsg, 0);
+
+                       /* only error if lntmsg == NULL, otherwise we are just
+                        * short circuiting the rdma process of 0 bytes */
+                       kgnilnd_nak_rdma(conn, rxmsg->gnm_type,
+                                       lntmsg == NULL ? -ENOENT : 0,
+                                       rxmsg->gnm_u.get.gngm_cookie,
+                                       ni->ni_nid);
+                       RETURN(0);
+               }
+               /* sending ACK with sink buff. info */
+               tx = kgnilnd_new_tx_msg(GNILND_MSG_PUT_ACK, ni->ni_nid);
+               if (tx == NULL) {
+                       kgnilnd_consume_rx(rx);
+                       RETURN(-ENOMEM);
+               }
+
+               rc = kgnilnd_set_tx_id(tx, conn);
+               if (rc != 0) {
+                       GOTO(nak_put_req, rc);
+               }
+
+               rc = kgnilnd_setup_rdma_buffer(tx, niov,
+                                              kiov, offset, mlen);
+               if (rc != 0) {
+                       GOTO(nak_put_req, rc);
+               }
+
+               tx->tx_msg.gnm_u.putack.gnpam_src_cookie =
+                       rxmsg->gnm_u.putreq.gnprm_cookie;
+               tx->tx_msg.gnm_u.putack.gnpam_dst_cookie = tx->tx_id.txe_cookie;
+               tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_addr =
+                       (__u64)((unsigned long)tx->tx_buffer);
+               tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_nob = mlen;
+
+               tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
+               tx->tx_qtime = jiffies;
+               /* we only queue from kgnilnd_recv - we might get called from other contexts
+                * and we don't want to block the mutex in those cases */
+
+               spin_lock(&tx->tx_conn->gnc_device->gnd_lock);
+               kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_MAPQ, 1);
+               spin_unlock(&tx->tx_conn->gnc_device->gnd_lock);
+               kgnilnd_schedule_device(tx->tx_conn->gnc_device);
+
+               kgnilnd_consume_rx(rx);
+               RETURN(0);
+
+nak_put_req:
+               /* make sure we send an error back when the PUT fails */
+               kgnilnd_nak_rdma(conn, rxmsg->gnm_type, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid);
+               kgnilnd_tx_done(tx, rc);
+               kgnilnd_consume_rx(rx);
+
+               /* return magic LNet network error */
+               RETURN(-EIO);
+       case GNILND_MSG_GET_REQ_REV:
+               /* LNET wants to truncate or drop transaction, sending NAK */
+               if (mlen == 0) {
+                       kgnilnd_consume_rx(rx);
+                       lnet_finalize(lntmsg, 0);
+
+                       /* only error if lntmsg == NULL, otherwise we are just
+                        * short circuiting the rdma process of 0 bytes */
+                       kgnilnd_nak_rdma(conn, rxmsg->gnm_type,
+                                       lntmsg == NULL ? -ENOENT : 0,
+                                       rxmsg->gnm_u.get.gngm_cookie,
+                                       ni->ni_nid);
+                       RETURN(0);
+               }
+               /* lntmsg can be null when parsing a LNET_GET */
+               if (lntmsg != NULL) {
+                       /* sending ACK with sink buff. info */
+                       tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_ACK_REV, ni->ni_nid);
+                       if (tx == NULL) {
+                               kgnilnd_consume_rx(rx);
+                               RETURN(-ENOMEM);
+                       }
+
+                       rc = kgnilnd_set_tx_id(tx, conn);
+                       if (rc != 0)
+                               GOTO(nak_get_req_rev, rc);
+
+                       rc = kgnilnd_setup_rdma_buffer(tx, niov,
+                                                      kiov, offset, mlen);
+                       if (rc != 0)
+                               GOTO(nak_get_req_rev, rc);
+
+                       tx->tx_msg.gnm_u.putack.gnpam_src_cookie =
+                               rxmsg->gnm_u.putreq.gnprm_cookie;
+                       tx->tx_msg.gnm_u.putack.gnpam_dst_cookie = tx->tx_id.txe_cookie;
+                       tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_addr =
+                               (__u64)((unsigned long)tx->tx_buffer);
+                       tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_nob = mlen;
+
+                       tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
+
+                       /* we only queue from kgnilnd_recv - we might get called from other contexts
+                        * and we don't want to block the mutex in those cases */
+
+                       spin_lock(&tx->tx_conn->gnc_device->gnd_lock);
+                       kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_MAPQ, 1);
+                       spin_unlock(&tx->tx_conn->gnc_device->gnd_lock);
+                       kgnilnd_schedule_device(tx->tx_conn->gnc_device);
+               } else {
+                       /* No match */
+                       kgnilnd_nak_rdma(conn, rxmsg->gnm_type,
+                                       -ENOENT,
+                                       rxmsg->gnm_u.get.gngm_cookie,
+                                       ni->ni_nid);
+               }
+
                kgnilnd_consume_rx(rx);
-               lnet_finalize(ni, lntmsg, 0);
                RETURN(0);
 
-       case GNILND_MSG_PUT_REQ:
+nak_get_req_rev:
+               /* make sure we send an error back when the GET fails */
+               kgnilnd_nak_rdma(conn, rxmsg->gnm_type, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid);
+               kgnilnd_tx_done(tx, rc);
+               kgnilnd_consume_rx(rx);
+
+               /* return magic LNet network error */
+               RETURN(-EIO);
+
+
+       case GNILND_MSG_PUT_REQ_REV:
                /* LNET wants to truncate or drop transaction, sending NAK */
                if (mlen == 0) {
                        kgnilnd_consume_rx(rx);
-                       lnet_finalize(ni, lntmsg, 0);
+                       lnet_finalize(lntmsg, 0);
 
                        /* only error if lntmsg == NULL, otherwise we are just
                         * short circuiting the rdma process of 0 bytes */
-                       kgnilnd_nak_rdma(conn, GNILND_MSG_PUT_NAK,
+                       kgnilnd_nak_rdma(conn, rxmsg->gnm_type,
                                        lntmsg == NULL ? -ENOENT : 0,
                                        rxmsg->gnm_u.get.gngm_cookie,
                                        ni->ni_nid);
                        RETURN(0);
                }
-               /* sending ACK with sink buff. info */
-               tx = kgnilnd_new_tx_msg(GNILND_MSG_PUT_ACK, ni->ni_nid);
-               if (tx == NULL) {
-                       kgnilnd_consume_rx(rx);
-                       RETURN(-ENOMEM);
-               }
-
-               rc = kgnilnd_set_tx_id(tx, conn);
-               if (rc != 0) {
-                       GOTO(nak_put_req, rc);
-               }
 
-               rc = kgnilnd_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
-               if (rc != 0) {
-                       GOTO(nak_put_req, rc);
+               if (lntmsg != NULL) {
+                       /* Matched! */
+                       kgnilnd_setup_rdma(ni, rx, lntmsg, mlen);
+               } else {
+                       /* No match */
+                       kgnilnd_nak_rdma(conn, rxmsg->gnm_type,
+                                       -ENOENT,
+                                       rxmsg->gnm_u.get.gngm_cookie,
+                                       ni->ni_nid);
                }
-
-               tx->tx_msg.gnm_u.putack.gnpam_src_cookie =
-                       rxmsg->gnm_u.putreq.gnprm_cookie;
-               tx->tx_msg.gnm_u.putack.gnpam_dst_cookie = tx->tx_id.txe_cookie;
-               tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_addr =
-                       (__u64)((unsigned long)tx->tx_buffer);
-               tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_nob = mlen;
-
-               tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
-
-               /* we only queue from kgnilnd_recv - we might get called from other contexts
-                * and we don't want to block the mutex in those cases */
-
-               spin_lock(&tx->tx_conn->gnc_device->gnd_lock);
-               kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_MAPQ, 1);
-               spin_unlock(&tx->tx_conn->gnc_device->gnd_lock);
-               kgnilnd_schedule_device(tx->tx_conn->gnc_device);
-
                kgnilnd_consume_rx(rx);
                RETURN(0);
-
-nak_put_req:
-               /* make sure we send an error back when the PUT fails */
-               kgnilnd_nak_rdma(conn, GNILND_MSG_PUT_NAK, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid);
-               kgnilnd_tx_done(tx, rc);
-               kgnilnd_consume_rx(rx);
-
-               /* return magic LNet network error */
-               RETURN(-EIO);
-
        case GNILND_MSG_GET_REQ:
                if (lntmsg != NULL) {
                        /* Matched! */
-                       kgnilnd_reply(ni, rx, lntmsg);
+                       kgnilnd_setup_rdma(ni, rx, lntmsg, mlen);
                } else {
                        /* No match */
-                       kgnilnd_nak_rdma(conn, GNILND_MSG_GET_NAK,
+                       kgnilnd_nak_rdma(conn, rxmsg->gnm_type,
                                        -ENOENT,
                                        rxmsg->gnm_u.get.gngm_cookie,
                                        ni->ni_nid);
@@ -2352,7 +2632,13 @@ kgnilnd_check_conn_timeouts_locked(kgn_conn_t *conn)
        newest_last_rx = GNILND_LASTRX(conn);
 
        if (time_after_eq(now, newest_last_rx + timeout)) {
-               GNIDBG_CONN(D_CONSOLE|D_NETERROR, conn, "No gnilnd traffic received from %s for %lu "
+               uint32_t level = D_CONSOLE|D_NETERROR;
+
+               if (conn->gnc_peer->gnp_state == GNILND_PEER_DOWN) {
+                       level = D_NET;
+               }
+                       GNIDBG_CONN(level, conn,
+                       "No gnilnd traffic received from %s for %lu "
                        "seconds, terminating connection. Is node down? ",
                        libcfs_nid2str(conn->gnc_peer->gnp_nid),
                        cfs_duration_sec(now - newest_last_rx));
@@ -2401,14 +2687,16 @@ kgnilnd_check_peer_timeouts_locked(kgn_peer_t *peer, struct list_head *todie,
        int                     rc = 0;
        int                     count = 0;
        int                     reconnect;
+       int                     to_reconn;
        short                   releaseconn = 0;
        unsigned long           first_rx = 0;
+       int                     purgatory_conn_cnt = 0;
 
        CDEBUG(D_NET, "checking peer 0x%p->%s for timeouts; interval %lus\n",
                peer, libcfs_nid2str(peer->gnp_nid),
                peer->gnp_reconnect_interval);
 
-       timeout = cfs_time_seconds(MAX(*kgnilnd_tunables.kgn_timeout,
+       timeout = cfs_time_seconds(max(*kgnilnd_tunables.kgn_timeout,
                                       GNILND_MIN_TIMEOUT));
 
        conn = kgnilnd_find_conn_locked(peer);
@@ -2422,6 +2710,14 @@ kgnilnd_check_peer_timeouts_locked(kgn_peer_t *peer, struct list_head *todie,
                                conn->gnc_close_recvd = GNILND_CLOSE_INJECT1;
                                conn->gnc_peer_error = -ETIMEDOUT;
                        }
+
+                       if (*kgnilnd_tunables.kgn_to_reconn_disable &&
+                           rc == -ETIMEDOUT) {
+                               peer->gnp_state = GNILND_PEER_TIMED_OUT;
+                               CDEBUG(D_WARNING, "%s conn timed out, will "
+                                      "reconnect upon request from peer\n",
+                                      libcfs_nid2str(conn->gnc_peer->gnp_nid));
+                       }
                        /* Once we mark closed, any of the scheduler threads could
                         * get it and move through before we hit the fail loc code */
                        kgnilnd_close_conn_locked(conn, rc);
@@ -2465,20 +2761,28 @@ kgnilnd_check_peer_timeouts_locked(kgn_peer_t *peer, struct list_head *todie,
        /* Don't reconnect if we are still trying to clear out old conns.
         * This prevents us sending traffic on the new mbox before ensuring we are done
         * with the old one */
-       reconnect = (atomic_read(&peer->gnp_dirty_eps) == 0);
+       reconnect = (peer->gnp_state == GNILND_PEER_UP) &&
+                   (atomic_read(&peer->gnp_dirty_eps) == 0);
+
+       /* fast reconnect after a timeout */
+       to_reconn = !conn &&
+                   (peer->gnp_last_errno == -ETIMEDOUT) &&
+                   *kgnilnd_tunables.kgn_fast_reconn;
 
        /* if we are not connected and there are tx on the gnp_tx_queue waiting
         * to be sent, we'll check the reconnect interval and fire up a new
         * connection request */
 
-       if ((peer->gnp_connecting == GNILND_PEER_IDLE) &&
+       if (reconnect &&
+           (peer->gnp_connecting == GNILND_PEER_IDLE) &&
            (time_after_eq(jiffies, peer->gnp_reconnect_time)) &&
-            !list_empty(&peer->gnp_tx_queue) && reconnect) {
+           (!list_empty(&peer->gnp_tx_queue) || to_reconn)) {
 
                CDEBUG(D_NET, "starting connect to %s\n",
                        libcfs_nid2str(peer->gnp_nid));
-               LASSERTF(peer->gnp_connecting == GNILND_PEER_IDLE, "Peer was idle and we"
-                       "have a write_lock, state issue %d\n", peer->gnp_connecting);
+               LASSERTF(peer->gnp_connecting == GNILND_PEER_IDLE,
+                        "Peer was idle and we have a write_lock, state issue %d\n",
+                        peer->gnp_connecting);
 
                peer->gnp_connecting = GNILND_PEER_CONNECT;
                kgnilnd_peer_addref(peer); /* extra ref for connd */
@@ -2501,8 +2805,8 @@ kgnilnd_check_peer_timeouts_locked(kgn_peer_t *peer, struct list_head *todie,
         */
        if (first_rx &&
                time_after(jiffies, first_rx + cfs_time_seconds(*kgnilnd_tunables.kgn_hardware_timeout))) {
-               CDEBUG(D_NET,"We can release conn %p from purgatory %lu\n",
-                      conn, first_rx + cfs_time_seconds(*kgnilnd_tunables.kgn_hardware_timeout));
+               CDEBUG(D_INFO, "We can release peer %s conn's from purgatory %lu\n",
+                       libcfs_nid2str(peer->gnp_nid), first_rx + cfs_time_seconds(*kgnilnd_tunables.kgn_hardware_timeout));
                releaseconn = 1;
        }
 
@@ -2541,11 +2845,33 @@ kgnilnd_check_peer_timeouts_locked(kgn_peer_t *peer, struct list_head *todie,
                                        cfs_duration_sec(waiting));
 
                                kgnilnd_detach_purgatory_locked(conn, souls);
+                       } else {
+                               purgatory_conn_cnt++;
                        }
                }
        }
 
-       return;
+       /* If we have too many connections in purgatory we could run out of
+        * resources. Limit the number of connections to a tunable number,
+        * clean up to the minimum all in one fell swoop... there are
+        * situations where dvs will retry tx's and we can eat up several
+        * hundread connection requests at once.
+        */
+       if (purgatory_conn_cnt > *kgnilnd_tunables.kgn_max_purgatory) {
+               list_for_each_entry_safe(conn, connN, &peer->gnp_conns,
+                                        gnc_list) {
+                       if (conn->gnc_in_purgatory &&
+                           conn->gnc_state == GNILND_CONN_DONE) {
+                               CDEBUG(D_NET, "Dropping Held resource due to"
+                                             " resource limits being hit\n");
+                               kgnilnd_detach_purgatory_locked(conn, souls);
+
+                               if (purgatory_conn_cnt-- <
+                                   *kgnilnd_tunables.kgn_max_purgatory)
+                                       break;
+                       }
+               }
+       }
 }
 
 void
@@ -2553,11 +2879,8 @@ kgnilnd_reaper_check(int idx)
 {
        struct list_head  *peers = &kgnilnd_data.kgn_peers[idx];
        struct list_head  *ctmp, *ctmpN;
-       struct list_head   geriatrics;
-       struct list_head   souls;
-
-       INIT_LIST_HEAD(&geriatrics);
-       INIT_LIST_HEAD(&souls);
+       LIST_HEAD(geriatrics);
+       LIST_HEAD(souls);
 
        write_lock(&kgnilnd_data.kgn_peer_conn_lock);
 
@@ -2593,7 +2916,7 @@ kgnilnd_update_reaper_timeout(long timeout)
 }
 
 static void
-kgnilnd_reaper_poke_with_stick(unsigned long arg)
+kgnilnd_reaper_poke_with_stick(cfs_timer_cb_arg_t arg)
 {
        wake_up(&kgnilnd_data.kgn_reaper_waitq);
 }
@@ -2609,9 +2932,6 @@ kgnilnd_reaper(void *arg)
        struct timer_list  timer;
        DEFINE_WAIT(wait);
 
-       cfs_daemonize("kgnilnd_rpr");
-       cfs_block_allsigs();
-
        /* all gnilnd threads need to run fairly urgently */
        set_user_nice(current, *kgnilnd_tunables.kgn_nice);
        spin_lock(&kgnilnd_data.kgn_reaper_lock);
@@ -2639,11 +2959,11 @@ kgnilnd_reaper(void *arg)
                        prepare_to_wait(&kgnilnd_data.kgn_reaper_waitq, &wait,
                                        TASK_INTERRUPTIBLE);
                        spin_unlock(&kgnilnd_data.kgn_reaper_lock);
-                       setup_timer(&timer, kgnilnd_reaper_poke_with_stick,
-                                   next_check_time);
+                       cfs_timer_setup(&timer, kgnilnd_reaper_poke_with_stick,
+                                       next_check_time, 0);
                        mod_timer(&timer, (long) jiffies + timeout);
 
-                       /* check flag variables before comitting */
+                       /* check flag variables before committing */
                        if (!kgnilnd_data.kgn_shutdown &&
                            !kgnilnd_data.kgn_quiesce_trigger) {
                                CDEBUG(D_INFO, "schedule timeout %ld (%lu sec)\n",
@@ -2696,6 +3016,25 @@ kgnilnd_reaper(void *arg)
 }
 
 int
+kgnilnd_recv_bte_get(kgn_tx_t *tx) {
+       unsigned niov, offset, nob;
+       struct bio_vec *kiov;
+       struct lnet_msg *lntmsg = tx->tx_lntmsg[0];
+       kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov, tx->tx_nob_rdma);
+
+       if (kiov != NULL) {
+               lnet_copy_flat2kiov(
+                       niov, kiov, offset,
+                       nob,
+                       tx->tx_buffer_copy + tx->tx_offset, 0, nob);
+       } else {
+               memcpy(tx->tx_buffer, tx->tx_buffer_copy + tx->tx_offset, nob);
+       }
+       return 0;
+}
+
+
+int
 kgnilnd_check_rdma_cq(kgn_device_t *dev)
 {
        gni_return_t           rrc;
@@ -2707,6 +3046,9 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev)
        long                   num_processed = 0;
        kgn_conn_t            *conn = NULL;
        kgn_tx_t              *tx = NULL;
+       kgn_rdma_desc_t       *rdesc;
+       unsigned int           rnob;
+       __u64                  rcookie;
 
        for (;;) {
                /* make sure we don't keep looping if we need to reset */
@@ -2729,7 +3071,7 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev)
                }
 
                if (rrc == GNI_RC_NOT_DONE) {
-                       mutex_unlock(&dev->gnd_cq_mutex);
+                       kgnilnd_gl_mutex_unlock(&dev->gnd_cq_mutex);
                        CDEBUG(D_INFO, "SEND RDMA CQ %d empty processed %ld\n",
                               dev->gnd_id, num_processed);
                        return num_processed;
@@ -2741,12 +3083,12 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev)
                        "this is bad, somehow our credits didn't protect us"
                        " from CQ overrun\n");
                LASSERTF(GNI_CQ_GET_TYPE(event_data) == GNI_CQ_EVENT_TYPE_POST,
-                       "rrc %d, GNI_CQ_GET_TYPE("LPX64") = "LPX64"\n", rrc,
+                       "rrc %d, GNI_CQ_GET_TYPE(%#llx) = %#llx\n", rrc,
                        event_data, GNI_CQ_GET_TYPE(event_data));
 
                rrc = kgnilnd_get_completed(dev->gnd_snd_rdma_cqh, event_data,
                                            &desc);
-               mutex_unlock(&dev->gnd_cq_mutex);
+               kgnilnd_gl_mutex_unlock(&dev->gnd_cq_mutex);
 
                /* XXX Nic: Need better error handling here... */
                LASSERTF((rrc == GNI_RC_SUCCESS) ||
@@ -2764,17 +3106,44 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev)
                }
 
                GNITX_ASSERTF(tx, tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE ||
-                       tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE,
+                       tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE ||
+                       tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE_REV ||
+                       tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV,
                        "tx %p with type %d\n", tx, tx->tx_msg.gnm_type);
 
                GNIDBG_TX(D_NET, tx, "RDMA completion for %d bytes", tx->tx_nob);
 
+               if (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV) {
+                       lnet_set_reply_msg_len(NULL, tx->tx_lntmsg[1],
+                                              tx->tx_msg.gnm_u.completion.gncm_retval);
+               }
+
+               rc = 0;
+               if (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV && desc->status == GNI_RC_SUCCESS) {
+                       if (tx->tx_buffer_copy != NULL)
+                               kgnilnd_recv_bte_get(tx);
+                       rc = kgnilnd_verify_rdma_cksum(tx, tx->tx_putinfo.gnpam_payload_cksum, tx->tx_nob_rdma);
+               }
+
+               if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE_REV && desc->status == GNI_RC_SUCCESS) {
+                       if (tx->tx_buffer_copy != NULL)
+                               kgnilnd_recv_bte_get(tx);
+                       rc = kgnilnd_verify_rdma_cksum(tx, tx->tx_getinfo.gngm_payload_cksum, tx->tx_nob_rdma);
+               }
+
                /* remove from rdmaq */
+               kgnilnd_conn_mutex_lock(&conn->gnc_rdma_mutex);
                spin_lock(&conn->gnc_list_lock);
                kgnilnd_tx_del_state_locked(tx, NULL, conn, GNILND_TX_ALLOCD);
                spin_unlock(&conn->gnc_list_lock);
+               kgnilnd_conn_mutex_unlock(&conn->gnc_rdma_mutex);
 
-               if (likely(desc->status == GNI_RC_SUCCESS)) {
+               if (CFS_FAIL_CHECK(CFS_FAIL_GNI_RDMA_CQ_ERROR)) {
+                       event_data = 1LL << 48;
+                       rc = 1;
+               }
+
+               if (likely(desc->status == GNI_RC_SUCCESS) && rc == 0) {
                        atomic_inc(&dev->gnd_rdma_ntx);
                        atomic64_add(tx->tx_nob, &dev->gnd_rdma_txbytes);
                        /* transaction succeeded, add into fmaq */
@@ -2782,7 +3151,9 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev)
                        kgnilnd_peer_alive(conn->gnc_peer);
 
                        /* drop ref from kgnilnd_validate_tx_ev_id */
+                       kgnilnd_admin_decref(conn->gnc_tx_in_use);
                        kgnilnd_conn_decref(conn);
+
                        continue;
                }
 
@@ -2804,35 +3175,34 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev)
                GNIDBG_TX(D_NETERROR, tx, "RDMA %s error (%s)",
                        should_retry ? "transient" : "unrecoverable", err_str);
 
-               if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE) {
-                       if (should_retry) {
-                               kgnilnd_rdma(tx, GNILND_MSG_PUT_DONE,
-                                            &tx->tx_putinfo.gnpam_desc,
-                                            tx->tx_putinfo.gnpam_desc.gnrd_nob,
-                                            tx->tx_putinfo.gnpam_dst_cookie);
-                       } else {
-                               kgnilnd_nak_rdma(conn, GNILND_MSG_PUT_NAK,
-                                               -EFAULT,
-                                               tx->tx_putinfo.gnpam_dst_cookie,
-                                               tx->tx_msg.gnm_srcnid);
-                               kgnilnd_tx_done(tx, -EFAULT);
-                       }
+               if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE ||
+                   tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV) {
+                       rdesc    = &tx->tx_putinfo.gnpam_desc;
+                       rnob     = tx->tx_putinfo.gnpam_desc.gnrd_nob;
+                       rcookie  = tx->tx_putinfo.gnpam_dst_cookie;
                } else {
-                       if (should_retry) {
-                               kgnilnd_rdma(tx, GNILND_MSG_GET_DONE,
-                                            &tx->tx_getinfo.gngm_desc,
-                                            tx->tx_lntmsg[0]->msg_len,
-                                            tx->tx_getinfo.gngm_cookie);
-                       } else {
-                               kgnilnd_nak_rdma(conn, GNILND_MSG_GET_NAK,
-                                               -EFAULT,
-                                               tx->tx_getinfo.gngm_cookie,
-                                               tx->tx_msg.gnm_srcnid);
-                               kgnilnd_tx_done(tx, -EFAULT);
-                       }
+                       rdesc    = &tx->tx_getinfo.gngm_desc;
+                       rnob     = tx->tx_lntmsg[0]->msg_len;
+                       rcookie  = tx->tx_getinfo.gngm_cookie;
+               }
+
+               if (should_retry) {
+                       kgnilnd_rdma(tx,
+                                    tx->tx_msg.gnm_type,
+                                    rdesc,
+                                    rnob, rcookie);
+               } else {
+                       kgnilnd_nak_rdma(conn,
+                                        tx->tx_msg.gnm_type,
+                                        -EFAULT,
+                                        rcookie,
+                                        tx->tx_msg.gnm_srcnid);
+                       kgnilnd_tx_done(tx, -GNILND_NOPURG);
+                       kgnilnd_close_conn(conn, -ECOMM);
                }
 
                /* drop ref from kgnilnd_validate_tx_ev_id */
+               kgnilnd_admin_decref(conn->gnc_tx_in_use);
                kgnilnd_conn_decref(conn);
        }
 }
@@ -2847,6 +3217,7 @@ kgnilnd_check_fma_send_cq(kgn_device_t *dev)
        kgn_conn_t            *conn = NULL;
        int                    queued_fma, saw_reply, rc;
        long                   num_processed = 0;
+       struct list_head      *ctmp, *ctmpN;
 
        for (;;) {
                /* make sure we don't keep looping if we need to reset */
@@ -2862,13 +3233,29 @@ kgnilnd_check_fma_send_cq(kgn_device_t *dev)
                }
 
                rrc = kgnilnd_cq_get_event(dev->gnd_snd_fma_cqh, &event_data);
-               mutex_unlock(&dev->gnd_cq_mutex);
+               kgnilnd_gl_mutex_unlock(&dev->gnd_cq_mutex);
 
                if (rrc == GNI_RC_NOT_DONE) {
                        CDEBUG(D_INFO,
-                              "SMSG send CQ %d not ready (data "LPX64") "
+                              "SMSG send CQ %d not ready (data %#llx) "
                               "processed %ld\n", dev->gnd_id, event_data,
                               num_processed);
+
+                       if (num_processed > 0) {
+                               spin_lock(&dev->gnd_lock);
+                               if (!list_empty(&dev->gnd_delay_conns)) {
+                                       list_for_each_safe(ctmp, ctmpN, &dev->gnd_delay_conns) {
+                                               conn = list_entry(ctmp, kgn_conn_t, gnc_delaylist);
+                                               list_del_init(&conn->gnc_delaylist);
+                                               CDEBUG(D_NET, "Moving Conn %p from delay queue to ready_queue\n", conn);
+                                               kgnilnd_schedule_conn_nolock(conn);
+                                       }
+                                       spin_unlock(&dev->gnd_lock);
+                                       kgnilnd_schedule_device(dev);
+                               } else {
+                                       spin_unlock(&dev->gnd_lock);
+                               }
+                       }
                        return num_processed;
                }
 
@@ -2879,7 +3266,7 @@ kgnilnd_check_fma_send_cq(kgn_device_t *dev)
                        "this is bad, somehow our credits didn't "
                        "protect us from CQ overrun\n");
                LASSERTF(GNI_CQ_GET_TYPE(event_data) == GNI_CQ_EVENT_TYPE_SMSG,
-                       "rrc %d, GNI_CQ_GET_TYPE("LPX64") = "LPX64"\n", rrc,
+                       "rrc %d, GNI_CQ_GET_TYPE(%#llx) = %#llx\n", rrc,
                        event_data, GNI_CQ_GET_TYPE(event_data));
 
                /* if SMSG couldn't handle an error, time for conn to die */
@@ -2893,7 +3280,7 @@ kgnilnd_check_fma_send_cq(kgn_device_t *dev)
                        if (conn == NULL) {
                                /* Conn was destroyed? */
                                CDEBUG(D_NET,
-                                       "SMSG CQID lookup "LPX64" failed\n",
+                                       "SMSG CQID lookup %#llx failed\n",
                                        GNI_CQ_GET_INST_ID(event_data));
                                write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
                                continue;
@@ -2928,6 +3315,7 @@ kgnilnd_check_fma_send_cq(kgn_device_t *dev)
                }
 
                /* lock tx_list_state and tx_state */
+               kgnilnd_conn_mutex_lock(&conn->gnc_smsg_mutex);
                spin_lock(&tx->tx_conn->gnc_list_lock);
 
                GNITX_ASSERTF(tx, tx->tx_list_state == GNILND_TX_LIVE_FMAQ,
@@ -2948,6 +3336,7 @@ kgnilnd_check_fma_send_cq(kgn_device_t *dev)
                saw_reply = !(tx->tx_state & GNILND_TX_WAITING_REPLY);
 
                spin_unlock(&tx->tx_conn->gnc_list_lock);
+               kgnilnd_conn_mutex_unlock(&conn->gnc_smsg_mutex);
 
                if (queued_fma) {
                        CDEBUG(D_NET, "scheduling conn 0x%p->%s for fmaq\n",
@@ -2985,6 +3374,7 @@ kgnilnd_check_fma_send_cq(kgn_device_t *dev)
                }
 
                /* drop ref from kgnilnd_validate_tx_ev_id */
+               kgnilnd_admin_decref(conn->gnc_tx_in_use);
                kgnilnd_conn_decref(conn);
 
                /* if we are waiting for a REPLY, we'll handle the tx then */
@@ -3015,10 +3405,10 @@ kgnilnd_check_fma_rcv_cq(kgn_device_t *dev)
                        return 1;
                }
                rrc = kgnilnd_cq_get_event(dev->gnd_rcv_fma_cqh, &event_data);
-               mutex_unlock(&dev->gnd_cq_mutex);
+               kgnilnd_gl_mutex_unlock(&dev->gnd_cq_mutex);
 
                if (rrc == GNI_RC_NOT_DONE) {
-                       CDEBUG(D_INFO, "SMSG RX CQ %d empty data "LPX64" "
+                       CDEBUG(D_INFO, "SMSG RX CQ %d empty data %#llx "
                                "processed %ld\n",
                                dev->gnd_id, event_data, num_processed);
                        return num_processed;
@@ -3035,14 +3425,13 @@ kgnilnd_check_fma_rcv_cq(kgn_device_t *dev)
                                /* set overrun too */
                                event_data |= (1UL << 63);
                                LASSERTF(GNI_CQ_OVERRUN(event_data),
-                                        "(1UL << 63) is no longer the bit to"
-                                        "set to indicate CQ_OVERRUN\n");
+                                        "(1UL << 63) is no longer the bit to set to indicate CQ_OVERRUN\n");
                        }
                }
                /* sender should get error event too and take care
                of failed transaction by re-transmitting */
                if (rrc == GNI_RC_TRANSACTION_ERROR) {
-                       CDEBUG(D_NET, "SMSG RX CQ error "LPX64"\n", event_data);
+                       CDEBUG(D_NET, "SMSG RX CQ error %#llx\n", event_data);
                        continue;
                }
 
@@ -3051,12 +3440,12 @@ kgnilnd_check_fma_rcv_cq(kgn_device_t *dev)
                        conn = kgnilnd_cqid2conn_locked(
                                                 GNI_CQ_GET_INST_ID(event_data));
                        if (conn == NULL) {
-                               CDEBUG(D_NET, "SMSG RX CQID lookup "LPU64" "
-                                       "failed, dropping event "LPX64"\n",
+                               CDEBUG(D_NET, "SMSG RX CQID lookup %llu "
+                                       "failed, dropping event %#llx\n",
                                        GNI_CQ_GET_INST_ID(event_data),
                                        event_data);
                        } else {
-                               CDEBUG(D_NET, "SMSG RX: CQID "LPU64" "
+                               CDEBUG(D_NET, "SMSG RX: CQID %llu "
                                       "conn %p->%s\n",
                                        GNI_CQ_GET_INST_ID(event_data),
                                        conn, conn->gnc_peer ?
@@ -3126,7 +3515,8 @@ kgnilnd_send_mapped_tx(kgn_tx_t *tx, int try_map_if_full)
                rc = kgnilnd_map_buffer(tx);
        }
 
-       /* rc should be 0 if we mapped succesfully here, if non-zero we are queueing */
+       /* rc should be 0 if we mapped successfully here, if non-zero
+        * we are queueing */
        if (rc != 0) {
                /* if try_map_if_full set, they handle requeuing */
                if (unlikely(try_map_if_full)) {
@@ -3150,7 +3540,12 @@ kgnilnd_send_mapped_tx(kgn_tx_t *tx, int try_map_if_full)
         * remote node where the RDMA will be started
         * Special case -EAGAIN logic - this should just queued as if the mapping couldn't
         * be satisified. The rest of the errors are "hard" errors that require
-        * upper layers to handle themselves */
+        * upper layers to handle themselves.
+        * If kgnilnd_post_rdma returns a resource error, kgnilnd_rdma will put
+        * the tx back on the TX_MAPQ. When this tx is pulled back off the MAPQ,
+        * it's gnm_type will now be GNILND_MSG_PUT_DONE or
+        * GNILND_MSG_GET_DONE_REV.
+        */
        case GNILND_MSG_GET_REQ:
                tx->tx_msg.gnm_u.get.gngm_desc.gnrd_key = tx->tx_map_key;
                tx->tx_msg.gnm_u.get.gngm_cookie = tx->tx_id.txe_cookie;
@@ -3174,18 +3569,59 @@ kgnilnd_send_mapped_tx(kgn_tx_t *tx, int try_map_if_full)
                break;
 
        /* PUT_REQ and GET_DONE are where we do the actual RDMA */
+       case GNILND_MSG_PUT_DONE:
        case GNILND_MSG_PUT_REQ:
-               kgnilnd_rdma(tx, GNILND_MSG_PUT_DONE,
+               rc = kgnilnd_rdma(tx, GNILND_MSG_PUT_DONE,
                             &tx->tx_putinfo.gnpam_desc,
                             tx->tx_putinfo.gnpam_desc.gnrd_nob,
                             tx->tx_putinfo.gnpam_dst_cookie);
+               RETURN(try_map_if_full ? rc : 0);
                break;
        case GNILND_MSG_GET_DONE:
-               kgnilnd_rdma(tx, GNILND_MSG_GET_DONE,
+               rc = kgnilnd_rdma(tx, GNILND_MSG_GET_DONE,
                             &tx->tx_getinfo.gngm_desc,
                             tx->tx_lntmsg[0]->msg_len,
                             tx->tx_getinfo.gngm_cookie);
+               RETURN(try_map_if_full ? rc : 0);
+               break;
+       case GNILND_MSG_PUT_REQ_REV:
+               tx->tx_msg.gnm_u.get.gngm_desc.gnrd_key = tx->tx_map_key;
+               tx->tx_msg.gnm_u.get.gngm_cookie = tx->tx_id.txe_cookie;
+               tx->tx_msg.gnm_u.get.gngm_desc.gnrd_addr = (__u64)((unsigned long)tx->tx_buffer);
+               tx->tx_msg.gnm_u.get.gngm_desc.gnrd_nob = tx->tx_nob;
+               tx->tx_state = GNILND_TX_WAITING_COMPLETION | GNILND_TX_WAITING_REPLY;
+               kgnilnd_compute_rdma_cksum(tx, tx->tx_nob);
+               tx->tx_msg.gnm_u.get.gngm_payload_cksum = tx->tx_msg.gnm_payload_cksum;
+
+               rc = kgnilnd_sendmsg(tx, NULL, 0, &tx->tx_conn->gnc_list_lock, GNILND_TX_FMAQ);
+               break;
+       case GNILND_MSG_PUT_DONE_REV:
+               rc = kgnilnd_rdma(tx, GNILND_MSG_PUT_DONE_REV,
+                            &tx->tx_getinfo.gngm_desc,
+                            tx->tx_nob,
+                            tx->tx_getinfo.gngm_cookie);
+               RETURN(try_map_if_full ? rc : 0);
+               break;
+       case GNILND_MSG_GET_ACK_REV:
+               tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_key = tx->tx_map_key;
+               tx->tx_state = GNILND_TX_WAITING_COMPLETION | GNILND_TX_WAITING_REPLY;
+               /* LNET_GETS are a special case for parse */
+               kgnilnd_compute_rdma_cksum(tx, tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_nob);
+               tx->tx_msg.gnm_u.putack.gnpam_payload_cksum = tx->tx_msg.gnm_payload_cksum;
+
+               if (CFS_FAIL_CHECK(CFS_FAIL_GNI_PUT_ACK_AGAIN))
+                       tx->tx_state |= GNILND_TX_FAIL_SMSG;
 
+               /* redirect to FMAQ on failure, no need to infinite loop here in MAPQ */
+               rc = kgnilnd_sendmsg(tx, NULL, 0, &tx->tx_conn->gnc_list_lock, GNILND_TX_FMAQ);
+               break;
+       case GNILND_MSG_GET_DONE_REV:
+       case GNILND_MSG_GET_REQ_REV:
+               rc = kgnilnd_rdma(tx, GNILND_MSG_GET_DONE_REV,
+                               &tx->tx_putinfo.gnpam_desc,
+                               tx->tx_putinfo.gnpam_desc.gnrd_nob,
+                               tx->tx_putinfo.gnpam_dst_cookie);
+               RETURN(try_map_if_full ? rc : 0);
                break;
        }
 
@@ -3269,7 +3705,7 @@ kgnilnd_process_fmaq(kgn_conn_t *conn)
        GNITX_ASSERTF(tx, tx->tx_id.txe_smsg_id != 0,
                      "tx with zero id", NULL);
 
-       CDEBUG(D_NET, "sending regular msg: %p, type %s(0x%02x), cookie "LPX64"\n",
+       CDEBUG(D_NET, "sending regular msg: %p, type %s(0x%02x), cookie %#llx\n",
               tx, kgnilnd_msgtype2str(tx->tx_msg.gnm_type),
               tx->tx_msg.gnm_type, tx->tx_id.txe_cookie);
 
@@ -3289,15 +3725,22 @@ kgnilnd_process_fmaq(kgn_conn_t *conn)
 
        case GNILND_MSG_GET_DONE:
        case GNILND_MSG_PUT_DONE:
+       case GNILND_MSG_PUT_DONE_REV:
+       case GNILND_MSG_GET_DONE_REV:
        case GNILND_MSG_PUT_NAK:
        case GNILND_MSG_GET_NAK:
+       case GNILND_MSG_GET_NAK_REV:
+       case GNILND_MSG_PUT_NAK_REV:
                tx->tx_state = GNILND_TX_WAITING_COMPLETION;
                break;
 
        case GNILND_MSG_PUT_REQ:
+       case GNILND_MSG_GET_REQ_REV:
                tx->tx_msg.gnm_u.putreq.gnprm_cookie = tx->tx_id.txe_cookie;
-
+               /* fallthrough */
        case GNILND_MSG_PUT_ACK:
+       case GNILND_MSG_PUT_REQ_REV:
+       case GNILND_MSG_GET_ACK_REV:
        case GNILND_MSG_GET_REQ:
                /* This is really only to handle the retransmit of SMSG once these
                 * two messages are setup in send_mapped_tx */
@@ -3374,10 +3817,9 @@ kgnilnd_process_rdmaq(kgn_device_t *dev)
                        new_ok -= atomic64_read(&dev->gnd_rdmaq_bytes_out);
                        atomic64_set(&dev->gnd_rdmaq_bytes_ok, new_ok);
 
-                       CDEBUG(D_NET, "resetting rdmaq bytes to %ld, deadline +%lu -> %lu, "
-                                      "current out %ld\n",
-                              atomic64_read(&dev->gnd_rdmaq_bytes_ok), dead_bump, dev->gnd_rdmaq_deadline,
-                              atomic64_read(&dev->gnd_rdmaq_bytes_out));
+                       CDEBUG(D_NET, "resetting rdmaq bytes to %lld, deadline +%lu -> %lu, current out %lld\n",
+                              (s64)atomic64_read(&dev->gnd_rdmaq_bytes_ok), dead_bump, dev->gnd_rdmaq_deadline,
+                              (s64)atomic64_read(&dev->gnd_rdmaq_bytes_out));
                }
                spin_unlock(&dev->gnd_rdmaq_lock);
        }
@@ -3460,8 +3902,8 @@ _kgnilnd_match_reply(kgn_conn_t *conn, int type1, int type2, __u64 cookie)
                GNITX_ASSERTF(tx, ((tx->tx_id.txe_idx == ev_id.txe_idx) &&
                                  (tx->tx_id.txe_cookie = cookie)),
                              "conn 0x%p->%s tx_ref_table hosed: wanted "
-                             "txe_cookie "LPX64" txe_idx %d "
-                             "found tx %p cookie "LPX64" txe_idx %d\n",
+                             "txe_cookie %#llx txe_idx %d "
+                             "found tx %p cookie %#llx txe_idx %d\n",
                              conn, libcfs_nid2str(conn->gnc_peer->gnp_nid),
                              cookie, ev_id.txe_idx,
                              tx, tx->tx_id.txe_cookie, tx->tx_id.txe_idx);
@@ -3475,7 +3917,7 @@ _kgnilnd_match_reply(kgn_conn_t *conn, int type1, int type2, __u64 cookie)
                        tx->tx_state, GNILND_TX_WAITING_REPLY,
                        libcfs_nid2str(conn->gnc_peer->gnp_nid));
        } else {
-               CWARN("Unmatched reply %02x, or %02x/"LPX64" from %s\n",
+               CWARN("Unmatched reply %02x, or %02x/%#llx from %s\n",
                      type1, type2, cookie, libcfs_nid2str(conn->gnc_peer->gnp_nid));
        }
        return tx;
@@ -3486,6 +3928,13 @@ kgnilnd_complete_tx(kgn_tx_t *tx, int rc)
 {
        int             complete = 0;
        kgn_conn_t      *conn = tx->tx_conn;
+       __u64 nob = tx->tx_nob;
+       __u32 physnop = tx->tx_phys_npages;
+       int   id = tx->tx_id.txe_smsg_id;
+       int buftype = tx->tx_buftype;
+       gni_mem_handle_t hndl;
+       hndl.qword1 = tx->tx_map_key.qword1;
+       hndl.qword2 = tx->tx_map_key.qword2;
 
        spin_lock(&conn->gnc_list_lock);
 
@@ -3495,6 +3944,22 @@ kgnilnd_complete_tx(kgn_tx_t *tx, int rc)
        tx->tx_rc = rc;
        tx->tx_state &= ~GNILND_TX_WAITING_REPLY;
 
+       if (rc == -EFAULT) {
+               CDEBUG(D_NETERROR, "Error %d TX data: TX %p tx_id %x nob %16llu physnop %8d buffertype %#8x MemHandle %#llx.%#llxx\n",
+                       rc, tx, id, nob, physnop, buftype, hndl.qword1, hndl.qword2);
+
+               if(*kgnilnd_tunables.kgn_efault_lbug) {
+                       GNIDBG_TOMSG(D_NETERROR, &tx->tx_msg,
+                       "error %d on tx 0x%p->%s id %u/%d state %s age %ds",
+                       rc, tx, conn ?
+                       libcfs_nid2str(conn->gnc_peer->gnp_nid) : "<?>",
+                       tx->tx_id.txe_smsg_id, tx->tx_id.txe_idx,
+                       kgnilnd_tx_state2str(tx->tx_list_state),
+                       cfs_duration_sec((unsigned long) jiffies - tx->tx_qtime));
+                       LBUG();
+               }
+       }
+
        if (!(tx->tx_state & GNILND_TX_WAITING_COMPLETION)) {
                kgnilnd_tx_del_state_locked(tx, NULL, conn, GNILND_TX_ALLOCD);
                /* sample under lock as follow on steps require gnc_list_lock
@@ -3518,7 +3983,9 @@ kgnilnd_finalize_rx_done(kgn_tx_t *tx, kgn_msg_t *msg)
        atomic_inc(&conn->gnc_device->gnd_rdma_nrx);
        atomic64_add(tx->tx_nob, &conn->gnc_device->gnd_rdma_rxbytes);
 
-       rc = kgnilnd_verify_rdma_cksum(tx, msg->gnm_payload_cksum);
+       /* the gncm_retval is passed in for PUTs */
+       rc = kgnilnd_verify_rdma_cksum(tx, msg->gnm_payload_cksum,
+                                      msg->gnm_u.completion.gncm_retval);
 
        kgnilnd_complete_tx(tx, rc);
 }
@@ -3539,7 +4006,6 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
        int           repost = 1, saw_complete;
        unsigned long timestamp, newest_last_rx, timeout;
        int           last_seq;
-       void         *memory = NULL;
        ENTRY;
 
        /* Short circuit if the ep_handle is null.
@@ -3549,7 +4015,7 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
                RETURN_EXIT;
 
        timestamp = jiffies;
-       mutex_lock(&conn->gnc_device->gnd_cq_mutex);
+       kgnilnd_gl_mutex_lock(&conn->gnc_device->gnd_cq_mutex);
        /* delay in jiffies - we are really concerned only with things that
         * result in a schedule() or really holding this off for long times .
         * NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */
@@ -3578,7 +4044,7 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
                libcfs_nid2str(conn->gnc_peer->gnp_nid),
                cfs_duration_sec(timestamp - newest_last_rx),
                cfs_duration_sec(GNILND_TIMEOUTRX(timeout)));
-               mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+               kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
                rc = -ETIME;
                kgnilnd_close_conn(conn, rc);
                RETURN_EXIT;
@@ -3587,40 +4053,49 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
        rrc = kgnilnd_smsg_getnext(conn->gnc_ephandle, &prefix);
 
        if (rrc == GNI_RC_NOT_DONE) {
-               mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
-               CDEBUG(D_INFO, "SMSG RX empty\n");
+               kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+               CDEBUG(D_INFO, "SMSG RX empty conn 0x%p\n", conn);
                RETURN_EXIT;
        }
 
+       /* Instead of asserting when we get mailbox corruption lets attempt to
+        * close the conn and recover. We can put the conn/mailbox into
+        * purgatory and let purgatory deal with the problem. If we see
+        * this NETTERROR reported on production systems in large amounts
+        * we will need to revisit the state machine to see if we can tighten
+        * it up further to improve data protection.
+        */
+
        if (rrc == GNI_RC_INVALID_STATE) {
-               LIBCFS_ALLOC(memory, conn->gnpr_smsg_attr.buff_size);
-               if (memory == NULL) {
-                       memory = (void *)0xdeadbeef;
-               } else {
-                       memcpy(memory, conn->gnpr_smsg_attr.msg_buffer + conn->gnpr_smsg_attr.mbox_offset, conn->gnpr_smsg_attr.buff_size);
-               }
+               kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+               GNIDBG_CONN(D_NETERROR | D_CONSOLE, conn, "Mailbox corruption "
+                       "detected closing conn %p from peer %s\n", conn,
+                       libcfs_nid2str(conn->gnc_peer->gnp_nid));
+               rc = -EIO;
+               kgnilnd_close_conn(conn, rc);
+               RETURN_EXIT;
        }
 
        LASSERTF(rrc == GNI_RC_SUCCESS,
-               "bad rc %d on conn %p from peer %s mailbox copy %p\n",
-                rrc, conn, libcfs_nid2str(peer->gnp_nid), memory);
+               "bad rc %d on conn %p from peer %s\n",
+               rrc, conn, libcfs_nid2str(peer->gnp_nid));
 
        msg = (kgn_msg_t *)prefix;
 
        rx = kgnilnd_alloc_rx();
        if (rx == NULL) {
-               mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+               kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
                kgnilnd_release_msg(conn);
                GNIDBG_MSG(D_NETERROR, msg, "Dropping SMSG RX from 0x%p->%s, no RX memory",
                           conn, libcfs_nid2str(peer->gnp_nid));
                RETURN_EXIT;
        }
 
-       GNIDBG_MSG(D_INFO, msg, "SMSG RX on %p from %s",
-               conn, libcfs_nid2str(peer->gnp_nid));
+       GNIDBG_MSG(D_INFO, msg, "SMSG RX on %p", conn);
 
        timestamp = conn->gnc_last_rx;
-       last_seq = conn->gnc_rx_seq;
+       seq = last_seq = atomic_read(&conn->gnc_rx_seq);
+       atomic_inc(&conn->gnc_rx_seq);
 
        conn->gnc_last_rx = jiffies;
        /* stash first rx so we can clear out purgatory
@@ -3628,16 +4103,14 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
        if (conn->gnc_first_rx == 0)
                conn->gnc_first_rx = jiffies;
 
-       seq = conn->gnc_rx_seq++;
-
        /* needs to linger to protect gnc_rx_seq like we do with gnc_tx_seq */
-       mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+       kgnilnd_gl_mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
        kgnilnd_peer_alive(conn->gnc_peer);
 
        rx->grx_msg = msg;
        rx->grx_conn = conn;
        rx->grx_eager = 0;
-       rx->grx_received = current_kernel_time();
+       ktime_get_ts64(&rx->grx_received);
 
        if (CFS_FAIL_CHECK(CFS_FAIL_GNI_NET_LOOKUP)) {
                rc = -ENONET;
@@ -3688,10 +4161,12 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
 
                /* NB message type checked below; NOT here... */
                switch (msg->gnm_type) {
+               case GNILND_MSG_GET_ACK_REV:
                case GNILND_MSG_PUT_ACK:
                        kgnilnd_swab_rdma_desc(&msg->gnm_u.putack.gnpam_desc);
                        break;
 
+               case GNILND_MSG_PUT_REQ_REV:
                case GNILND_MSG_GET_REQ:
                        kgnilnd_swab_rdma_desc(&msg->gnm_u.get.gngm_desc);
                        break;
@@ -3717,7 +4192,7 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
        }
 
        if (msg->gnm_connstamp != conn->gnc_peer_connstamp) {
-               GNIDBG_MSG(D_NETERROR, msg, "Unexpected connstamp "LPX64"("LPX64
+               GNIDBG_MSG(D_NETERROR, msg, "Unexpected connstamp %#llx(%#llx"
                       " expected) from %s",
                       msg->gnm_connstamp, conn->gnc_peer_connstamp,
                       libcfs_nid2str(peer->gnp_nid));
@@ -3753,7 +4228,7 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
                                       conn, last_seq,
                                       cfs_duration_sec(now - timestamp),
                                       cfs_duration_sec(now - conn->gnc_last_rx_cq),
-                                      conn->gnc_tx_seq,
+                                      atomic_read(&conn->gnc_tx_seq),
                                       cfs_duration_sec(now - conn->gnc_last_tx),
                                       cfs_duration_sec(now - conn->gnc_last_tx_cq),
                                       cfs_duration_sec(now - conn->gnc_last_noop_want),
@@ -3795,13 +4270,20 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
                                msg->gnm_srcnid, rx, 0);
                repost = rc < 0;
                break;
-
+       case GNILND_MSG_GET_REQ_REV:
        case GNILND_MSG_PUT_REQ:
                rc = lnet_parse(net->gnn_ni, &msg->gnm_u.putreq.gnprm_hdr,
                                msg->gnm_srcnid, rx, 1);
                repost = rc < 0;
                break;
+       case GNILND_MSG_GET_NAK_REV:
+               tx = kgnilnd_match_reply_either(conn, GNILND_MSG_GET_REQ_REV, GNILND_MSG_GET_ACK_REV,
+                                       msg->gnm_u.completion.gncm_cookie);
+               if (tx == NULL)
+                       break;
 
+               kgnilnd_complete_tx(tx, msg->gnm_u.completion.gncm_retval);
+               break;
        case GNILND_MSG_PUT_NAK:
                tx = kgnilnd_match_reply_either(conn, GNILND_MSG_PUT_REQ, GNILND_MSG_PUT_ACK,
                                        msg->gnm_u.completion.gncm_cookie);
@@ -3810,7 +4292,6 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
 
                kgnilnd_complete_tx(tx, msg->gnm_u.completion.gncm_retval);
                break;
-
        case GNILND_MSG_PUT_ACK:
                tx = kgnilnd_match_reply(conn, GNILND_MSG_PUT_REQ,
                                        msg->gnm_u.putack.gnpam_src_cookie);
@@ -3848,20 +4329,54 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
                                kgnilnd_tx_done(tx, rc);
                }
                break;
+       case GNILND_MSG_GET_ACK_REV:
+               tx = kgnilnd_match_reply(conn, GNILND_MSG_GET_REQ_REV,
+                                       msg->gnm_u.putack.gnpam_src_cookie);
+               if (tx == NULL)
+                       break;
+
+               /* store putack data for later: deferred rdma or re-try */
+               tx->tx_putinfo = msg->gnm_u.putack;
+               saw_complete = 0;
+               spin_lock(&tx->tx_conn->gnc_list_lock);
 
+               GNITX_ASSERTF(tx, tx->tx_state & GNILND_TX_WAITING_REPLY,
+                       "not waiting for reply", NULL);
+
+               tx->tx_state &= ~GNILND_TX_WAITING_REPLY;
+
+               if (likely(!(tx->tx_state & GNILND_TX_WAITING_COMPLETION))) {
+                       kgnilnd_tx_del_state_locked(tx, NULL, conn, GNILND_TX_ALLOCD);
+                       /* sample under lock as follow on steps require gnc_list_lock
+                        * - or call kgnilnd_tx_done which requires no locks held over
+                        *   call to lnet_finalize */
+                       saw_complete = 1;
+               } else {
+                       /* cannot launch rdma if still waiting for fma-msg completion */
+                       CDEBUG(D_NET, "tx 0x%p type 0x%02x will need to "
+                                       "wait for SMSG completion\n", tx, tx->tx_msg.gnm_type);
+                       tx->tx_state |= GNILND_TX_PENDING_RDMA;
+               }
+               spin_unlock(&tx->tx_conn->gnc_list_lock);
+
+               if (saw_complete) {
+                       rc = kgnilnd_send_mapped_tx(tx, 0);
+                       if (rc < 0)
+                               kgnilnd_tx_done(tx, rc);
+               }
+               break;
        case GNILND_MSG_PUT_DONE:
                tx = kgnilnd_match_reply(conn, GNILND_MSG_PUT_ACK,
                                        msg->gnm_u.completion.gncm_cookie);
                if (tx == NULL)
                        break;
 
-               GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED ||
-                              tx->tx_buftype == GNILND_BUF_VIRT_MAPPED,
+               GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED,
                               "bad tx buftype %d", tx->tx_buftype);
 
                kgnilnd_finalize_rx_done(tx, msg);
                break;
-
+       case GNILND_MSG_PUT_REQ_REV:
        case GNILND_MSG_GET_REQ:
                rc = lnet_parse(net->gnn_ni, &msg->gnm_u.get.gngm_hdr,
                                msg->gnm_srcnid, rx, 1);
@@ -3874,8 +4389,7 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
                if (tx == NULL)
                        break;
 
-               GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED ||
-                              tx->tx_buftype == GNILND_BUF_VIRT_MAPPED,
+               GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED,
                               "bad tx buftype %d", tx->tx_buftype);
 
                kgnilnd_complete_tx(tx, msg->gnm_u.completion.gncm_retval);
@@ -3887,8 +4401,7 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
                if (tx == NULL)
                        break;
 
-               GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED ||
-                              tx->tx_buftype == GNILND_BUF_VIRT_MAPPED,
+               GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED,
                               "bad tx buftype %d", tx->tx_buftype);
 
                lnet_set_reply_msg_len(net->gnn_ni, tx->tx_lntmsg[1],
@@ -3896,6 +4409,42 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
 
                kgnilnd_finalize_rx_done(tx, msg);
                break;
+       case GNILND_MSG_GET_DONE_REV:
+               tx = kgnilnd_match_reply(conn, GNILND_MSG_GET_ACK_REV,
+                                       msg->gnm_u.completion.gncm_cookie);
+               if (tx == NULL)
+                       break;
+
+               GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED,
+                               "bad tx buftype %d", tx->tx_buftype);
+
+               kgnilnd_finalize_rx_done(tx, msg);
+               break;
+
+       case GNILND_MSG_PUT_DONE_REV:
+               tx = kgnilnd_match_reply(conn, GNILND_MSG_PUT_REQ_REV,
+                                       msg->gnm_u.completion.gncm_cookie);
+
+               if (tx == NULL)
+                       break;
+
+               GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED,
+                              "bad tx buftype %d", tx->tx_buftype);
+
+               kgnilnd_finalize_rx_done(tx, msg);
+               break;
+       case GNILND_MSG_PUT_NAK_REV:
+               tx = kgnilnd_match_reply(conn, GNILND_MSG_PUT_REQ_REV,
+                                       msg->gnm_u.completion.gncm_cookie);
+
+               if (tx == NULL)
+                       break;
+
+               GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED,
+                               "bad tx buftype %d", tx->tx_buftype);
+
+               kgnilnd_complete_tx(tx, msg->gnm_u.completion.gncm_retval);
+               break;
        }
 
  out:
@@ -4031,7 +4580,10 @@ kgnilnd_send_conn_close(kgn_conn_t *conn)
                }
        }
 
+       /* When changing gnc_state we need to take the kgn_peer_conn_lock */
+       write_lock(&kgnilnd_data.kgn_peer_conn_lock);
        conn->gnc_state = GNILND_CONN_CLOSED;
+       write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
        /* mark this conn as CLOSED now that we processed it
         * do after TX, so we can use CLOSING in asserts */
 
@@ -4053,13 +4605,15 @@ kgnilnd_process_mapped_tx(kgn_device_t *dev)
        int             found_work = 0;
        int             rc = 0;
        kgn_tx_t        *tx;
-       int             max_retrans = *kgnilnd_tunables.kgn_max_retransmits;
+       int              fast_remaps = GNILND_FAST_MAPPING_TRY;
        int             log_retrans, log_retrans_level;
        static int      last_map_version;
        ENTRY;
 
        spin_lock(&dev->gnd_lock);
        if (list_empty(&dev->gnd_map_tx)) {
+               /* if the list is empty make sure we dont have a timer running */
+               del_singleshot_timer_sync(&dev->gnd_map_timer);
                spin_unlock(&dev->gnd_lock);
                RETURN(0);
        }
@@ -4070,13 +4624,23 @@ kgnilnd_process_mapped_tx(kgn_device_t *dev)
         * backing off until our map version changes - indicating we unmapped
         * something */
        tx = list_first_entry(&dev->gnd_map_tx, kgn_tx_t, tx_list);
-       if ((tx->tx_retrans > (max_retrans / 4)) &&
-           (last_map_version == dev->gnd_map_version)) {
+       if (likely(dev->gnd_map_attempt == 0) ||
+               time_after_eq(jiffies, dev->gnd_next_map) ||
+               last_map_version != dev->gnd_map_version) {
+
+               /* if this is our first attempt at mapping set last mapped to current
+                * jiffies so we can timeout our attempt correctly.
+                */
+               if (dev->gnd_map_attempt == 0)
+                       dev->gnd_last_map = jiffies;
+       } else {
                GNIDBG_TX(D_NET, tx, "waiting for mapping event event to retry", NULL);
                spin_unlock(&dev->gnd_lock);
                RETURN(0);
        }
 
+       /* delete the previous timer if it exists */
+       del_singleshot_timer_sync(&dev->gnd_map_timer);
        /* stash the last map version to let us know when a good one was seen */
        last_map_version = dev->gnd_map_version;
 
@@ -4116,45 +4680,80 @@ kgnilnd_process_mapped_tx(kgn_device_t *dev)
                         * this function is called again - we operate on a copy of the original
                         * list and not the live list */
                        spin_lock(&dev->gnd_lock);
+                       /* reset map attempts back to zero we successfully
+                        * mapped so we can reset our timers */
+                       dev->gnd_map_attempt = 0;
                        continue;
+               } else if (rc == -EAGAIN) {
+                       spin_lock(&dev->gnd_lock);
+                       mod_timer(&dev->gnd_map_timer, dev->gnd_next_map);
+                       spin_unlock(&dev->gnd_lock);
+                       GOTO(get_out_mapped, rc);
                } else if (rc != -ENOMEM) {
                        /* carp, failure we can't handle */
                        kgnilnd_tx_done(tx, rc);
                        spin_lock(&dev->gnd_lock);
+                       /* reset map attempts back to zero we dont know what happened but it
+                        * wasnt a failed mapping
+                        */
+                       dev->gnd_map_attempt = 0;
                        continue;
                }
 
-               /* time to handle the retry cases.. */
-               tx->tx_retrans++;
-               if (tx->tx_retrans == 1)
-                       tx->tx_qtime = jiffies;
+               /* time to handle the retry cases..  lock so we dont have 2 threads
+                * mucking with gnd_map_attempt, or gnd_next_map at the same time.
+                */
+               spin_lock(&dev->gnd_lock);
+               dev->gnd_map_attempt++;
+               if (dev->gnd_map_attempt < fast_remaps) {
+                       /* do nothing we just want it to go as fast as possible.
+                        * just set gnd_next_map to current jiffies so it will process
+                        * as fast as possible.
+                        */
+                       dev->gnd_next_map = jiffies;
+               } else {
+                       /* Retry based on GNILND_MAP_RETRY_RATE */
+                       dev->gnd_next_map = jiffies + GNILND_MAP_RETRY_RATE;
+               }
 
-               /* only log occasionally once we've retried max / 2 */
-               log_retrans = (tx->tx_retrans >= (max_retrans / 2)) &&
-                             ((tx->tx_retrans % 32) == 0);
+               /* only log occasionally once we've retried fast_remaps */
+               log_retrans = (dev->gnd_map_attempt >= fast_remaps) &&
+                             ((dev->gnd_map_attempt % fast_remaps) == 0);
                log_retrans_level = log_retrans ? D_NETERROR : D_NET;
 
                /* make sure we are not off in the weeds with this tx */
-               if (tx->tx_retrans > *kgnilnd_tunables.kgn_max_retransmits) {
+               if (time_after(jiffies, dev->gnd_last_map + GNILND_MAP_TIMEOUT)) {
                       GNIDBG_TX(D_NETERROR, tx,
                               "giving up on TX, too many retries", NULL);
+                      spin_unlock(&dev->gnd_lock);
+                      if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_REQ ||
+                          tx->tx_msg.gnm_type == GNILND_MSG_GET_REQ_REV) {
+                              kgnilnd_nak_rdma(tx->tx_conn, tx->tx_msg.gnm_type,
+                                               -ENOMEM,
+                                               tx->tx_putinfo.gnpam_dst_cookie,
+                                               tx->tx_msg.gnm_srcnid);
+                       } else {
+                               kgnilnd_nak_rdma(tx->tx_conn, tx->tx_msg.gnm_type,
+                                               -ENOMEM,
+                                               tx->tx_getinfo.gngm_cookie,
+                                               tx->tx_msg.gnm_srcnid);
+                       }
                       kgnilnd_tx_done(tx, -ENOMEM);
                       GOTO(get_out_mapped, rc);
                } else {
                       GNIDBG_TX(log_retrans_level, tx,
                                "transient map failure #%d %d pages/%d bytes phys %u@%u "
-                               "virt %u@"LPU64" "
                                "nq_map %d mdd# %d/%d GART %ld",
-                               tx->tx_retrans, tx->tx_phys_npages, tx->tx_nob,
+                               dev->gnd_map_attempt, tx->tx_phys_npages, tx->tx_nob,
                                dev->gnd_map_nphys, dev->gnd_map_physnop * PAGE_SIZE,
-                               dev->gnd_map_nvirt, dev->gnd_map_virtnob,
                                atomic_read(&dev->gnd_nq_map),
                                atomic_read(&dev->gnd_n_mdd), atomic_read(&dev->gnd_n_mdd_held),
                                atomic64_read(&dev->gnd_nbytes_map));
                }
 
                /* we need to stop processing the rest of the list, so add it back in */
-               spin_lock(&dev->gnd_lock);
+               /* set timer to wake device when we need to schedule this tx */
+               mod_timer(&dev->gnd_map_timer, dev->gnd_next_map);
                kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_MAPQ, 0);
                spin_unlock(&dev->gnd_lock);
                GOTO(get_out_mapped, rc);
@@ -4165,16 +4764,20 @@ get_out_mapped:
 }
 
 int
-kgnilnd_process_conns(kgn_device_t *dev)
+kgnilnd_process_conns(kgn_device_t *dev, unsigned long deadline)
 {
        int              found_work = 0;
        int              conn_sched;
        int              intent = 0;
+       int              error_inject = 0;
+       int              rc = 0;
        kgn_conn_t      *conn;
 
        spin_lock(&dev->gnd_lock);
-       while (!list_empty(&dev->gnd_ready_conns)) {
+       while (!list_empty(&dev->gnd_ready_conns) && time_before(jiffies, deadline)) {
                dev->gnd_sched_alive = jiffies;
+               error_inject = 0;
+               rc = 0;
 
                if (unlikely(kgnilnd_data.kgn_quiesce_trigger)) {
                        /* break with lock held */
@@ -4183,6 +4786,12 @@ kgnilnd_process_conns(kgn_device_t *dev)
 
                conn = list_first_entry(&dev->gnd_ready_conns, kgn_conn_t, gnc_schedlist);
                list_del_init(&conn->gnc_schedlist);
+               /* 
+                * Since we are processing conn now, we don't need to be on the delaylist any longer.
+                */
+
+               if (!list_empty(&conn->gnc_delaylist))
+                       list_del_init(&conn->gnc_delaylist);
                spin_unlock(&dev->gnd_lock);
 
                conn_sched = xchg(&conn->gnc_scheduled, GNILND_CONN_PROCESS);
@@ -4201,10 +4810,16 @@ kgnilnd_process_conns(kgn_device_t *dev)
                if (kgnilnd_check_conn_fail_loc(dev, conn, &intent)) {
 
                        /* based on intent see if we should run again. */
-                       kgnilnd_schedule_process_conn(conn, intent);
-
+                       rc = kgnilnd_schedule_process_conn(conn, intent);
+                       error_inject = 1;
                        /* drop ref from gnd_ready_conns */
-                       kgnilnd_conn_decref(conn);
+                       if (atomic_read(&conn->gnc_refcount) == 1 && rc != 1) {
+                               down_write(&dev->gnd_conn_sem);
+                               kgnilnd_conn_decref(conn);
+                               up_write(&dev->gnd_conn_sem);
+                       } else if (rc != 1) {
+                               kgnilnd_conn_decref(conn);
+                       }
                        /* clear this so that scheduler thread doesn't spin */
                        found_work = 0;
                        /* break with lock held... */
@@ -4213,30 +4828,60 @@ kgnilnd_process_conns(kgn_device_t *dev)
                }
 
                if (unlikely(conn->gnc_state == GNILND_CONN_CLOSED)) {
+                       down_write(&dev->gnd_conn_sem);
+
                        /* CONN_CLOSED set in procces_fmaq when CLOSE is sent */
-                       kgnilnd_complete_closed_conn(conn);
+                       if (unlikely(atomic_read(&conn->gnc_tx_in_use))) {
+                               /* If there are tx's currently in use in another
+                                * thread we dont want to complete the close
+                                * yet. Cycle this conn back through
+                                * the scheduler. */
+                               kgnilnd_schedule_conn(conn);
+                       } else {
+                               kgnilnd_complete_closed_conn(conn);
+                       }
+                       up_write(&dev->gnd_conn_sem);
                } else if (unlikely(conn->gnc_state == GNILND_CONN_DESTROY_EP)) {
                        /* DESTROY_EP set in kgnilnd_conn_decref on gnc_refcount = 1 */
                        /* serialize SMSG CQs with ep_bind and smsg_release */
+                       down_write(&dev->gnd_conn_sem);
                        kgnilnd_destroy_conn_ep(conn);
+                       up_write(&dev->gnd_conn_sem);
                } else if (unlikely(conn->gnc_state == GNILND_CONN_CLOSING)) {
                       /* if we need to do some CLOSE sending, etc done here do it */
+                       down_write(&dev->gnd_conn_sem);
                        kgnilnd_send_conn_close(conn);
                        kgnilnd_check_fma_rx(conn);
+                       up_write(&dev->gnd_conn_sem);
                } else if (atomic_read(&conn->gnc_peer->gnp_dirty_eps) == 0) {
                        /* start moving traffic if the old conns are cleared out */
+                       down_read(&dev->gnd_conn_sem);
                        kgnilnd_check_fma_rx(conn);
                        kgnilnd_process_fmaq(conn);
+                       up_read(&dev->gnd_conn_sem);
                }
 
-               kgnilnd_schedule_process_conn(conn, 0);
+               rc = kgnilnd_schedule_process_conn(conn, 0);
 
                /* drop ref from gnd_ready_conns */
-               kgnilnd_conn_decref(conn);
+               if (atomic_read(&conn->gnc_refcount) == 1 && rc != 1) {
+                       down_write(&dev->gnd_conn_sem);
+                       kgnilnd_conn_decref(conn);
+                       up_write(&dev->gnd_conn_sem);
+               } else if (rc != 1) {
+                       kgnilnd_conn_decref(conn);
+               }
 
                /* check list again with lock held */
                spin_lock(&dev->gnd_lock);
        }
+
+       /* If we are short circuiting due to timing we want to be scheduled
+        * as soon as possible.
+        */
+       if (!list_empty(&dev->gnd_ready_conns) && !error_inject)
+               found_work++;
+
        spin_unlock(&dev->gnd_lock);
 
        RETURN(found_work);
@@ -4246,20 +4891,16 @@ int
 kgnilnd_scheduler(void *arg)
 {
        int               threadno = (long)arg;
-       kgn_device_t     *dev;
-       char              name[16];
-       int               busy_loops = 0;
+       kgn_device_t            *dev;
+       int                     busy_loops = 0;
+       unsigned long     deadline = 0;
        DEFINE_WAIT(wait);
 
        dev = &kgnilnd_data.kgn_devices[(threadno + 1) % kgnilnd_data.kgn_ndevs];
 
-       snprintf(name, sizeof(name), "kgnilnd_sd_%02d", threadno);
-       cfs_daemonize(name);
-       cfs_block_allsigs();
-
        /* all gnilnd threads need to run fairly urgently */
-       set_user_nice(current, *kgnilnd_tunables.kgn_nice);
-
+       set_user_nice(current, *kgnilnd_tunables.kgn_sched_nice);
+       deadline = jiffies + cfs_time_seconds(*kgnilnd_tunables.kgn_sched_timeout);
        while (!kgnilnd_data.kgn_shutdown) {
                int     found_work = 0;
                /* Safe: kgn_shutdown only set when quiescent */
@@ -4273,12 +4914,15 @@ kgnilnd_scheduler(void *arg)
                /* tracking for when thread goes AWOL */
                dev->gnd_sched_alive = jiffies;
 
+               CFS_FAIL_TIMEOUT(CFS_FAIL_GNI_SCHED_DEADLINE,
+                       (*kgnilnd_tunables.kgn_sched_timeout + 1));
                /* let folks know we are up and kicking
                 * - they can use this for latency savings, etc
                 * - only change if IRQ, if IDLE leave alone as that
                 *   schedule_device calls to put us back to IRQ */
                (void)cmpxchg(&dev->gnd_ready, GNILND_DEV_IRQ, GNILND_DEV_LOOP);
 
+               down_read(&dev->gnd_conn_sem);
                /* always check these - they are super low cost  */
                found_work += kgnilnd_check_fma_send_cq(dev);
                found_work += kgnilnd_check_fma_rcv_cq(dev);
@@ -4299,21 +4943,23 @@ kgnilnd_scheduler(void *arg)
                 * transistion
                 * ...should.... */
 
+               up_read(&dev->gnd_conn_sem);
+
                /* process all conns ready now */
-               found_work += kgnilnd_process_conns(dev);
+               found_work += kgnilnd_process_conns(dev, deadline);
 
                /* do an eager check to avoid the IRQ disabling in
                 * prepare_to_wait and friends */
 
-               if (found_work && busy_loops++ < *kgnilnd_tunables.kgn_loops) {
+               if (found_work &&
+                  (busy_loops++ < *kgnilnd_tunables.kgn_loops) &&
+                  time_before(jiffies, deadline)) {
                        found_work = 0;
                        if ((busy_loops % 10) == 0) {
                                /* tickle heartbeat and watchdog to ensure our
                                 * piggishness doesn't turn into heartbeat failure */
                                touch_nmi_watchdog();
-                               if (kgnilnd_hssops.hb_to_l0 != NULL) {
-                                       kgnilnd_hssops.hb_to_l0();
-                               }
+                               kgnilnd_hw_hb();
                        }
                        continue;
                }
@@ -4332,7 +4978,8 @@ kgnilnd_scheduler(void *arg)
 
                found_work += xchg(&dev->gnd_ready, GNILND_DEV_IDLE);
 
-               if (busy_loops >= *kgnilnd_tunables.kgn_loops) {
+               if ((busy_loops >= *kgnilnd_tunables.kgn_loops) ||
+                  time_after_eq(jiffies, deadline)) {
                        CDEBUG(D_INFO,
                               "yeilding: found_work %d busy_loops %d\n",
                               found_work, busy_loops);
@@ -4346,8 +4993,10 @@ kgnilnd_scheduler(void *arg)
                         * again. yield() ensures we wake up without another
                         * waitq poke in that case */
                        atomic_inc(&dev->gnd_n_yield);
+                       kgnilnd_data.kgn_last_condresched = jiffies;
                        yield();
                        CDEBUG(D_INFO, "awake after yeild\n");
+                       deadline = jiffies + cfs_time_seconds(*kgnilnd_tunables.kgn_sched_timeout);
                } else if (found_work == GNILND_DEV_IDLE) {
                        /* busy_loops is low and there is nothing to do,
                         * go to sleep and wait for a waitq poke */
@@ -4355,8 +5004,10 @@ kgnilnd_scheduler(void *arg)
                               "scheduling: found_work %d busy_loops %d\n",
                               found_work, busy_loops);
                        atomic_inc(&dev->gnd_n_schedule);
+                       kgnilnd_data.kgn_last_scheduled = jiffies;
                        schedule();
                        CDEBUG(D_INFO, "awake after schedule\n");
+                       deadline = jiffies + cfs_time_seconds(*kgnilnd_tunables.kgn_sched_timeout);
                }
                finish_wait(&dev->gnd_waitq, &wait);
        }