Whamcloud - gitweb
i=liangzhen
[fs/lustre-release.git] / lnet / klnds / viblnd / viblnd_cb.c
index 12dcdfd..3b05751 100644 (file)
  *
  */
 
-#include "vibnal.h"
+#include "viblnd.h"
 
 void
 kibnal_tx_done (kib_tx_t *tx)
 {
-        ptl_err_t        ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL;
-        int              i;
+        lnet_msg_t *lntmsg[2];
+        int         rc = tx->tx_status;
+        int         i;
 
         LASSERT (!in_interrupt());
         LASSERT (!tx->tx_queued);               /* mustn't be queued for sending */
         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting sent callback */
         LASSERT (!tx->tx_waiting);              /* mustn't be awaiting peer response */
 
-#if !IBNAL_WHOLE_MEM
-        switch (tx->tx_mapped) {
-        default:
-                LBUG();
-
-        case KIB_TX_UNMAPPED:
-                break;
-
-        case KIB_TX_MAPPED: {
+#if IBNAL_USE_FMR
+        if (tx->tx_md.md_fmrcount == 0 ||
+            (rc != 0 && tx->tx_md.md_active)) {
                 vv_return_t      vvrc;
 
-                vvrc = vv_mem_region_destroy(kibnal_data.kib_hca,
-                                             tx->tx_md.md_handle);
+                /* mapping must be active (it dropped fmrcount to 0) */
+                LASSERT (tx->tx_md.md_active); 
+
+                vvrc = vv_unmap_fmr(kibnal_data.kib_hca,
+                                    1, &tx->tx_md.md_fmrhandle);
                 LASSERT (vvrc == vv_return_ok);
-                tx->tx_mapped = KIB_TX_UNMAPPED;
-                break;
-        }
+
+                tx->tx_md.md_fmrcount = *kibnal_tunables.kib_fmr_remaps;
         }
+        tx->tx_md.md_active = 0;
 #endif
-        for (i = 0; i < 2; i++) {
-                /* tx may have up to 2 libmsgs to finalise */
-                if (tx->tx_libmsg[i] == NULL)
-                        continue;
 
-                lib_finalize (&kibnal_lib, NULL, tx->tx_libmsg[i], ptlrc);
-                tx->tx_libmsg[i] = NULL;
-        }
+        /* tx may have up to 2 lnet msgs to finalise */
+        lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
+        lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
         
         if (tx->tx_conn != NULL) {
                 kibnal_conn_decref(tx->tx_conn);
@@ -73,89 +67,85 @@ kibnal_tx_done (kib_tx_t *tx)
 
         spin_lock(&kibnal_data.kib_tx_lock);
 
-        if (tx->tx_isnblk) {
-                list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_nblk_txs);
-        } else {
-                list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
-                wake_up (&kibnal_data.kib_idle_tx_waitq);
-        }
+        list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
 
         spin_unlock(&kibnal_data.kib_tx_lock);
+
+        /* delay finalize until my descs have been freed */
+        for (i = 0; i < 2; i++) {
+                if (lntmsg[i] == NULL)
+                        continue;
+
+                lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
+        }
 }
 
-kib_tx_t *
-kibnal_get_idle_tx (int may_block) 
+void
+kibnal_txlist_done (struct list_head *txlist, int status)
 {
-        kib_tx_t      *tx = NULL;
-        ENTRY;
-        
-        for (;;) {
-                spin_lock(&kibnal_data.kib_tx_lock);
+        kib_tx_t *tx;
 
-                /* "normal" descriptor is free */
-                if (!list_empty (&kibnal_data.kib_idle_txs)) {
-                        tx = list_entry (kibnal_data.kib_idle_txs.next,
-                                         kib_tx_t, tx_list);
-                        break;
-                }
+        while (!list_empty (txlist)) {
+                tx = list_entry (txlist->next, kib_tx_t, tx_list);
 
-                if (!may_block) {
-                        /* may dip into reserve pool */
-                        if (list_empty (&kibnal_data.kib_idle_nblk_txs)) {
-                                CERROR ("reserved tx desc pool exhausted\n");
-                                break;
-                        }
+                list_del (&tx->tx_list);
+                /* complete now */
+                tx->tx_waiting = 0;
+                tx->tx_status = status;
+                kibnal_tx_done (tx);
+        }
+}
 
-                        tx = list_entry (kibnal_data.kib_idle_nblk_txs.next,
-                                         kib_tx_t, tx_list);
-                        break;
-                }
+kib_tx_t *
+kibnal_get_idle_tx (void) 
+{
+        kib_tx_t      *tx;
+        
+        spin_lock(&kibnal_data.kib_tx_lock);
 
-                /* block for idle tx */
+        if (list_empty (&kibnal_data.kib_idle_txs)) {
                 spin_unlock(&kibnal_data.kib_tx_lock);
-
-                wait_event (kibnal_data.kib_idle_tx_waitq,
-                            !list_empty (&kibnal_data.kib_idle_txs) ||
-                            kibnal_data.kib_shutdown);
+                return NULL;
         }
 
-        if (tx != NULL) {
-                list_del (&tx->tx_list);
+        tx = list_entry (kibnal_data.kib_idle_txs.next, kib_tx_t, tx_list);
+        list_del (&tx->tx_list);
 
-                /* Allocate a new completion cookie.  It might not be needed,
-                 * but we've got a lock right now and we're unlikely to
-                 * wrap... */
-                tx->tx_cookie = kibnal_data.kib_next_tx_cookie++;
-#if IBNAL_WHOLE_MEM
-                LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
-#endif
-                LASSERT (tx->tx_nwrq == 0);
-                LASSERT (!tx->tx_queued);
-                LASSERT (tx->tx_sending == 0);
-                LASSERT (!tx->tx_waiting);
-                LASSERT (tx->tx_status == 0);
-                LASSERT (tx->tx_conn == NULL);
-                LASSERT (tx->tx_libmsg[0] == NULL);
-                LASSERT (tx->tx_libmsg[1] == NULL);
-        }
+        /* Allocate a new completion cookie.  It might not be needed,
+         * but we've got a lock right now and we're unlikely to
+         * wrap... */
+        tx->tx_cookie = kibnal_data.kib_next_tx_cookie++;
 
         spin_unlock(&kibnal_data.kib_tx_lock);
+
+        LASSERT (tx->tx_nwrq == 0);
+        LASSERT (!tx->tx_queued);
+        LASSERT (tx->tx_sending == 0);
+        LASSERT (!tx->tx_waiting);
+        LASSERT (tx->tx_status == 0);
+        LASSERT (tx->tx_conn == NULL);
+        LASSERT (tx->tx_lntmsg[0] == NULL);
+        LASSERT (tx->tx_lntmsg[1] == NULL);
         
-        RETURN(tx);
+        return tx;
 }
 
 int
-kibnal_post_rx (kib_rx_t *rx, int credit)
+kibnal_post_rx (kib_rx_t *rx, int credit, int rsrvd_credit)
 {
         kib_conn_t   *conn = rx->rx_conn;
         int           rc = 0;
+        __u64         addr = (__u64)((unsigned long)((rx)->rx_msg));
         vv_return_t   vvrc;
 
         LASSERT (!in_interrupt());
+        /* old peers don't reserve rxs for RDMA replies */
+        LASSERT (!rsrvd_credit ||
+                 conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
         
         rx->rx_gl = (vv_scatgat_t) {
-                .v_address = KIBNAL_ADDR2SG(KIBNAL_RX_VADDR(rx)),
-                .l_key     = KIBNAL_RX_LKEY(rx),
+                .v_address = KIBNAL_ADDR2SG(addr),
+                .l_key     = rx->rx_lkey,
                 .length    = IBNAL_MSG_SIZE,
         };
 
@@ -168,7 +158,7 @@ kibnal_post_rx (kib_rx_t *rx, int credit)
         };
 
         LASSERT (conn->ibc_state >= IBNAL_CONN_INIT);
-        LASSERT (!rx->rx_posted);
+        LASSERT (rx->rx_nob >= 0);              /* not posted */
 
         CDEBUG(D_NET, "posting rx [%d %x "LPX64"]\n", 
                rx->rx_wrq.scatgat_list->length,
@@ -181,27 +171,31 @@ kibnal_post_rx (kib_rx_t *rx, int credit)
                 return 0;
         }
         
-        rx->rx_posted = 1;
-
+        rx->rx_nob = -1;                        /* flag posted */
+        
         spin_lock(&conn->ibc_lock);
         /* Serialise vv_post_receive; it's not re-entrant on the same QP */
         vvrc = vv_post_receive(kibnal_data.kib_hca,
                                conn->ibc_qp, &rx->rx_wrq);
-        spin_unlock(&conn->ibc_lock);
 
-        if (vvrc == 0) {
-                if (credit) {
-                        spin_lock(&conn->ibc_lock);
+        if (vvrc == vv_return_ok) {
+                if (credit)
                         conn->ibc_outstanding_credits++;
-                        spin_unlock(&conn->ibc_lock);
+                if (rsrvd_credit)
+                        conn->ibc_reserved_credits++;
+
+                spin_unlock(&conn->ibc_lock);
 
+                if (credit || rsrvd_credit)
                         kibnal_check_sends(conn);
-                }
+
                 return 0;
         }
         
-        CERROR ("post rx -> "LPX64" failed %d\n", 
-                conn->ibc_peer->ibp_nid, vvrc);
+        spin_unlock(&conn->ibc_lock);
+
+        CERROR ("post rx -> %s failed %d\n", 
+                libcfs_nid2str(conn->ibc_peer->ibp_nid), vvrc);
         rc = -EIO;
         kibnal_close_conn(rx->rx_conn, rc);
         /* No more posts for this rx; so lose its ref */
@@ -222,7 +216,7 @@ kibnal_post_receives (kib_conn_t *conn)
                 /* +1 ref for rx desc.  This ref remains until kibnal_post_rx
                  * fails (i.e. actual failure or we're disconnecting) */
                 kibnal_conn_addref(conn);
-                rc = kibnal_post_rx (&conn->ibc_rxs[i], 0);
+                rc = kibnal_post_rx (&conn->ibc_rxs[i], 0, 0);
                 if (rc != 0)
                         return rc;
         }
@@ -267,9 +261,8 @@ kibnal_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
         if (tx == NULL) {
                 spin_unlock(&conn->ibc_lock);
 
-                CWARN("Unmatched completion type %x cookie "LPX64
-                      " from "LPX64"\n",
-                      txtype, cookie, conn->ibc_peer->ibp_nid);
+                CWARN("Unmatched completion type %x cookie "LPX64" from %s\n",
+                      txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
                 kibnal_close_conn (conn, -EPROTO);
                 return;
         }
@@ -278,12 +271,8 @@ kibnal_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
                 if (status < 0) {               /* failed? */
                         tx->tx_status = status;
                 } else if (txtype == IBNAL_MSG_GET_REQ) { 
-                        /* XXX layering violation: set REPLY data length */
-                        LASSERT (tx->tx_libmsg[1] != NULL);
-                        LASSERT (tx->tx_libmsg[1]->ev.type == 
-                                 PTL_EVENT_REPLY_END);
-
-                        tx->tx_libmsg[1]->ev.mlength = status;
+                        lnet_set_reply_msg_len(kibnal_data.kib_ni,
+                                               tx->tx_lntmsg[1], status);
                 }
         }
         
@@ -302,11 +291,11 @@ kibnal_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
 void
 kibnal_send_completion (kib_conn_t *conn, int type, int status, __u64 cookie) 
 {
-        kib_tx_t    *tx = kibnal_get_idle_tx(0);
+        kib_tx_t    *tx = kibnal_get_idle_tx();
         
         if (tx == NULL) {
-                CERROR("Can't get tx for completion %x for "LPX64"\n",
-                       type, conn->ibc_peer->ibp_nid);
+                CERROR("Can't get tx for completion %x for %s\n",
+                       type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
                 return;
         }
         
@@ -324,12 +313,15 @@ kibnal_handle_rx (kib_rx_t *rx)
         kib_conn_t   *conn = rx->rx_conn;
         int           credits = msg->ibm_credits;
         kib_tx_t     *tx;
-        int           rc;
+        int           rc = 0;
+        int           repost = 1;
+        int           rsrvd_credit = 0;
+        int           rc2;
 
         LASSERT (conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
 
-        CDEBUG (D_NET, "Received %x[%d] from "LPX64"\n",
-                msg->ibm_type, credits, conn->ibc_peer->ibp_nid);
+        CDEBUG (D_NET, "Received %x[%d] from %s\n",
+                msg->ibm_type, credits, libcfs_nid2str(conn->ibc_peer->ibp_nid));
         
         if (credits != 0) {
                 /* Have I received credits that will let me send? */
@@ -342,37 +334,38 @@ kibnal_handle_rx (kib_rx_t *rx)
 
         switch (msg->ibm_type) {
         default:
-                CERROR("Bad IBNAL message type %x from "LPX64"\n",
-                       msg->ibm_type, conn->ibc_peer->ibp_nid);
+                CERROR("Bad IBNAL message type %x from %s\n",
+                       msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
+                rc = -EPROTO;
                 break;
 
         case IBNAL_MSG_NOOP:
                 break;
 
         case IBNAL_MSG_IMMEDIATE:
-                lib_parse(&kibnal_lib, &msg->ibm_u.immediate.ibim_hdr, rx);
+                rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
+                                msg->ibm_srcnid, rx, 0);
+                repost = rc < 0;                /* repost on error */
                 break;
                 
         case IBNAL_MSG_PUT_REQ:
-                rx->rx_responded = 0;
-                lib_parse(&kibnal_lib, &msg->ibm_u.putreq.ibprm_hdr, rx);
-                if (rx->rx_responded)
-                        break;
-
-                /* I wasn't asked to transfer any payload data.  This happens
-                 * if the PUT didn't match, or got truncated. */
-                kibnal_send_completion(rx->rx_conn, IBNAL_MSG_PUT_NAK, 0,
-                                       msg->ibm_u.putreq.ibprm_cookie);
+                rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.putreq.ibprm_hdr,
+                                msg->ibm_srcnid, rx, 1);
+                repost = rc < 0;                /* repost on error */
                 break;
 
         case IBNAL_MSG_PUT_NAK:
-                CWARN ("PUT_NACK from "LPX64"\n", conn->ibc_peer->ibp_nid);
+                rsrvd_credit = 1;               /* rdma reply (was pre-reserved) */
+                
+                CWARN ("PUT_NACK from %s\n", libcfs_nid2str(conn->ibc_peer->ibp_nid));
                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_REQ, 
                                          msg->ibm_u.completion.ibcm_status,
                                          msg->ibm_u.completion.ibcm_cookie);
                 break;
 
         case IBNAL_MSG_PUT_ACK:
+                rsrvd_credit = 1;               /* rdma reply (was pre-reserved) */
+
                 spin_lock(&conn->ibc_lock);
                 tx = kibnal_find_waiting_tx_locked(conn, IBNAL_MSG_PUT_REQ,
                                                    msg->ibm_u.putack.ibpam_src_cookie);
@@ -381,9 +374,9 @@ kibnal_handle_rx (kib_rx_t *rx)
                 spin_unlock(&conn->ibc_lock);
 
                 if (tx == NULL) {
-                        CERROR("Unmatched PUT_ACK from "LPX64"\n",
-                               conn->ibc_peer->ibp_nid);
-                        kibnal_close_conn(conn, -EPROTO);
+                        CERROR("Unmatched PUT_ACK from %s\n",
+                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
+                        rc = -EPROTO;
                         break;
                 }
 
@@ -394,47 +387,55 @@ kibnal_handle_rx (kib_rx_t *rx)
 
                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
 
-                rc = kibnal_init_rdma(tx, IBNAL_MSG_PUT_DONE, 
-                                      kibnal_rd_size(&msg->ibm_u.putack.ibpam_rd),
-                                      &msg->ibm_u.putack.ibpam_rd,
-                                      msg->ibm_u.putack.ibpam_dst_cookie);
-                if (rc < 0)
-                        CERROR("Can't setup rdma for PUT to "LPX64": %d\n",
-                               conn->ibc_peer->ibp_nid, rc);
+                rc2 = kibnal_init_rdma(tx, IBNAL_MSG_PUT_DONE, 
+                                       kibnal_rd_size(&msg->ibm_u.putack.ibpam_rd),
+                                       &msg->ibm_u.putack.ibpam_rd,
+                                       msg->ibm_u.putack.ibpam_dst_cookie);
+                if (rc2 < 0)
+                        CERROR("Can't setup rdma for PUT to %s: %d\n",
+                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
 
                 spin_lock(&conn->ibc_lock);
-                if (tx->tx_status == 0 && rc < 0)
-                        tx->tx_status = rc;
+                if (tx->tx_status == 0 && rc2 < 0)
+                        tx->tx_status = rc2;
                 tx->tx_waiting = 0;             /* clear waiting and queue atomically */
                 kibnal_queue_tx_locked(tx, conn);
                 spin_unlock(&conn->ibc_lock);
                 break;
                 
         case IBNAL_MSG_PUT_DONE:
+                /* This buffer was pre-reserved by not returning the credit
+                 * when the PUT_REQ's buffer was reposted, so I just return it
+                 * now */
                 kibnal_handle_completion(conn, IBNAL_MSG_PUT_ACK,
                                          msg->ibm_u.completion.ibcm_status,
                                          msg->ibm_u.completion.ibcm_cookie);
                 break;
 
         case IBNAL_MSG_GET_REQ:
-                rx->rx_responded = 0;
-                lib_parse(&kibnal_lib, &msg->ibm_u.get.ibgm_hdr, rx);
-                if (rx->rx_responded)           /* I responded to the GET_REQ */
-                        break;
-                /* NB GET didn't match (I'd have responded even with no payload
-                 * data) */
-                kibnal_send_completion(rx->rx_conn, IBNAL_MSG_GET_DONE, -ENODATA,
-                                       msg->ibm_u.get.ibgm_cookie);
+                rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.get.ibgm_hdr,
+                                msg->ibm_srcnid, rx, 1);
+                repost = rc < 0;                /* repost on error */
                 break;
 
         case IBNAL_MSG_GET_DONE:
+                rsrvd_credit = 1;               /* rdma reply (was pre-reserved) */
+                
                 kibnal_handle_completion(conn, IBNAL_MSG_GET_REQ,
                                          msg->ibm_u.completion.ibcm_status,
                                          msg->ibm_u.completion.ibcm_cookie);
                 break;
         }
 
-        kibnal_post_rx(rx, 1);
+        if (rc < 0)                             /* protocol error */
+                kibnal_close_conn(conn, rc);
+
+        if (repost) {
+                if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD)
+                        rsrvd_credit = 0;       /* peer isn't pre-reserving */
+
+                kibnal_post_rx(rx, !rsrvd_credit, rsrvd_credit);
+        }
 }
 
 void
@@ -445,42 +446,50 @@ kibnal_rx_complete (kib_rx_t *rx, vv_comp_status_t vvrc, int nob, __u64 rxseq)
         unsigned long flags;
         int           rc;
 
-        CDEBUG (D_NET, "rx %p conn %p\n", rx, conn);
-        LASSERT (rx->rx_posted);
-        rx->rx_posted = 0;
+        CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
+        LASSERT (rx->rx_nob < 0);               /* was posted */
+        rx->rx_nob = 0;                         /* isn't now */
 
         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
                 goto ignore;
 
         if (vvrc != vv_comp_status_success) {
-                CERROR("Rx from "LPX64" failed: %d\n", 
-                       conn->ibc_peer->ibp_nid, vvrc);
+                CERROR("Rx from %s failed: %d\n", 
+                       libcfs_nid2str(conn->ibc_peer->ibp_nid), vvrc);
                 goto failed;
         }
 
-        rc = kibnal_unpack_msg(msg, nob);
+        rc = kibnal_unpack_msg(msg, conn->ibc_version, nob);
         if (rc != 0) {
-                CERROR ("Error %d unpacking rx from "LPX64"\n",
-                        rc, conn->ibc_peer->ibp_nid);
+                CERROR ("Error %d unpacking rx from %s\n",
+                        rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
                 goto failed;
         }
 
-        if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
+        rx->rx_nob = nob;                       /* Can trust 'nob' now */
+
+        if (!lnet_ptlcompat_matchnid(conn->ibc_peer->ibp_nid,
+                                     msg->ibm_srcnid) ||
+            !lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid, 
+                                     msg->ibm_dstnid) ||
             msg->ibm_srcstamp != conn->ibc_incarnation ||
-            msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
-                CERROR ("Stale rx from "LPX64"\n",
-                        conn->ibc_peer->ibp_nid);
+                CERROR ("Stale rx from %s\n",
+                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
                 goto failed;
         }
 
         if (msg->ibm_seq != rxseq) {
-                CERROR ("Out-of-sequence rx from "LPX64
+                CERROR ("Out-of-sequence rx from %s"
                         ": got "LPD64" but expected "LPD64"\n",
-                        conn->ibc_peer->ibp_nid, msg->ibm_seq, rxseq);
+                        libcfs_nid2str(conn->ibc_peer->ibp_nid),
+                        msg->ibm_seq, rxseq);
                 goto failed;
         }
 
+        /* set time last known alive */
+        kibnal_peer_alive(conn->ibc_peer);
+
         /* racing with connection establishment/teardown! */
 
         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
@@ -506,7 +515,31 @@ kibnal_rx_complete (kib_rx_t *rx, vv_comp_status_t vvrc, int nob, __u64 rxseq)
         kibnal_conn_decref(conn);
 }
 
-#if IBNAL_WHOLE_MEM
+struct page *
+kibnal_kvaddr_to_page (unsigned long vaddr)
+{
+        struct page *page;
+
+        if (vaddr >= VMALLOC_START &&
+            vaddr < VMALLOC_END) {
+                page = vmalloc_to_page ((void *)vaddr);
+                LASSERT (page != NULL);
+                return page;
+        }
+#ifdef CONFIG_HIGHMEM
+        if (vaddr >= PKMAP_BASE &&
+            vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
+                /* No highmem pages only used for bulk (kiov) I/O */
+                CERROR("find page for address in highmem\n");
+                LBUG();
+        }
+#endif
+        page = virt_to_page (vaddr);
+        LASSERT (page != NULL);
+        return page;
+}
+
+#if !IBNAL_USE_FMR
 int
 kibnal_append_rdfrag(kib_rdma_desc_t *rd, int active, struct page *page, 
                      unsigned long page_offset, unsigned long len)
@@ -524,10 +557,12 @@ kibnal_append_rdfrag(kib_rdma_desc_t *rd, int active, struct page *page,
                 return -EMSGSIZE;
         }
 
-        /* Try to create an address that adapter-tavor will munge into a valid
+        /* Try to create an address that adaptor-tavor will munge into a valid
          * network address, given how it maps all phys mem into 1 region */
-        addr = kibnal_page2phys(page) + page_offset + PAGE_OFFSET;
+        addr = lnet_page2phys(page) + page_offset + PAGE_OFFSET;
 
+        /* NB this relies entirely on there being a single region for the whole
+         * of memory, since "high" memory will wrap in the (void *) cast! */
         vvrc = vv_get_gen_mr_attrib(kibnal_data.kib_hca, 
                                     (void *)((unsigned long)addr),
                                     len, &mem_h, &l_key, &r_key);
@@ -562,34 +597,10 @@ kibnal_append_rdfrag(kib_rdma_desc_t *rd, int active, struct page *page,
         return 0;
 }
 
-struct page *
-kibnal_kvaddr_to_page (unsigned long vaddr)
-{
-        struct page *page;
-
-        if (vaddr >= VMALLOC_START &&
-            vaddr < VMALLOC_END) {
-                page = vmalloc_to_page ((void *)vaddr);
-                LASSERT (page != NULL);
-                return page;
-        }
-#if CONFIG_HIGHMEM
-        if (vaddr >= PKMAP_BASE &&
-            vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
-                /* No highmem pages only used for bulk (kiov) I/O */
-                CERROR("find page for address in highmem\n");
-                LBUG();
-        }
-#endif
-        page = virt_to_page (vaddr);
-        LASSERT (page != NULL);
-        return page;
-}
-
 int
 kibnal_setup_rd_iov(kib_tx_t *tx, kib_rdma_desc_t *rd, 
                     vv_access_con_bit_mask_t access,
-                    int niov, struct iovec *iov, int offset, int nob)
+                    unsigned int niov, struct iovec *iov, int offset, int nob)
                  
 {
         /* active if I'm sending */
@@ -647,7 +658,7 @@ kibnal_setup_rd_iov(kib_tx_t *tx, kib_rdma_desc_t *rd,
 int
 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd, 
                       vv_access_con_bit_mask_t access,
-                      int nkiov, ptl_kiov_t *kiov, int offset, int nob)
+                      int nkiov, lnet_kiov_t *kiov, int offset, int nob)
 {
         /* active if I'm sending */
         int            active = ((access & vv_acc_r_mem_write) == 0);
@@ -688,20 +699,66 @@ kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd,
 }
 #else
 int
+kibnal_map_tx (kib_tx_t *tx, kib_rdma_desc_t *rd, int active,
+               int npages, unsigned long page_offset, int nob)
+{
+        vv_return_t   vvrc;
+        vv_fmr_map_t  map_props;
+
+        LASSERT ((rd != tx->tx_rd) == !active);
+        LASSERT (!tx->tx_md.md_active);
+        LASSERT (tx->tx_md.md_fmrcount > 0);
+        LASSERT (page_offset < PAGE_SIZE);
+        LASSERT (npages >= (1 + ((page_offset + nob - 1)>>PAGE_SHIFT)));
+        LASSERT (npages <= LNET_MAX_IOV);
+
+        memset(&map_props, 0, sizeof(map_props));
+
+        map_props.start          = (void *)page_offset;
+        map_props.size           = nob;
+        map_props.page_array_len = npages;
+        map_props.page_array     = tx->tx_pages;
+
+        vvrc = vv_map_fmr(kibnal_data.kib_hca, tx->tx_md.md_fmrhandle,
+                          &map_props, &tx->tx_md.md_lkey, &tx->tx_md.md_rkey);
+        if (vvrc != vv_return_ok) {
+                CERROR ("Can't map vaddr %p for %d in %d pages: %d\n", 
+                        map_props.start, nob, npages, vvrc);
+                return -EFAULT;
+        }
+
+        tx->tx_md.md_addr = (unsigned long)map_props.start;
+        tx->tx_md.md_active = 1;
+        tx->tx_md.md_fmrcount--;
+
+        rd->rd_key = active ? tx->tx_md.md_lkey : tx->tx_md.md_rkey;
+        rd->rd_nob = nob;
+        rd->rd_addr = tx->tx_md.md_addr;
+
+        /* Compensate for adaptor-tavor's munging of gatherlist addresses */
+        if (active)
+                rd->rd_addr += PAGE_OFFSET;
+
+        return 0;
+}
+
+int
 kibnal_setup_rd_iov (kib_tx_t *tx, kib_rdma_desc_t *rd,
                      vv_access_con_bit_mask_t access,
-                     int niov, struct iovec *iov, int offset, int nob)
+                     unsigned int niov, struct iovec *iov, int offset, int nob)
                  
 {
         /* active if I'm sending */
-        int         active = ((access & vv_acc_r_mem_write) == 0);
-        void       *vaddr;
-        vv_return_t vvrc;
+        int           active = ((access & vv_acc_r_mem_write) == 0);
+        int           resid;
+        int           fragnob;
+        struct page  *page;
+        int           npages;
+        unsigned long page_offset;
+        unsigned long vaddr;
 
         LASSERT (nob > 0);
         LASSERT (niov > 0);
-        LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
-        LASSERT ((rd != tx->tx_rd) == !active);
 
         while (offset >= iov->iov_len) {
                 offset -= iov->iov_len;
@@ -715,49 +772,49 @@ kibnal_setup_rd_iov (kib_tx_t *tx, kib_rdma_desc_t *rd,
                 return (-EMSGSIZE);
         }
 
-        vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
-        tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
+        vaddr = ((unsigned long)iov->iov_base) + offset;
+        
+        page_offset = vaddr & (PAGE_SIZE - 1);
+        resid = nob;
+        npages = 0;
 
-        vvrc = vv_mem_region_register(kibnal_data.kib_hca, vaddr, nob,
-                                      kibnal_data.kib_pd, access,
-                                      &tx->tx_md.md_handle, 
-                                      &tx->tx_md.md_lkey,
-                                      &tx->tx_md.md_rkey);
-        if (vvrc != vv_return_ok) {
-                CERROR ("Can't map vaddr %p: %d\n", vaddr, vvrc);
-                return -EFAULT;
-        }
+        do {
+                LASSERT (npages < LNET_MAX_IOV);
 
-        tx->tx_mapped = KIB_TX_MAPPED;
+                page = kibnal_kvaddr_to_page(vaddr);
+                if (page == NULL) {
+                        CERROR("Can't find page for %lu\n", vaddr);
+                        return -EFAULT;
+                }
 
-        rd->rd_key = active ? tx->tx_md.md_lkey : tx->tx_md.md_rkey;
-        rd->rd_nfrag = 1;
-        kibnal_rf_set(&rd->rd_frags[0], tx->tx_md.md_addr, nob);
-        
-        return (0);
+                tx->tx_pages[npages++] = lnet_page2phys(page);
+
+                fragnob = PAGE_SIZE - (vaddr & (PAGE_SIZE - 1));
+                vaddr += fragnob;
+                resid -= fragnob;
+
+        } while (resid > 0);
+
+        return kibnal_map_tx(tx, rd, active, npages, page_offset, nob);
 }
 
 int
 kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd,
                       vv_access_con_bit_mask_t access,
-                      int nkiov, ptl_kiov_t *kiov, int offset, int nob)
+                      int nkiov, lnet_kiov_t *kiov, int offset, int nob)
 {
         /* active if I'm sending */
         int            active = ((access & vv_acc_r_mem_write) == 0);
-        vv_return_t    vvrc;
-        vv_phy_list_t  phys_pages;
-        vv_phy_buf_t  *phys;
-        int            page_offset;
-        int            nphys;
         int            resid;
-        int            phys_size;
-        int            rc;
-
+        int            npages;
+        unsigned long  page_offset;
+        
         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
 
         LASSERT (nob > 0);
         LASSERT (nkiov > 0);
-        LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
+        LASSERT (nkiov <= LNET_MAX_IOV);
+        LASSERT (!tx->tx_md.md_active);
         LASSERT ((rd != tx->tx_rd) == !active);
 
         while (offset >= kiov->kiov_len) {
@@ -767,92 +824,33 @@ kibnal_setup_rd_kiov (kib_tx_t *tx, kib_rdma_desc_t *rd,
                 LASSERT (nkiov > 0);
         }
 
-        phys_size = nkiov * sizeof (*phys);
-        PORTAL_ALLOC(phys, phys_size);
-        if (phys == NULL) {
-                CERROR ("Can't allocate tmp phys\n");
-                return (-ENOMEM);
-        }
-
         page_offset = kiov->kiov_offset + offset;
+        
+        resid = offset + nob;
+        npages = 0;
 
-        phys[0].start = kibnal_page2phys(kiov->kiov_page);
-        phys[0].size = PAGE_SIZE;
-
-        nphys = 1;
-        resid = nob - (kiov->kiov_len - offset);
-
-        while (resid > 0) {
-                kiov++;
-                nkiov--;
+        do {
+                LASSERT (npages < LNET_MAX_IOV);
                 LASSERT (nkiov > 0);
 
-                if (kiov->kiov_offset != 0 ||
-                    ((resid > PAGE_SIZE) && 
-                     kiov->kiov_len < PAGE_SIZE)) {
-                        int i;
+                if ((npages > 0 && kiov->kiov_offset != 0) ||
+                    (resid > kiov->kiov_len && 
+                     (kiov->kiov_offset + kiov->kiov_len) != PAGE_SIZE)) {
                         /* Can't have gaps */
                         CERROR ("Can't make payload contiguous in I/O VM:"
-                                "page %d, offset %d, len %d \n", nphys, 
-                                kiov->kiov_offset, kiov->kiov_len);
-
-                        for (i = -nphys; i < nkiov; i++)
-                                CERROR("kiov[%d] %p +%d for %d\n",
-                                       i, kiov[i].kiov_page, 
-                                       kiov[i].kiov_offset, 
-                                       kiov[i].kiov_len);
+                                "page %d, offset %d, len %d \n",
+                                npages, kiov->kiov_offset, kiov->kiov_len);
                         
-                        rc = -EINVAL;
-                        goto out;
+                        return -EINVAL;
                 }
 
-                LASSERT (nphys * sizeof (*phys) < phys_size);
-                phys[nphys].start = kibnal_page2phys(kiov->kiov_page);
-                phys[nphys].size = PAGE_SIZE;
-
-                nphys++;
-                resid -= PAGE_SIZE;
-        }
-
-#if 0
-        CWARN ("nphys %d, nob %d, page_offset %d\n", nphys, nob, page_offset);
-        for (i = 0; i < nphys; i++)
-                CWARN ("   [%d] "LPX64"\n", i, phys[i]);
-#endif
-
-        vvrc = vv_phy_mem_region_register(kibnal_data.kib_hca,
-                                          &phys_pages,
-                                          IBNAL_RDMA_BASE,
-                                          nphys,
-                                          page_offset,
-                                          kibnal_data.kib_pd,
-                                          access,
-                                          &tx->tx_md.md_handle,
-                                          &tx->tx_md.md_addr,
-                                          &tx->tx_md.md_lkey,
-                                          &tx->tx_md.md_rkey);
-
-        if (vvrc != vv_return_ok) {
-                CERROR ("Can't map phys: %d\n", vvrc);
-                rc = -EFAULT;
-                goto out;
-        }
-
-        CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: "
-               "lkey %x, rkey %x, addr "LPX64"\n",
-               nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey,
-               tx->tx_md.md_addr);
-
-        tx->tx_mapped = KIB_TX_MAPPED;
-        rc = 0;
+                tx->tx_pages[npages++] = lnet_page2phys(kiov->kiov_page);
+                resid -= kiov->kiov_len;
+                kiov++;
+                nkiov--;
+        } while (resid > 0);
 
-        rd->rd_key = active ? tx->tx_md.md_lkey : tx->tx_md.md_rkey;
-        rd->rd_nfrag = 1;
-        kibnal_rf_set(&rd->rd_frags[0], tx->tx_md.md_addr, nob);
-        
- out:
-        PORTAL_FREE(phys, phys_size);
-        return (rc);
+        return kibnal_map_tx(tx, rd, active, npages, page_offset, nob);
 }
 #endif
 
@@ -873,25 +871,42 @@ void
 kibnal_check_sends (kib_conn_t *conn)
 {
         kib_tx_t       *tx;
-        vv_return_t     vvrc;                        
+        vv_return_t     vvrc;
         int             rc;
+        int             consume_cred;
         int             done;
 
         /* Don't send anything until after the connection is established */
         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
-                CDEBUG(D_NET, LPX64"too soon\n", conn->ibc_peer->ibp_nid);
+                CDEBUG(D_NET, "%s too soon\n",
+                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
                 return;
         }
         
         spin_lock(&conn->ibc_lock);
 
-        LASSERT (conn->ibc_nsends_posted <= IBNAL_MSG_QUEUE_SIZE);
-
+        LASSERT (conn->ibc_nsends_posted <=
+                 *kibnal_tunables.kib_concurrent_sends);
+        LASSERT (conn->ibc_reserved_credits >= 0);
+        
+        while (conn->ibc_reserved_credits > 0 &&
+               !list_empty(&conn->ibc_tx_queue_rsrvd)) {
+                LASSERT (conn->ibc_version != 
+                         IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
+                tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
+                                kib_tx_t, tx_list);
+                list_del(&tx->tx_list);
+                list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
+                conn->ibc_reserved_credits--;
+        }
+        
         if (list_empty(&conn->ibc_tx_queue) &&
-            conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER) {
+            list_empty(&conn->ibc_tx_queue_nocred) &&
+            (conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER ||
+             kibnal_send_keepalive(conn))) {
                 spin_unlock(&conn->ibc_lock);
                 
-                tx = kibnal_get_idle_tx(0);     /* don't block */
+                tx = kibnal_get_idle_tx();
                 if (tx != NULL)
                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
 
@@ -901,9 +916,22 @@ kibnal_check_sends (kib_conn_t *conn)
                         kibnal_queue_tx_locked(tx, conn);
         }
 
-        while (!list_empty (&conn->ibc_tx_queue)) {
-                tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list);
-
+        for (;;) {
+                if (!list_empty(&conn->ibc_tx_queue_nocred)) {
+                        LASSERT (conn->ibc_version != 
+                                 IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD);
+                        tx = list_entry (conn->ibc_tx_queue_nocred.next, 
+                                         kib_tx_t, tx_list);
+                        consume_cred = 0;
+                } else if (!list_empty (&conn->ibc_tx_queue)) {
+                        tx = list_entry (conn->ibc_tx_queue.next, 
+                                         kib_tx_t, tx_list);
+                        consume_cred = 1;
+                } else {
+                        /* nothing waiting */
+                        break;
+                }
+                
                 LASSERT (tx->tx_queued);
                 /* We rely on this for QP sizing */
                 LASSERT (tx->tx_nwrq > 0 && tx->tx_nwrq <= 1 + IBNAL_MAX_RDMA_FRAGS);
@@ -913,23 +941,27 @@ kibnal_check_sends (kib_conn_t *conn)
                 LASSERT (conn->ibc_credits >= 0);
                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
 
-                if (conn->ibc_nsends_posted == IBNAL_MSG_QUEUE_SIZE) {
-                        CDEBUG(D_NET, LPX64": posted enough\n",
-                               conn->ibc_peer->ibp_nid);
+                if (conn->ibc_nsends_posted ==
+                    *kibnal_tunables.kib_concurrent_sends) {
+                        /* We've got some tx completions outstanding... */
+                        CDEBUG(D_NET, "%s: posted enough\n",
+                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
                         break;
                 }
                 
-                if (conn->ibc_credits == 0) {   /* no credits */
-                        CDEBUG(D_NET, LPX64": no credits\n",
-                               conn->ibc_peer->ibp_nid);
-                        break;
-                }
-                
-                if (conn->ibc_credits == 1 &&   /* last credit reserved for */
-                    conn->ibc_outstanding_credits == 0) { /* giving back credits */
-                        CDEBUG(D_NET, LPX64": not using last credit\n",
-                               conn->ibc_peer->ibp_nid);
-                        break;
+                if (consume_cred) {
+                        if (conn->ibc_credits == 0) {   /* no credits */
+                                CDEBUG(D_NET, "%s: no credits\n",
+                                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
+                                break;
+                        }
+                        
+                        if (conn->ibc_credits == 1 &&   /* last credit reserved for */
+                            conn->ibc_outstanding_credits == 0) { /* giving back credits */
+                                CDEBUG(D_NET, "%s: not using last credit\n",
+                                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
+                                break;
+                        }
                 }
                 
                 list_del (&tx->tx_list);
@@ -939,24 +971,28 @@ kibnal_check_sends (kib_conn_t *conn)
 
                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
                     (!list_empty(&conn->ibc_tx_queue) ||
-                     conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER)) {
+                     !list_empty(&conn->ibc_tx_queue_nocred) ||
+                     (conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER &&
+                      !kibnal_send_keepalive(conn)))) {
                         /* redundant NOOP */
                         spin_unlock(&conn->ibc_lock);
                         kibnal_tx_done(tx);
                         spin_lock(&conn->ibc_lock);
-                        CDEBUG(D_NET, LPX64": redundant noop\n",
-                               conn->ibc_peer->ibp_nid);
+                        CDEBUG(D_NET, "%s: redundant noop\n",
+                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
                         continue;
                 }
 
-                kibnal_pack_msg(tx->tx_msg, conn->ibc_outstanding_credits,
+                kibnal_pack_msg(tx->tx_msg, conn->ibc_version,
+                                conn->ibc_outstanding_credits,
                                 conn->ibc_peer->ibp_nid, conn->ibc_incarnation,
                                 conn->ibc_txseq);
 
                 conn->ibc_txseq++;
                 conn->ibc_outstanding_credits = 0;
                 conn->ibc_nsends_posted++;
-                conn->ibc_credits--;
+                if (consume_cred)
+                        conn->ibc_credits--;
 
                 /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
                  * PUT.  If so, it was first queued here as a PUT_REQ, sent and
@@ -973,7 +1009,37 @@ kibnal_check_sends (kib_conn_t *conn)
                  * QP!! */
 
                 LASSERT (tx->tx_nwrq > 0);
-
+#if 0
+                if (tx->tx_wrq[0].wr_type == vv_wr_rdma_write) 
+                        CDEBUG(D_NET, "WORK[0]: RDMA gl %p for %d k %x -> "LPX64" k %x\n",
+                               tx->tx_wrq[0].scatgat_list->v_address,
+                               tx->tx_wrq[0].scatgat_list->length,
+                               tx->tx_wrq[0].scatgat_list->l_key,
+                               tx->tx_wrq[0].type.send.send_qp_type.rc_type.r_addr,
+                               tx->tx_wrq[0].type.send.send_qp_type.rc_type.r_r_key);
+                else
+                        CDEBUG(D_NET, "WORK[0]: %s gl %p for %d k %x\n",
+                               tx->tx_wrq[0].wr_type == vv_wr_send ? "SEND" : "????",
+                               tx->tx_wrq[0].scatgat_list->v_address,
+                               tx->tx_wrq[0].scatgat_list->length,
+                               tx->tx_wrq[0].scatgat_list->l_key);
+
+                if (tx->tx_nwrq > 1) {
+                        if (tx->tx_wrq[1].wr_type == vv_wr_rdma_write) 
+                                CDEBUG(D_NET, "WORK[1]: RDMA gl %p for %d k %x -> "LPX64" k %x\n",
+                                       tx->tx_wrq[1].scatgat_list->v_address,
+                                       tx->tx_wrq[1].scatgat_list->length,
+                                       tx->tx_wrq[1].scatgat_list->l_key,
+                                       tx->tx_wrq[1].type.send.send_qp_type.rc_type.r_addr,
+                                       tx->tx_wrq[1].type.send.send_qp_type.rc_type.r_r_key);
+                        else
+                                CDEBUG(D_NET, "WORK[1]: %s gl %p for %d k %x\n",
+                                       tx->tx_wrq[1].wr_type == vv_wr_send ? "SEND" : "????",
+                                       tx->tx_wrq[1].scatgat_list->v_address,
+                                       tx->tx_wrq[1].scatgat_list->length,
+                                       tx->tx_wrq[1].scatgat_list->l_key);
+                }
+#endif           
                 rc = -ECONNABORTED;
                 vvrc = vv_return_ok;
                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
@@ -986,11 +1052,14 @@ kibnal_check_sends (kib_conn_t *conn)
                         rc = (vvrc == vv_return_ok) ? 0 : -EIO;
                 }
 
+                conn->ibc_last_send = jiffies;
+
                 if (rc != 0) {
                         /* NB credits are transferred in the actual
                          * message, which can only be the last work item */
                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
-                        conn->ibc_credits++;
+                        if (consume_cred)
+                                conn->ibc_credits++;
                         conn->ibc_nsends_posted--;
 
                         tx->tx_status = rc;
@@ -1004,11 +1073,11 @@ kibnal_check_sends (kib_conn_t *conn)
                         spin_unlock(&conn->ibc_lock);
                         
                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
-                                CERROR ("Error %d posting transmit to "LPX64"\n", 
-                                        vvrc, conn->ibc_peer->ibp_nid);
+                                CERROR ("Error %d posting transmit to %s\n", 
+                                        vvrc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
                         else
-                                CDEBUG (D_NET, "Error %d posting transmit to "
-                                        LPX64"\n", rc, conn->ibc_peer->ibp_nid);
+                                CDEBUG (D_NET, "Error %d posting transmit to %s\n",
+                                        rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
 
                         kibnal_close_conn (conn, rc);
 
@@ -1036,10 +1105,11 @@ kibnal_tx_complete (kib_tx_t *tx, vv_comp_status_t vvrc)
         if (failed &&
             tx->tx_status == 0 &&
             conn->ibc_state == IBNAL_CONN_ESTABLISHED)
-                CERROR("tx -> "LPX64" type %x cookie "LPX64
+                CDEBUG(D_NETERROR, "tx -> %s type %x cookie "LPX64
                        "sending %d waiting %d: failed %d\n", 
-                       conn->ibc_peer->ibp_nid, tx->tx_msg->ibm_type, 
-                       tx->tx_cookie, tx->tx_sending, tx->tx_waiting, vvrc);
+                       libcfs_nid2str(conn->ibc_peer->ibp_nid),
+                       tx->tx_msg->ibm_type, tx->tx_cookie,
+                       tx->tx_sending, tx->tx_waiting, vvrc);
 
         spin_lock(&conn->ibc_lock);
 
@@ -1067,10 +1137,12 @@ kibnal_tx_complete (kib_tx_t *tx, vv_comp_status_t vvrc)
         if (idle)
                 kibnal_tx_done (tx);
 
-        if (failed)
+        if (failed) {
                 kibnal_close_conn (conn, -EIO);
-        else
+        } else {
+                kibnal_peer_alive(conn->ibc_peer);
                 kibnal_check_sends(conn);
+        }
 
         kibnal_conn_decref(conn);               /* ...until here */
 }
@@ -1081,6 +1153,7 @@ kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
         vv_scatgat_t *gl = &tx->tx_gl[tx->tx_nwrq];
         vv_wr_t      *wrq = &tx->tx_wrq[tx->tx_nwrq];
         int           nob = offsetof (kib_msg_t, ibm_u) + body_nob;
+        __u64         addr = (__u64)((unsigned long)((tx)->tx_msg));
 
         LASSERT (tx->tx_nwrq >= 0 && 
                  tx->tx_nwrq < (1 + IBNAL_MAX_RDMA_FRAGS));
@@ -1089,8 +1162,8 @@ kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
         kibnal_init_msg(tx->tx_msg, type, body_nob);
 
         *gl = (vv_scatgat_t) {
-                .v_address = KIBNAL_ADDR2SG(KIBNAL_TX_VADDR(tx)),
-                .l_key     = KIBNAL_TX_LKEY(tx),
+                .v_address = KIBNAL_ADDR2SG(addr),
+                .l_key     = tx->tx_lkey,
                 .length    = nob,
         };
 
@@ -1112,18 +1185,42 @@ int
 kibnal_init_rdma (kib_tx_t *tx, int type, int nob,
                   kib_rdma_desc_t *dstrd, __u64 dstcookie)
 {
-        /* CAVEAT EMPTOR: this 'consumes' the frags in 'dstrd' */
-        int              resid = nob;
         kib_msg_t       *ibmsg = tx->tx_msg;
         kib_rdma_desc_t *srcrd = tx->tx_rd;
+        vv_scatgat_t    *gl;
+        vv_wr_t         *wrq;
+        int              rc;
+
+#if IBNAL_USE_FMR
+        LASSERT (tx->tx_nwrq == 0);
+
+        gl = &tx->tx_gl[0];
+        gl->length    = nob;
+        gl->v_address = KIBNAL_ADDR2SG(srcrd->rd_addr);
+        gl->l_key     = srcrd->rd_key;
+
+        wrq = &tx->tx_wrq[0];
+
+        wrq->wr_id = kibnal_ptr2wreqid(tx, IBNAL_WID_RDMA);
+        wrq->completion_notification = 0;
+        wrq->scatgat_list = gl;
+        wrq->num_of_data_segments = 1;
+        wrq->wr_type = vv_wr_rdma_write;
+        wrq->type.send.solicited_event = 0;
+        wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;
+        wrq->type.send.send_qp_type.rc_type.r_addr = dstrd->rd_addr;
+        wrq->type.send.send_qp_type.rc_type.r_r_key = dstrd->rd_key;
+
+        tx->tx_nwrq = 1;
+        rc = nob;
+#else
+        /* CAVEAT EMPTOR: this 'consumes' the frags in 'dstrd' */
+        int              resid = nob;
         kib_rdma_frag_t *srcfrag;
         int              srcidx;
         kib_rdma_frag_t *dstfrag;
         int              dstidx;
-        vv_scatgat_t    *gl;
-        vv_wr_t         *wrq;
         int              wrknob;
-        int              rc;
 
         /* Called by scheduler */
         LASSERT (!in_interrupt());
@@ -1200,6 +1297,7 @@ kibnal_init_rdma (kib_tx_t *tx, int type, int nob,
 
         if (rc < 0)                             /* no RDMA if completing with failure */
                 tx->tx_nwrq = 0;
+#endif
         
         ibmsg->ibm_u.completion.ibcm_status = rc;
         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
@@ -1237,12 +1335,14 @@ kibnal_schedule_peer_arp (kib_peer_t *peer)
 }
 
 void
-kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
+kibnal_launch_tx (kib_tx_t *tx, lnet_nid_t nid)
 {
         kib_peer_t      *peer;
         kib_conn_t      *conn;
         unsigned long    flags;
         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
+        int              retry;
+        int              rc;
 
         /* If I get here, I've committed to send, so I complete the tx with
          * failure on any problems */
@@ -1250,38 +1350,51 @@ kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
         LASSERT (tx->tx_nwrq > 0);              /* work items have been set up */
 
-        read_lock_irqsave(g_lock, flags);
+        for (retry = 0; ; retry = 1) {
+                read_lock_irqsave(g_lock, flags);
         
-        peer = kibnal_find_peer_locked (nid);
-        if (peer == NULL) {
-                read_unlock_irqrestore(g_lock, flags);
-                tx->tx_status = -EHOSTUNREACH;
-                tx->tx_waiting = 0;
-                kibnal_tx_done (tx);
-                return;
-        }
+                peer = kibnal_find_peer_locked (nid);
+                if (peer != NULL) {
+                        conn = kibnal_find_conn_locked (peer);
+                        if (conn != NULL) {
+                                kibnal_conn_addref(conn); /* 1 ref for me... */
+                                read_unlock_irqrestore(g_lock, flags);
 
-        conn = kibnal_find_conn_locked (peer);
-        if (conn != NULL) {
-                kibnal_conn_addref(conn);       /* 1 ref for me... */
-                read_unlock_irqrestore(g_lock, flags);
+                                kibnal_queue_tx (tx, conn);
+                                kibnal_conn_decref(conn); /* ...to here */
+                                return;
+                        }
+                }
                 
-                kibnal_queue_tx (tx, conn);
-                kibnal_conn_decref(conn);       /* ...to here */
-                return;
-        }
-        
-        /* Making one or more connections; I'll need a write lock... */
-        read_unlock(g_lock);
-        write_lock(g_lock);
+                /* Making one or more connections; I'll need a write lock... */
+                read_unlock(g_lock);
+                write_lock(g_lock);
+
+                peer = kibnal_find_peer_locked (nid);
+                if (peer != NULL)
+                        break;
 
-        peer = kibnal_find_peer_locked (nid);
-        if (peer == NULL) {
                 write_unlock_irqrestore(g_lock, flags);
-                tx->tx_status = -EHOSTUNREACH;
-                tx->tx_waiting = 0;
-                kibnal_tx_done (tx);
-                return;
+
+                if (retry) {
+                        CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
+
+                        tx->tx_status = -EHOSTUNREACH;
+                        tx->tx_waiting = 0;
+                        kibnal_tx_done (tx);
+                        return;
+                }
+
+                rc = kibnal_add_persistent_peer(nid, LNET_NIDADDR(nid));
+                if (rc != 0) {
+                        CERROR("Can't add peer %s: %d\n",
+                               libcfs_nid2str(nid), rc);
+                        
+                        tx->tx_status = -EHOSTUNREACH;
+                        tx->tx_waiting = 0;
+                        kibnal_tx_done (tx);
+                        return;
+                }
         }
 
         conn = kibnal_find_conn_locked (peer);
@@ -1295,17 +1408,19 @@ kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
                 return;
         }
 
-        if (peer->ibp_connecting == 0) {
-                if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
+        if (peer->ibp_connecting == 0 &&
+            peer->ibp_accepting == 0) {
+                if (!(peer->ibp_reconnect_interval == 0 || /* first attempt */
+                      time_after_eq(jiffies, peer->ibp_reconnect_time))) {
                         write_unlock_irqrestore(g_lock, flags);
                         tx->tx_status = -EHOSTUNREACH;
                         tx->tx_waiting = 0;
                         kibnal_tx_done (tx);
                         return;
                 }
-        
+
                 peer->ibp_connecting = 1;
-                peer->ibp_arp_count = 1 + IBNAL_ARP_RETRIES;
+                peer->ibp_arp_count = 1 + *kibnal_tunables.kib_arp_retries;
                 kibnal_schedule_peer_arp(peer);
         }
         
@@ -1316,46 +1431,30 @@ kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
 }
 
 int
-kibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
+kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
 {
-        /* I would guess that if kibnal_get_peer (nid) == NULL,
-           and we're not routing, then 'nid' is very distant :) */
-        if ( nal->libnal_ni.ni_pid.nid == nid ) {
-                *dist = 0;
-        } else {
-                *dist = 1;
-        }
-
-        return 0;
-}
-
-ptl_err_t
-kibnal_sendmsg(lib_nal_t    *nal, 
-               void         *private,
-               lib_msg_t    *libmsg,
-               ptl_hdr_t    *hdr, 
-               int           type, 
-               ptl_nid_t     nid, 
-               ptl_pid_t     pid,
-               unsigned int  payload_niov, 
-               struct iovec *payload_iov, 
-               ptl_kiov_t   *payload_kiov,
-               int           payload_offset,
-               int           payload_nob)
-{
-        kib_msg_t  *ibmsg;
-        kib_tx_t   *tx;
-        int         nob;
-        int         rc;
-        int         n;
+        lnet_hdr_t       *hdr = &lntmsg->msg_hdr; 
+        int               type = lntmsg->msg_type; 
+        lnet_process_id_t target = lntmsg->msg_target;
+        int               target_is_router = lntmsg->msg_target_is_router;
+        int               routing = lntmsg->msg_routing;
+        unsigned int      payload_niov = lntmsg->msg_niov; 
+        struct iovec     *payload_iov = lntmsg->msg_iov; 
+        lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
+        unsigned int      payload_offset = lntmsg->msg_offset;
+        unsigned int      payload_nob = lntmsg->msg_len;
+        kib_msg_t        *ibmsg;
+        kib_tx_t         *tx;
+        int               nob;
+        int               rc;
 
         /* NB 'private' is different depending on what we're sending.... */
 
-        CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64
-               " pid %d\n", payload_nob, payload_niov, nid , pid);
+        CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
+               payload_nob, payload_niov, libcfs_id2str(target));
 
         LASSERT (payload_nob == 0 || payload_niov > 0);
-        LASSERT (payload_niov <= PTL_MD_MAX_IOV);
+        LASSERT (payload_niov <= LNET_MAX_IOV);
 
         /* Thread context */
         LASSERT (!in_interrupt());
@@ -1365,138 +1464,90 @@ kibnal_sendmsg(lib_nal_t    *nal,
         switch (type) {
         default:
                 LBUG();
-                return (PTL_FAIL);
+                return (-EIO);
                 
-        case PTL_MSG_REPLY: {
-                /* reply's 'private' is the incoming receive */
-                kib_rx_t *rx = private;
-
-                LASSERT(rx != NULL);
-
-                if (rx->rx_msg->ibm_type == IBNAL_MSG_IMMEDIATE) {
-                        /* RDMA not expected */
-                        nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
-                        if (nob > IBNAL_MSG_SIZE) {
-                                CERROR("REPLY for "LPX64" too big (RDMA not requested):"
-                                       "%d (max for message is %d)\n", 
-                                       nid, payload_nob, IBNAL_MSG_SIZE);
-                                CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
-                                       nob, nid);
-                                return PTL_FAIL;
-                        }
-                        break;
-                }
-
-                /* Incoming message consistent with RDMA? */
-                if (rx->rx_msg->ibm_type != IBNAL_MSG_GET_REQ) {
-                        CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
-                               nid, rx->rx_msg->ibm_type);
-                        return PTL_FAIL;
-                }
+        case LNET_MSG_ACK:
+                LASSERT (payload_nob == 0);
+                break;
 
-                /* NB rx_complete() will send GET_NAK when I return to it from
-                 * here, unless I set rx_responded! */
+        case LNET_MSG_GET:
+                if (routing || target_is_router)
+                        break;                  /* send IMMEDIATE */
+                
+                /* is the REPLY message too small for RDMA? */
+                nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
+                if (nob <= IBNAL_MSG_SIZE)
+                        break;                  /* send IMMEDIATE */
 
-                tx = kibnal_get_idle_tx(0);
+                tx = kibnal_get_idle_tx();
                 if (tx == NULL) {
-                        CERROR("Can't get tx for REPLY to "LPX64"\n", nid);
-                        return PTL_FAIL;
-                }
-
-                if (payload_nob == 0)
-                        rc = 0;
-                else if (payload_kiov == NULL)
-                        rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0, 
-                                                 payload_niov, payload_iov, 
-                                                 payload_offset, payload_nob);
-                else
-                        rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 0,
-                                                  payload_niov, payload_kiov,
-                                                  payload_offset, payload_nob);
-                if (rc != 0) {
-                        CERROR("Can't setup GET src for "LPX64": %d\n", nid, rc);
-                        kibnal_tx_done(tx);
-                        return PTL_FAIL;
+                        CERROR("Can allocate txd for GET to %s: \n",
+                               libcfs_nid2str(target.nid));
+                        return -ENOMEM;
                 }
                 
-                rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, payload_nob,
-                                      &rx->rx_msg->ibm_u.get.ibgm_rd,
-                                      rx->rx_msg->ibm_u.get.ibgm_cookie);
-                if (rc < 0) {
-                        CERROR("Can't setup rdma for GET from "LPX64": %d\n", 
-                               nid, rc);
-                } else if (rc == 0) {
-                        /* No RDMA: local completion may happen now! */
-                        lib_finalize (&kibnal_lib, NULL, libmsg, PTL_OK);
-                } else {
-                        /* RDMA: lib_finalize(libmsg) when it completes */
-                        tx->tx_libmsg[0] = libmsg;
-                }
-
-                kibnal_queue_tx(tx, rx->rx_conn);
-                rx->rx_responded = 1;
-                return (rc >= 0) ? PTL_OK : PTL_FAIL;
-        }
-
-        case PTL_MSG_GET:
-                /* will the REPLY message be small enough not to need RDMA? */
-                nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[libmsg->md->length]);
-                if (nob <= IBNAL_MSG_SIZE)
-                        break;
-
-                tx = kibnal_get_idle_tx(1);     /* may block; caller is an app thread */
-                LASSERT (tx != NULL);
-
                 ibmsg = tx->tx_msg;
                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
 
-                if ((libmsg->md->options & PTL_MD_KIOV) == 0)
+                if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
                         rc = kibnal_setup_rd_iov(tx, &ibmsg->ibm_u.get.ibgm_rd,
                                                  vv_acc_r_mem_write,
-                                                 libmsg->md->md_niov,
-                                                 libmsg->md->md_iov.iov,
-                                                 0, libmsg->md->length);
+                                                 lntmsg->msg_md->md_niov,
+                                                 lntmsg->msg_md->md_iov.iov,
+                                                 0, lntmsg->msg_md->md_length);
                 else
                         rc = kibnal_setup_rd_kiov(tx, &ibmsg->ibm_u.get.ibgm_rd,
                                                   vv_acc_r_mem_write,
-                                                  libmsg->md->md_niov,
-                                                  libmsg->md->md_iov.kiov,
-                                                  0, libmsg->md->length);
+                                                  lntmsg->msg_md->md_niov,
+                                                  lntmsg->msg_md->md_iov.kiov,
+                                                  0, lntmsg->msg_md->md_length);
                 if (rc != 0) {
-                        CERROR("Can't setup GET sink for "LPX64": %d\n", nid, rc);
+                        CERROR("Can't setup GET sink for %s: %d\n",
+                               libcfs_nid2str(target.nid), rc);
                         kibnal_tx_done(tx);
-                        return PTL_FAIL;
+                        return -EIO;
                 }
 
-                n = ibmsg->ibm_u.get.ibgm_rd.rd_nfrag;
-                nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[n]);
+#if IBNAL_USE_FMR
+                nob = sizeof(kib_get_msg_t);
+#else
+                {
+                        int n = ibmsg->ibm_u.get.ibgm_rd.rd_nfrag;
+                        
+                        nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[n]);
+                }
+#endif
                 kibnal_init_tx_msg(tx, IBNAL_MSG_GET_REQ, nob);
 
-                tx->tx_libmsg[1] = lib_create_reply_msg(&kibnal_lib, nid, libmsg);
-                if (tx->tx_libmsg[1] == NULL) {
-                        CERROR("Can't create reply for GET -> "LPX64"\n", nid);
+                tx->tx_lntmsg[1] = lnet_create_reply_msg(kibnal_data.kib_ni,
+                                                         lntmsg);
+                if (tx->tx_lntmsg[1] == NULL) {
+                        CERROR("Can't create reply for GET -> %s\n",
+                               libcfs_nid2str(target.nid));
                         kibnal_tx_done(tx);
-                        return PTL_FAIL;
+                        return -EIO;
                 }
 
-                tx->tx_libmsg[0] = libmsg;      /* finalise libmsg[0,1] on completion */
+                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
                 tx->tx_waiting = 1;             /* waiting for GET_DONE */
-                kibnal_launch_tx(tx, nid);
-                return PTL_OK;
-
-        case PTL_MSG_ACK:
-                LASSERT (payload_nob == 0);
-                break;
+                kibnal_launch_tx(tx, target.nid);
+                return 0;
 
-        case PTL_MSG_PUT:
+        case LNET_MSG_REPLY:
+        case LNET_MSG_PUT:
                 /* Is the payload small enough not to need RDMA? */
                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
                 if (nob <= IBNAL_MSG_SIZE)
-                        break;
+                        break;                  /* send IMMEDIATE */
 
-                tx = kibnal_get_idle_tx(1);     /* may block: caller is app thread */
-                LASSERT (tx != NULL);
+                tx = kibnal_get_idle_tx();
+                if (tx == NULL) {
+                        CERROR("Can't allocate %s txd for %s\n",
+                               type == LNET_MSG_PUT ? "PUT" : "REPLY",
+                               libcfs_nid2str(target.nid));
+                        return -ENOMEM;
+                }
 
                 if (payload_kiov == NULL)
                         rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0,
@@ -1507,9 +1558,10 @@ kibnal_sendmsg(lib_nal_t    *nal,
                                                   payload_niov, payload_kiov,
                                                   payload_offset, payload_nob);
                 if (rc != 0) {
-                        CERROR("Can't setup PUT src for "LPX64": %d\n", nid, rc);
+                        CERROR("Can't setup PUT src for %s: %d\n",
+                               libcfs_nid2str(target.nid), rc);
                         kibnal_tx_done(tx);
-                        return PTL_FAIL;
+                        return -EIO;
                 }
 
                 ibmsg = tx->tx_msg;
@@ -1517,74 +1569,133 @@ kibnal_sendmsg(lib_nal_t    *nal,
                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));
 
-                tx->tx_libmsg[0] = libmsg;      /* finalise libmsg on completion */
+                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
                 tx->tx_waiting = 1;             /* waiting for PUT_{ACK,NAK} */
-                kibnal_launch_tx(tx, nid);
-                return PTL_OK;
+                kibnal_launch_tx(tx, target.nid);
+                return 0;
         }
 
+        /* send IMMEDIATE */
+
         LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])
                  <= IBNAL_MSG_SIZE);
 
-        tx = kibnal_get_idle_tx(!(type == PTL_MSG_ACK ||
-                                  type == PTL_MSG_REPLY));
+        tx = kibnal_get_idle_tx();
         if (tx == NULL) {
-                CERROR ("Can't send %d to "LPX64": tx descs exhausted\n", type, nid);
-                return PTL_NO_SPACE;
+                CERROR ("Can't send %d to %s: tx descs exhausted\n",
+                        type, libcfs_nid2str(target.nid));
+                return -ENOMEM;
         }
 
         ibmsg = tx->tx_msg;
         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
 
-        if (payload_nob > 0) {
-                if (payload_kiov != NULL)
-                        lib_copy_kiov2buf(ibmsg->ibm_u.immediate.ibim_payload,
-                                          payload_niov, payload_kiov,
-                                          payload_offset, payload_nob);
-                else
-                        lib_copy_iov2buf(ibmsg->ibm_u.immediate.ibim_payload,
-                                         payload_niov, payload_iov,
-                                         payload_offset, payload_nob);
-        }
+        if (payload_kiov != NULL)
+                lnet_copy_kiov2flat(IBNAL_MSG_SIZE, ibmsg,
+                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
+                                    payload_niov, payload_kiov,
+                                    payload_offset, payload_nob);
+        else
+                lnet_copy_iov2flat(IBNAL_MSG_SIZE, ibmsg,
+                                   offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
+                                   payload_niov, payload_iov,
+                                   payload_offset, payload_nob);
 
         nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);
         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE, nob);
 
-        tx->tx_libmsg[0] = libmsg;              /* finalise libmsg on completion */
-        kibnal_launch_tx(tx, nid);
-        return PTL_OK;
+        tx->tx_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
+        kibnal_launch_tx(tx, target.nid);
+        return 0;
 }
 
-ptl_err_t
-kibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
-               ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
-               unsigned int payload_niov, struct iovec *payload_iov,
-               size_t payload_offset, size_t payload_len)
+void
+kibnal_reply (lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg)
 {
-        CDEBUG(D_NET, "  pid = %d, nid="LPU64"\n",
-               pid, nid);
-        return (kibnal_sendmsg(nal, private, cookie,
-                               hdr, type, nid, pid,
-                               payload_niov, payload_iov, NULL,
-                               payload_offset, payload_len));
+        lnet_process_id_t target = lntmsg->msg_target;
+        unsigned int      niov = lntmsg->msg_niov; 
+        struct iovec     *iov = lntmsg->msg_iov; 
+        lnet_kiov_t      *kiov = lntmsg->msg_kiov;
+        unsigned int      offset = lntmsg->msg_offset;
+        unsigned int      nob = lntmsg->msg_len;
+        kib_tx_t         *tx;
+        int               rc;
+        
+        tx = kibnal_get_idle_tx();
+        if (tx == NULL) {
+                CERROR("Can't get tx for REPLY to %s\n",
+                       libcfs_nid2str(target.nid));
+                goto failed_0;
+        }
+
+        if (nob == 0)
+                rc = 0;
+        else if (kiov == NULL)
+                rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0, 
+                                         niov, iov, offset, nob);
+        else
+                rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 0,
+                                          niov, kiov, offset, nob);
+
+        if (rc != 0) {
+                CERROR("Can't setup GET src for %s: %d\n",
+                       libcfs_nid2str(target.nid), rc);
+                goto failed_1;
+        }
+        
+        rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, nob,
+                              &rx->rx_msg->ibm_u.get.ibgm_rd,
+                              rx->rx_msg->ibm_u.get.ibgm_cookie);
+        if (rc < 0) {
+                CERROR("Can't setup rdma for GET from %s: %d\n", 
+                       libcfs_nid2str(target.nid), rc);
+                goto failed_1;
+        }
+        
+        if (rc == 0) {
+                /* No RDMA: local completion may happen now! */
+                lnet_finalize(ni, lntmsg, 0);
+        } else {
+                /* RDMA: lnet_finalize(lntmsg) when it
+                 * completes */
+                tx->tx_lntmsg[0] = lntmsg;
+        }
+        
+        kibnal_queue_tx(tx, rx->rx_conn);
+        return;
+        
+ failed_1:
+        kibnal_tx_done(tx);
+ failed_0:
+        lnet_finalize(ni, lntmsg, -EIO);
 }
 
-ptl_err_t
-kibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
-                     ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
-                     unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
-                     size_t payload_offset, size_t payload_len)
+int
+kibnal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
+                   void **new_private)
 {
-        return (kibnal_sendmsg(nal, private, cookie,
-                               hdr, type, nid, pid,
-                               payload_niov, NULL, payload_kiov,
-                               payload_offset, payload_len));
+        kib_rx_t    *rx = private;
+        kib_conn_t  *conn = rx->rx_conn;
+
+        if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
+                /* Can't block if RDMA completions need normal credits */
+                LCONSOLE_ERROR_MSG(0x129, "Dropping message from %s: no buffers"
+                                   " free. %s is running an old version of LNET "
+                                   "that may deadlock if messages wait for"
+                                   "buffers) \n", 
+                                   libcfs_nid2str(conn->ibc_peer->ibp_nid),
+                                   libcfs_nid2str(conn->ibc_peer->ibp_nid));
+                return -EDEADLK;
+        }
+        
+        *new_private = private;
+        return 0;
 }
 
-ptl_err_t
-kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
-                 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
-                 size_t offset, int mlen, int rlen)
+int
+kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
+             unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
+             unsigned int offset, unsigned int mlen, unsigned int rlen)
 {
         kib_rx_t    *rx = private;
         kib_msg_t   *rxmsg = rx->rx_msg;
@@ -1592,11 +1703,10 @@ kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
         kib_tx_t    *tx;
         kib_msg_t   *txmsg;
         int          nob;
-        int          rc;
-        int          n;
+        int          post_cred = 1;
+        int          rc = 0;
         
         LASSERT (mlen <= rlen);
-        LASSERT (mlen >= 0);
         LASSERT (!in_interrupt());
         /* Either all pages or all vaddrs */
         LASSERT (!(kiov != NULL && iov != NULL));
@@ -1607,38 +1717,42 @@ kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
                 
         case IBNAL_MSG_IMMEDIATE:
                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
-                if (nob > IBNAL_MSG_SIZE) {
-                        CERROR ("Immediate message from "LPX64" too big: %d\n",
-                                rxmsg->ibm_u.immediate.ibim_hdr.src_nid, rlen);
-                        return (PTL_FAIL);
+                if (nob > rx->rx_nob) {
+                        CERROR ("Immediate message from %s too big: %d(%d)\n",
+                                libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
+                                nob, rx->rx_nob);
+                        rc = -EPROTO;
+                        break;
                 }
 
                 if (kiov != NULL)
-                        lib_copy_buf2kiov(niov, kiov, offset,
-                                          rxmsg->ibm_u.immediate.ibim_payload,
-                                          mlen);
+                        lnet_copy_flat2kiov(niov, kiov, offset,
+                                            IBNAL_MSG_SIZE, rxmsg,
+                                            offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
+                                            mlen);
                 else
-                        lib_copy_buf2iov(niov, iov, offset,
-                                         rxmsg->ibm_u.immediate.ibim_payload,
-                                         mlen);
-
-                lib_finalize (nal, NULL, libmsg, PTL_OK);
-                return (PTL_OK);
+                        lnet_copy_flat2iov(niov, iov, offset,
+                                           IBNAL_MSG_SIZE, rxmsg,
+                                           offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),
+                                           mlen);
+                lnet_finalize (ni, lntmsg, 0);
+                break;
 
         case IBNAL_MSG_PUT_REQ:
-                /* NB rx_complete() will send PUT_NAK when I return to it from
-                 * here, unless I set rx_responded!  */
-
-                if (mlen == 0) { /* No payload to RDMA */
-                        lib_finalize(nal, NULL, libmsg, PTL_OK);
-                        return PTL_OK;
+                if (mlen == 0) {
+                        lnet_finalize(ni, lntmsg, 0);
+                        kibnal_send_completion(rx->rx_conn, IBNAL_MSG_PUT_NAK, 0,
+                                               rxmsg->ibm_u.putreq.ibprm_cookie);
+                        break;
                 }
-
-                tx = kibnal_get_idle_tx(0);
+                
+                tx = kibnal_get_idle_tx();
                 if (tx == NULL) {
-                        CERROR("Can't allocate tx for "LPX64"\n",
-                               conn->ibc_peer->ibp_nid);
-                        return PTL_FAIL;
+                        CERROR("Can't allocate tx for %s\n",
+                               libcfs_nid2str(conn->ibc_peer->ibp_nid));
+                        /* Not replying will break the connection */
+                        rc = -ENOMEM;
+                        break;
                 }
 
                 txmsg = tx->tx_msg;
@@ -1653,52 +1767,51 @@ kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
                                                   vv_acc_r_mem_write,
                                                   niov, kiov, offset, mlen);
                 if (rc != 0) {
-                        CERROR("Can't setup PUT sink for "LPX64": %d\n",
-                               conn->ibc_peer->ibp_nid, rc);
+                        CERROR("Can't setup PUT sink for %s: %d\n",
+                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
                         kibnal_tx_done(tx);
-                        return PTL_FAIL;
+                        /* tell peer it's over */
+                        kibnal_send_completion(rx->rx_conn, IBNAL_MSG_PUT_NAK, rc,
+                                               rxmsg->ibm_u.putreq.ibprm_cookie);
+                        break;
                 }
 
                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
+#if IBNAL_USE_FMR
+                nob = sizeof(kib_putack_msg_t);
+#else
+                {
+                        int n = tx->tx_msg->ibm_u.putack.ibpam_rd.rd_nfrag;
 
-                n = tx->tx_msg->ibm_u.putack.ibpam_rd.rd_nfrag;
-                nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[n]);
+                        nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[n]);
+                }
+#endif
                 kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_ACK, nob);
 
-                tx->tx_libmsg[0] = libmsg;      /* finalise libmsg on completion */
+                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
                 tx->tx_waiting = 1;             /* waiting for PUT_DONE */
                 kibnal_queue_tx(tx, conn);
 
-                LASSERT (!rx->rx_responded);
-                rx->rx_responded = 1;
-                return PTL_OK;
+                if (conn->ibc_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD)
+                        post_cred = 0; /* peer still owns 'rx' for sending PUT_DONE */
+                break;
 
         case IBNAL_MSG_GET_REQ:
-                /* We get called here just to discard any junk after the
-                 * GET hdr. */
-                LASSERT (libmsg == NULL);
-                lib_finalize (nal, NULL, libmsg, PTL_OK);
-                return (PTL_OK);
+                if (lntmsg != NULL) {
+                        /* Optimized GET; RDMA lntmsg's payload */
+                        kibnal_reply(ni, rx, lntmsg);
+                } else {
+                        /* GET didn't match anything */
+                        kibnal_send_completion(rx->rx_conn, IBNAL_MSG_GET_DONE, 
+                                               -ENODATA,
+                                               rxmsg->ibm_u.get.ibgm_cookie);
+                }
+                break;
         }
-}
 
-ptl_err_t
-kibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
-              unsigned int niov, struct iovec *iov, 
-              size_t offset, size_t mlen, size_t rlen)
-{
-        return (kibnal_recvmsg (nal, private, msg, niov, iov, NULL,
-                                offset, mlen, rlen));
-}
-
-ptl_err_t
-kibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
-                     unsigned int niov, ptl_kiov_t *kiov, 
-                     size_t offset, size_t mlen, size_t rlen)
-{
-        return (kibnal_recvmsg (nal, private, msg, niov, NULL, kiov,
-                                offset, mlen, rlen));
+        kibnal_post_rx(rx, post_cred, 0);
+        return rc;
 }
 
 int
@@ -1720,6 +1833,41 @@ kibnal_thread_fini (void)
 }
 
 void
+kibnal_peer_alive (kib_peer_t *peer)
+{
+        /* This is racy, but everyone's only writing cfs_time_current() */
+        peer->ibp_last_alive = cfs_time_current();
+        mb();
+}
+
+void
+kibnal_peer_notify (kib_peer_t *peer)
+{
+        time_t        last_alive = 0;
+        int           error = 0;
+        unsigned long flags;
+        
+        read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
+
+        if (list_empty(&peer->ibp_conns) &&
+            peer->ibp_accepting == 0 &&
+            peer->ibp_connecting == 0 &&
+            peer->ibp_error != 0) {
+                error = peer->ibp_error;
+                peer->ibp_error = 0;
+                
+                last_alive = cfs_time_current_sec() -
+                             cfs_duration_sec(cfs_time_current() -
+                                              peer->ibp_last_alive);
+        }
+        
+        read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
+        
+        if (error != 0)
+                lnet_notify(kibnal_data.kib_ni, peer->ibp_nid, 0, last_alive);
+}
+
+void
 kibnal_schedule_conn (kib_conn_t *conn)
 {
         unsigned long flags;
@@ -1737,14 +1885,13 @@ kibnal_schedule_conn (kib_conn_t *conn)
 void
 kibnal_close_conn_locked (kib_conn_t *conn, int error)
 {
-        /* This just does the immmediate housekeeping.  'error' is zero for a
+        /* This just does the immediate housekeeping.  'error' is zero for a
          * normal shutdown which can happen only after the connection has been
          * established.  If the connection is established, schedule the
          * connection to be finished off by the connd.  Otherwise the connd is
          * already dealing with it (either to set it up or tear it down).
          * Caller holds kib_global_lock exclusively in irq context */
         kib_peer_t       *peer = conn->ibc_peer;
-        struct list_head *tmp;
         
         LASSERT (error != 0 || conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
 
@@ -1759,48 +1906,33 @@ kibnal_close_conn_locked (kib_conn_t *conn, int error)
 
         if (error == 0 &&
             list_empty(&conn->ibc_tx_queue) &&
+            list_empty(&conn->ibc_tx_queue_rsrvd) &&
+            list_empty(&conn->ibc_tx_queue_nocred) &&
             list_empty(&conn->ibc_active_txs)) {
-                CDEBUG(D_NET, "closing conn to "LPX64
+                CDEBUG(D_NET, "closing conn to %s"
                        " rx# "LPD64" tx# "LPD64"\n", 
-                       peer->ibp_nid, conn->ibc_txseq, conn->ibc_rxseq);
+                       libcfs_nid2str(peer->ibp_nid),
+                       conn->ibc_txseq, conn->ibc_rxseq);
         } else {
-                CERROR("Closing conn to "LPX64": error %d%s%s"
+                CDEBUG(D_NETERROR, "Closing conn to %s: error %d%s%s%s%s"
                        " rx# "LPD64" tx# "LPD64"\n",
-                       peer->ibp_nid, error,
+                       libcfs_nid2str(peer->ibp_nid), error,
                        list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
+                       list_empty(&conn->ibc_tx_queue_rsrvd) ? "" : "(sending_rsrvd)",
+                       list_empty(&conn->ibc_tx_queue_nocred) ? "" : "(sending_nocred)",
                        list_empty(&conn->ibc_active_txs) ? "" : "(waiting)",
                        conn->ibc_txseq, conn->ibc_rxseq);
-
-#if 0
-                /* can't skip down the queue without holding ibc_lock (see above) */
-                list_for_each(tmp, &conn->ibc_tx_queue) {
-                        kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
-                        
-                        CERROR("   queued tx type %x cookie "LPX64
-                               " sending %d waiting %d ticks %ld/%d\n", 
-                               tx->tx_msg->ibm_type, tx->tx_cookie, 
-                               tx->tx_sending, tx->tx_waiting,
-                               (long)(tx->tx_deadline - jiffies), HZ);
-                }
-
-                list_for_each(tmp, &conn->ibc_active_txs) {
-                        kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
-                        
-                        CERROR("   active tx type %x cookie "LPX64
-                               " sending %d waiting %d ticks %ld/%d\n", 
-                               tx->tx_msg->ibm_type, tx->tx_cookie, 
-                               tx->tx_sending, tx->tx_waiting,
-                               (long)(tx->tx_deadline - jiffies), HZ);
-                }
-#endif
         }
 
         list_del (&conn->ibc_list);
-        
-        if (list_empty (&peer->ibp_conns) &&    /* no more conns */
-            peer->ibp_persistence == 0 &&       /* non-persistent peer */
-            kibnal_peer_active(peer)) {         /* still in peer table */
-                kibnal_unlink_peer_locked (peer);
+
+        if (list_empty (&peer->ibp_conns)) {   /* no more conns */
+                if (peer->ibp_persistence == 0 && /* non-persistent peer */
+                    kibnal_peer_active(peer))     /* still in peer table */
+                        kibnal_unlink_peer_locked (peer);
+
+                /* set/clear error on last conn */
+                peer->ibp_error = conn->ibc_comms_error;
         }
 
         kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECT1);
@@ -1845,84 +1977,76 @@ kibnal_handle_early_rxs(kib_conn_t *conn)
 }
 
 void
-kibnal_conn_disconnected(kib_conn_t *conn)
+kibnal_abort_txs(kib_conn_t *conn, struct list_head *txs)
 {
-        LIST_HEAD        (zombies); 
-        struct list_head *tmp;
-        struct list_head *nxt;
-        kib_tx_t         *tx;
-
-        /* I'm the connd */
-        LASSERT (!in_interrupt());
-        LASSERT (current == kibnal_data.kib_connd);
-        LASSERT (conn->ibc_state >= IBNAL_CONN_INIT);
-        
-        kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECTED);
-
-        /* move QP to error state to make posted work items complete */
-        kibnal_set_qp_state(conn, vv_qp_state_error);
+        LIST_HEAD           (zombies); 
+        struct list_head    *tmp;
+        struct list_head    *nxt;
+        kib_tx_t            *tx;
 
         spin_lock(&conn->ibc_lock);
 
-        /* Complete all tx descs not waiting for sends to complete.
-         * NB we should be safe from RDMA now that the QP has changed state */
-
-        list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) {
+        list_for_each_safe (tmp, nxt, txs) {
                 tx = list_entry (tmp, kib_tx_t, tx_list);
 
-                LASSERT (tx->tx_queued);
-
+                if (txs == &conn->ibc_active_txs) {
+                        LASSERT (!tx->tx_queued);
+                        LASSERT (tx->tx_waiting || tx->tx_sending != 0);
+                } else {
+                        LASSERT (tx->tx_queued);
+                }
+                
                 tx->tx_status = -ECONNABORTED;
                 tx->tx_queued = 0;
                 tx->tx_waiting = 0;
                 
-                if (tx->tx_sending != 0)
-                        continue;
-
-                list_del (&tx->tx_list);
-                list_add (&tx->tx_list, &zombies);
+                if (tx->tx_sending == 0) {
+                        list_del (&tx->tx_list);
+                        list_add (&tx->tx_list, &zombies);
+                }
         }
 
-        list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) {
-                tx = list_entry (tmp, kib_tx_t, tx_list);
-
-                LASSERT (!tx->tx_queued);
-                LASSERT (tx->tx_waiting ||
-                         tx->tx_sending != 0);
+        spin_unlock(&conn->ibc_lock);
 
-                tx->tx_status = -ECONNABORTED;
-                tx->tx_waiting = 0;
-                
-                if (tx->tx_sending != 0)
-                        continue;
+        kibnal_txlist_done(&zombies, -ECONNABORTED);
+}
 
-                list_del (&tx->tx_list);
-                list_add (&tx->tx_list, &zombies);
-        }
+void
+kibnal_conn_disconnected(kib_conn_t *conn)
+{
+        /* I'm the connd */
+        LASSERT (!in_interrupt());
+        LASSERT (current == kibnal_data.kib_connd);
+        LASSERT (conn->ibc_state >= IBNAL_CONN_INIT);
         
-        spin_unlock(&conn->ibc_lock);
+        kibnal_set_conn_state(conn, IBNAL_CONN_DISCONNECTED);
 
-        while (!list_empty(&zombies)) {
-                tx = list_entry (zombies.next, kib_tx_t, tx_list);
+        /* move QP to error state to make posted work items complete */
+        kibnal_set_qp_state(conn, vv_qp_state_error);
 
-                list_del(&tx->tx_list);
-                kibnal_tx_done (tx);
-        }
+        /* Complete all tx descs not waiting for sends to complete.
+         * NB we should be safe from RDMA now that the QP has changed state */
+
+        kibnal_abort_txs(conn, &conn->ibc_tx_queue);
+        kibnal_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
+        kibnal_abort_txs(conn, &conn->ibc_tx_queue_nocred);
+        kibnal_abort_txs(conn, &conn->ibc_active_txs);
 
         kibnal_handle_early_rxs(conn);
+
+        kibnal_peer_notify(conn->ibc_peer);
 }
 
 void
-kibnal_peer_connect_failed (kib_peer_t *peer, int active)
+kibnal_peer_connect_failed (kib_peer_t *peer, int active, int error)
 {
-        struct list_head  zombies;
-        kib_tx_t         *tx;
+        LIST_HEAD        (zombies);
         unsigned long     flags;
 
         /* Only the connd creates conns => single threaded */
+        LASSERT (error != 0);
         LASSERT (!in_interrupt());
         LASSERT (current == kibnal_data.kib_connd);
-        LASSERT (peer->ibp_reconnect_interval >= IBNAL_MIN_RECONNECT_INTERVAL);
 
         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
 
@@ -1930,10 +2054,12 @@ kibnal_peer_connect_failed (kib_peer_t *peer, int active)
                 LASSERT (peer->ibp_connecting != 0);
                 peer->ibp_connecting--;
         } else {
-                LASSERT (!kibnal_peer_active(peer));
+                LASSERT (peer->ibp_accepting != 0);
+                peer->ibp_accepting--;
         }
         
-        if (peer->ibp_connecting != 0) {
+        if (peer->ibp_connecting != 0 ||
+            peer->ibp_accepting != 0) {
                 /* another connection attempt under way (loopback?)... */
                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
                 return;
@@ -1941,11 +2067,17 @@ kibnal_peer_connect_failed (kib_peer_t *peer, int active)
 
         if (list_empty(&peer->ibp_conns)) {
                 /* Say when active connection can be re-attempted */
-                peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval;
-                /* Increase reconnection interval */
-                peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2,
-                                                    IBNAL_MAX_RECONNECT_INTERVAL);
-        
+                peer->ibp_reconnect_interval *= 2;
+                peer->ibp_reconnect_interval =
+                        MAX(peer->ibp_reconnect_interval,
+                            *kibnal_tunables.kib_min_reconnect_interval);
+                peer->ibp_reconnect_interval =
+                        MIN(peer->ibp_reconnect_interval,
+                            *kibnal_tunables.kib_max_reconnect_interval);
+                
+                peer->ibp_reconnect_time = jiffies + 
+                                           peer->ibp_reconnect_interval * HZ;
+
                 /* Take peer's blocked transmits to complete with error */
                 list_add(&zombies, &peer->ibp_tx_queue);
                 list_del_init(&peer->ibp_tx_queue);
@@ -1955,6 +2087,8 @@ kibnal_peer_connect_failed (kib_peer_t *peer, int active)
                         /* failed connection attempt on non-persistent peer */
                         kibnal_unlink_peer_locked (peer);
                 }
+
+                peer->ibp_error = error;
         } else {
                 /* Can't have blocked transmits if there are connections */
                 LASSERT (list_empty(&peer->ibp_tx_queue));
@@ -1962,31 +2096,49 @@ kibnal_peer_connect_failed (kib_peer_t *peer, int active)
         
         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
 
+        kibnal_peer_notify(peer);
+
         if (list_empty (&zombies)) 
                 return;
         
-        CERROR ("Deleting messages for "LPX64": connection failed\n", peer->ibp_nid);
-        do {
-                tx = list_entry (zombies.next, kib_tx_t, tx_list);
+        CDEBUG (D_NETERROR, "Deleting messages for %s: connection failed\n",
+                libcfs_nid2str(peer->ibp_nid));
 
-                list_del (&tx->tx_list);
-                /* complete now */
-                tx->tx_status = -EHOSTUNREACH;
-                kibnal_tx_done (tx);
-        } while (!list_empty (&zombies));
+        kibnal_txlist_done(&zombies, -EHOSTUNREACH);
 }
 
 void
-kibnal_connreq_done(kib_conn_t *conn, int active, int status)
+kibnal_reject(cm_cep_handle_t cep, int why)
 {
-        static cm_reject_data_t   rej;
+        static cm_reject_data_t   rejs[3];
+        cm_reject_data_t         *rej = &rejs[why];
+
+        LASSERT (why >= 0 && why < sizeof(rejs)/sizeof(rejs[0]));
+
+        /* If I wasn't so lazy, I'd initialise this only once; it's effective
+         * read-only */
+        rej->reason = cm_rej_code_usr_rej;
+        rej->priv_data[0] = (IBNAL_MSG_MAGIC) & 0xff;
+        rej->priv_data[1] = (IBNAL_MSG_MAGIC >> 8) & 0xff;
+        rej->priv_data[2] = (IBNAL_MSG_MAGIC >> 16) & 0xff;
+        rej->priv_data[3] = (IBNAL_MSG_MAGIC >> 24) & 0xff;
+        rej->priv_data[4] = (IBNAL_MSG_VERSION) & 0xff;
+        rej->priv_data[5] = (IBNAL_MSG_VERSION >> 8) & 0xff;
+        rej->priv_data[6] = why;
+
+        cm_reject(cep, rej);
+}
 
+void
+kibnal_connreq_done(kib_conn_t *conn, int active, int status)
+{
         struct list_head   txs;
         kib_peer_t        *peer = conn->ibc_peer;
-        kib_peer_t        *peer2;
         unsigned long      flags;
         kib_tx_t          *tx;
 
+        CDEBUG(D_NET,"%d\n", status);
+
         /* Only the connd creates conns => single threaded */
         LASSERT (!in_interrupt());
         LASSERT (current == kibnal_data.kib_connd);
@@ -1995,10 +2147,10 @@ kibnal_connreq_done(kib_conn_t *conn, int active, int status)
         if (active) {
                 LASSERT (peer->ibp_connecting > 0);
         } else {
-                LASSERT (!kibnal_peer_active(peer));
+                LASSERT (peer->ibp_accepting > 0);
         }
         
-        PORTAL_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
+        LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
         conn->ibc_connvars = NULL;
 
         if (status != 0) {
@@ -2010,15 +2162,13 @@ kibnal_connreq_done(kib_conn_t *conn, int active, int status)
                 case IBNAL_CONN_ACTIVE_CHECK_REPLY:
                         /* got a connection reply but failed checks */
                         LASSERT (active);
-                        memset(&rej, 0, sizeof(rej));
-                        rej.reason = cm_rej_code_usr_rej;
-                        cm_reject(conn->ibc_cep, &rej);
+                        kibnal_reject(conn->ibc_cep, IBNAL_REJECT_FATAL);
                         break;
 
                 case IBNAL_CONN_ACTIVE_CONNECT:
                         LASSERT (active);
                         cm_cancel(conn->ibc_cep);
-                        kibnal_pause(HZ/10);
+                        cfs_pause(cfs_time_seconds(1)/10);
                         /* cm_connect() failed immediately or
                          * callback returned failure */
                         break;
@@ -2038,7 +2188,7 @@ kibnal_connreq_done(kib_conn_t *conn, int active, int status)
                         break;
                 }
 
-                kibnal_peer_connect_failed(conn->ibc_peer, active);
+                kibnal_peer_connect_failed(peer, active, status);
                 kibnal_conn_disconnected(conn);
                 return;
         }
@@ -2052,30 +2202,15 @@ kibnal_connreq_done(kib_conn_t *conn, int active, int status)
                 LASSERT(conn->ibc_state == IBNAL_CONN_PASSIVE_WAIT);
         }
         
+        conn->ibc_last_send = jiffies;
         kibnal_set_conn_state(conn, IBNAL_CONN_ESTABLISHED);
+        kibnal_peer_alive(peer);
 
-        if (!active) {
-                peer2 = kibnal_find_peer_locked(peer->ibp_nid);
-                if (peer2 != NULL) {
-                        /* already in the peer table; swap */
-                        conn->ibc_peer = peer2;
-                        kibnal_peer_addref(peer2);
-                        kibnal_peer_decref(peer);
-                        peer = conn->ibc_peer;
-                } else {
-                        /* add 'peer' to the peer table */
-                        kibnal_peer_addref(peer);
-                        list_add_tail(&peer->ibp_list,
-                                      kibnal_nid2peerlist(peer->ibp_nid));
-                }
-        }
-        
         /* Add conn to peer's list and nuke any dangling conns from a different
          * peer instance... */
         kibnal_conn_addref(conn);               /* +1 ref for ibc_list */
         list_add(&conn->ibc_list, &peer->ibp_conns);
-        kibnal_close_stale_conns_locked (conn->ibc_peer,
-                                         conn->ibc_incarnation);
+        kibnal_close_stale_conns_locked (peer, conn->ibc_incarnation);
 
         if (!kibnal_peer_active(peer) ||        /* peer has been deleted */
             conn->ibc_comms_error != 0 ||       /* comms error */
@@ -2085,19 +2220,21 @@ kibnal_connreq_done(kib_conn_t *conn, int active, int status)
                 kibnal_close_conn_locked(conn, -ECONNABORTED);
 
                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
-                kibnal_peer_connect_failed(peer, active);
+                kibnal_peer_connect_failed(peer, active, -ECONNABORTED);
                 return;
         }
 
         if (active)
                 peer->ibp_connecting--;
+        else
+                peer->ibp_accepting--;
 
         /* grab pending txs while I have the lock */
         list_add(&txs, &peer->ibp_tx_queue);
         list_del_init(&peer->ibp_tx_queue);
         
-        /* reset reconnect interval for next attempt */
-        peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
+        peer->ibp_reconnect_interval = 0;       /* OK to reconnect at any time */
+
         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
 
         /* Schedule blocked txs */
@@ -2156,12 +2293,12 @@ kibnal_cm_callback(cm_cep_handle_t cep, cm_conn_data_t *cmdata, void *arg)
                         break;
 
                 case IBNAL_CONN_DISCONNECT1:
-                        /* kibnal_terminate_conn is getting there; It'll see
+                        /* kibnal_disconnect_conn is getting there; It'll see
                          * ibc_disconnect set... */
                         break;
 
                 case IBNAL_CONN_DISCONNECT2:
-                        /* kibnal_terminate_conn got there already; complete
+                        /* kibnal_disconnect_conn got there already; complete
                          * the disconnect. */
                         kibnal_schedule_conn(conn);
                         break;
@@ -2176,7 +2313,7 @@ kibnal_cm_callback(cm_cep_handle_t cep, cm_conn_data_t *cmdata, void *arg)
                 LASSERT (!conn->ibc_disconnect);
                 conn->ibc_disconnect = 1;
 
-                /* kibnal_terminate_conn sent the disconnect request. */
+                /* kibnal_disconnect_conn sent the disconnect request. */
                 kibnal_schedule_conn(conn);
 
                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
@@ -2230,13 +2367,16 @@ kibnal_recv_connreq(cm_cep_handle_t *cep, cm_request_data_t *cmreq)
         static kib_msg_t        txmsg;
         static kib_msg_t        rxmsg;
         static cm_reply_data_t  reply;
-        static cm_reject_data_t reject;
 
         kib_conn_t         *conn = NULL;
         int                 rc = 0;
+        int                 reason;
         int                 rxmsgnob;
+        rwlock_t           *g_lock = &kibnal_data.kib_global_lock;
+        kib_peer_t         *peer;
+        kib_peer_t         *peer2;
+        unsigned long       flags;
         kib_connvars_t     *cv;
-        kib_peer_t         *tmp_peer;
         cm_return_t         cmrc;
         vv_return_t         vvrc;
         
@@ -2245,9 +2385,10 @@ kibnal_recv_connreq(cm_cep_handle_t *cep, cm_request_data_t *cmreq)
         LASSERT (!in_interrupt());
         LASSERT (current == kibnal_data.kib_connd);
 
-        if (cmreq->sid != IBNAL_SERVICE_NUMBER) {
+        if (cmreq->sid != (__u64)(*kibnal_tunables.kib_service_number)) {
                 CERROR(LPX64" != IBNAL_SERVICE_NUMBER("LPX64")\n",
-                       cmreq->sid, (__u64)IBNAL_SERVICE_NUMBER);
+                       cmreq->sid, (__u64)(*kibnal_tunables.kib_service_number));
+                reason = IBNAL_REJECT_FATAL;
                 goto reject;
         }
 
@@ -2255,63 +2396,131 @@ kibnal_recv_connreq(cm_cep_handle_t *cep, cm_request_data_t *cmreq)
         rxmsgnob = MIN(cm_REQ_priv_data_len, sizeof(rxmsg));
         memcpy(&rxmsg, cmreq->priv_data, rxmsgnob);
 
-        rc = kibnal_unpack_msg(&rxmsg, rxmsgnob);
+        rc = kibnal_unpack_msg(&rxmsg, 0, rxmsgnob);
         if (rc != 0) {
-                CERROR("Can't parse connection request: %d\n", rc);
+                /* SILENT! kibnal_unpack_msg() complains if required */
+                reason = IBNAL_REJECT_FATAL;
                 goto reject;
         }
 
+        if (rxmsg.ibm_version != IBNAL_MSG_VERSION)
+                CWARN("Connection from %s: old protocol version 0x%x\n",
+                      libcfs_nid2str(rxmsg.ibm_srcnid), rxmsg.ibm_version);
+
         if (rxmsg.ibm_type != IBNAL_MSG_CONNREQ) {
-                CERROR("Unexpected connreq msg type: %x from "LPX64"\n",
-                       rxmsg.ibm_type, rxmsg.ibm_srcnid);
+                CERROR("Unexpected connreq msg type: %x from %s\n",
+                       rxmsg.ibm_type, libcfs_nid2str(rxmsg.ibm_srcnid));
+                reason = IBNAL_REJECT_FATAL;
                 goto reject;
         }
 
-        if (rxmsg.ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid) {
-                CERROR("Can't accept "LPX64": bad dst nid "LPX64"\n",
-                       rxmsg.ibm_srcnid, rxmsg.ibm_dstnid);
+        if (!lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
+                                     rxmsg.ibm_dstnid)) {
+                CERROR("Can't accept %s: bad dst nid %s\n",
+                       libcfs_nid2str(rxmsg.ibm_srcnid), 
+                       libcfs_nid2str(rxmsg.ibm_dstnid));
+                reason = IBNAL_REJECT_FATAL;
                 goto reject;
         }
 
         if (rxmsg.ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
-                CERROR("Can't accept "LPX64": incompatible queue depth %d (%d wanted)\n",
-                       rxmsg.ibm_srcnid, rxmsg.ibm_u.connparams.ibcp_queue_depth, 
+                CERROR("Can't accept %s: incompatible queue depth %d (%d wanted)\n",
+                       libcfs_nid2str(rxmsg.ibm_srcnid), 
+                       rxmsg.ibm_u.connparams.ibcp_queue_depth, 
                        IBNAL_MSG_QUEUE_SIZE);
+                reason = IBNAL_REJECT_FATAL;
                 goto reject;
         }
 
         if (rxmsg.ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE) {
-                CERROR("Can't accept "LPX64": message size %d too big (%d max)\n",
-                       rxmsg.ibm_srcnid, rxmsg.ibm_u.connparams.ibcp_max_msg_size, 
+                CERROR("Can't accept %s: message size %d too big (%d max)\n",
+                       libcfs_nid2str(rxmsg.ibm_srcnid), 
+                       rxmsg.ibm_u.connparams.ibcp_max_msg_size, 
                        IBNAL_MSG_SIZE);
+                reason = IBNAL_REJECT_FATAL;
                 goto reject;
         }
                 
         if (rxmsg.ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
-                CERROR("Can't accept "LPX64": max frags %d too big (%d max)\n",
-                       rxmsg.ibm_srcnid, rxmsg.ibm_u.connparams.ibcp_max_frags, 
+                CERROR("Can't accept %s: max frags %d too big (%d max)\n",
+                       libcfs_nid2str(rxmsg.ibm_srcnid), 
+                       rxmsg.ibm_u.connparams.ibcp_max_frags, 
                        IBNAL_MAX_RDMA_FRAGS);
+                reason = IBNAL_REJECT_FATAL;
+                goto reject;
+        }
+        
+        /* assume 'rxmsg.ibm_srcnid' is a new peer; create */
+        rc = kibnal_create_peer (&peer, rxmsg.ibm_srcnid);
+        if (rc != 0) {
+                CERROR("Can't create peer for %s\n",
+                       libcfs_nid2str(rxmsg.ibm_srcnid));
+                reason = IBNAL_REJECT_NO_RESOURCES;
                 goto reject;
         }
+
+        write_lock_irqsave(g_lock, flags);
+
+        if (kibnal_data.kib_listen_handle == NULL) {
+                write_unlock_irqrestore(g_lock, flags);
+
+                CWARN ("Shutdown has started, rejecting connreq from %s\n",
+                       libcfs_nid2str(rxmsg.ibm_srcnid));
+                kibnal_peer_decref(peer);
+                reason = IBNAL_REJECT_FATAL;
+                goto reject;
+        }
+
+        peer2 = kibnal_find_peer_locked(rxmsg.ibm_srcnid);
+        if (peer2 != NULL) {
+                /* tie-break connection race in favour of the higher NID */                
+                if (peer2->ibp_connecting != 0 &&
+                    rxmsg.ibm_srcnid < kibnal_data.kib_ni->ni_nid) {
+                        write_unlock_irqrestore(g_lock, flags);
+
+                        CWARN("Conn race %s\n",
+                              libcfs_nid2str(peer2->ibp_nid));
+
+                        kibnal_peer_decref(peer);
+                        reason = IBNAL_REJECT_CONN_RACE;
+                        goto reject;
+                }
+
+                peer2->ibp_accepting++;
+                kibnal_peer_addref(peer2);
+
+                write_unlock_irqrestore(g_lock, flags);
+                kibnal_peer_decref(peer);
+                peer = peer2;
+        } else {
+                /* Brand new peer */
+                LASSERT (peer->ibp_accepting == 0);
+                peer->ibp_accepting = 1;
+
+                kibnal_peer_addref(peer);
+                list_add_tail(&peer->ibp_list, kibnal_nid2peerlist(rxmsg.ibm_srcnid));
+
+                write_unlock_irqrestore(g_lock, flags);
+        }
                 
         conn = kibnal_create_conn(cep);
         if (conn == NULL) {
-                CERROR("Can't create conn for "LPX64"\n", rxmsg.ibm_srcnid);
-                goto reject;
-        }
-        
-        /* assume 'rxmsg.ibm_srcnid' is a new peer */
-        tmp_peer = kibnal_create_peer (rxmsg.ibm_srcnid);
-        if (tmp_peer == NULL) {
-                CERROR("Can't create tmp peer for "LPX64"\n", rxmsg.ibm_srcnid);
-                kibnal_conn_decref(conn);
-                conn = NULL;
+                CERROR("Can't create conn for %s\n",
+                       libcfs_nid2str(rxmsg.ibm_srcnid));
+                kibnal_peer_connect_failed(peer, 0, -ENOMEM);
+                kibnal_peer_decref(peer);
+                reason = IBNAL_REJECT_NO_RESOURCES;
                 goto reject;
         }
 
-        conn->ibc_peer = tmp_peer;              /* conn takes over my ref */
+        conn->ibc_version = rxmsg.ibm_version;
+
+        conn->ibc_peer = peer;              /* conn takes over my ref */
         conn->ibc_incarnation = rxmsg.ibm_srcstamp;
         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
+        conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
+        LASSERT (conn->ibc_credits + conn->ibc_reserved_credits
+                 <= IBNAL_RX_MSGS);
 
         cv = conn->ibc_connvars;
 
@@ -2324,25 +2533,43 @@ kibnal_recv_connreq(cm_cep_handle_t *cep, cm_request_data_t *cmreq)
 
         vvrc = gid2gid_index(kibnal_data.kib_hca, cv->cv_port,
                              &cv->cv_path.sgid, &cv->cv_sgid_index);
-        LASSERT (vvrc == vv_return_ok);
+        if (vvrc != vv_return_ok) {
+                CERROR("gid2gid_index failed for %s: %d\n",
+                       libcfs_nid2str(rxmsg.ibm_srcnid), vvrc);
+                rc = -EIO;
+                reason = IBNAL_REJECT_FATAL;
+                goto reject;
+        }
         
         vvrc = pkey2pkey_index(kibnal_data.kib_hca, cv->cv_port,
                                cv->cv_path.pkey, &cv->cv_pkey_index);
-        LASSERT (vvrc == vv_return_ok);
+        if (vvrc != vv_return_ok) {
+                CERROR("pkey2pkey_index failed for %s: %d\n",
+                       libcfs_nid2str(rxmsg.ibm_srcnid), vvrc);
+                rc = -EIO;
+                reason = IBNAL_REJECT_FATAL;
+                goto reject;
+        }
 
         rc = kibnal_set_qp_state(conn, vv_qp_state_init);
-        if (rc != 0)
+        if (rc != 0) {
+                reason = IBNAL_REJECT_FATAL;
                 goto reject;
+        }
 
         rc = kibnal_post_receives(conn);
         if (rc != 0) {
-                CERROR("Can't post receives for "LPX64"\n", rxmsg.ibm_srcnid);
+                CERROR("Can't post receives for %s\n", 
+                       libcfs_nid2str(rxmsg.ibm_srcnid));
+                reason = IBNAL_REJECT_FATAL;
                 goto reject;
         }
 
         rc = kibnal_set_qp_state(conn, vv_qp_state_rtr);
-        if (rc != 0)
+        if (rc != 0) {
+                reason = IBNAL_REJECT_FATAL;
                 goto reject;
+        }
         
         memset(&reply, 0, sizeof(reply));
         reply.qpn                 = cv->cv_local_qpn;
@@ -2362,7 +2589,8 @@ kibnal_recv_connreq(cm_cep_handle_t *cep, cm_request_data_t *cmreq)
         txmsg.ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
         txmsg.ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
         txmsg.ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
-        kibnal_pack_msg(&txmsg, 0, rxmsg.ibm_srcnid, rxmsg.ibm_srcstamp, 0);
+        kibnal_pack_msg(&txmsg, conn->ibc_version,
+                        0, rxmsg.ibm_srcnid, rxmsg.ibm_srcstamp, 0);
 
         /* ...and copy into reply to avoid alignment issues */
         memcpy(&reply.priv_data, &txmsg, txmsg.ibm_nob);
@@ -2378,13 +2606,13 @@ kibnal_recv_connreq(cm_cep_handle_t *cep, cm_request_data_t *cmreq)
         /* back out state change (no callback happening) */
         kibnal_set_conn_state(conn, IBNAL_CONN_INIT);
         rc = -EIO;
+        reason = IBNAL_REJECT_FATAL;
                 
  reject:
-        CERROR("Rejected connreq from "LPX64"\n", rxmsg.ibm_srcnid);
+        CDEBUG(D_NET, "Rejecting connreq from %s\n",
+               libcfs_nid2str(rxmsg.ibm_srcnid));
 
-        memset(&reject, 0, sizeof(reject));
-        reject.reason = cm_rej_code_usr_rej;
-        cm_reject(cep, &reject);
+        kibnal_reject(cep, reason);
 
         if (conn != NULL) {
                 LASSERT (rc != 0);
@@ -2409,12 +2637,11 @@ kibnal_listen_callback(cm_cep_handle_t cep, cm_conn_data_t *data, void *arg)
                 return;
         }
 
-        PORTAL_ALLOC_ATOMIC(pcr, sizeof(*pcr));
+        LIBCFS_ALLOC_ATOMIC(pcr, sizeof(*pcr));
         if (pcr == NULL) {
                 CERROR("Can't allocate passive connreq\n");
 
-                cm_reject(cep, &((cm_reject_data_t) /* NB RO struct */
-                                 {.reason = cm_rej_code_no_res,}));
+                kibnal_reject(cep, IBNAL_REJECT_NO_RESOURCES);
                 cm_destroy_cep(cep);
                 return;
         }
@@ -2438,7 +2665,6 @@ kibnal_active_connect_callback (cm_cep_handle_t cep, cm_conn_data_t *cd,
         /* CAVEAT EMPTOR: tasklet context */
         kib_conn_t       *conn = (kib_conn_t *)arg;
         kib_connvars_t   *cv = conn->ibc_connvars;
-        unsigned long     flags;
 
         LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_CONNECT);
         cv->cv_conndata = *cd;
@@ -2456,7 +2682,7 @@ kibnal_connect_conn (kib_conn_t *conn)
         kib_connvars_t           *cv = conn->ibc_connvars;
         kib_peer_t               *peer = conn->ibc_peer;
         cm_return_t               cmrc;
-        
+
         /* Only called by connd => statics OK */
         LASSERT (!in_interrupt());
         LASSERT (current == kibnal_data.kib_connd);
@@ -2464,12 +2690,12 @@ kibnal_connect_conn (kib_conn_t *conn)
 
         memset(&cmreq, 0, sizeof(cmreq));
         
-        cmreq.sid = IBNAL_SERVICE_NUMBER;
+        cmreq.sid = (__u64)(*kibnal_tunables.kib_service_number);
 
         cmreq.cep_data.ca_guid              = kibnal_data.kib_hca_attrs.guid;
         cmreq.cep_data.qpn                  = cv->cv_local_qpn;
-        cmreq.cep_data.retry_cnt            = IBNAL_RETRY_CNT;
-        cmreq.cep_data.rtr_retry_cnt        = IBNAL_RNR_CNT;
+        cmreq.cep_data.retry_cnt            = *kibnal_tunables.kib_retry_cnt;
+        cmreq.cep_data.rtr_retry_cnt        = *kibnal_tunables.kib_rnr_cnt;
         cmreq.cep_data.start_psn            = cv->cv_rxpsn;
         cmreq.cep_data.end_to_end_flow_ctrl = IBNAL_EE_FLOW_CNT;
         // XXX ack_timeout?
@@ -2486,12 +2712,27 @@ kibnal_connect_conn (kib_conn_t *conn)
         msg.ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
         msg.ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
         msg.ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
-        kibnal_pack_msg(&msg, 0, peer->ibp_nid, 0, 0);
+        kibnal_pack_msg(&msg, conn->ibc_version, 0, peer->ibp_nid, 0, 0);
+
+        if (the_lnet.ln_testprotocompat != 0) {
+                /* single-shot proto check */
+                LNET_LOCK();
+                if ((the_lnet.ln_testprotocompat & 1) != 0) {
+                        msg.ibm_version++;
+                        the_lnet.ln_testprotocompat &= ~1;
+                }
+                if ((the_lnet.ln_testprotocompat & 2) != 0) {
+                        msg.ibm_magic = LNET_PROTO_MAGIC;
+                        the_lnet.ln_testprotocompat &= ~2;
+                }
+                LNET_UNLOCK();
+        }
 
         /* ...and copy into cmreq to avoid alignment issues */
         memcpy(&cmreq.priv_data, &msg, msg.ibm_nob);
         
-        CDEBUG(D_NET, "Connecting %p to "LPX64"\n", conn, peer->ibp_nid);
+        CDEBUG(D_NET, "Connecting %p to %s\n", conn,
+               libcfs_nid2str(peer->ibp_nid));
 
         kibnal_conn_addref(conn);               /* ++ref for CM callback */
         kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CONNECT);
@@ -2499,17 +2740,67 @@ kibnal_connect_conn (kib_conn_t *conn)
         cmrc = cm_connect(conn->ibc_cep, &cmreq, 
                           kibnal_active_connect_callback, conn);
         if (cmrc == cm_stat_success) {
-                CDEBUG(D_NET, "connection REQ sent to "LPX64"\n",
-                       peer->ibp_nid);
+                CDEBUG(D_NET, "connection REQ sent to %s\n",
+                       libcfs_nid2str(peer->ibp_nid));
                 return;
         }
 
-        CERROR ("Connect "LPX64" failed: %d\n", peer->ibp_nid, cmrc);
+        CERROR ("Connect %s failed: %d\n", libcfs_nid2str(peer->ibp_nid), cmrc);
         kibnal_conn_decref(conn);       /* drop callback's ref */
         kibnal_connreq_done(conn, 1, -EHOSTUNREACH);
 }
 
 void
+kibnal_reconnect (kib_conn_t *conn, int why)
+{
+        kib_peer_t      *peer = conn->ibc_peer;
+        int              retry;
+        unsigned long    flags;
+        cm_return_t      cmrc;
+        cm_cep_handle_t  cep;
+        
+        LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_CONNECT);
+
+        read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
+
+        LASSERT (peer->ibp_connecting > 0);          /* 'conn' at least */
+
+        /* retry connection if it's still needed and no other connection
+         * attempts (active or passive) are in progress.
+         * Immediate reconnect is required, so I don't even look at the
+         * reconnection timeout etc */
+
+        retry = (!list_empty(&peer->ibp_tx_queue) &&
+                 peer->ibp_connecting == 1 &&
+                 peer->ibp_accepting == 0);
+        
+        read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
+
+        if (!retry) {
+                kibnal_connreq_done(conn, 1, why);
+                return;
+        }
+
+        cep = cm_create_cep(cm_cep_transp_rc);
+        if (cep == NULL) {
+                CERROR("Can't create new CEP\n");
+                kibnal_connreq_done(conn, 1, -ENOMEM);
+                return;
+        }
+
+        cmrc = cm_cancel(conn->ibc_cep);
+        LASSERT (cmrc == cm_stat_success);
+        cmrc = cm_destroy_cep(conn->ibc_cep);
+        LASSERT (cmrc == cm_stat_success);
+
+        conn->ibc_cep = cep;
+
+        /* reuse conn; no need to peer->ibp_connecting++ */
+        kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_ARP);
+        kibnal_connect_conn(conn);
+}
+
+void
 kibnal_check_connreply (kib_conn_t *conn)
 {
         static cm_rtu_data_t  rtu;
@@ -2520,7 +2811,6 @@ kibnal_check_connreply (kib_conn_t *conn)
         kib_peer_t       *peer = conn->ibc_peer;
         int               msgnob;
         cm_return_t       cmrc;
-        cm_cep_handle_t   cep;
         unsigned long     flags;
         int               rc;
 
@@ -2541,64 +2831,73 @@ kibnal_check_connreply (kib_conn_t *conn)
                 msgnob = MIN(cm_REP_priv_data_len, sizeof(msg));
                 memcpy(&msg, &reply->priv_data, msgnob);
 
-                rc = kibnal_unpack_msg(&msg, msgnob);
+                rc = kibnal_unpack_msg(&msg, conn->ibc_version, msgnob);
                 if (rc != 0) {
-                        CERROR("Can't unpack reply from "LPX64"\n",
-                               peer->ibp_nid);
+                        CERROR("Can't unpack reply from %s\n",
+                               libcfs_nid2str(peer->ibp_nid));
                         kibnal_connreq_done(conn, 1, rc);
                         return;
                 }
 
                 if (msg.ibm_type != IBNAL_MSG_CONNACK ) {
-                        CERROR("Unexpected message type %d from "LPX64"\n",
-                               msg.ibm_type, peer->ibp_nid);
+                        CERROR("Unexpected message type %d from %s\n",
+                               msg.ibm_type, libcfs_nid2str(peer->ibp_nid));
                         kibnal_connreq_done(conn, 1, -EPROTO);
                         return;
                 }
 
                 if (msg.ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) {
-                        CERROR(LPX64" has incompatible queue depth %d(%d wanted)\n",
-                               peer->ibp_nid, msg.ibm_u.connparams.ibcp_queue_depth,
+                        CERROR("%s has incompatible queue depth %d(%d wanted)\n",
+                               libcfs_nid2str(peer->ibp_nid), 
+                               msg.ibm_u.connparams.ibcp_queue_depth,
                                IBNAL_MSG_QUEUE_SIZE);
                         kibnal_connreq_done(conn, 1, -EPROTO);
                         return;
                 }
                 
                 if (msg.ibm_u.connparams.ibcp_max_msg_size > IBNAL_MSG_SIZE) {
-                        CERROR(LPX64" max message size %d too big (%d max)\n",
-                               peer->ibp_nid, msg.ibm_u.connparams.ibcp_max_msg_size, 
+                        CERROR("%s max message size %d too big (%d max)\n",
+                               libcfs_nid2str(peer->ibp_nid), 
+                               msg.ibm_u.connparams.ibcp_max_msg_size, 
                                IBNAL_MSG_SIZE);
                         kibnal_connreq_done(conn, 1, -EPROTO);
                         return;
                 }
 
                 if (msg.ibm_u.connparams.ibcp_max_frags > IBNAL_MAX_RDMA_FRAGS) {
-                        CERROR(LPX64" max frags %d too big (%d max)\n",
-                               peer->ibp_nid, msg.ibm_u.connparams.ibcp_max_frags, 
+                        CERROR("%s max frags %d too big (%d max)\n",
+                               libcfs_nid2str(peer->ibp_nid),
+                               msg.ibm_u.connparams.ibcp_max_frags, 
                                IBNAL_MAX_RDMA_FRAGS);
                         kibnal_connreq_done(conn, 1, -EPROTO);
                         return;
                 }
                 
                 read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
-                rc = (msg.ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
-                      msg.ibm_dststamp != kibnal_data.kib_incarnation) ?
-                     -ESTALE : 0;
+                if (lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
+                                            msg.ibm_dstnid) &&
+                    msg.ibm_dststamp == kibnal_data.kib_incarnation)
+                        rc = 0;
+                else
+                        rc = -ESTALE;
                 read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
                 if (rc != 0) {
-                        CERROR("Stale connection reply from "LPX64"\n",
-                               peer->ibp_nid);
+                        CERROR("Stale connection reply from %s\n",
+                               libcfs_nid2str(peer->ibp_nid));
                         kibnal_connreq_done(conn, 1, rc);
                         return;
                 }
 
                 conn->ibc_incarnation = msg.ibm_srcstamp;
                 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
+                conn->ibc_reserved_credits = IBNAL_MSG_QUEUE_SIZE;
+                LASSERT (conn->ibc_credits + conn->ibc_reserved_credits
+                         <= IBNAL_RX_MSGS);
                 
                 rc = kibnal_post_receives(conn);
                 if (rc != 0) {
-                        CERROR("Can't post receives for "LPX64"\n",
-                               peer->ibp_nid);
+                        CERROR("Can't post receives for %s\n",
+                               libcfs_nid2str(peer->ibp_nid));
                         kibnal_connreq_done(conn, 1, rc);
                         return;
                 }
@@ -2628,7 +2927,8 @@ kibnal_check_connreply (kib_conn_t *conn)
                         return;
                 }
 
-                CERROR("cm_accept "LPX64" failed: %d\n", peer->ibp_nid, cmrc);
+                CERROR("cm_accept %s failed: %d\n", 
+                       libcfs_nid2str(peer->ibp_nid), cmrc);
                 /* Back out of RTU: no callback coming */
                 kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_CHECK_REPLY);
                 kibnal_conn_decref(conn);
@@ -2638,37 +2938,72 @@ kibnal_check_connreply (kib_conn_t *conn)
 
         if (cv->cv_conndata.status == cm_event_conn_reject) {
 
-                if (cv->cv_conndata.data.reject.reason != cm_rej_code_stale_conn) {
-                        CERROR("conn -> "LPX64" rejected: %d\n", peer->ibp_nid,
-                               cv->cv_conndata.data.reject.reason);
-                        kibnal_connreq_done(conn, 1, -ECONNREFUSED);
-                        return;
-                }
+                if (cv->cv_conndata.data.reject.reason == cm_rej_code_usr_rej) {
+                        unsigned char *bytes =
+                                cv->cv_conndata.data.reject.priv_data;
+                        int   magic   = (bytes[0]) |
+                                        (bytes[1] << 8) |
+                                        (bytes[2] << 16) |
+                                        (bytes[3] << 24);
+                        int   version = (bytes[4]) |
+                                        (bytes[5] << 8);
+                        int   why     = (bytes[6]);
+
+                        /* Expected proto/version: she just doesn't like me (or
+                         * ran out of resources) */
+                        if (magic == IBNAL_MSG_MAGIC &&
+                            version == conn->ibc_version) {
+                                CERROR("conn -> %s rejected: fatal error %d\n",
+                                       libcfs_nid2str(peer->ibp_nid), why);
+
+                                if (why == IBNAL_REJECT_CONN_RACE) 
+                                        kibnal_reconnect(conn, -EALREADY);
+                                else
+                                        kibnal_connreq_done(conn, 1, -ECONNREFUSED);
+                                return;
+                        }
+                        
+                        /* Fail unless it's worth retrying with an old proto
+                         * version */
+                        if (!(magic == IBNAL_MSG_MAGIC &&
+                              version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD &&
+                              conn->ibc_version == IBNAL_MSG_VERSION)) {
+                                CERROR("conn -> %s rejected: bad protocol "
+                                       "magic/ver %08x/%x why %d\n",
+                                       libcfs_nid2str(peer->ibp_nid),
+                                       magic, version, why);
+
+                                kibnal_connreq_done(conn, 1, -ECONNREFUSED);
+                                return;
+                        }
 
-                CWARN ("conn -> "LPX64" stale: retrying\n", peer->ibp_nid);
+                        conn->ibc_version = version;
+                        CWARN ("Connection to %s refused: "
+                               "retrying with old protocol version 0x%x\n", 
+                               libcfs_nid2str(peer->ibp_nid), version);
 
-                cep = cm_create_cep(cm_cep_transp_rc);
-                if (cep == NULL) {
-                        CERROR("Can't create new CEP\n");
-                        kibnal_connreq_done(conn, 1, -ENOMEM);
+                        kibnal_reconnect(conn, -ECONNREFUSED);
                         return;
-                }
-
-                cmrc = cm_cancel(conn->ibc_cep);
-                LASSERT (cmrc == cm_stat_success);
-                cmrc = cm_destroy_cep(conn->ibc_cep);
-                LASSERT (cmrc == cm_stat_success);
-
-                conn->ibc_cep = cep;
+                } else if (cv->cv_conndata.data.reject.reason == 
+                           cm_rej_code_stale_conn) {
+                        
+                        CWARN ("conn -> %s stale: retrying\n", 
+                               libcfs_nid2str(peer->ibp_nid));
 
-                /* retry connect */
-                kibnal_set_conn_state(conn, IBNAL_CONN_ACTIVE_ARP);
-                kibnal_connect_conn(conn);
-                return;
+                        kibnal_reconnect(conn, -ESTALE);
+                        return;
+                } else {
+                        CDEBUG(D_NETERROR, "conn -> %s rejected: reason %d\n",
+                               libcfs_nid2str(peer->ibp_nid),
+                               cv->cv_conndata.data.reject.reason);
+                        kibnal_connreq_done(conn, 1, -ECONNREFUSED);
+                        return;
+                }
+                /* NOT REACHED */
         }
 
-        CERROR("conn -> "LPX64" failed: %d\n", peer->ibp_nid,
-               cv->cv_conndata.status);
+        CDEBUG(D_NETERROR, "conn -> %s failed: %d\n", 
+               libcfs_nid2str(peer->ibp_nid), cv->cv_conndata.status);
         kibnal_connreq_done(conn, 1, -ECONNABORTED);
 }
 
@@ -2689,54 +3024,50 @@ kibnal_arp_done (kib_conn_t *conn)
         LASSERT (peer->ibp_arp_count > 0);
         
         if (cv->cv_arprc != ibat_stat_ok) {
-                write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
-                peer->ibp_arp_count--;
-                if (peer->ibp_arp_count == 0) {
-                        /* final ARP attempt failed */
-                        write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
-                                                flags);
-                        CERROR("Arp "LPX64"@%u.%u.%u.%u failed: %d\n", 
-                               peer->ibp_nid, HIPQUAD(peer->ibp_ip), 
-                               cv->cv_arprc);
-                } else {
-                        /* Retry ARP: ibp_connecting++ so terminating conn
-                         * doesn't end peer's connection attempt */
-                        peer->ibp_connecting++;
-                        write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
-                                                flags);
-                        CWARN("Arp "LPX64"@%u.%u.%u.%u failed: %d "
-                              "(%d attempts left)\n", 
-                              peer->ibp_nid, HIPQUAD(peer->ibp_ip), 
-                              cv->cv_arprc, peer->ibp_arp_count);
-
-                        kibnal_schedule_peer_arp(peer);
-                }
-                kibnal_connreq_done(conn, 1, -ENETUNREACH);
-                return;
+                CDEBUG(D_NETERROR, "Arp %s @ %u.%u.%u.%u failed: %d\n", 
+                       libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip),
+                       cv->cv_arprc);
+                goto failed;
         }
 
         if ((arp->mask & IBAT_PRI_PATH_VALID) != 0) {
-                CDEBUG(D_NET, "Got valid path for "LPX64"\n", peer->ibp_nid);
+                CDEBUG(D_NET, "Got valid path for %s\n",
+                       libcfs_nid2str(peer->ibp_nid));
 
                 *path = *arp->primary_path;
 
                 vvrc = base_gid2port_num(kibnal_data.kib_hca, &path->sgid,
                                          &cv->cv_port);
-                LASSERT (vvrc == vv_return_ok);
+                if (vvrc != vv_return_ok) {
+                        CWARN("base_gid2port_num failed for %s @ %u.%u.%u.%u: %d\n", 
+                              libcfs_nid2str(peer->ibp_nid),
+                              HIPQUAD(peer->ibp_ip), vvrc);
+                        goto failed;
+                }
 
                 vvrc = gid2gid_index(kibnal_data.kib_hca, cv->cv_port,
                                      &path->sgid, &cv->cv_sgid_index);
-                LASSERT (vvrc == vv_return_ok);
+                if (vvrc != vv_return_ok) {
+                        CWARN("gid2gid_index failed for %s @ %u.%u.%u.%u: %d\n", 
+                              libcfs_nid2str(peer->ibp_nid),
+                              HIPQUAD(peer->ibp_ip), vvrc);
+                        goto failed;
+                }
 
                 vvrc = pkey2pkey_index(kibnal_data.kib_hca, cv->cv_port,
                                        path->pkey, &cv->cv_pkey_index);
-                LASSERT (vvrc == vv_return_ok);
+                if (vvrc != vv_return_ok) {
+                        CWARN("pkey2pkey_index failed for %s @ %u.%u.%u.%u: %d\n", 
+                              libcfs_nid2str(peer->ibp_nid), 
+                              HIPQUAD(peer->ibp_ip), vvrc);
+                        goto failed;
+                }
 
                 path->mtu = IBNAL_IB_MTU;
 
         } else if ((arp->mask & IBAT_LID_VALID) != 0) {
-                CWARN("Creating new path record for "LPX64"@%u.%u.%u.%u\n",
-                      peer->ibp_nid, HIPQUAD(peer->ibp_ip));
+                CWARN("Creating new path record for %s @ %u.%u.%u.%u\n",
+                      libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip));
 
                 cv->cv_pkey_index = IBNAL_PKEY_IDX;
                 cv->cv_sgid_index = IBNAL_SGID_IDX;
@@ -2746,11 +3077,21 @@ kibnal_arp_done (kib_conn_t *conn)
 
                 vvrc = port_num2base_gid(kibnal_data.kib_hca, cv->cv_port,
                                          &path->sgid);
-                LASSERT (vvrc == vv_return_ok);
+                if (vvrc != vv_return_ok) {
+                        CWARN("port_num2base_gid failed for %s @ %u.%u.%u.%u: %d\n", 
+                              libcfs_nid2str(peer->ibp_ip),
+                              HIPQUAD(peer->ibp_ip), vvrc);
+                        goto failed;
+                }
 
                 vvrc = port_num2base_lid(kibnal_data.kib_hca, cv->cv_port,
                                          &path->slid);
-                LASSERT (vvrc == vv_return_ok);
+                if (vvrc != vv_return_ok) {
+                        CWARN("port_num2base_lid failed for %s @ %u.%u.%u.%u: %d\n", 
+                              libcfs_nid2str(peer->ibp_ip), 
+                              HIPQUAD(peer->ibp_ip), vvrc);
+                        goto failed;
+                }
 
                 path->dgid          = arp->gid;
                 path->sl            = IBNAL_SERVICE_LEVEL;
@@ -2761,10 +3102,9 @@ kibnal_arp_done (kib_conn_t *conn)
                 path->pkey          = IBNAL_PKEY;
                 path->traffic_class = IBNAL_TRAFFIC_CLASS;
         } else {
-                CERROR("Can't Arp "LPX64"@%u.%u.%u.%u: no PATH or LID\n", 
-                       peer->ibp_nid, HIPQUAD(peer->ibp_ip));
-                kibnal_connreq_done(conn, 1, -ENETUNREACH);
-                return;
+                CWARN("Arp for %s @ %u.%u.%u.%u returned neither PATH nor LID\n",
+                      libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip));
+                goto failed;
         }
 
         rc = kibnal_set_qp_state(conn, vv_qp_state_init);
@@ -2774,28 +3114,53 @@ kibnal_arp_done (kib_conn_t *conn)
 
         /* do the actual connection request */
         kibnal_connect_conn(conn);
+        return;
+
+ failed:
+        write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
+        peer->ibp_arp_count--;
+        if (peer->ibp_arp_count == 0) {
+                /* final ARP attempt failed */
+                write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
+                                        flags);
+                CDEBUG(D_NETERROR, "Arp %s @ %u.%u.%u.%u failed (final attempt)\n", 
+                       libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip));
+        } else {
+                /* Retry ARP: ibp_connecting++ so terminating conn
+                 * doesn't end peer's connection attempt */
+                peer->ibp_connecting++;
+                write_unlock_irqrestore(&kibnal_data.kib_global_lock, 
+                                        flags);
+                CDEBUG(D_NETERROR, "Arp %s @ %u.%u.%u.%u failed (%d attempts left)\n",
+                       libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip), 
+                       peer->ibp_arp_count);
+                
+                kibnal_schedule_peer_arp(peer);
+        }
+        kibnal_connreq_done(conn, 1, -ENETUNREACH);
 }
 
 void
 kibnal_arp_callback (ibat_stat_t arprc, ibat_arp_data_t *arp_data, void *arg)
 {
         /* CAVEAT EMPTOR: tasklet context */
-        kib_conn_t      *conn = (kib_conn_t *)arg;
-        kib_peer_t      *peer = conn->ibc_peer;
-        unsigned long    flags;
+        kib_peer_t *peer;
+        kib_conn_t *conn = (kib_conn_t *)arg;
+
+        LASSERT (conn != NULL);
+        LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
+
+        peer = conn->ibc_peer;
 
         if (arprc != ibat_stat_ok)
-                CERROR("Arp "LPX64"@%u.%u.%u.%u failed: %d\n",
-                       peer->ibp_nid, HIPQUAD(peer->ibp_ip), arprc);
+                CDEBUG(D_NETERROR, "Arp %s at %u.%u.%u.%u failed: %d\n",
+                       libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip), arprc);
         else
-                CDEBUG(D_NET, "Arp "LPX64"@%u.%u.%u.%u OK: LID %s PATH %s\n",
-                       peer->ibp_nid, HIPQUAD(peer->ibp_ip), 
+                CDEBUG(D_NET, "Arp %s at %u.%u.%u.%u OK: LID %s PATH %s\n",
+                       libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip), 
                        (arp_data->mask & IBAT_LID_VALID) == 0 ? "invalid" : "valid",
                        (arp_data->mask & IBAT_PRI_PATH_VALID) == 0 ? "invalid" : "valid");
 
-        LASSERT (conn != NULL);
-        LASSERT (conn->ibc_state == IBNAL_CONN_ACTIVE_ARP);
-
         conn->ibc_connvars->cv_arprc = arprc;
         if (arprc == ibat_stat_ok)
                 conn->ibc_connvars->cv_arp = *arp_data;
@@ -2818,18 +3183,18 @@ kibnal_arp_peer (kib_peer_t *peer)
 
         cep = cm_create_cep(cm_cep_transp_rc);
         if (cep == NULL) {
-                CERROR ("Can't create cep for conn->"LPX64"\n",
-                        peer->ibp_nid);
-                kibnal_peer_connect_failed(peer, 1);
+                CERROR ("Can't create cep for conn->%s\n",
+                        libcfs_nid2str(peer->ibp_nid));
+                kibnal_peer_connect_failed(peer, 1, -ENOMEM);
                 return;
         }
 
         conn = kibnal_create_conn(cep);
         if (conn == NULL) {
-                CERROR ("Can't allocate conn->"LPX64"\n",
-                        peer->ibp_nid);
+                CERROR ("Can't allocate conn->%s\n",
+                        libcfs_nid2str(peer->ibp_nid));
                 cm_destroy_cep(cep);
-                kibnal_peer_connect_failed(peer, 1);
+                kibnal_peer_connect_failed(peer, 1, -ENOMEM);
                 return;
         }
 
@@ -2865,39 +3230,41 @@ kibnal_arp_peer (kib_peer_t *peer)
 }
 
 int
-kibnal_conn_timed_out (kib_conn_t *conn)
+kibnal_check_txs (kib_conn_t *conn, struct list_head *txs)
 {
         kib_tx_t          *tx;
         struct list_head  *ttmp;
+        int                timed_out = 0;
 
         spin_lock(&conn->ibc_lock);
 
-        list_for_each (ttmp, &conn->ibc_tx_queue) {
+        list_for_each (ttmp, txs) {
                 tx = list_entry (ttmp, kib_tx_t, tx_list);
 
-                LASSERT (tx->tx_queued);
-
-                if (time_after_eq (jiffies, tx->tx_deadline)) {
-                        spin_unlock(&conn->ibc_lock);
-                        return 1;
+                if (txs == &conn->ibc_active_txs) {
+                        LASSERT (!tx->tx_queued);
+                        LASSERT (tx->tx_waiting || tx->tx_sending != 0);
+                } else {
+                        LASSERT (tx->tx_queued);
                 }
-        }
-
-        list_for_each (ttmp, &conn->ibc_active_txs) {
-                tx = list_entry (ttmp, kib_tx_t, tx_list);
-
-                LASSERT (!tx->tx_queued);
-                LASSERT (tx->tx_waiting ||
-                         tx->tx_sending != 0);
 
                 if (time_after_eq (jiffies, tx->tx_deadline)) {
-                        spin_unlock(&conn->ibc_lock);
-                        return 1;
+                        timed_out = 1;
+                        break;
                 }
         }
 
         spin_unlock(&conn->ibc_lock);
-        return 0;
+        return timed_out;
+}
+
+int
+kibnal_conn_timed_out (kib_conn_t *conn)
+{
+        return  kibnal_check_txs(conn, &conn->ibc_tx_queue) ||
+                kibnal_check_txs(conn, &conn->ibc_tx_queue_rsrvd) ||
+                kibnal_check_txs(conn, &conn->ibc_tx_queue_nocred) ||
+                kibnal_check_txs(conn, &conn->ibc_active_txs);
 }
 
 void
@@ -2938,11 +3305,11 @@ kibnal_check_conns (int idx)
                         
                         kibnal_conn_addref(conn); /* 1 ref for me... */
 
-                        read_unlock_irqrestore(&kibnal_data.kib_global_lock, 
+                        read_unlock_irqrestore(&kibnal_data.kib_global_lock,
                                                flags);
 
-                        CERROR("Timed out RDMA with "LPX64"\n",
-                               peer->ibp_nid);
+                        CERROR("Timed out RDMA with %s\n",
+                               libcfs_nid2str(peer->ibp_nid));
 
                         kibnal_close_conn (conn, -ETIMEDOUT);
                         kibnal_conn_decref(conn); /* ...until here */
@@ -2990,7 +3357,7 @@ kibnal_disconnect_conn (kib_conn_t *conn)
         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
 
         cm_cancel(conn->ibc_cep);
-        kibnal_pause(HZ/10);
+        cfs_pause(cfs_time_seconds(1)/10);
 
         if (!conn->ibc_disconnect)              /* CM callback will never happen now */
                 kibnal_conn_decref(conn);
@@ -3015,13 +3382,13 @@ kibnal_connd (void *arg)
         int                peer_index = 0;
         unsigned long      deadline = jiffies;
         
-        kportal_daemonize ("kibnal_connd");
-        kportal_blockallsigs ();
+        cfs_daemonize ("kibnal_connd");
+        cfs_block_allsigs ();
 
         init_waitqueue_entry (&wait, current);
         kibnal_data.kib_connd = current;
 
-        spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
+        spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
 
         while (!kibnal_data.kib_shutdown) {
 
@@ -3049,7 +3416,7 @@ kibnal_connd (void *arg)
                         dropped_lock = 1;
 
                         kibnal_recv_connreq(pcr->pcr_cep, &pcr->pcr_cmreq);
-                        PORTAL_FREE(pcr, sizeof(*pcr));
+                        LIBCFS_FREE(pcr, sizeof(*pcr));
 
                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
                 }
@@ -3120,9 +3487,9 @@ kibnal_connd (void *arg)
                          * connection within (n+1)/n times the timeout
                          * interval. */
 
-                        if (kibnal_tunables.kib_io_timeout > n * p)
+                        if (*kibnal_tunables.kib_timeout > n * p)
                                 chunk = (chunk * n * p) / 
-                                        kibnal_tunables.kib_io_timeout;
+                                        *kibnal_tunables.kib_timeout;
                         if (chunk == 0)
                                 chunk = 1;
 
@@ -3169,8 +3536,6 @@ kibnal_cq_callback (unsigned long unused_context)
 {
         unsigned long    flags;
 
-        CDEBUG(D_NET, "!!\n");
-
         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
         kibnal_data.kib_ready = 1;
         wake_up(&kibnal_data.kib_sched_waitq);
@@ -3192,8 +3557,8 @@ kibnal_scheduler(void *arg)
         int             busy_loops = 0;
 
         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
-        kportal_daemonize(name);
-        kportal_blockallsigs();
+        cfs_daemonize(name);
+        cfs_block_allsigs();
 
         init_waitqueue_entry(&wait, current);
 
@@ -3286,8 +3651,8 @@ kibnal_scheduler(void *arg)
                                  * I give a scheduler on another CPU a chance
                                  * to get the final SEND completion, so the tx
                                  * descriptor can get freed as I inspect it. */
-                                CERROR ("RDMA failed: %d\n", 
-                                        wc.completion_status);
+                                CDEBUG(D_NETERROR, "RDMA failed: %d\n", 
+                                       wc.completion_status);
                                 break;
 
                         default:
@@ -3301,7 +3666,7 @@ kibnal_scheduler(void *arg)
                 /* Nothing to do; sleep... */
 
                 set_current_state(TASK_INTERRUPTIBLE);
-                add_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
+                add_wait_queue_exclusive(&kibnal_data.kib_sched_waitq, &wait);
                 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
                                        flags);
 
@@ -3317,13 +3682,3 @@ kibnal_scheduler(void *arg)
         kibnal_thread_fini();
         return (0);
 }
-
-
-lib_nal_t kibnal_lib = {
-        .libnal_data = &kibnal_data,      /* NAL private data */
-        .libnal_send = kibnal_send,
-        .libnal_send_pages = kibnal_send_pages,
-        .libnal_recv = kibnal_recv,
-        .libnal_recv_pages = kibnal_recv_pages,
-        .libnal_dist = kibnal_dist
-};