Whamcloud - gitweb
* ranal passes netregression
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd_cb.c
index b491d71..38f1b77 100644 (file)
@@ -38,11 +38,14 @@ kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
 }
 
 void
-kranal_device_callback(RAP_INT32 devid)
+kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
 {
         kra_device_t *dev;
         int           i;
-        
+        unsigned long flags;
+
+        CDEBUG(D_NET, "callback for device %d\n", devid);
+
         for (i = 0; i < kranal_data.kra_ndevs; i++) {
 
                 dev = &kranal_data.kra_devices[i];
@@ -59,7 +62,7 @@ kranal_device_callback(RAP_INT32 devid)
                 spin_unlock_irqrestore(&dev->rad_lock, flags);
                 return;
         }
-        
+
         CWARN("callback for unknown device %d\n", devid);
 }
 
@@ -68,9 +71,9 @@ kranal_schedule_conn(kra_conn_t *conn)
 {
         kra_device_t    *dev = conn->rac_device;
         unsigned long    flags;
-        
+
         spin_lock_irqsave(&dev->rad_lock, flags);
-        
+
         if (!conn->rac_scheduled) {
                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
                 conn->rac_scheduled = 1;
@@ -81,135 +84,19 @@ kranal_schedule_conn(kra_conn_t *conn)
         spin_unlock_irqrestore(&dev->rad_lock, flags);
 }
 
-void
-kranal_schedule_cqid (__u32 cqid)
-{
-        kra_conn_t         *conn;
-        struct list_head   *conns;
-        struct list_head   *tmp;
-
-        conns = kranal_cqid2connlist(cqid);
-
-        read_lock(&kranal_data.kra_global_lock);
-
-        conn = kranal_cqid2conn_locked(cqid);
-        
-        if (conn == NULL)
-                CWARN("no cqid %x\n", cqid);
-        else
-                kranal_schedule_conn(conn);
-        
-        read_unlock(&kranal_data.kra_global_lock);
-}
-
-void
-kranal_schedule_dev(kra_device_t *dev)
-{
-        kra_conn_t         *conn;
-        struct list_head   *conns;
-        struct list_head   *tmp;
-        int                 i;
-
-        /* Don't do this in IRQ context (servers may have 1000s of clients) */
-        LASSERT (!in_interrupt()); 
-
-        CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
-
-        for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
-
-                /* Drop the lock on each hash bucket to ensure we don't
-                 * block anyone for too long at IRQ priority on another CPU */
-                
-                read_lock(&kranal_data.kra_global_lock);
-        
-                conns = &kranal_data.kra_conns[i];
-
-                list_for_each (tmp, conns) {
-                        conn = list_entry(tmp, kra_conn_t, rac_hashlist);
-                
-                        if (conn->rac_device == dev)
-                                kranal_schedule_conn(conn);
-                }
-                read_unlock(&kranal_data.kra_global_lock);
-        }
-}
-
-void
-kranal_tx_done (kra_tx_t *tx, int completion)
-{
-        ptl_err_t        ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
-        kra_device_t    *dev;
-        unsigned long    flags;
-        int              i;
-        RAP_RETURN       rrc;
-
-        LASSERT (!in_interrupt());
-
-        switch (tx->tx_buftype) {
-        default:
-                LBUG();
-
-        case RANAL_BUF_NONE:
-        case RANAL_BUF_IMMEDIATE:
-        case RANAL_BUF_PHYS_UNMAPPED:
-        case RANAL_BUF_VIRT_UNMAPPED:
-                break;
-
-        case RANAL_BUF_PHYS_MAPPED:
-                LASSERT (tx->tx_conn != NULL);
-                dev = tx->tx_con->rac_device;
-                rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
-                                           dev->rad_ptag, &tx->tx_map_key);
-                LASSERT (rrc == RAP_SUCCESS);
-                break;
-
-        case RANAL_BUF_VIRT_MAPPED:
-                LASSERT (tx->tx_conn != NULL);
-                dev = tx->tx_con->rac_device;
-                rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer
-                                           dev->rad_ptag, &tx->tx_map_key);
-                LASSERT (rrc == RAP_SUCCESS);
-                break;
-        }
-
-        for (i = 0; i < 2; i++) {
-                /* tx may have up to 2 libmsgs to finalise */
-                if (tx->tx_libmsg[i] == NULL)
-                        continue;
-
-                lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
-                tx->tx_libmsg[i] = NULL;
-        }
-
-        tx->tx_buftype = RANAL_BUF_NONE;
-        tx->tx_msg.ram_type = RANAL_MSG_NONE;
-        tx->tx_conn = NULL;
-
-        spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
-
-        if (tx->tx_isnblk) {
-                list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
-        } else {
-                list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
-                wake_up(&kranal_data.kra_idle_tx_waitq);
-        }
-
-        spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
-}
-
 kra_tx_t *
-kranal_get_idle_tx (int may_block) 
+kranal_get_idle_tx (int may_block)
 {
         unsigned long  flags;
         kra_tx_t      *tx = NULL;
-        
+
         for (;;) {
                 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
 
                 /* "normal" descriptor is free */
                 if (!list_empty(&kranal_data.kra_idle_txs)) {
                         tx = list_entry(kranal_data.kra_idle_txs.next,
-                                       kra_tx_t, tx_list);
+                                        kra_tx_t, tx_list);
                         break;
                 }
 
@@ -221,7 +108,7 @@ kranal_get_idle_tx (int may_block)
                         }
 
                         tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
-                                       kra_tx_t, tx_list);
+                                        kra_tx_t, tx_list);
                         break;
                 }
 
@@ -229,7 +116,7 @@ kranal_get_idle_tx (int may_block)
                 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
 
                 wait_event(kranal_data.kra_idle_tx_waitq,
-                          !list_empty(&kranal_data.kra_idle_txs));
+                           !list_empty(&kranal_data.kra_idle_txs));
         }
 
         if (tx != NULL) {
@@ -247,7 +134,7 @@ kranal_get_idle_tx (int may_block)
         }
 
         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
-        
+
         return tx;
 }
 
@@ -258,10 +145,10 @@ kranal_init_msg(kra_msg_t *msg, int type)
         msg->ram_version = RANAL_MSG_VERSION;
         msg->ram_type = type;
         msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
-        /* ram_incarnation gets set when FMA is sent */
+        /* ram_connstamp gets set when FMA is sent */
 }
 
-kra_tx_t
+kra_tx_t *
 kranal_new_tx_msg (int may_block, int type)
 {
         kra_tx_t *tx = kranal_get_idle_tx(may_block);
@@ -274,36 +161,46 @@ kranal_new_tx_msg (int may_block, int type)
 }
 
 int
-kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
+kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
                                int offset, int nob)
-                 
+
 {
-        LASSERT (nob > 0);
-        LASSERT (niov > 0);
+        /* For now this is almost identical to kranal_setup_virt_buffer, but we
+         * could "flatten" the payload into a single contiguous buffer ready
+         * for sending direct over an FMA if we ever needed to. */
+
         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
+        LASSERT (nob >= 0);
 
-        while (offset >= iov->iov_len) {
-                offset -= iov->iov_len;
-                niov--;
-                iov++;
+        if (nob == 0) {
+                tx->tx_buffer = NULL;
+        } else {
                 LASSERT (niov > 0);
-        }
 
-        if (nob > iov->iov_len - offset) {
-                CERROR("Can't handle multiple vaddr fragments\n");
-                return -EMSGSIZE;
+                while (offset >= iov->iov_len) {
+                        offset -= iov->iov_len;
+                        niov--;
+                        iov++;
+                        LASSERT (niov > 0);
+                }
+
+                if (nob > iov->iov_len - offset) {
+                        CERROR("Can't handle multiple vaddr fragments\n");
+                        return -EMSGSIZE;
+                }
+
+                tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
         }
 
-        tx->tx_bufftype = RANAL_BUF_IMMEDIATE;
+        tx->tx_buftype = RANAL_BUF_IMMEDIATE;
         tx->tx_nob = nob;
-        tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
         return 0;
 }
 
 int
-kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
+kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
                           int offset, int nob)
-                 
+
 {
         LASSERT (nob > 0);
         LASSERT (niov > 0);
@@ -321,7 +218,7 @@ kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
                 return -EMSGSIZE;
         }
 
-        tx->tx_bufftype = RANAL_BUF_VIRT_UNMAPPED;
+        tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
         tx->tx_nob = nob;
         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
         return 0;
@@ -347,13 +244,11 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
                 LASSERT (nkiov > 0);
         }
 
-        tx->tx_bufftype = RANAL_BUF_PHYS_UNMAPPED;
+        tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
         tx->tx_nob = nob;
-        tx->tx_buffer = NULL;
-        tx->tx_phys_offset = kiov->kiov_offset + offset;
-        
+        tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
+
         phys->Address = kranal_page2phys(kiov->kiov_page);
-        phys->Length  = PAGE_SIZE;
         phys++;
 
         resid = nob - (kiov->kiov_len - offset);
@@ -363,30 +258,22 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
                 LASSERT (nkiov > 0);
 
                 if (kiov->kiov_offset != 0 ||
-                    ((resid > PAGE_SIZE) && 
+                    ((resid > PAGE_SIZE) &&
                      kiov->kiov_len < PAGE_SIZE)) {
-                        int i;
                         /* Can't have gaps */
                         CERROR("Can't make payload contiguous in I/O VM:"
-                               "page %d, offset %d, len %d \n", nphys, 
+                               "page %d, offset %d, len %d \n",
+                               (int)(phys - tx->tx_phys),
                                kiov->kiov_offset, kiov->kiov_len);
-
-                        for (i = -nphys; i < nkiov; i++) {
-                                CERROR("kiov[%d] %p +%d for %d\n",
-                                       i, kiov[i].kiov_page, 
-                                       kiov[i].kiov_offset, kiov[i].kiov_len);
-                        }
-                        
                         return -EINVAL;
                 }
 
                 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
-                        CERROR ("payload too big (%d)\n", phys - tx->tx_phys);
+                        CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
                         return -EMSGSIZE;
                 }
 
                 phys->Address = kranal_page2phys(kiov->kiov_page);
-                phys->Length  = PAGE_SIZE;
                 phys++;
 
                 resid -= PAGE_SIZE;
@@ -397,16 +284,16 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
 }
 
 static inline int
-kranal_setup_buffer (kra_tx_t *tx, int niov, 
-                     struct iovec *iov, ptl_kiov_t *kiov,
-                     int offset, int nob)
+kranal_setup_rdma_buffer (kra_tx_t *tx, int niov,
+                          struct iovec *iov, ptl_kiov_t *kiov,
+                          int offset, int nob)
 {
         LASSERT ((iov == NULL) != (kiov == NULL));
-        
+
         if (kiov != NULL)
                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
-        
-        return kranal_setup_virt_buffer(tx, niov, kiov, offset, nob);
+
+        return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
 }
 
 void
@@ -414,30 +301,112 @@ kranal_map_buffer (kra_tx_t *tx)
 {
         kra_conn_t     *conn = tx->tx_conn;
         kra_device_t   *dev = conn->rac_device;
+        RAP_RETURN      rrc;
+
+        LASSERT (current == dev->rad_scheduler);
 
         switch (tx->tx_buftype) {
         default:
-                
+                LBUG();
+
+        case RANAL_BUF_NONE:
+        case RANAL_BUF_IMMEDIATE:
+        case RANAL_BUF_PHYS_MAPPED:
+        case RANAL_BUF_VIRT_MAPPED:
+                break;
+
         case RANAL_BUF_PHYS_UNMAPPED:
-                rrc = RapkRegisterPhys(conn->rac_device->rad_handle,
+                rrc = RapkRegisterPhys(dev->rad_handle,
                                        tx->tx_phys, tx->tx_phys_npages,
-                                       conn->rac_device->rad_ptag,
                                        &tx->tx_map_key);
                 LASSERT (rrc == RAP_SUCCESS);
                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
-                return;
+                break;
 
         case RANAL_BUF_VIRT_UNMAPPED:
-                rrc = RapkRegisterMemory(conn->rac_device->rad_handle,
+                rrc = RapkRegisterMemory(dev->rad_handle,
                                          tx->tx_buffer, tx->tx_nob,
-                                         conn->rac_device->rad_ptag,
                                          &tx->tx_map_key);
                 LASSERT (rrc == RAP_SUCCESS);
                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
-                return;
+                break;
+        }
+}
+
+void
+kranal_unmap_buffer (kra_tx_t *tx)
+{
+        kra_device_t   *dev;
+        RAP_RETURN      rrc;
+
+        switch (tx->tx_buftype) {
+        default:
+                LBUG();
+
+        case RANAL_BUF_NONE:
+        case RANAL_BUF_IMMEDIATE:
+        case RANAL_BUF_PHYS_UNMAPPED:
+        case RANAL_BUF_VIRT_UNMAPPED:
+                break;
+
+        case RANAL_BUF_PHYS_MAPPED:
+                LASSERT (tx->tx_conn != NULL);
+                dev = tx->tx_conn->rac_device;
+                LASSERT (current == dev->rad_scheduler);
+                rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
+                                           &tx->tx_map_key);
+                LASSERT (rrc == RAP_SUCCESS);
+                tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
+                break;
+
+        case RANAL_BUF_VIRT_MAPPED:
+                LASSERT (tx->tx_conn != NULL);
+                dev = tx->tx_conn->rac_device;
+                LASSERT (current == dev->rad_scheduler);
+                rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
+                                           &tx->tx_map_key);
+                LASSERT (rrc == RAP_SUCCESS);
+                tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
+                break;
         }
 }
 
+void
+kranal_tx_done (kra_tx_t *tx, int completion)
+{
+        ptl_err_t        ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
+        unsigned long    flags;
+        int              i;
+
+        LASSERT (!in_interrupt());
+
+        kranal_unmap_buffer(tx);
+
+        for (i = 0; i < 2; i++) {
+                /* tx may have up to 2 libmsgs to finalise */
+                if (tx->tx_libmsg[i] == NULL)
+                        continue;
+
+                lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
+                tx->tx_libmsg[i] = NULL;
+        }
+
+        tx->tx_buftype = RANAL_BUF_NONE;
+        tx->tx_msg.ram_type = RANAL_MSG_NONE;
+        tx->tx_conn = NULL;
+
+        spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
+
+        if (tx->tx_isnblk) {
+                list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
+        } else {
+                list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
+                wake_up(&kranal_data.kra_idle_tx_waitq);
+        }
+
+        spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
+}
+
 kra_conn_t *
 kranal_find_conn_locked (kra_peer_t *peer)
 {
@@ -477,11 +446,11 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
 
         /* If I get here, I've committed to send, so I complete the tx with
          * failure on any problems */
-        
+
         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
 
         read_lock(g_lock);
-        
+
         peer = kranal_find_peer_locked(nid);
         if (peer == NULL) {
                 read_unlock(g_lock);
@@ -495,7 +464,7 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
                 read_unlock(g_lock);
                 return;
         }
-        
+
         /* Making one or more connections; I'll need a write lock... */
         read_unlock(g_lock);
         write_lock_irqsave(g_lock, flags);
@@ -503,7 +472,7 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
         peer = kranal_find_peer_locked(nid);
         if (peer == NULL) {
                 write_unlock_irqrestore(g_lock, flags);
-                kranal_tx_done(tx -EHOSTUNREACH);
+                kranal_tx_done(tx, -EHOSTUNREACH);
                 return;
         }
 
@@ -518,59 +487,67 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
         LASSERT (peer->rap_persistence > 0);
 
         if (!peer->rap_connecting) {
-                now = CURRENT_TIME;
+                LASSERT (list_empty(&peer->rap_tx_queue));
+
+                now = CURRENT_SECONDS;
                 if (now < peer->rap_reconnect_time) {
                         write_unlock_irqrestore(g_lock, flags);
                         kranal_tx_done(tx, -EHOSTUNREACH);
                         return;
                 }
-        
+
                 peer->rap_connecting = 1;
                 kranal_peer_addref(peer); /* extra ref for connd */
-        
+
                 spin_lock(&kranal_data.kra_connd_lock);
-        
+
                 list_add_tail(&peer->rap_connd_list,
-                             &kranal_data.kra_connd_peers);
+                              &kranal_data.kra_connd_peers);
                 wake_up(&kranal_data.kra_connd_waitq);
-        
+
                 spin_unlock(&kranal_data.kra_connd_lock);
         }
-        
+
         /* A connection is being established; queue the message... */
         list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
 
         write_unlock_irqrestore(g_lock, flags);
 }
 
-static void
-kranal_rdma(kra_tx_t *tx, int type, 
-            kra_rdma_desc_t *rard, int nob, __u64 cookie)
+void
+kranal_rdma(kra_tx_t *tx, int type,
+            kra_rdma_desc_t *sink, int nob, __u64 cookie)
 {
-        kra_conn_t *conn = tx->tx_conn;
-        RAP_RETURN  rrc;
+        kra_conn_t   *conn = tx->tx_conn;
+        RAP_RETURN    rrc;
+        unsigned long flags;
 
-        /* prep final completion message */
-        kranal_init_msg(&tx->tx_msg, type);
-        tx->tx_msg.ram_u.completion.racm_cookie = cookie;
-        
-        LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
-                 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
-        LASSERT (nob <= rard->rard_nob);
+        LASSERT (kranal_tx_mapped(tx));
+        LASSERT (nob <= sink->rard_nob);
+        LASSERT (nob <= tx->tx_nob);
+
+        /* No actual race with scheduler sending CLOSE (I'm she!) */
+        LASSERT (current == conn->rac_device->rad_scheduler);
 
         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
-        tx->tx_rdma_desc.SrcPtr = tx->tx_buffer;
+        tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
-        tx->tx_rdma_desc.DstPtr = rard->rard_addr;
-        tx->tx_rdma_desc.DstKey = rard->rard_key;
+        tx->tx_rdma_desc.DstPtr = sink->rard_addr;
+        tx->tx_rdma_desc.DstKey = sink->rard_key;
         tx->tx_rdma_desc.Length = nob;
         tx->tx_rdma_desc.AppPtr = tx;
 
+        /* prep final completion message */
+        kranal_init_msg(&tx->tx_msg, type);
+        tx->tx_msg.ram_u.completion.racm_cookie = cookie;
+
         if (nob == 0) { /* Immediate completion */
                 kranal_post_fma(conn, tx);
                 return;
         }
-        
+
+        LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
+
         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
         LASSERT (rrc == RAP_SUCCESS);
 
@@ -587,43 +564,46 @@ kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
         RAP_RETURN rrc;
 
         LASSERT (conn->rac_rxmsg != NULL);
+        CDEBUG(D_NET, "Consuming %p\n", conn);
 
-        rrc = RapkFmaCopyToUser(conn->rac_rihandle, buffer,
-                                &nob_received, sizeof(kra_msg_t));
+        rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
+                             &nob_received, sizeof(kra_msg_t));
         LASSERT (rrc == RAP_SUCCESS);
 
         conn->rac_rxmsg = NULL;
 
-        if (nob_received != nob) {
-                CWARN("Expected %d immediate bytes but got %d\n",
-                      nob, nob_received);
+        if (nob_received < nob) {
+                CWARN("Incomplete immediate msg from "LPX64
+                      ": expected %d, got %d\n",
+                      conn->rac_peer->rap_nid, nob, nob_received);
                 return -EPROTO;
         }
-        
+
         return 0;
 }
 
 ptl_err_t
-kranal_do_send (lib_nal_t    *nal, 
+kranal_do_send (lib_nal_t    *nal,
                 void         *private,
                 lib_msg_t    *libmsg,
-                ptl_hdr_t    *hdr, 
-                int           type, 
-                ptl_nid_t     nid, 
+                ptl_hdr_t    *hdr,
+                int           type,
+                ptl_nid_t     nid,
                 ptl_pid_t     pid,
-                unsigned int  niov, 
-                struct iovec *iov, 
+                unsigned int  niov,
+                struct iovec *iov,
                 ptl_kiov_t   *kiov,
-                size_t        offset,
-                size_t        nob)
+                int           offset,
+                int           nob)
 {
         kra_conn_t *conn;
         kra_tx_t   *tx;
+        int         rc;
 
         /* NB 'private' is different depending on what we're sending.... */
 
-        CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
-               " pid %d\n", nob, niov, nid , pid);
+        CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64" pid %d\n",
+               nob, niov, nid, pid);
 
         LASSERT (nob == 0 || niov > 0);
         LASSERT (niov <= PTL_MD_MAX_IOV);
@@ -635,25 +615,25 @@ kranal_do_send (lib_nal_t    *nal,
         switch(type) {
         default:
                 LBUG();
-                
+
         case PTL_MSG_REPLY: {
                 /* reply's 'private' is the conn that received the GET_REQ */
                 conn = private;
                 LASSERT (conn->rac_rxmsg != NULL);
 
                 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
-                        if (nob > RANAL_MAX_IMMEDIATE) {
+                        if (nob > RANAL_FMA_MAX_DATA) {
                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
                                        nob, nid);
                                 return PTL_FAIL;
                         }
                         break;                  /* RDMA not expected */
                 }
-                
+
                 /* Incoming message consistent with immediate reply? */
                 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
-                              nid, conn->rac_rxmsg->ram_type);
+                               nid, conn->rac_rxmsg->ram_type);
                         return PTL_FAIL;
                 }
 
@@ -661,7 +641,7 @@ kranal_do_send (lib_nal_t    *nal,
                 if (tx == NULL)
                         return PTL_FAIL;
 
-                rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
+                rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
                 if (rc != 0) {
                         kranal_tx_done(tx, rc);
                         return PTL_FAIL;
@@ -672,22 +652,40 @@ kranal_do_send (lib_nal_t    *nal,
 
                 kranal_map_buffer(tx);
                 kranal_rdma(tx, RANAL_MSG_GET_DONE,
-                            &conn->rac_rxmsg->ram_u.getreq.ragm_desc, nob,
-                            &conn->rac_rxmsg->ram_u.getreq.ragm_cookie);
+                            &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
+                            conn->rac_rxmsg->ram_u.get.ragm_cookie);
+
+                /* flag matched by consuming rx message */
+                kranal_consume_rxmsg(conn, NULL, 0);
                 return PTL_OK;
         }
 
         case PTL_MSG_GET:
-                if (kiov == NULL &&             /* not paged */
-                    nob <= RANAL_MAX_IMMEDIATE && /* small enough */
-                    nob <= kranal_tunables.kra_max_immediate)
-                        break;                  /* send IMMEDIATE */
+                LASSERT (niov == 0);
+                LASSERT (nob == 0);
+                /* We have to consider the eventual sink buffer rather than any
+                 * payload passed here (there isn't any, and strictly, looking
+                 * inside libmsg is a layering violation).  We send a simple
+                 * IMMEDIATE GET if the sink buffer is mapped already and small
+                 * enough for FMA */
+
+                if ((libmsg->md->options & PTL_MD_KIOV) == 0 &&
+                    libmsg->md->length <= RANAL_FMA_MAX_DATA &&
+                    libmsg->md->length <= kranal_tunables.kra_max_immediate)
+                        break;
 
-                tx = kranal_new_tx_msg(0, RANAL_MSG_GET_REQ);
+                tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
                 if (tx == NULL)
                         return PTL_NO_SPACE;
 
-                rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
+                if ((libmsg->md->options & PTL_MD_KIOV) == 0)
+                        rc = kranal_setup_virt_buffer(tx, libmsg->md->md_niov,
+                                                      libmsg->md->md_iov.iov,
+                                                      0, libmsg->md->length);
+                else
+                        rc = kranal_setup_phys_buffer(tx, libmsg->md->md_niov,
+                                                      libmsg->md->md_iov.kiov,
+                                                      0, libmsg->md->length);
                 if (rc != 0) {
                         kranal_tx_done(tx, rc);
                         return PTL_FAIL;
@@ -704,7 +702,7 @@ kranal_do_send (lib_nal_t    *nal,
                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
                 /* rest of tx_msg is setup just before it is sent */
                 kranal_launch_tx(tx, nid);
-                return PTL_OK
+                return PTL_OK;
 
         case PTL_MSG_ACK:
                 LASSERT (nob == 0);
@@ -712,15 +710,15 @@ kranal_do_send (lib_nal_t    *nal,
 
         case PTL_MSG_PUT:
                 if (kiov == NULL &&             /* not paged */
-                    nob <= RANAL_MAX_IMMEDIATE && /* small enough */
+                    nob <= RANAL_FMA_MAX_DATA && /* small enough */
                     nob <= kranal_tunables.kra_max_immediate)
                         break;                  /* send IMMEDIATE */
-                
-                tx = kranal_new_tx_msg(!in_interrupt(), RANA_MSG_PUT_REQ);
+
+                tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
                 if (tx == NULL)
                         return PTL_NO_SPACE;
 
-                rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
+                rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
                 if (rc != 0) {
                         kranal_tx_done(tx, rc);
                         return PTL_FAIL;
@@ -734,11 +732,11 @@ kranal_do_send (lib_nal_t    *nal,
         }
 
         LASSERT (kiov == NULL);
-        LASSERT (nob <= RANAL_MAX_IMMEDIATE);
+        LASSERT (nob <= RANAL_FMA_MAX_DATA);
 
         tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
                                  type == PTL_MSG_REPLY ||
-                                 in_interrupt()), 
+                                 in_interrupt()),
                                RANAL_MSG_IMMEDIATE);
         if (tx == NULL)
                 return PTL_NO_SPACE;
@@ -748,7 +746,7 @@ kranal_do_send (lib_nal_t    *nal,
                 kranal_tx_done(tx, rc);
                 return PTL_FAIL;
         }
-                
+
         tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
         tx->tx_libmsg[0] = libmsg;
         kranal_launch_tx(tx, nid);
@@ -757,48 +755,58 @@ kranal_do_send (lib_nal_t    *nal,
 
 ptl_err_t
 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
-            ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
-            unsigned int niov, struct iovec *iov,
-            size_t offset, size_t len)
+             ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
+             unsigned int niov, struct iovec *iov,
+             size_t offset, size_t len)
 {
         return kranal_do_send(nal, private, cookie,
-                             hdr, type, nid, pid,
-                             niov, iov, NULL,
-                             offset, len);
+                              hdr, type, nid, pid,
+                              niov, iov, NULL,
+                              offset, len);
 }
 
 ptl_err_t
-kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
-                  ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
-                  unsigned int niov, ptl_kiov_t *kiov, 
-                  size_t offset, size_t len)
+kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
+                   ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
+                   unsigned int niov, ptl_kiov_t *kiov,
+                   size_t offset, size_t len)
 {
         return kranal_do_send(nal, private, cookie,
-                             hdr, type, nid, pid,
-                             niov, NULL, kiov,
-                             offset, len);
+                              hdr, type, nid, pid,
+                              niov, NULL, kiov,
+                              offset, len);
 }
 
 ptl_err_t
-kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
-               unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
-               size_t offset, size_t mlen, size_t rlen)
+kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
+                unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
+                int offset, int mlen, int rlen)
 {
         kra_conn_t  *conn = private;
         kra_msg_t   *rxmsg = conn->rac_rxmsg;
+        kra_tx_t    *tx;
         void        *buffer;
         int          rc;
-        
+
         LASSERT (mlen <= rlen);
         LASSERT (!in_interrupt());
         /* Either all pages or all vaddrs */
         LASSERT (!(kiov != NULL && iov != NULL));
 
+        CDEBUG(D_NET, "conn %p, rxmsg %p, libmsg %p\n", conn, rxmsg, libmsg);
+
+        if (libmsg == NULL) {
+                /* GET or ACK or portals is discarding */
+                LASSERT (mlen == 0);
+                lib_finalize(nal, NULL, libmsg, PTL_OK);
+                return PTL_OK;
+        }
+
         switch(rxmsg->ram_type) {
         default:
                 LBUG();
                 return PTL_FAIL;
-                
+
         case RANAL_MSG_IMMEDIATE:
                 if (mlen == 0) {
                         buffer = NULL;
@@ -823,44 +831,32 @@ kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
                 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
                 return PTL_OK;
 
-        case RANAL_MSG_GET_REQ:
-                /* If the GET matched, we've already handled it in
-                 * kranal_do_send which is called to send the REPLY.  We're
-                 * only called here to complete the GET receive (if we needed
-                 * it which we don't, but I digress...) */
-                LASSERT (libmsg == NULL);
-                lib_finalize(nal, NULL, libmsg, PTL_OK);
-                return PTL_OK;
-
         case RANAL_MSG_PUT_REQ:
-                if (libmsg == NULL) {           /* PUT didn't match... */
-                        lib_finalize(null, NULL, libmsg, PTL_OK);
-                        return PTL_OK;
-                }
-                
                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
                 if (tx == NULL)
                         return PTL_NO_SPACE;
 
-                rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, mlen);
+                rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
                 if (rc != 0) {
                         kranal_tx_done(tx, rc);
                         return PTL_FAIL;
                 }
 
+                tx->tx_conn = conn;
                 kranal_map_buffer(tx);
-                
-                tx->tx_msg.ram_u.putack.rapam_src_cookie = 
+
+                tx->tx_msg.ram_u.putack.rapam_src_cookie =
                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
-                tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_key = tx->tx_map_key;
-                tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_addr = tx->tx_buffer;
-                tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_nob = mlen;
+                tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
+                tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
+                        (__u64)((unsigned long)tx->tx_buffer);
+                tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
 
                 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
 
                 kranal_post_fma(conn, tx);
-                
+
                 /* flag matched by consuming rx message */
                 kranal_consume_rxmsg(conn, NULL, 0);
                 return PTL_OK;
@@ -869,20 +865,20 @@ kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
 
 ptl_err_t
 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
-            unsigned int niov, struct iovec *iov, 
-            size_t offset, size_t mlen, size_t rlen)
+             unsigned int niov, struct iovec *iov,
+             size_t offset, size_t mlen, size_t rlen)
 {
-        return kranal_recvmsg(nal, private, msg, niov, iov, NULL,
-                             offset, mlen, rlen);
+        return kranal_do_recv(nal, private, msg, niov, iov, NULL,
+                              offset, mlen, rlen);
 }
 
 ptl_err_t
 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
-                  unsigned int niov, ptl_kiov_t *kiov, 
-                  size_t offset, size_t mlen, size_t rlen)
+                   unsigned int niov, ptl_kiov_t *kiov,
+                   size_t offset, size_t mlen, size_t rlen)
 {
-        return kranal_recvmsg(nal, private, msg, niov, NULL, kiov,
-                             offset, mlen, rlen);
+        return kranal_do_recv(nal, private, msg, niov, NULL, kiov,
+                              offset, mlen, rlen);
 }
 
 int
@@ -904,7 +900,7 @@ kranal_thread_fini (void)
 }
 
 int
-kranal_check_conn (kra_conn_t *conn)
+kranal_check_conn_timeouts (kra_conn_t *conn)
 {
         kra_tx_t          *tx;
         struct list_head  *ttmp;
@@ -912,25 +908,31 @@ kranal_check_conn (kra_conn_t *conn)
         long               timeout;
         unsigned long      now = jiffies;
 
-        if (!conn->rac_closing &&
-            time_after_eq(now, conn->rac_last_sent + conn->rac_keepalive * HZ)) {
+        LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
+                 conn->rac_state == RANAL_CONN_CLOSING);
+
+        if (!conn->rac_close_sent &&
+            time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
+                CDEBUG(D_NET, "Scheduling keepalive %p->"LPX64"\n",
+                       conn, conn->rac_peer->rap_nid);
                 kranal_schedule_conn(conn);
         }
 
-        /* wait twice as long for CLOSE to be sure peer is dead */
-        timeout = (conn->rac_closing ? 1 : 2) * conn->rac_timeout * HZ;
+        timeout = conn->rac_timeout * HZ;
 
         if (!conn->rac_close_recvd &&
             time_after_eq(now, conn->rac_last_rx + timeout)) {
-                CERROR("Nothing received from "LPX64" within %d seconds\n",
+                CERROR("%s received from "LPX64" within %lu seconds\n",
+                       (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
+                       "Nothing" : "CLOSE not",
                        conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
                 return -ETIMEDOUT;
         }
 
-        if (conn->rac_closing)
+        if (conn->rac_state != RANAL_CONN_ESTABLISHED)
                 return 0;
-        
+
         /* Check the conn's queues are moving.  These are "belt+braces" checks,
          * in case of hardware/software errors that make this conn seem
          * responsive even though it isn't progressing its message queues. */
@@ -939,47 +941,49 @@ kranal_check_conn (kra_conn_t *conn)
 
         list_for_each (ttmp, &conn->rac_fmaq) {
                 tx = list_entry(ttmp, kra_tx_t, tx_list);
-                
+
                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
                         spin_unlock_irqrestore(&conn->rac_lock, flags);
-                        CERROR("tx on fmaq for "LPX64" blocked %d seconds\n",
-                               conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
+                        CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
+                               conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
                         return -ETIMEDOUT;
                 }
         }
-        
+
         list_for_each (ttmp, &conn->rac_rdmaq) {
                 tx = list_entry(ttmp, kra_tx_t, tx_list);
-                
+
                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
                         spin_unlock_irqrestore(&conn->rac_lock, flags);
-                        CERROR("tx on rdmaq for "LPX64" blocked %d seconds\n",
-                               conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
+                        CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
+                               conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
                         return -ETIMEDOUT;
                 }
         }
-        
+
         list_for_each (ttmp, &conn->rac_replyq) {
                 tx = list_entry(ttmp, kra_tx_t, tx_list);
-                
+
                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
                         spin_unlock_irqrestore(&conn->rac_lock, flags);
-                        CERROR("tx on replyq for "LPX64" blocked %d seconds\n",
-                               conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
+                        CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
+                               conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
                         return -ETIMEDOUT;
                 }
         }
-        
+
         spin_unlock_irqrestore(&conn->rac_lock, flags);
         return 0;
 }
 
 void
-kranal_check_conns (int idx, unsigned long *min_timeoutp)
+kranal_reaper_check (int idx, unsigned long *min_timeoutp)
 {
         struct list_head  *conns = &kranal_data.kra_conns[idx];
         struct list_head  *ctmp;
         kra_conn_t        *conn;
+        unsigned long      flags;
+        int                rc;
 
  again:
         /* NB. We expect to check all the conns and not find any problems, so
@@ -987,30 +991,40 @@ kranal_check_conns (int idx, unsigned long *min_timeoutp)
         read_lock(&kranal_data.kra_global_lock);
 
         list_for_each (ctmp, conns) {
-                conn = list_entry(ptmp, kra_conn_t, rac_hashlist);
+                conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
 
                 if (conn->rac_timeout < *min_timeoutp )
                         *min_timeoutp = conn->rac_timeout;
                 if (conn->rac_keepalive < *min_timeoutp )
                         *min_timeoutp = conn->rac_keepalive;
 
-                rc = kranal_check_conn(conn);
+                rc = kranal_check_conn_timeouts(conn);
                 if (rc == 0)
                         continue;
 
                 kranal_conn_addref(conn);
                 read_unlock(&kranal_data.kra_global_lock);
 
-                CERROR("Check on conn to "LPX64"failed: %d\n",
-                       conn->rac_peer->rap_nid, rc);
+                CERROR("Conn to "LPX64", cqid %d timed out\n",
+                       conn->rac_peer->rap_nid, conn->rac_cqid);
+
+                write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
-                write_lock_irqsave(&kranal_data.kra_global_lock);
+                switch (conn->rac_state) {
+                default:
+                        LBUG();
 
-                if (!conn->rac_closing)
+                case RANAL_CONN_ESTABLISHED:
                         kranal_close_conn_locked(conn, -ETIMEDOUT);
-                else
+                        break;
+
+                case RANAL_CONN_CLOSING:
                         kranal_terminate_conn_locked(conn);
-                        
+                        break;
+                }
+
+                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+
                 kranal_conn_decref(conn);
 
                 /* start again now I've dropped the lock */
@@ -1023,13 +1037,15 @@ kranal_check_conns (int idx, unsigned long *min_timeoutp)
 int
 kranal_connd (void *arg)
 {
-       char               name[16];
+        long               id = (long)arg;
+        char               name[16];
         wait_queue_t       wait;
         unsigned long      flags;
         kra_peer_t        *peer;
-        int                i;
+        kra_acceptsock_t  *ras;
+        int                did_something;
 
-       snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
+        snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
         kportal_daemonize(name);
         kportal_blockallsigs();
 
@@ -1038,29 +1054,50 @@ kranal_connd (void *arg)
         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
 
         while (!kranal_data.kra_shutdown) {
-                /* Safe: kra_shutdown only set when quiescent */
+                did_something = 0;
+
+                if (!list_empty(&kranal_data.kra_connd_acceptq)) {
+                        ras = list_entry(kranal_data.kra_connd_acceptq.next,
+                                         kra_acceptsock_t, ras_list);
+                        list_del(&ras->ras_list);
+
+                        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+
+                        CDEBUG(D_NET,"About to handshake someone\n");
+
+                        kranal_conn_handshake(ras->ras_sock, NULL);
+                        kranal_free_acceptsock(ras);
+
+                        CDEBUG(D_NET,"Finished handshaking someone\n");
+
+                        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+                        did_something = 1;
+                }
 
                 if (!list_empty(&kranal_data.kra_connd_peers)) {
                         peer = list_entry(kranal_data.kra_connd_peers.next,
-                                         kra_peer_t, rap_connd_list);
-                        
+                                          kra_peer_t, rap_connd_list);
+
                         list_del_init(&peer->rap_connd_list);
                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
 
                         kranal_connect(peer);
-                        kranal_put_peer(peer);
+                        kranal_peer_decref(peer);
 
                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
-                       continue;
+                        did_something = 1;
                 }
 
+                if (did_something)
+                        continue;
+
                 set_current_state(TASK_INTERRUPTIBLE);
                 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
-                
+
                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
 
                 schedule ();
-                
+
                 set_current_state(TASK_RUNNING);
                 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
 
@@ -1074,14 +1111,14 @@ kranal_connd (void *arg)
 }
 
 void
-kranal_update_reaper_timeout(long timeout) 
+kranal_update_reaper_timeout(long timeout)
 {
         unsigned long   flags;
 
         LASSERT (timeout > 0);
-        
+
         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
-        
+
         if (timeout < kranal_data.kra_new_min_timeout)
                 kranal_data.kra_new_min_timeout = timeout;
 
@@ -1093,9 +1130,6 @@ kranal_reaper (void *arg)
 {
         wait_queue_t       wait;
         unsigned long      flags;
-        kra_conn_t        *conn;
-        kra_peer_t        *peer;
-        unsigned long      flags;
         long               timeout;
         int                i;
         int                conn_entries = kranal_data.kra_conn_hash_size;
@@ -1104,106 +1138,100 @@ kranal_reaper (void *arg)
         unsigned long      next_check_time = jiffies;
         long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
         long               current_min_timeout = 1;
-        
+
         kportal_daemonize("kranal_reaper");
         kportal_blockallsigs();
 
         init_waitqueue_entry(&wait, current);
 
         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
-        kranal_data.kra_new_min_timeout = 1;
 
         while (!kranal_data.kra_shutdown) {
+                /* I wake up every 'p' seconds to check for timeouts on some
+                 * more peers.  I try to check every connection 'n' times
+                 * within the global minimum of all keepalive and timeout
+                 * intervals, to ensure I attend to every connection within
+                 * (n+1)/n times its timeout intervals. */
+                const int     p = 1;
+                const int     n = 3;
+                unsigned long min_timeout;
+                int           chunk;
 
                 /* careful with the jiffy wrap... */
                 timeout = (long)(next_check_time - jiffies);
-                if (timeout <= 0) {
-                
-                        /* I wake up every 'p' seconds to check for
-                         * timeouts on some more peers.  I try to check
-                         * every connection 'n' times within the global
-                         * minimum of all keepalive and timeout intervals,
-                         * to ensure I attend to every connection within
-                         * (n+1)/n times its timeout intervals. */
-                
-                        const int     p = 1;
-                        const int     n = 3;
-                        unsigned long min_timeout;
-                        int           chunk;
-
-                        if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
-                                /* new min timeout set: restart min timeout scan */
-                                next_min_timeout = MAX_SCHEDULE_TIMEOUT;
-                                base_index = conn_index - 1;
-                                if (base_index < 0)
-                                        base_index = conn_entries - 1;
-
-                                if (kranal_data.kra_new_min_timeout < current_min_timeout) {
-                                        current_min_timeout = kranal_data.kra_new_min_timeout;
-                                        CWARN("Set new min timeout %ld\n",
-                                              current_min_timeout);
-                                }
-
-                                kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
-                        }
-                        min_timeout = current_min_timeout;
-
-                        spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
-                                               flags);
-
-                        LASSERT (min_timeout > 0);
-
-                        /* Compute how many table entries to check now so I
-                         * get round the whole table fast enough (NB I do
-                         * this at fixed intervals of 'p' seconds) */
-                       chunk = conn_entries;
-                        if (min_timeout > n * p)
-                                chunk = (chunk * n * p) / min_timeout;
-                        if (chunk == 0)
-                                chunk = 1;
-
-                        for (i = 0; i < chunk; i++) {
-                                kranal_check_conns(conn_index, 
-                                                   &next_min_timeout);
-                                conn_index = (conn_index + 1) % conn_entries;
-                        }
+                if (timeout > 0) {
+                        set_current_state(TASK_INTERRUPTIBLE);
+                        add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
+
+                        spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
 
-                        next_check_time += p * HZ;
+                        schedule_timeout(timeout);
 
                         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
 
-                        if (((conn_index - chunk <= base_index &&
-                              base_index < conn_index) ||
-                             (conn_index - conn_entries - chunk <= base_index &&
-                              base_index < conn_index - conn_entries))) {
-
-                                /* Scanned all conns: set current_min_timeout... */
-                                if (current_min_timeout != next_min_timeout) {
-                                        current_min_timeout = next_min_timeout;                                        
-                                        CWARN("Set new min timeout %ld\n",
-                                              current_min_timeout);
-                                }
-
-                                /* ...and restart min timeout scan */
-                                next_min_timeout = MAX_SCHEDULE_TIMEOUT;
-                                base_index = conn_index - 1;
-                                if (base_index < 0)
-                                        base_index = conn_entries - 1;
-                        }
+                        set_current_state(TASK_RUNNING);
+                        remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
+                        continue;
                 }
 
-                set_current_state(TASK_INTERRUPTIBLE);
-                add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
+                if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
+                        /* new min timeout set: restart min timeout scan */
+                        next_min_timeout = MAX_SCHEDULE_TIMEOUT;
+                        base_index = conn_index - 1;
+                        if (base_index < 0)
+                                base_index = conn_entries - 1;
+
+                        if (kranal_data.kra_new_min_timeout < current_min_timeout) {
+                                current_min_timeout = kranal_data.kra_new_min_timeout;
+                                CDEBUG(D_NET, "Set new min timeout %ld\n",
+                                       current_min_timeout);
+                        }
+
+                        kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
+                }
+                min_timeout = current_min_timeout;
 
                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
 
-                busy_loops = 0;
-                schedule_timeout(timeout);
+                LASSERT (min_timeout > 0);
+
+                /* Compute how many table entries to check now so I get round
+                 * the whole table fast enough given that I do this at fixed
+                 * intervals of 'p' seconds) */
+                chunk = conn_entries;
+                if (min_timeout > n * p)
+                        chunk = (chunk * n * p) / min_timeout;
+                if (chunk == 0)
+                        chunk = 1;
+
+                for (i = 0; i < chunk; i++) {
+                        kranal_reaper_check(conn_index,
+                                            &next_min_timeout);
+                        conn_index = (conn_index + 1) % conn_entries;
+                }
+
+                next_check_time += p * HZ;
 
                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
 
-                set_current_state(TASK_RUNNING);
-                remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
+                if (((conn_index - chunk <= base_index &&
+                      base_index < conn_index) ||
+                     (conn_index - conn_entries - chunk <= base_index &&
+                      base_index < conn_index - conn_entries))) {
+
+                        /* Scanned all conns: set current_min_timeout... */
+                        if (current_min_timeout != next_min_timeout) {
+                                current_min_timeout = next_min_timeout;
+                                CDEBUG(D_NET, "Set new min timeout %ld\n",
+                                       current_min_timeout);
+                        }
+
+                        /* ...and restart min timeout scan */
+                        next_min_timeout = MAX_SCHEDULE_TIMEOUT;
+                        base_index = conn_index - 1;
+                        if (base_index < 0)
+                                base_index = conn_entries - 1;
+                }
         }
 
         kranal_thread_fini();
@@ -1211,82 +1239,170 @@ kranal_reaper (void *arg)
 }
 
 void
-kranal_process_rdmaq (__u32 cqid)
+kranal_check_rdma_cq (kra_device_t *dev)
 {
         kra_conn_t          *conn;
         kra_tx_t            *tx;
         RAP_RETURN           rrc;
         unsigned long        flags;
         RAP_RDMA_DESCRIPTOR *desc;
-        
-        read_lock(&kranal_data.kra_global_lock);
+        __u32                cqid;
+        __u32                event_type;
 
-        conn = kranal_cqid2conn_locked(cqid);
-        LASSERT (conn != NULL);
+        for (;;) {
+                rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
+                if (rrc == RAP_NOT_DONE) {
+                        CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
+                        return;
+                }
 
-        rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
-        LASSERT (rrc == RAP_SUCCESS);
+                LASSERT (rrc == RAP_SUCCESS);
+                LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
 
-        spin_lock_irqsave(&conn->rac_lock, flags);
+                read_lock(&kranal_data.kra_global_lock);
 
-        LASSERT (!list_empty(&conn->rac_rdmaq));
-        tx = list_entry(con->rac_rdmaq.next, kra_tx_t, tx_list);
-        list_del(&tx->tx_list);
+                conn = kranal_cqid2conn_locked(cqid);
+                if (conn == NULL) {
+                        /* Conn was destroyed? */
+                        CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
+                        read_unlock(&kranal_data.kra_global_lock);
+                        continue;
+                }
 
-        LASSERT(desc->AppPtr == (void *)tx);
-        LASSERT(desc->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
-                desc->tx_msg.ram_type == RANAL_MSG_GET_DONE);
+                rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
+                LASSERT (rrc == RAP_SUCCESS);
 
-        list_add_tail(&tx->tx_list, &conn->rac_fmaq);
-        tx->tx_qtime = jiffies;
-        
-        spin_unlock_irqrestore(&conn->rac_lock, flags);
+                CDEBUG(D_NET, "Completed %p\n",
+                       list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
 
-        /* Get conn's fmaq processed, now I've just put something there */
-        kranal_schedule_conn(conn);
+                spin_lock_irqsave(&conn->rac_lock, flags);
 
-        read_unlock(&kranal_data.kra_global_lock);
+                LASSERT (!list_empty(&conn->rac_rdmaq));
+                tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
+                list_del(&tx->tx_list);
+
+                LASSERT(desc->AppPtr == (void *)tx);
+                LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
+                        tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
+
+                list_add_tail(&tx->tx_list, &conn->rac_fmaq);
+                tx->tx_qtime = jiffies;
+
+                spin_unlock_irqrestore(&conn->rac_lock, flags);
+
+                /* Get conn's fmaq processed, now I've just put something
+                 * there */
+                kranal_schedule_conn(conn);
+
+                read_unlock(&kranal_data.kra_global_lock);
+        }
+}
+
+void
+kranal_check_fma_cq (kra_device_t *dev)
+{
+        kra_conn_t         *conn;
+        RAP_RETURN          rrc;
+        __u32               cqid;
+        __u32               event_type;
+        struct list_head   *conns;
+        struct list_head   *tmp;
+        int                 i;
+
+        for (;;) {
+                rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
+                if (rrc == RAP_NOT_DONE) {
+                        CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
+                        return;
+                }
+
+                LASSERT (rrc == RAP_SUCCESS);
+
+                if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
+
+                        read_lock(&kranal_data.kra_global_lock);
+
+                        conn = kranal_cqid2conn_locked(cqid);
+                        if (conn == NULL) {
+                                CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
+                                       cqid);
+                        } else {
+                                CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
+                                       conn, cqid);
+                                kranal_schedule_conn(conn);
+                        }
+
+                        read_unlock(&kranal_data.kra_global_lock);
+                        continue;
+                }
+
+                /* FMA CQ has overflowed: check ALL conns */
+                CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
+
+                for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
+
+                        read_lock(&kranal_data.kra_global_lock);
+
+                        conns = &kranal_data.kra_conns[i];
+
+                        list_for_each (tmp, conns) {
+                                conn = list_entry(tmp, kra_conn_t,
+                                                  rac_hashlist);
+
+                                if (conn->rac_device == dev)
+                                        kranal_schedule_conn(conn);
+                        }
+
+                        /* don't block write lockers for too long... */
+                        read_unlock(&kranal_data.kra_global_lock);
+                }
+        }
 }
 
 int
 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
                void *immediate, int immediatenob)
 {
-        int   sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
+        int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
+        RAP_RETURN rrc;
 
-        LASSERT (sizeof(*msg) <= RANAL_FMA_PREFIX_LEN);
+        CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
+               conn, msg, msg->ram_type, sync ? "(sync)" : "",
+               immediate, immediatenob);
+
+        LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
-                 immediatenob <= RANAL_FMA_MAX_DATA_LEN :
+                 immediatenob <= RANAL_FMA_MAX_DATA :
                  immediatenob == 0);
 
-        msg->ram_incarnation = conn->rac_incarnation;
+        msg->ram_connstamp = conn->rac_my_connstamp;
         msg->ram_seq = conn->rac_tx_seq;
 
         if (sync)
-                rrc = RapkFmaSyncSend(conn->rac_device.rad_handle,
+                rrc = RapkFmaSyncSend(conn->rac_rihandle,
                                       immediate, immediatenob,
                                       msg, sizeof(*msg));
         else
-                rrc = RapkFmaSend(conn->rac_device.rad_handle,
+                rrc = RapkFmaSend(conn->rac_rihandle,
                                   immediate, immediatenob,
                                   msg, sizeof(*msg));
 
         switch (rrc) {
+        default:
+                LBUG();
+
         case RAP_SUCCESS:
                 conn->rac_last_tx = jiffies;
                 conn->rac_tx_seq++;
                 return 0;
-                
+
         case RAP_NOT_DONE:
                 return -EAGAIN;
-
-        default:
-                LBUG();
         }
 }
 
-int
-kranal_process_fmaq (kra_conn_t *conn) 
+void
+kranal_process_fmaq (kra_conn_t *conn)
 {
         unsigned long flags;
         int           more_to_do;
@@ -1294,27 +1410,52 @@ kranal_process_fmaq (kra_conn_t *conn)
         int           rc;
         int           expect_reply;
 
-        /* NB I will be rescheduled some via a rad_fma_cq event if my FMA is
-         * out of credits when I try to send right now... */
+        /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
+         *       However I will be rescheduled some by an FMA completion event
+         *       when I eventually get some.
+         * NB 2. Sampling rac_state here races with setting it elsewhere.
+         *       But it doesn't matter if I try to send a "real" message just
+         *       as I start closing because I'll get scheduled to send the
+         *       close anyway. */
 
-        if (conn->rac_closing) {
+        /* Not racing with incoming message processing! */
+        LASSERT (current == conn->rac_device->rad_scheduler);
 
+        if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
                 if (!list_empty(&conn->rac_rdmaq)) {
-                        /* Can't send CLOSE yet; I'm still waiting for RDMAs I
-                         * posted to finish */
+                        /* RDMAs in progress */
                         LASSERT (!conn->rac_close_sent);
-                        kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
-                        kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
-                        return 0;
+
+                        if (time_after_eq(jiffies,
+                                          conn->rac_last_tx +
+                                          conn->rac_keepalive * HZ)) {
+                                CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
+                                kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
+                                kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
+                        }
+                        return;
                 }
 
                 if (conn->rac_close_sent)
-                        return 0;
-                
+                        return;
+
+                CWARN("sending CLOSE to "LPX64"\n", conn->rac_peer->rap_nid);
                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
-                conn->rac_close_sent = (rc == 0);
-                return 0;
+                if (rc != 0)
+                        return;
+
+                conn->rac_close_sent = 1;
+                if (!conn->rac_close_recvd)
+                        return;
+
+                write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+
+                if (conn->rac_state == RANAL_CONN_CLOSING)
+                        kranal_terminate_conn_locked(conn);
+
+                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+                return;
         }
 
         spin_lock_irqsave(&conn->rac_lock, flags);
@@ -1323,13 +1464,15 @@ kranal_process_fmaq (kra_conn_t *conn)
 
                 spin_unlock_irqrestore(&conn->rac_lock, flags);
 
-                if (time_after_eq(conn->rac_last_tx + conn->rac_keepalive)) {
+                if (time_after_eq(jiffies,
+                                  conn->rac_last_tx + conn->rac_keepalive * HZ)) {
+                        CDEBUG(D_NET, "sending NOOP (idle)\n");
                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
                 }
-                return 0;
+                return;
         }
-        
+
         tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
         list_del(&tx->tx_list);
         more_to_do = !list_empty(&conn->rac_fmaq);
@@ -1337,20 +1480,26 @@ kranal_process_fmaq (kra_conn_t *conn)
         spin_unlock_irqrestore(&conn->rac_lock, flags);
 
         expect_reply = 0;
+        CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
+               tx, tx->tx_msg.ram_type, tx->tx_cookie);
         switch (tx->tx_msg.ram_type) {
         default:
                 LBUG();
-                
+
         case RANAL_MSG_IMMEDIATE:
+                rc = kranal_sendmsg(conn, &tx->tx_msg,
+                                    tx->tx_buffer, tx->tx_nob);
+                expect_reply = 0;
+                break;
+
         case RANAL_MSG_PUT_NAK:
         case RANAL_MSG_PUT_DONE:
         case RANAL_MSG_GET_NAK:
         case RANAL_MSG_GET_DONE:
-                rc = kranal_sendmsg(conn, &tx->tx_msg,
-                                    tx->tx_buffer, tx->tx_nob);
+                rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
                 expect_reply = 0;
                 break;
-                
+
         case RANAL_MSG_PUT_REQ:
                 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
@@ -1367,7 +1516,8 @@ kranal_process_fmaq (kra_conn_t *conn)
                 kranal_map_buffer(tx);
                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
-                tx->tx_msg.ram_u.get.ragm_desc.rard_addr = tx->tx_buffer;
+                tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
+                        (__u64)((unsigned long)tx->tx_buffer);
                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
                 expect_reply = 1;
@@ -1375,26 +1525,32 @@ kranal_process_fmaq (kra_conn_t *conn)
         }
 
         if (rc == -EAGAIN) {
-                /* replace at the head of the list for later */
+                /* I need credits to send this.  Replace tx at the head of the
+                 * fmaq and I'll get rescheduled when credits appear */
+                CDEBUG(D_NET, "EAGAIN on %p\n", conn);
                 spin_lock_irqsave(&conn->rac_lock, flags);
                 list_add(&tx->tx_list, &conn->rac_fmaq);
                 spin_unlock_irqrestore(&conn->rac_lock, flags);
-
-                return 0;
+                return;
         }
 
         LASSERT (rc == 0);
-        
+
         if (!expect_reply) {
                 kranal_tx_done(tx, 0);
         } else {
+                /* LASSERT(current) above ensures this doesn't race with reply
+                 * processing */
                 spin_lock_irqsave(&conn->rac_lock, flags);
                 list_add_tail(&tx->tx_list, &conn->rac_replyq);
                 tx->tx_qtime = jiffies;
                 spin_unlock_irqrestore(&conn->rac_lock, flags);
         }
 
-        return more_to_do;
+        if (more_to_do) {
+                CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
+                kranal_schedule_conn(conn);
+        }
 }
 
 static inline void
@@ -1404,52 +1560,73 @@ kranal_swab_rdma_desc (kra_rdma_desc_t *d)
         __swab16s(&d->rard_key.Cookie);
         __swab16s(&d->rard_key.MdHandle);
         __swab32s(&d->rard_key.Flags);
-        __swab64s(&d->rard_addr);
+        __swab64s(&d->rard_addr.AddressBits);
         __swab32s(&d->rard_nob);
 }
 
 kra_tx_t *
 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
 {
-        unsigned long     flags;
         struct list_head *ttmp;
         kra_tx_t         *tx;
-        
+        unsigned long     flags;
+
+        spin_lock_irqsave(&conn->rac_lock, flags);
+
         list_for_each(ttmp, &conn->rac_replyq) {
                 tx = list_entry(ttmp, kra_tx_t, tx_list);
-                
+
+                CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
+                       tx, tx->tx_msg.ram_type, tx->tx_cookie);
+
                 if (tx->tx_cookie != cookie)
                         continue;
-                
+
                 if (tx->tx_msg.ram_type != type) {
+                        spin_unlock_irqrestore(&conn->rac_lock, flags);
                         CWARN("Unexpected type %x (%x expected) "
                               "matched reply from "LPX64"\n",
                               tx->tx_msg.ram_type, type,
                               conn->rac_peer->rap_nid);
                         return NULL;
                 }
+
+                list_del(&tx->tx_list);
+                spin_unlock_irqrestore(&conn->rac_lock, flags);
+                return tx;
         }
-        
-        CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid);
+
+        spin_unlock_irqrestore(&conn->rac_lock, flags);
+        CWARN("Unmatched reply %02x/"LPX64" from "LPX64"\n",
+              type, cookie, conn->rac_peer->rap_nid);
         return NULL;
 }
 
-int
-kranal_process_receives(kra_conn_t *conn)
+void
+kranal_check_fma_rx (kra_conn_t *conn)
 {
         unsigned long flags;
         __u32         seq;
-        __u32         nob;
+        kra_tx_t     *tx;
         kra_msg_t    *msg;
-        RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &msg);
+        void         *prefix;
+        RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
         kra_peer_t   *peer = conn->rac_peer;
 
         if (rrc == RAP_NOT_DONE)
-                return 0;
-        
+                return;
+
+        CDEBUG(D_NET, "RX on %p\n", conn);
+
         LASSERT (rrc == RAP_SUCCESS);
         conn->rac_last_rx = jiffies;
-        seq = conn->rac_seq++;
+        seq = conn->rac_rx_seq++;
+        msg = (kra_msg_t *)prefix;
+
+        /* stash message for portals callbacks they'll NULL
+         * rac_rxmsg if they consume it */
+        LASSERT (conn->rac_rxmsg == NULL);
+        conn->rac_rxmsg = msg;
 
         if (msg->ram_magic != RANAL_MSG_MAGIC) {
                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
@@ -1462,7 +1639,7 @@ kranal_process_receives(kra_conn_t *conn)
                 __swab16s(&msg->ram_version);
                 __swab16s(&msg->ram_type);
                 __swab64s(&msg->ram_srcnid);
-                __swab64s(&msg->ram_incarnation);
+                __swab64s(&msg->ram_connstamp);
                 __swab32s(&msg->ram_seq);
 
                 /* NB message type checked below; NOT here... */
@@ -1474,7 +1651,7 @@ kranal_process_receives(kra_conn_t *conn)
                 case RANAL_MSG_GET_REQ:
                         kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
                         break;
-                        
+
                 default:
                         break;
                 }
@@ -1491,15 +1668,15 @@ kranal_process_receives(kra_conn_t *conn)
                        msg->ram_srcnid, peer->rap_nid);
                 goto out;
         }
-        
-        if (msg->ram_incarnation != conn->rac_incarnation) {
-                CERROR("Unexpected incarnation "LPX64"("LPX64
+
+        if (msg->ram_connstamp != conn->rac_peer_connstamp) {
+                CERROR("Unexpected connstamp "LPX64"("LPX64
                        " expected) from "LPX64"\n",
-                       msg->ram_incarnation, conn->rac_incarnation,
+                       msg->ram_connstamp, conn->rac_peer_connstamp,
                        peer->rap_nid);
                 goto out;
         }
-        
+
         if (msg->ram_seq != seq) {
                 CERROR("Unexpected sequence number %d(%d expected) from "
                        LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
@@ -1507,38 +1684,48 @@ kranal_process_receives(kra_conn_t *conn)
         }
 
         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
-                /* This message signals RDMA completion: wait now... */
+                /* This message signals RDMA completion... */
                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
                 LASSERT (rrc == RAP_SUCCESS);
         }
+
+        if (conn->rac_close_recvd) {
+                CERROR("Unexpected message %d after CLOSE from "LPX64"\n",
+                       msg->ram_type, conn->rac_peer->rap_nid);
+                goto out;
+        }
+
         if (msg->ram_type == RANAL_MSG_CLOSE) {
+                CWARN("RX CLOSE from "LPX64"\n", conn->rac_peer->rap_nid);
                 conn->rac_close_recvd = 1;
-                write_lock_irqsave(&kranal_data.kra_global_lock);
+                write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
-                if (!conn->rac_closing)
-                        kranal_close_conn_locked(conn, -ETIMEDOUT);
-                else if (conn->rac_close_sent)
+                if (conn->rac_state == RANAL_CONN_ESTABLISHED)
+                        kranal_close_conn_locked(conn, 0);
+                else if (conn->rac_state == RANAL_CONN_CLOSING &&
+                         conn->rac_close_sent)
                         kranal_terminate_conn_locked(conn);
-                
+
+                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
                 goto out;
         }
 
-        if (conn->rac_closing)
+        if (conn->rac_state != RANAL_CONN_ESTABLISHED)
                 goto out;
-        
-        conn->rac_rxmsg = msg;                  /* stash message for portals callbacks */
-                                                /* they'll NULL rac_rxmsg if they consume it */
+
         switch (msg->ram_type) {
         case RANAL_MSG_NOOP:
                 /* Nothing to do; just a keepalive */
+                CDEBUG(D_NET, "RX NOOP on %p\n", conn);
                 break;
-                
+
         case RANAL_MSG_IMMEDIATE:
+                CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
                 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
                 break;
-                
+
         case RANAL_MSG_PUT_REQ:
+                CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
                 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
 
                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
@@ -1547,35 +1734,39 @@ kranal_process_receives(kra_conn_t *conn)
                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
                 if (tx == NULL)
                         break;
-                
-                tx->tx_msg.ram_u.racm_cookie = msg->msg_u.putreq.raprm_cookie;
+
+                tx->tx_msg.ram_u.completion.racm_cookie =
+                        msg->ram_u.putreq.raprm_cookie;
                 kranal_post_fma(conn, tx);
                 break;
 
         case RANAL_MSG_PUT_NAK:
+                CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
                                         msg->ram_u.completion.racm_cookie);
                 if (tx == NULL)
                         break;
-                
+
                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
                 kranal_tx_done(tx, -ENOENT);    /* no match */
                 break;
-                
+
         case RANAL_MSG_PUT_ACK:
+                CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
                                         msg->ram_u.putack.rapam_src_cookie);
                 if (tx == NULL)
                         break;
 
                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
-                            &msg->ram_u.putack.rapam_desc, 
-                            msg->msg_u.putack.rapam_desc.rard_nob,
+                            &msg->ram_u.putack.rapam_desc,
+                            msg->ram_u.putack.rapam_desc.rard_nob,
                             msg->ram_u.putack.rapam_dst_cookie);
                 break;
 
         case RANAL_MSG_PUT_DONE:
+                CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
                                         msg->ram_u.completion.racm_cookie);
                 if (tx == NULL)
@@ -1587,8 +1778,9 @@ kranal_process_receives(kra_conn_t *conn)
                 break;
 
         case RANAL_MSG_GET_REQ:
-                lib_parse(&kranal_lib, &msg->ram_u.getreq.ragm_hdr, conn);
-                
+                CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
+                lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
+
                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
                         break;
 
@@ -1596,27 +1788,29 @@ kranal_process_receives(kra_conn_t *conn)
                 if (tx == NULL)
                         break;
 
-                tx->tx_msg.ram_u.racm_cookie = msg->msg_u.getreq.ragm_cookie;
+                tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
                 kranal_post_fma(conn, tx);
                 break;
-                
+
         case RANAL_MSG_GET_NAK:
+                CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
                                         msg->ram_u.completion.racm_cookie);
                 if (tx == NULL)
                         break;
-                
+
                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
                 kranal_tx_done(tx, -ENOENT);    /* no match */
                 break;
-                
+
         case RANAL_MSG_GET_DONE:
+                CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
                                         msg->ram_u.completion.racm_cookie);
                 if (tx == NULL)
                         break;
-                
+
                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
                 kranal_tx_done(tx, 0);
@@ -1624,10 +1818,37 @@ kranal_process_receives(kra_conn_t *conn)
         }
 
  out:
-        if (conn->rac_msg != NULL)
+        if (conn->rac_rxmsg != NULL)
                 kranal_consume_rxmsg(conn, NULL, 0);
 
-        return 1;
+        /* check again later */
+        kranal_schedule_conn(conn);
+}
+
+void
+kranal_complete_closed_conn (kra_conn_t *conn)
+{
+        kra_tx_t   *tx;
+
+        LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
+        LASSERT (list_empty(&conn->rac_list));
+        LASSERT (list_empty(&conn->rac_hashlist));
+
+        while (!list_empty(&conn->rac_fmaq)) {
+                tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
+
+                list_del(&tx->tx_list);
+                kranal_tx_done(tx, -ECONNABORTED);
+        }
+
+        LASSERT (list_empty(&conn->rac_rdmaq));
+
+        while (!list_empty(&conn->rac_replyq)) {
+                tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
+
+                list_del(&tx->tx_list);
+                kranal_tx_done(tx, -ECONNABORTED);
+        }
 }
 
 int
@@ -1638,89 +1859,63 @@ kranal_scheduler (void *arg)
         char            name[16];
         kra_conn_t     *conn;
         unsigned long   flags;
-        int             rc;
-        int             i;
-        __u32           cqid;
-        int             did_something;
         int             busy_loops = 0;
 
-        snprintf(name, sizeof(name), "kranal_sd_%02ld", dev->rad_idx);
+        snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
         kportal_daemonize(name);
         kportal_blockallsigs();
 
+        dev->rad_scheduler = current;
         init_waitqueue_entry(&wait, current);
 
         spin_lock_irqsave(&dev->rad_lock, flags);
 
         while (!kranal_data.kra_shutdown) {
                 /* Safe: kra_shutdown only set when quiescent */
-                
-               if (busy_loops++ >= RANAL_RESCHED) {
+
+                if (busy_loops++ >= RANAL_RESCHED) {
                         spin_unlock_irqrestore(&dev->rad_lock, flags);
 
                         our_cond_resched();
-                       busy_loops = 0;
+                        busy_loops = 0;
 
                         spin_lock_irqsave(&dev->rad_lock, flags);
-               }
-
-                did_something = 0;
+                }
 
                 if (dev->rad_ready) {
+                        /* Device callback fired since I last checked it */
                         dev->rad_ready = 0;
                         spin_unlock_irqrestore(&dev->rad_lock, flags);
 
-                        rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type);
+                        kranal_check_rdma_cq(dev);
+                        kranal_check_fma_cq(dev);
 
-                        LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE);
-                        LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
-                        
-                        if (rrc == RAP_SUCCESS) {
-                                kranal_process_rdmaq(cqid);
-                                did_something = 1;
-                        }
-                        
-                        rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type);
-                        LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE);
-                        
-                        if (rrc == RAP_SUCCESS) {
-                                if ((event_type & RAPK_CQ_EVENT_OVERRUN) != 0)
-                                        kranal_schedule_dev(dev);
-                                else
-                                        kranal_schedule_cqid(cqid);
-                                did_something = 1;
-                        }
-                        
                         spin_lock_irqsave(&dev->rad_lock, flags);
-
-                        /* If there were no completions to handle, I leave
-                         * rad_ready clear.  NB I cleared it BEFORE I checked
-                         * the completion queues since I'm racing with the
-                         * device callback. */
-
-                        if (did_something)
-                                dev->rad_ready = 1;
                 }
-               
+
                 if (!list_empty(&dev->rad_connq)) {
+                        /* Connection needs attention */
                         conn = list_entry(dev->rad_connq.next,
                                           kra_conn_t, rac_schedlist);
-                        list_del(&conn->rac_schedlist);
+                        list_del_init(&conn->rac_schedlist);
+                        LASSERT (conn->rac_scheduled);
+                        conn->rac_scheduled = 0;
                         spin_unlock_irqrestore(&dev->rad_lock, flags);
 
-                        LASSERT (conn->rac_scheduled);
+                        kranal_check_fma_rx(conn);
+                        kranal_process_fmaq(conn);
 
-                        resched  = kranal_process_fmaq(conn);
-                        resched |= kranal_process_receives(conn);
-                        did_something = 1;
+                        if (conn->rac_state == RANAL_CONN_CLOSED)
+                                kranal_complete_closed_conn(conn);
+
+                        kranal_conn_decref(conn);
 
                         spin_lock_irqsave(&dev->rad_lock, flags);
-                        if (resched)
-                                list_add_tail(&conn->rac_schedlist,
-                                              &dev->rad_connq);
+                        continue;
                 }
 
-                if (did_something)
+                /* recheck device callback fired before sleeping */
+                if (dev->rad_ready)
                         continue;
 
                 add_wait_queue(&dev->rad_waitq, &wait);
@@ -1739,6 +1934,7 @@ kranal_scheduler (void *arg)
 
         spin_unlock_irqrestore(&dev->rad_lock, flags);
 
+        dev->rad_scheduler = NULL;
         kranal_thread_fini();
         return 0;
 }