Whamcloud - gitweb
LU-1617 build: skip generated files in .gitignore
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
index f8999e0..f792de7 100644 (file)
@@ -1,32 +1,52 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
+/*
+ * GPL HEADER START
  *
  *
- * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
- *   Author: PJ Kirner <pjkirner@clusterfs.com>
- *           E Barton <eeb@bartonsoftware.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
  *
- *   This file is confidential source code owned by Cluster File Systems.
- *   No viewing, modification, compilation, redistribution, or any other
- *   form of use is permitted except through a signed license agreement.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
  *
- *   If you have not signed such an agreement, then you have no rights to
- *   this file.  Please destroy it immediately and contact CFS.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
  *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lnet/klnds/ptllnd/ptllnd_peer.c
+ *
+ * Author: PJ Kirner <pjkirner@clusterfs.com>
+ * Author: E Barton <eeb@bartonsoftware.com>
  */
 
 #include "ptllnd.h"
 #include <libcfs/list.h>
 
 static int
  */
 
 #include "ptllnd.h"
 #include <libcfs/list.h>
 
 static int
-kptllnd_count_queue(struct list_head *q)
+kptllnd_count_queue(cfs_list_t *q)
 {
 {
-        struct list_head *e;
-        int               n = 0;
-        
-        list_for_each(e, q) {
+        cfs_list_t *e;
+        int         n = 0;
+
+        cfs_list_for_each(e, q) {
                 n++;
         }
 
                 n++;
         }
 
@@ -34,38 +54,37 @@ kptllnd_count_queue(struct list_head *q)
 }
 
 int
 }
 
 int
-kptllnd_get_peer_info(int index, 
+kptllnd_get_peer_info(int index,
                       lnet_process_id_t *id,
                       int *state, int *sent_hello,
                       int *refcount, __u64 *incarnation,
                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
                       int *nsendq, int *nactiveq,
                       lnet_process_id_t *id,
                       int *state, int *sent_hello,
                       int *refcount, __u64 *incarnation,
                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
                       int *nsendq, int *nactiveq,
-                      int *credits, int *outstanding_credits) 
+                      int *credits, int *outstanding_credits)
 {
 {
-        rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
+        cfs_rwlock_t     *g_lock = &kptllnd_data.kptl_peer_rw_lock;
         unsigned long     flags;
         unsigned long     flags;
-        struct list_head *ptmp;
+        cfs_list_t       *ptmp;
         kptl_peer_t      *peer;
         int               i;
         int               rc = -ENOENT;
 
         kptl_peer_t      *peer;
         int               i;
         int               rc = -ENOENT;
 
-        read_lock_irqsave(g_lock, flags);
+        cfs_read_lock_irqsave(g_lock, flags);
 
         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
 
         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
-                
-                list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
-                        peer = list_entry(ptmp, kptl_peer_t, peer_list);
+                cfs_list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
+                        peer = cfs_list_entry(ptmp, kptl_peer_t, peer_list);
 
                         if (index-- > 0)
                                 continue;
 
                         if (index-- > 0)
                                 continue;
-                        
+
                         *id          = peer->peer_id;
                         *state       = peer->peer_state;
                         *sent_hello  = peer->peer_sent_hello;
                         *id          = peer->peer_id;
                         *state       = peer->peer_state;
                         *sent_hello  = peer->peer_sent_hello;
-                        *refcount    = atomic_read(&peer->peer_refcount);
+                        *refcount    = cfs_atomic_read(&peer->peer_refcount);
                         *incarnation = peer->peer_incarnation;
 
                         *incarnation = peer->peer_incarnation;
 
-                        spin_lock(&peer->peer_lock);
+                        cfs_spin_lock(&peer->peer_lock);
 
                         *next_matchbits      = peer->peer_next_matchbits;
                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
 
                         *next_matchbits      = peer->peer_next_matchbits;
                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
@@ -75,15 +94,15 @@ kptllnd_get_peer_info(int index,
                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
 
                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
 
-                        spin_unlock(&peer->peer_lock);
+                        cfs_spin_unlock(&peer->peer_lock);
 
                         rc = 0;
                         goto out;
                 }
         }
 
                         rc = 0;
                         goto out;
                 }
         }
-        
+
  out:
  out:
-        read_unlock_irqrestore(g_lock, flags);
+        cfs_read_unlock_irqrestore(g_lock, flags);
         return rc;
 }
 
         return rc;
 }
 
@@ -95,13 +114,13 @@ kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
 
         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
                  peer->peer_state == PEER_STATE_ACTIVE);
 
         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
                  peer->peer_state == PEER_STATE_ACTIVE);
-        
+
         kptllnd_data.kptl_n_active_peers++;
         kptllnd_data.kptl_n_active_peers++;
-        atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
+        cfs_atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
 
         /* NB add to HEAD of peer list for MRU order!
          * (see kptllnd_cull_peertable) */
 
         /* NB add to HEAD of peer list for MRU order!
          * (see kptllnd_cull_peertable) */
-        list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
+        cfs_list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
 }
 
 void
 }
 
 void
@@ -110,20 +129,20 @@ kptllnd_cull_peertable_locked (lnet_process_id_t pid)
         /* I'm about to add a new peer with this portals ID to the peer table,
          * so (a) this peer should not exist already and (b) I want to leave at
          * most (max_procs_per_nid - 1) peers with this NID in the table. */
         /* I'm about to add a new peer with this portals ID to the peer table,
          * so (a) this peer should not exist already and (b) I want to leave at
          * most (max_procs_per_nid - 1) peers with this NID in the table. */
-        struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
-        int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
-        int                count;
-        struct list_head  *tmp;
-        struct list_head  *nxt;
-        kptl_peer_t       *peer;
-        
+        cfs_list_t   *peers = kptllnd_nid2peerlist(pid.nid);
+        int           cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
+        int           count;
+        cfs_list_t   *tmp;
+        cfs_list_t   *nxt;
+        kptl_peer_t  *peer;
+
         count = 0;
         count = 0;
-        list_for_each_safe (tmp, nxt, peers) {
+        cfs_list_for_each_safe (tmp, nxt, peers) {
                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
                  * in MRU order */
                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
                  * in MRU order */
-                peer = list_entry(tmp, kptl_peer_t, peer_list);
+                peer = cfs_list_entry(tmp, kptl_peer_t, peer_list);
                         
                         
-                if (peer->peer_id.nid != pid.nid)
+                if (LNET_NIDADDR(peer->peer_id.nid) != LNET_NIDADDR(pid.nid))
                         continue;
 
                 LASSERT (peer->peer_id.pid != pid.pid);
                         continue;
 
                 LASSERT (peer->peer_id.pid != pid.pid);
@@ -142,7 +161,7 @@ kptllnd_cull_peertable_locked (lnet_process_id_t pid)
 }
 
 kptl_peer_t *
 }
 
 kptl_peer_t *
-kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
+kptllnd_peer_allocate (kptl_net_t *net, lnet_process_id_t lpid, ptl_process_id_t ppid)
 {
         unsigned long    flags;
         kptl_peer_t     *peer;
 {
         unsigned long    flags;
         kptl_peer_t     *peer;
@@ -157,35 +176,39 @@ kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
 
         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
 
 
         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
 
-        INIT_LIST_HEAD (&peer->peer_sendq);
-        INIT_LIST_HEAD (&peer->peer_activeq);
-        spin_lock_init (&peer->peer_lock);
+        CFS_INIT_LIST_HEAD (&peer->peer_noops);
+        CFS_INIT_LIST_HEAD (&peer->peer_sendq);
+        CFS_INIT_LIST_HEAD (&peer->peer_activeq);
+        cfs_spin_lock_init (&peer->peer_lock);
 
         peer->peer_state = PEER_STATE_ALLOCATED;
         peer->peer_error = 0;
 
         peer->peer_state = PEER_STATE_ALLOCATED;
         peer->peer_error = 0;
-        peer->peer_last_alive = cfs_time_current();
+        peer->peer_last_alive = 0;
         peer->peer_id = lpid;
         peer->peer_ptlid = ppid;
         peer->peer_credits = 1;                 /* enough for HELLO */
         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
         peer->peer_id = lpid;
         peer->peer_ptlid = ppid;
         peer->peer_credits = 1;                 /* enough for HELLO */
         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
-        peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
+        peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peertxcredits - 1;
+        peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
+        peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
+
+        cfs_atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
 
 
-        atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
+        cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
 
 
-        write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+        peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
 
         /* Only increase # peers under lock, to guarantee we dont grow it
          * during shutdown */
 
         /* Only increase # peers under lock, to guarantee we dont grow it
          * during shutdown */
-        if (kptllnd_data.kptl_shutdown) {
-                write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
-                                        flags);
+        if (net->net_shutdown) {
+                cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
+                                            flags);
                 LIBCFS_FREE(peer, sizeof(*peer));
                 return NULL;
         }
 
         kptllnd_data.kptl_npeers++;
                 LIBCFS_FREE(peer, sizeof(*peer));
                 return NULL;
         }
 
         kptllnd_data.kptl_npeers++;
-        write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
-        
+        cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
         return peer;
 }
 
         return peer;
 }
 
@@ -193,74 +216,59 @@ void
 kptllnd_peer_destroy (kptl_peer_t *peer)
 {
         unsigned long flags;
 kptllnd_peer_destroy (kptl_peer_t *peer)
 {
         unsigned long flags;
-        
+
         CDEBUG(D_NET, "Peer=%p\n", peer);
 
         CDEBUG(D_NET, "Peer=%p\n", peer);
 
-        LASSERT (!in_interrupt());
-        LASSERT (atomic_read(&peer->peer_refcount) == 0);
+        LASSERT (!cfs_in_interrupt());
+        LASSERT (cfs_atomic_read(&peer->peer_refcount) == 0);
         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
                  peer->peer_state == PEER_STATE_ZOMBIE);
         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
                  peer->peer_state == PEER_STATE_ZOMBIE);
-        LASSERT (list_empty(&peer->peer_sendq));
-        LASSERT (list_empty(&peer->peer_activeq));
+        LASSERT (cfs_list_empty(&peer->peer_noops));
+        LASSERT (cfs_list_empty(&peer->peer_sendq));
+        LASSERT (cfs_list_empty(&peer->peer_activeq));
 
 
-        write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+        cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
 
         if (peer->peer_state == PEER_STATE_ZOMBIE)
 
         if (peer->peer_state == PEER_STATE_ZOMBIE)
-                list_del(&peer->peer_list);
+                cfs_list_del(&peer->peer_list);
 
         kptllnd_data.kptl_npeers--;
 
 
         kptllnd_data.kptl_npeers--;
 
-        write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+        cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
 
         LIBCFS_FREE (peer, sizeof (*peer));
 }
 
 void
 
         LIBCFS_FREE (peer, sizeof (*peer));
 }
 
 void
-kptllnd_peer_cancel_txs(kptl_peer_t *peer)
+kptllnd_cancel_txlist (cfs_list_t *peerq, cfs_list_t *txs)
 {
 {
-        struct list_head   sendq;
-        struct list_head   activeq;
-        struct list_head  *tmp;
-        struct list_head  *nxt;
-        kptl_tx_t         *tx;
-        unsigned long      flags;
-
-        /* atomically grab all the peer's tx-es... */
+        cfs_list_t  *tmp;
+        cfs_list_t  *nxt;
+        kptl_tx_t   *tx;
 
 
-        spin_lock_irqsave(&peer->peer_lock, flags);
+        cfs_list_for_each_safe (tmp, nxt, peerq) {
+                tx = cfs_list_entry(tmp, kptl_tx_t, tx_list);
 
 
-        list_add(&sendq, &peer->peer_sendq);
-        list_del_init(&peer->peer_sendq);
-        list_for_each (tmp, &sendq) {
-                tx = list_entry(tmp, kptl_tx_t, tx_list);
-                tx->tx_active = 0;
-        }
+                cfs_list_del(&tx->tx_list);
+                cfs_list_add_tail(&tx->tx_list, txs);
 
 
-        list_add(&activeq, &peer->peer_activeq);
-        list_del_init(&peer->peer_activeq);
-        list_for_each (tmp, &activeq) {
-                tx = list_entry(tmp, kptl_tx_t, tx_list);
+                tx->tx_status = -EIO;
                 tx->tx_active = 0;
         }
                 tx->tx_active = 0;
         }
+}
 
 
-        spin_unlock_irqrestore(&peer->peer_lock, flags);
+void
+kptllnd_peer_cancel_txs(kptl_peer_t *peer, cfs_list_t *txs)
+{
+        unsigned long   flags;
 
 
-        /* ...then drop the peer's ref on them at leasure.  This will get
-         * kptllnd_tx_fini() to abort outstanding comms if necessary. */
+        cfs_spin_lock_irqsave(&peer->peer_lock, flags);
 
 
-        list_for_each_safe (tmp, nxt, &sendq) {
-                tx = list_entry(tmp, kptl_tx_t, tx_list);
-                list_del(&tx->tx_list);
-                tx->tx_status = -EIO;
-                kptllnd_tx_decref(tx);
-        }
-
-        list_for_each_safe (tmp, nxt, &activeq) {
-                tx = list_entry(tmp, kptl_tx_t, tx_list);
-                list_del(&tx->tx_list);
-                tx->tx_status = -EIO;
-                kptllnd_tx_decref(tx);
-        }
+        kptllnd_cancel_txlist(&peer->peer_noops, txs);
+        kptllnd_cancel_txlist(&peer->peer_sendq, txs);
+        kptllnd_cancel_txlist(&peer->peer_activeq, txs);
+                
+        cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
 }
 
 void
 }
 
 void
@@ -268,77 +276,147 @@ kptllnd_peer_alive (kptl_peer_t *peer)
 {
         /* This is racy, but everyone's only writing cfs_time_current() */
         peer->peer_last_alive = cfs_time_current();
 {
         /* This is racy, but everyone's only writing cfs_time_current() */
         peer->peer_last_alive = cfs_time_current();
-        mb();
+        cfs_mb();
 }
 
 void
 kptllnd_peer_notify (kptl_peer_t *peer)
 {
         unsigned long flags;
 }
 
 void
 kptllnd_peer_notify (kptl_peer_t *peer)
 {
         unsigned long flags;
-        time_t        last_alive = 0;
+        kptl_net_t   *net;
+        kptl_net_t  **nets;
+        int           i = 0;
+        int           nnets = 0;
         int           error = 0;
         int           error = 0;
-        
-        spin_lock_irqsave(&peer->peer_lock, flags);
+        cfs_time_t    last_alive = 0;
+
+        cfs_spin_lock_irqsave(&peer->peer_lock, flags);
 
         if (peer->peer_error != 0) {
                 error = peer->peer_error;
                 peer->peer_error = 0;
 
         if (peer->peer_error != 0) {
                 error = peer->peer_error;
                 peer->peer_error = 0;
-                
-                last_alive = cfs_time_current_sec() - 
-                             cfs_duration_sec(cfs_time_current() - 
-                                              peer->peer_last_alive);
+                last_alive = peer->peer_last_alive;
         }
         }
-        
-        spin_unlock_irqrestore(&peer->peer_lock, flags);
 
 
-        if (error != 0)
-                lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
-                             last_alive);
+        cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
+
+        if (error == 0)
+                return;
+
+        cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
+        cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list)
+                nnets++;
+        cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
+
+        if (nnets == 0) /* shutdown in progress */
+                return;
+
+        LIBCFS_ALLOC(nets, nnets * sizeof(*nets));
+        if (nets == NULL) {
+                CERROR("Failed to allocate nets[%d]\n", nnets);
+                return;
+        }
+        memset(nets, 0, nnets * sizeof(*nets));
+
+        cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
+        i = 0;
+        cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
+                LASSERT (i < nnets);
+                nets[i] = net;
+                kptllnd_net_addref(net);
+                i++;
+        }
+        cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
+
+        for (i = 0; i < nnets; i++) {
+                lnet_nid_t peer_nid;
+
+                net = nets[i];
+                if (net == NULL)
+                        break;
+
+                if (!net->net_shutdown) {
+                        peer_nid = kptllnd_ptl2lnetnid(net->net_ni->ni_nid,
+                                                       peer->peer_ptlid.nid);
+                        lnet_notify(net->net_ni, peer_nid, 0, last_alive);
+                }
+
+                kptllnd_net_decref(net);
+        }
+
+        LIBCFS_FREE(nets, nnets * sizeof(*nets));
 }
 
 void
 kptllnd_handle_closing_peers ()
 {
         unsigned long           flags;
 }
 
 void
 kptllnd_handle_closing_peers ()
 {
         unsigned long           flags;
+        cfs_list_t              txs;
         kptl_peer_t            *peer;
         kptl_peer_t            *peer;
-        struct list_head       *tmp;
-        struct list_head       *nxt;
+        cfs_list_t             *tmp;
+        cfs_list_t             *nxt;
+        kptl_tx_t              *tx;
         int                     idle;
 
         /* Check with a read lock first to avoid blocking anyone */
 
         int                     idle;
 
         /* Check with a read lock first to avoid blocking anyone */
 
-        read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
-        idle = list_empty(&kptllnd_data.kptl_closing_peers);
-        read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+        cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+        idle = cfs_list_empty(&kptllnd_data.kptl_closing_peers) &&
+               cfs_list_empty(&kptllnd_data.kptl_zombie_peers);
+        cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
 
         if (idle)
                 return;
 
 
         if (idle)
                 return;
 
-        /* Scan the closing peers and cancel their txs.
-         * NB only safe while there is only a single watchdog */
+        CFS_INIT_LIST_HEAD(&txs);
+
+        cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+
+        /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
+         * ref removes it from this list, so I musn't drop the lock while
+         * scanning it. */
+        cfs_list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
+                peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
+
+                LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
+
+                kptllnd_peer_cancel_txs(peer, &txs);
+        }
 
 
-        write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+        /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
+         * I'm the only one removing from this list, but peers can be added on
+         * the end any time I drop the lock. */
 
 
-        list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
-                peer = list_entry (tmp, kptl_peer_t, peer_list);
+        cfs_list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
+                peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
 
                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
 
 
                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
 
-                list_del(&peer->peer_list);
-                list_add_tail(&peer->peer_list,
-                              &kptllnd_data.kptl_zombie_peers);
+                cfs_list_del(&peer->peer_list);
+                cfs_list_add_tail(&peer->peer_list,
+                                  &kptllnd_data.kptl_zombie_peers);
                 peer->peer_state = PEER_STATE_ZOMBIE;
 
                 peer->peer_state = PEER_STATE_ZOMBIE;
 
-                write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+                cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
+                                            flags);
 
                 kptllnd_peer_notify(peer);
 
                 kptllnd_peer_notify(peer);
-                kptllnd_peer_cancel_txs(peer);
+                kptllnd_peer_cancel_txs(peer, &txs);
                 kptllnd_peer_decref(peer);
 
                 kptllnd_peer_decref(peer);
 
-                write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+                cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
         }
 
         }
 
-        write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+        cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+
+        /* Drop peer's ref on all cancelled txs.  This will get
+         * kptllnd_tx_fini() to abort outstanding comms if necessary. */
+
+        cfs_list_for_each_safe (tmp, nxt, &txs) {
+                tx = cfs_list_entry(tmp, kptl_tx_t, tx_list);
+                cfs_list_del(&tx->tx_list);
+                kptllnd_tx_decref(tx);
+        }
 }
 
 void
 }
 
 void
@@ -350,34 +428,31 @@ kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
 
         case PEER_STATE_WAITING_HELLO:
         case PEER_STATE_ACTIVE:
 
         case PEER_STATE_WAITING_HELLO:
         case PEER_STATE_ACTIVE:
+                /* Ensure new peers see a new incarnation of me */
+                LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
+                if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
+                        kptllnd_data.kptl_incarnation++;
+
                 /* Removing from peer table */
                 kptllnd_data.kptl_n_active_peers--;
                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
 
                 /* Removing from peer table */
                 kptllnd_data.kptl_n_active_peers--;
                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
 
-                list_del(&peer->peer_list);
+                cfs_list_del(&peer->peer_list);
                 kptllnd_peer_unreserve_buffers();
 
                 peer->peer_error = why; /* stash 'why' only on first close */
                 kptllnd_peer_unreserve_buffers();
 
                 peer->peer_error = why; /* stash 'why' only on first close */
+                peer->peer_state = PEER_STATE_CLOSING;
 
                 /* Schedule for immediate attention, taking peer table's ref */
 
                 /* Schedule for immediate attention, taking peer table's ref */
-                list_add_tail(&peer->peer_list, 
-                              &kptllnd_data.kptl_closing_peers);
-                wake_up(&kptllnd_data.kptl_watchdog_waitq);
+                cfs_list_add_tail(&peer->peer_list,
+                                 &kptllnd_data.kptl_closing_peers);
+                cfs_waitq_signal(&kptllnd_data.kptl_watchdog_waitq);
                 break;
 
         case PEER_STATE_ZOMBIE:
                 break;
 
         case PEER_STATE_ZOMBIE:
-                /* Schedule for attention at next timeout */
-                kptllnd_peer_addref(peer);
-                list_del(&peer->peer_list);
-                list_add_tail(&peer->peer_list, 
-                              &kptllnd_data.kptl_closing_peers);
-                break;
-                
         case PEER_STATE_CLOSING:
                 break;
         }
         case PEER_STATE_CLOSING:
                 break;
         }
-
-        peer->peer_state = PEER_STATE_CLOSING;
 }
 
 void
 }
 
 void
@@ -385,16 +460,16 @@ kptllnd_peer_close(kptl_peer_t *peer, int why)
 {
         unsigned long      flags;
 
 {
         unsigned long      flags;
 
-        write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+        cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
         kptllnd_peer_close_locked(peer, why);
         kptllnd_peer_close_locked(peer, why);
-        write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+        cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
 }
 
 int
 kptllnd_peer_del(lnet_process_id_t id)
 {
 }
 
 int
 kptllnd_peer_del(lnet_process_id_t id)
 {
-        struct list_head  *ptmp;
-        struct list_head  *pnxt;
+        cfs_list_t        *ptmp;
+        cfs_list_t        *pnxt;
         kptl_peer_t       *peer;
         int                lo;
         int                hi;
         kptl_peer_t       *peer;
         int                lo;
         int                hi;
@@ -407,34 +482,36 @@ kptllnd_peer_del(lnet_process_id_t id)
          * wildcard (LNET_NID_ANY) then look at all of the buckets
          */
         if (id.nid != LNET_NID_ANY) {
          * wildcard (LNET_NID_ANY) then look at all of the buckets
          */
         if (id.nid != LNET_NID_ANY) {
-                struct list_head *l = kptllnd_nid2peerlist(id.nid);
-                
+                cfs_list_t *l = kptllnd_nid2peerlist(id.nid);
+
                 lo = hi =  l - kptllnd_data.kptl_peers;
         } else {
                 if (id.pid != LNET_PID_ANY)
                         return -EINVAL;
                 lo = hi =  l - kptllnd_data.kptl_peers;
         } else {
                 if (id.pid != LNET_PID_ANY)
                         return -EINVAL;
-                
+
                 lo = 0;
                 hi = kptllnd_data.kptl_peer_hash_size - 1;
         }
 
 again:
                 lo = 0;
                 hi = kptllnd_data.kptl_peer_hash_size - 1;
         }
 
 again:
-        read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+        cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
 
         for (i = lo; i <= hi; i++) {
 
         for (i = lo; i <= hi; i++) {
-                list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
-                        peer = list_entry (ptmp, kptl_peer_t, peer_list);
+                cfs_list_for_each_safe (ptmp, pnxt,
+                                        &kptllnd_data.kptl_peers[i]) {
+                        peer = cfs_list_entry (ptmp, kptl_peer_t, peer_list);
 
                         if (!(id.nid == LNET_NID_ANY || 
 
                         if (!(id.nid == LNET_NID_ANY || 
-                              (peer->peer_id.nid == id.nid &&
+                              (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(id.nid) &&
                                (id.pid == LNET_PID_ANY || 
                                 peer->peer_id.pid == id.pid))))
                                 continue;
 
                         kptllnd_peer_addref(peer); /* 1 ref for me... */
 
                                (id.pid == LNET_PID_ANY || 
                                 peer->peer_id.pid == id.pid))))
                                 continue;
 
                         kptllnd_peer_addref(peer); /* 1 ref for me... */
 
-                        read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
-                                               flags);
+                        cfs_read_unlock_irqrestore(&kptllnd_data. \
+                                                   kptl_peer_rw_lock,
+                                                   flags);
 
                         kptllnd_peer_close(peer, 0);
                         kptllnd_peer_decref(peer); /* ...until here */
 
                         kptllnd_peer_close(peer, 0);
                         kptllnd_peer_decref(peer); /* ...until here */
@@ -446,21 +523,38 @@ again:
                 }
         }
 
                 }
         }
 
-        read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+        cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
 
         return (rc);
 }
 
 void
 
         return (rc);
 }
 
 void
-kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx)
+kptllnd_queue_tx(kptl_peer_t *peer, kptl_tx_t *tx)
 {
         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
 {
         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
-        ptl_handle_md_t  rdma_mdh = PTL_INVALID_HANDLE;
-        ptl_handle_md_t  msg_mdh = PTL_INVALID_HANDLE;
-        ptl_handle_me_t  meh;
+        unsigned long flags;
+
+        cfs_spin_lock_irqsave(&peer->peer_lock, flags);
+
+        /* Ensure HELLO is sent first */
+        if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
+                cfs_list_add(&tx->tx_list, &peer->peer_noops);
+        else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
+                cfs_list_add(&tx->tx_list, &peer->peer_sendq);
+        else
+                cfs_list_add_tail(&tx->tx_list, &peer->peer_sendq);
+
+        cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
+}
+
+
+void
+kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
+{
+        /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
+        ptl_handle_md_t  msg_mdh;
         ptl_md_t         md;
         ptl_err_t        prc;
         ptl_md_t         md;
         ptl_err_t        prc;
-        unsigned long    flags;
 
         LASSERT (!tx->tx_idle);
         LASSERT (!tx->tx_active);
 
         LASSERT (!tx->tx_idle);
         LASSERT (!tx->tx_active);
@@ -472,204 +566,278 @@ kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx)
 
         kptllnd_set_tx_peer(tx, peer);
 
 
         kptllnd_set_tx_peer(tx, peer);
 
-        if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
-            tx->tx_type == TX_TYPE_GET_REQUEST) {
-
-                spin_lock_irqsave(&peer->peer_lock, flags);
-
-                /* Assume 64-bit matchbits can't wrap */
-                LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
-                tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
-                        peer->peer_next_matchbits++;
-                        
-                spin_unlock_irqrestore(&peer->peer_lock, flags);
-
-                prc = PtlMEAttach(kptllnd_data.kptl_nih,
-                                  *kptllnd_tunables.kptl_portal,
-                                  peer->peer_ptlid,
-                                  tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
-                                  0,             /* ignore bits */
-                                  PTL_UNLINK,
-                                  PTL_INS_BEFORE,
-                                  &meh);
-                if (prc != PTL_OK) {
-                        CERROR("PtlMEAttach(%s) failed: %d\n",
-                               libcfs_id2str(peer->peer_id), prc);
-                        goto failed;
-                }
-
-                prc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK, &rdma_mdh);
-                if (prc != PTL_OK) {
-                        CERROR("PtlMDAttach(%s) failed: %d\n",
-                               libcfs_id2str(tx->tx_peer->peer_id), prc);
-                        prc = PtlMEUnlink(meh);
-                        LASSERT(prc == PTL_OK);
-                        rdma_mdh = PTL_INVALID_HANDLE;
-                        goto failed;
-                }
-
-                /* I'm not racing with the event callback here.  It's a bug if
-                 * there's an event on the MD I just attached before I actually
-                 * send the RDMA request message which the event callback
-                 * catches by asserting 'rdma_mdh' is valid. */
-        }
-
         memset(&md, 0, sizeof(md));
         memset(&md, 0, sizeof(md));
-        
-        md.start = tx->tx_msg;
-        md.length = tx->tx_msg->ptlm_nob;
-        md.threshold = 1;
+
+        md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
         md.options = PTL_MD_OP_PUT |
                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
                      PTL_MD_EVENT_START_DISABLE;
         md.user_ptr = &tx->tx_msg_eventarg;
         md.eq_handle = kptllnd_data.kptl_eqh;
 
         md.options = PTL_MD_OP_PUT |
                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
                      PTL_MD_EVENT_START_DISABLE;
         md.user_ptr = &tx->tx_msg_eventarg;
         md.eq_handle = kptllnd_data.kptl_eqh;
 
+        if (nfrag == 0) {
+                md.start = tx->tx_msg;
+                md.length = tx->tx_msg->ptlm_nob;
+        } else {
+                LASSERT (nfrag > 1);
+                LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
+
+                md.start = tx->tx_frags;
+                md.length = nfrag;
+                md.options |= PTL_MD_IOVEC;
+        }
+
         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
         if (prc != PTL_OK) {
         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
         if (prc != PTL_OK) {
-                msg_mdh = PTL_INVALID_HANDLE;
-                goto failed;
+                CERROR("PtlMDBind(%s) failed: %s(%d)\n",
+                       libcfs_id2str(peer->peer_id),
+                       kptllnd_errtype2str(prc), prc);
+                tx->tx_status = -EIO;
+                kptllnd_tx_decref(tx);
+                return;
         }
         }
-        
-        spin_lock_irqsave(&peer->peer_lock, flags);
 
 
-        tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
+
+        tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * CFS_HZ);
         tx->tx_active = 1;
         tx->tx_active = 1;
-        tx->tx_rdma_mdh = rdma_mdh;
         tx->tx_msg_mdh = msg_mdh;
         tx->tx_msg_mdh = msg_mdh;
+        kptllnd_queue_tx(peer, tx);
+}
 
 
-       /* Ensure HELLO is sent first */
-       if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
-               list_add(&tx->tx_list, &peer->peer_sendq);
-       else
-               list_add_tail(&tx->tx_list, &peer->peer_sendq);
+/* NB "restarts" comes from peer_sendq of a single peer */
+void
+kptllnd_restart_txs (kptl_net_t *net, lnet_process_id_t target,
+                     cfs_list_t *restarts)
+{
+        kptl_tx_t   *tx;
+        kptl_tx_t   *tmp;
+        kptl_peer_t *peer;
 
 
-        spin_unlock_irqrestore(&peer->peer_lock, flags);
-        return;
-        
- failed:
-        spin_lock_irqsave(&peer->peer_lock, flags);
+        LASSERT (!cfs_list_empty(restarts));
 
 
-        tx->tx_status = -EIO;
-        tx->tx_rdma_mdh = rdma_mdh;
-        tx->tx_msg_mdh = msg_mdh;
+        if (kptllnd_find_target(net, target, &peer) != 0)
+                peer = NULL;
 
 
-        spin_unlock_irqrestore(&peer->peer_lock, flags);
+        cfs_list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
+                LASSERT (tx->tx_peer != NULL);
+                LASSERT (tx->tx_type == TX_TYPE_GET_REQUEST ||
+                         tx->tx_type == TX_TYPE_PUT_REQUEST ||
+                         tx->tx_type == TX_TYPE_SMALL_MESSAGE);
 
 
-        kptllnd_tx_decref(tx);
+                cfs_list_del_init(&tx->tx_list);
+
+                if (peer == NULL ||
+                    tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
+                        kptllnd_tx_decref(tx);
+                        continue;
+                }
+
+                LASSERT (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_NOOP);
+                tx->tx_status = 0;
+                tx->tx_active = 1;
+                kptllnd_peer_decref(tx->tx_peer);
+                tx->tx_peer = NULL;
+                kptllnd_set_tx_peer(tx, peer);
+                kptllnd_queue_tx(peer, tx); /* takes over my ref on tx */
+        }
+
+        if (peer == NULL)
+                return;
+
+        kptllnd_peer_check_sends(peer);
+        kptllnd_peer_decref(peer);
+}
+
+static inline int
+kptllnd_peer_send_noop (kptl_peer_t *peer)
+{
+        if (!peer->peer_sent_hello ||
+            peer->peer_credits == 0 ||
+            !cfs_list_empty(&peer->peer_noops) ||
+            peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
+                return 0;
+
+        /* No tx to piggyback NOOP onto or no credit to send a tx */
+        return (cfs_list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
 }
 
 void
 kptllnd_peer_check_sends (kptl_peer_t *peer)
 {
 }
 
 void
 kptllnd_peer_check_sends (kptl_peer_t *peer)
 {
-
+        ptl_handle_me_t  meh;
         kptl_tx_t       *tx;
         int              rc;
         kptl_tx_t       *tx;
         int              rc;
+        int              msg_type;
         unsigned long    flags;
 
         unsigned long    flags;
 
-        LASSERT(!in_interrupt());
+        LASSERT(!cfs_in_interrupt());
 
 
-        spin_lock_irqsave(&peer->peer_lock, flags);
+        cfs_spin_lock_irqsave(&peer->peer_lock, flags);
 
 
-        if (list_empty(&peer->peer_sendq) &&
-            peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER &&
-            peer->peer_credits != 0) {
+        peer->peer_retry_noop = 0;
 
 
+        if (kptllnd_peer_send_noop(peer)) {
                 /* post a NOOP to return credits */
                 /* post a NOOP to return credits */
-                spin_unlock_irqrestore(&peer->peer_lock, flags);
+                cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
 
                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
                 if (tx == NULL) {
                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
                                libcfs_id2str(peer->peer_id));
                 } else {
 
                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
                 if (tx == NULL) {
                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
                                libcfs_id2str(peer->peer_id));
                 } else {
-                        kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
-                        kptllnd_post_tx(peer, tx);
+                        kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP,
+                                         peer->peer_id, 0);
+                        kptllnd_post_tx(peer, tx, 0);
                 }
 
                 }
 
-                spin_lock_irqsave(&peer->peer_lock, flags);
+                cfs_spin_lock_irqsave(&peer->peer_lock, flags);
+                peer->peer_retry_noop = (tx == NULL);
         }
 
         }
 
-        while (!list_empty(&peer->peer_sendq)) {
-                tx = list_entry (peer->peer_sendq.next, kptl_tx_t, tx_list);
+        for (;;) {
+                if (!cfs_list_empty(&peer->peer_noops)) {
+                        LASSERT (peer->peer_sent_hello);
+                        tx = cfs_list_entry(peer->peer_noops.next,
+                                            kptl_tx_t, tx_list);
+                } else if (!cfs_list_empty(&peer->peer_sendq)) {
+                        tx = cfs_list_entry(peer->peer_sendq.next,
+                                            kptl_tx_t, tx_list);
+                } else {
+                        /* nothing to send right now */
+                        break;
+                }
 
                 LASSERT (tx->tx_active);
                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
 
                 LASSERT (tx->tx_active);
                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
-                LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
-                         !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
+                LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
 
                 LASSERT (peer->peer_outstanding_credits >= 0);
 
                 LASSERT (peer->peer_outstanding_credits >= 0);
-                LASSERT (peer->peer_outstanding_credits <= 
-                         *kptllnd_tunables.kptl_peercredits);
+                LASSERT (peer->peer_sent_credits >= 0);
+                LASSERT (peer->peer_sent_credits +
+                         peer->peer_outstanding_credits <=
+                         *kptllnd_tunables.kptl_peertxcredits);
                 LASSERT (peer->peer_credits >= 0);
                 LASSERT (peer->peer_credits >= 0);
-                LASSERT (peer->peer_credits <= 
-                         *kptllnd_tunables.kptl_peercredits);
-
-               /* Ensure HELLO is sent first */
-               if (!peer->peer_sent_hello) {
-                       if (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_HELLO)
-                               break;
-                       peer->peer_sent_hello = 1;
-               }
+
+                msg_type = tx->tx_msg->ptlm_type;
+
+                /* Ensure HELLO is sent first */
+                if (!peer->peer_sent_hello) {
+                        LASSERT (cfs_list_empty(&peer->peer_noops));
+                        if (msg_type != PTLLND_MSG_TYPE_HELLO)
+                                break;
+                        peer->peer_sent_hello = 1;
+                }
 
                 if (peer->peer_credits == 0) {
 
                 if (peer->peer_credits == 0) {
-                        CDEBUG(D_NETTRACE, "%s[%d/%d]: no credits for %p\n",
-                               libcfs_id2str(peer->peer_id),
-                               peer->peer_credits, peer->peer_outstanding_credits, tx);
+                        CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
+                               libcfs_id2str(peer->peer_id), 
+                               peer->peer_credits,
+                               peer->peer_outstanding_credits, 
+                               peer->peer_sent_credits, 
+                               kptllnd_msgtype2str(msg_type), tx);
                         break;
                 }
 
                         break;
                 }
 
-                /* Don't use the last credit unless I've got credits to
-                 * return */
+                /* Last/Initial credit reserved for NOOP/HELLO */
                 if (peer->peer_credits == 1 &&
                 if (peer->peer_credits == 1 &&
-                    peer->peer_outstanding_credits == 0) {
-                        CDEBUG(D_NETTRACE, "%s[%d/%d]: not using last credit for %p\n",
-                               libcfs_id2str(peer->peer_id),
-                               peer->peer_credits, peer->peer_outstanding_credits, tx);
+                    msg_type != PTLLND_MSG_TYPE_HELLO &&
+                    msg_type != PTLLND_MSG_TYPE_NOOP) {
+                        CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
+                               "not using last credit for %s[%p]\n",
+                               libcfs_id2str(peer->peer_id), 
+                               peer->peer_credits,
+                               peer->peer_outstanding_credits,
+                               peer->peer_sent_credits,
+                               kptllnd_msgtype2str(msg_type), tx);
                         break;
                 }
 
                         break;
                 }
 
-                list_del(&tx->tx_list);
+                cfs_list_del(&tx->tx_list);
 
                 /* Discard any NOOP I queued if I'm not at the high-water mark
                  * any more or more messages have been queued */
 
                 /* Discard any NOOP I queued if I'm not at the high-water mark
                  * any more or more messages have been queued */
-                if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
-                    (!list_empty(&peer->peer_sendq) ||
-                     peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
-
+                if (msg_type == PTLLND_MSG_TYPE_NOOP &&
+                    !kptllnd_peer_send_noop(peer)) {
                         tx->tx_active = 0;
 
                         tx->tx_active = 0;
 
-                        spin_unlock_irqrestore(&peer->peer_lock, flags);
+                        cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
 
                         CDEBUG(D_NET, "%s: redundant noop\n", 
                                libcfs_id2str(peer->peer_id));
                         kptllnd_tx_decref(tx);
 
 
                         CDEBUG(D_NET, "%s: redundant noop\n", 
                                libcfs_id2str(peer->peer_id));
                         kptllnd_tx_decref(tx);
 
-                        spin_lock_irqsave(&peer->peer_lock, flags);
+                        cfs_spin_lock_irqsave(&peer->peer_lock, flags);
                         continue;
                 }
 
                         continue;
                 }
 
-                /* fill last-minute msg header fields */
+                /* fill last-minute msg fields */
                 kptllnd_msg_pack(tx->tx_msg, peer);
 
                 kptllnd_msg_pack(tx->tx_msg, peer);
 
+                if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
+                    tx->tx_type == TX_TYPE_GET_REQUEST) {
+                        /* peer_next_matchbits must be known good */
+                        LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
+                        /* Assume 64-bit matchbits can't wrap */
+                        LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
+                        tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
+                                peer->peer_next_matchbits++;
+                }
+
+                peer->peer_sent_credits += peer->peer_outstanding_credits;
                 peer->peer_outstanding_credits = 0;
                 peer->peer_credits--;
 
                 peer->peer_outstanding_credits = 0;
                 peer->peer_credits--;
 
-                CDEBUG(D_NETTRACE, "%s[%d/%d]: %s tx=%p nob=%d cred=%d\n",
-                       libcfs_id2str(peer->peer_id),
-                       peer->peer_credits, peer->peer_outstanding_credits,
-                       kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
-                       tx, tx->tx_msg->ptlm_nob,
+                CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
+                       libcfs_id2str(peer->peer_id), peer->peer_credits,
+                       peer->peer_outstanding_credits, peer->peer_sent_credits,
+                       kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
                        tx->tx_msg->ptlm_credits);
 
                        tx->tx_msg->ptlm_credits);
 
-                list_add_tail(&tx->tx_list, &peer->peer_activeq);
+                cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
 
                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
 
 
                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
 
-                spin_unlock_irqrestore(&peer->peer_lock, flags);
+                cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
+
+                if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
+                    tx->tx_type == TX_TYPE_GET_REQUEST) {
+                        /* Post bulk now we have safe matchbits */
+                        rc = PtlMEAttach(kptllnd_data.kptl_nih,
+                                         *kptllnd_tunables.kptl_portal,
+                                         peer->peer_ptlid,
+                                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
+                                         0,             /* ignore bits */
+                                         PTL_UNLINK,
+                                         PTL_INS_BEFORE,
+                                         &meh);
+                        if (rc != PTL_OK) {
+                                CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
+                                       libcfs_id2str(peer->peer_id),
+                                       kptllnd_errtype2str(rc), rc);
+                                goto failed;
+                        }
+
+                        rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
+                                         &tx->tx_rdma_mdh);
+                        if (rc != PTL_OK) {
+                                CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
+                                       libcfs_id2str(tx->tx_peer->peer_id),
+                                       kptllnd_errtype2str(rc), rc);
+                                rc = PtlMEUnlink(meh);
+                                LASSERT(rc == PTL_OK);
+                                tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
+                                goto failed;
+                        }
+                        /* I'm not racing with the event callback here.  It's a
+                         * bug if there's an event on the MD I just attached
+                         * before I actually send the RDMA request message -
+                         * probably matchbits re-used in error. */
+                }
+
+                tx->tx_tposted = jiffies;       /* going on the wire */
 
                 rc = PtlPut (tx->tx_msg_mdh,
 
                 rc = PtlPut (tx->tx_msg_mdh,
-                             PTL_NOACK_REQ,
+                             tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
                              peer->peer_ptlid,
                              *kptllnd_tunables.kptl_portal,
                              0,                 /* acl cookie */
                              peer->peer_ptlid,
                              *kptllnd_tunables.kptl_portal,
                              0,                 /* acl cookie */
@@ -677,115 +845,158 @@ kptllnd_peer_check_sends (kptl_peer_t *peer)
                              0,                 /* offset */
                              0);                /* header data */
                 if (rc != PTL_OK) {
                              0,                 /* offset */
                              0);                /* header data */
                 if (rc != PTL_OK) {
-                        CERROR("PtlPut %s error %d\n",
-                               libcfs_id2str(peer->peer_id), rc);
-
-                        /* Nuke everything (including this tx) */
-                        kptllnd_peer_close(peer, -EIO);
-                        return;
+                        CERROR("PtlPut %s error %s(%d)\n",
+                               libcfs_id2str(peer->peer_id),
+                               kptllnd_errtype2str(rc), rc);
+                        goto failed;
                 }
 
                 kptllnd_tx_decref(tx);          /* drop my ref */
 
                 }
 
                 kptllnd_tx_decref(tx);          /* drop my ref */
 
-                spin_lock_irqsave(&peer->peer_lock, flags);
+                cfs_spin_lock_irqsave(&peer->peer_lock, flags);
         }
 
         }
 
-        spin_unlock_irqrestore(&peer->peer_lock, flags);
+        cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
+        return;
+
+ failed:
+        /* Nuke everything (including tx we were trying) */
+        kptllnd_peer_close(peer, -EIO);
+        kptllnd_tx_decref(tx);
 }
 
 kptl_tx_t *
 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
 {
         kptl_tx_t         *tx;
 }
 
 kptl_tx_t *
 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
 {
         kptl_tx_t         *tx;
-        struct list_head  *tmp;
-        unsigned long      flags;
+        cfs_list_t        *ele;
 
 
-        spin_lock_irqsave(&peer->peer_lock, flags);
+        cfs_list_for_each(ele, &peer->peer_sendq) {
+                tx = cfs_list_entry(ele, kptl_tx_t, tx_list);
 
 
-        list_for_each(tmp, &peer->peer_sendq) {
-                tx = list_entry(peer->peer_sendq.next, kptl_tx_t, tx_list);
-
-                if (time_after_eq(jiffies, tx->tx_deadline)) {
+                if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
                         kptllnd_tx_addref(tx);
                         kptllnd_tx_addref(tx);
-                        spin_unlock_irqrestore(&peer->peer_lock, flags);
                         return tx;
                 }
         }
 
                         return tx;
                 }
         }
 
-        list_for_each(tmp, &peer->peer_activeq) {
-                tx = list_entry(peer->peer_activeq.next, kptl_tx_t, tx_list);
+        cfs_list_for_each(ele, &peer->peer_activeq) {
+                tx = cfs_list_entry(ele, kptl_tx_t, tx_list);
 
 
-                if (time_after_eq(jiffies, tx->tx_deadline)) {
+                if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
                         kptllnd_tx_addref(tx);
                         kptllnd_tx_addref(tx);
-                        spin_unlock_irqrestore(&peer->peer_lock, flags);
                         return tx;
                 }
         }
 
                         return tx;
                 }
         }
 
-        spin_unlock_irqrestore(&peer->peer_lock, flags);
         return NULL;
 }
 
 
 void
         return NULL;
 }
 
 
 void
-kptllnd_peer_check_bucket (int idx)
+kptllnd_peer_check_bucket (int idx, int stamp)
 {
 {
-        struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
-        struct list_head  *ptmp;
+        cfs_list_t        *peers = &kptllnd_data.kptl_peers[idx];
         kptl_peer_t       *peer;
         kptl_peer_t       *peer;
-        kptl_tx_t         *tx;
         unsigned long      flags;
         unsigned long      flags;
-        int                nsend;
-        int                nactive;
 
 
-        CDEBUG(D_NET, "Bucket=%d\n", idx);
+        CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
 
  again:
         /* NB. Shared lock while I just look */
 
  again:
         /* NB. Shared lock while I just look */
-        read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+        cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
 
 
-        list_for_each (ptmp, peers) {
-                peer = list_entry (ptmp, kptl_peer_t, peer_list);
+        cfs_list_for_each_entry (peer, peers, peer_list) {
+                kptl_tx_t *tx;
+                int        check_sends;
+                int        c = -1, oc = -1, sc = -1;
+                int        nsend = -1, nactive = -1;
+                int        sent_hello = -1, state = -1;
 
 
-                CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d\n",
-                       libcfs_id2str(peer->peer_id),
-                       peer->peer_credits, peer->peer_outstanding_credits);
+                CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
+                       libcfs_id2str(peer->peer_id), peer->peer_credits, 
+                       peer->peer_outstanding_credits, peer->peer_sent_credits);
+
+                cfs_spin_lock(&peer->peer_lock);
 
 
-                /* In case we have enough credits to return via a
-                 * NOOP, but there were no non-blocking tx descs
-                 * free to do it last time... */
-                kptllnd_peer_check_sends(peer);
+                if (peer->peer_check_stamp == stamp) {
+                        /* checked already this pass */
+                        cfs_spin_unlock(&peer->peer_lock);
+                        continue;
+                }
 
 
+                peer->peer_check_stamp = stamp;
                 tx = kptllnd_find_timed_out_tx(peer);
                 tx = kptllnd_find_timed_out_tx(peer);
-                if (tx == NULL)
+                check_sends = peer->peer_retry_noop;
+
+                if (tx != NULL) {
+                        c  = peer->peer_credits;
+                        sc = peer->peer_sent_credits;
+                        oc = peer->peer_outstanding_credits;
+                        state      = peer->peer_state;
+                        sent_hello = peer->peer_sent_hello;
+                        nsend   = kptllnd_count_queue(&peer->peer_sendq);
+                        nactive = kptllnd_count_queue(&peer->peer_activeq);
+                }
+
+                cfs_spin_unlock(&peer->peer_lock);
+
+                if (tx == NULL && !check_sends)
                         continue;
 
                 kptllnd_peer_addref(peer); /* 1 ref for me... */
 
                         continue;
 
                 kptllnd_peer_addref(peer); /* 1 ref for me... */
 
-                read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
-                                       flags);
+                cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
+                                           flags);
 
 
-                spin_lock_irqsave(&peer->peer_lock, flags);
-                nsend = kptllnd_count_queue(&peer->peer_sendq);
-                nactive = kptllnd_count_queue(&peer->peer_activeq);
-                spin_unlock_irqrestore(&peer->peer_lock, flags);
+                if (tx == NULL) { /* nothing timed out */
+                        kptllnd_peer_check_sends(peer);
+                        kptllnd_peer_decref(peer); /* ...until here or... */
 
 
-                LCONSOLE_ERROR("Timing out %s: please check Portals\n",
-                               libcfs_id2str(peer->peer_id));
+                        /* rescan after dropping the lock */
+                        goto again;
+                }
 
 
-                CERROR("%s timed out: cred %d outstanding %d sendq %d "
-                       "activeq %d Tx %s (%s%s%s) status %d T/O %ds\n",
-                       libcfs_id2str(peer->peer_id),
-                       peer->peer_credits, peer->peer_outstanding_credits,
-                       nsend, nactive, kptllnd_tx_typestr(tx->tx_type),
+                LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
+                                   libcfs_id2str(peer->peer_id),
+                                   (tx->tx_tposted == 0) ?
+                                   "no free peer buffers" :
+                                   "please check Portals");
+
+               if (tx->tx_tposted) {
+                       CERROR("Could not send to %s after %ds (sent %lds ago); "
+                               "check Portals for possible issues\n",
+                               libcfs_id2str(peer->peer_id),
+                               *kptllnd_tunables.kptl_timeout,
+                               cfs_duration_sec(jiffies - tx->tx_tposted));
+                } else if (state < PEER_STATE_ACTIVE) {
+                        CERROR("Could not connect %s (%d) after %ds; "
+                               "peer might be down\n",
+                               libcfs_id2str(peer->peer_id), state,
+                               *kptllnd_tunables.kptl_timeout);
+               } else {
+                       CERROR("Could not get credits for %s after %ds; "
+                               "possible Lustre networking issues\n",
+                       libcfs_id2str(peer->peer_id),
+                       *kptllnd_tunables.kptl_timeout);
+               }
+
+                CERROR("%s timed out: cred %d outstanding %d, sent %d, "
+                       "state %d, sent_hello %d, sendq %d, activeq %d "
+                       "Tx %p %s %s (%s%s%s) status %d %sposted %lu T/O %ds\n",
+                       libcfs_id2str(peer->peer_id), c, oc, sc,
+                       state, sent_hello, nsend, nactive,
+                       tx, kptllnd_tx_typestr(tx->tx_type),
+                       kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
                        tx->tx_active ? "A" : "",
                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
                        "" : "M",
                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
                        "" : "D",
                        tx->tx_active ? "A" : "",
                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
                        "" : "M",
                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
                        "" : "D",
-                       tx->tx_status, *kptllnd_tunables.kptl_timeout);
-
-                kptllnd_dump_ptltrace();
+                       tx->tx_status,
+                       (tx->tx_tposted == 0) ? "not " : "",
+                       (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
+                       *kptllnd_tunables.kptl_timeout);
 
                 kptllnd_tx_decref(tx);
 
 
                 kptllnd_tx_decref(tx);
 
@@ -796,33 +1007,33 @@ kptllnd_peer_check_bucket (int idx)
                 goto again;
         }
 
                 goto again;
         }
 
-        read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+        cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
 }
 
 kptl_peer_t *
 kptllnd_id2peer_locked (lnet_process_id_t id)
 {
 }
 
 kptl_peer_t *
 kptllnd_id2peer_locked (lnet_process_id_t id)
 {
-        struct list_head *peers = kptllnd_nid2peerlist(id.nid);
-        struct list_head *tmp;
+        cfs_list_t       *peers = kptllnd_nid2peerlist(id.nid);
+        cfs_list_t       *tmp;
         kptl_peer_t      *peer;
 
         kptl_peer_t      *peer;
 
-        list_for_each (tmp, peers) {
-
-                peer = list_entry (tmp, kptl_peer_t, peer_list);
+        cfs_list_for_each (tmp, peers) {
+                peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
 
                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
                         peer->peer_state == PEER_STATE_ACTIVE);
 
                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
                         peer->peer_state == PEER_STATE_ACTIVE);
-                
-                if (peer->peer_id.nid != id.nid ||
-                    peer->peer_id.pid != id.pid)
+
+                /* NB logical LNet peers share one kptl_peer_t */
+                if (peer->peer_id.pid != id.pid ||
+                    LNET_NIDADDR(id.nid) != LNET_NIDADDR(peer->peer_id.nid))
                         continue;
 
                 kptllnd_peer_addref(peer);
 
                 CDEBUG(D_NET, "%s -> %s (%d)\n",
                         continue;
 
                 kptllnd_peer_addref(peer);
 
                 CDEBUG(D_NET, "%s -> %s (%d)\n",
-                       libcfs_id2str(id), 
+                       libcfs_id2str(id),
                        kptllnd_ptlid2str(peer->peer_ptlid),
                        kptllnd_ptlid2str(peer->peer_ptlid),
-                       atomic_read (&peer->peer_refcount));
+                       cfs_atomic_read (&peer->peer_refcount));
                 return peer;
         }
 
                 return peer;
         }
 
@@ -832,19 +1043,19 @@ kptllnd_id2peer_locked (lnet_process_id_t id)
 void
 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
 {
 void
 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
 {
-        LCONSOLE_ERROR("%s %s overflows the peer table[%d]: "
-                       "messages may be dropped\n",
-                       str, libcfs_id2str(id),
-                       kptllnd_data.kptl_n_active_peers);
-        LCONSOLE_ERROR("Please correct by increasing "
-                       "'max_nodes' or 'max_procs_per_node'\n");
+        LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
+                           "messages may be dropped\n",
+                           str, libcfs_id2str(id),
+                           kptllnd_data.kptl_n_active_peers);
+        LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
+                           "'max_nodes' or 'max_procs_per_node'\n");
 }
 
 __u64
 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
 {
 }
 
 __u64
 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
 {
-        kptl_peer_t            *peer;
-        struct list_head       *tmp;
+        kptl_peer_t  *peer;
+        cfs_list_t   *tmp;
 
         /* Find the last matchbits I saw this new peer using.  Note..
            A. This peer cannot be in the peer table - she's new!
 
         /* Find the last matchbits I saw this new peer using.  Note..
            A. This peer cannot be in the peer table - she's new!
@@ -859,30 +1070,30 @@ kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
         /* peer's last matchbits can't change after it comes out of the peer
          * table, so first match is fine */
 
         /* peer's last matchbits can't change after it comes out of the peer
          * table, so first match is fine */
 
-        list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
-                peer = list_entry (tmp, kptl_peer_t, peer_list);
+        cfs_list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
+                peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
 
 
-                if (peer->peer_id.nid == lpid.nid &&
+                if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
                     peer->peer_id.pid == lpid.pid)
                         return peer->peer_last_matchbits_seen;
         }
                     peer->peer_id.pid == lpid.pid)
                         return peer->peer_last_matchbits_seen;
         }
-        
-        list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
-                peer = list_entry (tmp, kptl_peer_t, peer_list);
 
 
-                if (peer->peer_id.nid == lpid.nid &&
+        cfs_list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
+                peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
+
+                if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
                     peer->peer_id.pid == lpid.pid)
                         return peer->peer_last_matchbits_seen;
         }
                     peer->peer_id.pid == lpid.pid)
                         return peer->peer_last_matchbits_seen;
         }
-        
+
         return PTL_RESERVED_MATCHBITS;
 }
 
 kptl_peer_t *
         return PTL_RESERVED_MATCHBITS;
 }
 
 kptl_peer_t *
-kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
-                           kptl_msg_t       *msg)
+kptllnd_peer_handle_hello (kptl_net_t *net,
+                           ptl_process_id_t initiator, kptl_msg_t *msg)
 {
 {
-        rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
+        cfs_rwlock_t       *g_lock = &kptllnd_data.kptl_peer_rw_lock;
         kptl_peer_t        *peer;
         kptl_peer_t        *new_peer;
         lnet_process_id_t   lpid;
         kptl_peer_t        *peer;
         kptl_peer_t        *new_peer;
         lnet_process_id_t   lpid;
@@ -919,26 +1130,21 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
                return NULL;
        }
        
                return NULL;
        }
        
-        if (msg->ptlm_u.hello.kptlhm_max_msg_size !=
-            *kptllnd_tunables.kptl_max_msg_size) {
-                CERROR("max message size MUST be equal for all peers: "
-                       "got %d expected %d from %s\n",
+        if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
+                CERROR("%s: max message size %d < MIN %d",
+                       libcfs_id2str(lpid),
                        msg->ptlm_u.hello.kptlhm_max_msg_size,
                        msg->ptlm_u.hello.kptlhm_max_msg_size,
-                       *kptllnd_tunables.kptl_max_msg_size,
-                       libcfs_id2str(lpid));
+                       PTLLND_MIN_BUFFER_SIZE);
                 return NULL;
         }
 
                 return NULL;
         }
 
-        if (msg->ptlm_credits + 1 != *kptllnd_tunables.kptl_peercredits) {
-                CERROR("peercredits MUST be equal on all peers: "
-                       "got %d expected %d from %s\n",
-                       msg->ptlm_credits + 1,
-                       *kptllnd_tunables.kptl_peercredits,
-                       libcfs_id2str(lpid));
+        if (msg->ptlm_credits <= 1) {
+                CERROR("Need more than 1+%d credits from %s\n",
+                       msg->ptlm_credits, libcfs_id2str(lpid));
                 return NULL;
         }
         
                 return NULL;
         }
         
-        write_lock_irqsave(g_lock, flags);
+        cfs_write_lock_irqsave(g_lock, flags);
 
         peer = kptllnd_id2peer_locked(lpid);
         if (peer != NULL) {
 
         peer = kptllnd_id2peer_locked(lpid);
         if (peer != NULL) {
@@ -946,21 +1152,50 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
                         /* Completing HELLO handshake */
                         LASSERT(peer->peer_incarnation == 0);
 
                         /* Completing HELLO handshake */
                         LASSERT(peer->peer_incarnation == 0);
 
+                        if (msg->ptlm_dststamp != 0 &&
+                            msg->ptlm_dststamp != peer->peer_myincarnation) {
+                                cfs_write_unlock_irqrestore(g_lock, flags);
+
+                                CERROR("Ignoring HELLO from %s: unexpected "
+                                       "dststamp "LPX64" ("LPX64" wanted)\n",
+                                       libcfs_id2str(lpid),
+                                       msg->ptlm_dststamp,
+                                       peer->peer_myincarnation);
+                                kptllnd_peer_decref(peer);
+                                return NULL;
+                        }
+                        
+                        /* Concurrent initiation or response to my HELLO */
                         peer->peer_state = PEER_STATE_ACTIVE;
                         peer->peer_incarnation = msg->ptlm_srcstamp;
                         peer->peer_next_matchbits = safe_matchbits;
                         peer->peer_state = PEER_STATE_ACTIVE;
                         peer->peer_incarnation = msg->ptlm_srcstamp;
                         peer->peer_next_matchbits = safe_matchbits;
-
-                        write_unlock_irqrestore(g_lock, flags);
+                        peer->peer_max_msg_size =
+                                msg->ptlm_u.hello.kptlhm_max_msg_size;
+                        
+                        cfs_write_unlock_irqrestore(g_lock, flags);
                         return peer;
                 }
 
                         return peer;
                 }
 
-                /* remove old incarnation of this peer */
+                if (msg->ptlm_dststamp != 0 &&
+                    msg->ptlm_dststamp <= peer->peer_myincarnation) {
+                        cfs_write_unlock_irqrestore(g_lock, flags);
+
+                        CERROR("Ignoring stale HELLO from %s: "
+                               "dststamp "LPX64" (current "LPX64")\n",
+                               libcfs_id2str(lpid),
+                               msg->ptlm_dststamp,
+                               peer->peer_myincarnation);
+                        kptllnd_peer_decref(peer);
+                        return NULL;
+                }
+
+                /* Brand new connection attempt: remove old incarnation */
                 kptllnd_peer_close_locked(peer, 0);
         }
 
         kptllnd_cull_peertable_locked(lpid);
 
                 kptllnd_peer_close_locked(peer, 0);
         }
 
         kptllnd_cull_peertable_locked(lpid);
 
-        write_unlock_irqrestore(g_lock, flags);
+        cfs_write_unlock_irqrestore(g_lock, flags);
 
         if (peer != NULL) {
                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
 
         if (peer != NULL) {
                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
@@ -969,6 +1204,7 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
                        msg->ptlm_srcstamp, peer->peer_incarnation);
 
                 kptllnd_peer_decref(peer);
                        msg->ptlm_srcstamp, peer->peer_incarnation);
 
                 kptllnd_peer_decref(peer);
+                peer = NULL;
         }
 
         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
         }
 
         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
@@ -979,9 +1215,9 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
         }
 
         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
         }
 
         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
-                         sizeof(kptl_hello_msg_t));
+                         lpid, sizeof(kptl_hello_msg_t));
 
 
-        new_peer = kptllnd_peer_allocate(lpid, initiator);
+        new_peer = kptllnd_peer_allocate(net, lpid, initiator);
         if (new_peer == NULL) {
                 kptllnd_tx_decref(hello_tx);
                 return NULL;
         if (new_peer == NULL) {
                 kptllnd_tx_decref(hello_tx);
                 return NULL;
@@ -997,21 +1233,41 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
                 return NULL;
         }
 
                 return NULL;
         }
 
-        write_lock_irqsave(g_lock, flags);
+        cfs_write_lock_irqsave(g_lock, flags);
+
+ again:
+        if (net->net_shutdown) {
+                cfs_write_unlock_irqrestore(g_lock, flags);
+
+                CERROR ("Shutdown started, refusing connection from %s\n",
+                        libcfs_id2str(lpid));
+                kptllnd_peer_unreserve_buffers();
+                kptllnd_peer_decref(new_peer);
+                kptllnd_tx_decref(hello_tx);
+                return NULL;
+        }
 
         peer = kptllnd_id2peer_locked(lpid);
         if (peer != NULL) {
                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
 
         peer = kptllnd_id2peer_locked(lpid);
         if (peer != NULL) {
                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
-                        /* An outgoing message instantiated 'peer' for me and
-                        * presumably provoked this reply */
-                        CWARN("Outgoing instantiated peer %s\n", libcfs_id2str(lpid));
+                        /* An outgoing message instantiated 'peer' for me */
                         LASSERT(peer->peer_incarnation == 0);
 
                         peer->peer_state = PEER_STATE_ACTIVE;
                         peer->peer_incarnation = msg->ptlm_srcstamp;
                         peer->peer_next_matchbits = safe_matchbits;
                         LASSERT(peer->peer_incarnation == 0);
 
                         peer->peer_state = PEER_STATE_ACTIVE;
                         peer->peer_incarnation = msg->ptlm_srcstamp;
                         peer->peer_next_matchbits = safe_matchbits;
+                        peer->peer_max_msg_size =
+                                msg->ptlm_u.hello.kptlhm_max_msg_size;
+
+                        cfs_write_unlock_irqrestore(g_lock, flags);
+
+                        CWARN("Outgoing instantiated peer %s\n",
+                              libcfs_id2str(lpid));
                } else {
                        LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
                } else {
                        LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
+
+                        cfs_write_unlock_irqrestore(g_lock, flags);
+
                        /* WOW!  Somehow this peer completed the HELLO
                         * handshake while I slept.  I guess I could have slept
                         * while it rebooted and sent a new HELLO, so I'll fail
                        /* WOW!  Somehow this peer completed the HELLO
                         * handshake while I slept.  I guess I could have slept
                         * while it rebooted and sent a new HELLO, so I'll fail
@@ -1020,8 +1276,6 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
                        kptllnd_peer_decref(peer);
                        peer = NULL;
                }
                        kptllnd_peer_decref(peer);
                        peer = NULL;
                }
-               
-                write_unlock_irqrestore(g_lock, flags);
 
                 kptllnd_peer_unreserve_buffers();
                 kptllnd_peer_decref(new_peer);
 
                 kptllnd_peer_unreserve_buffers();
                 kptllnd_peer_decref(new_peer);
@@ -1032,7 +1286,7 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
         if (kptllnd_data.kptl_n_active_peers ==
             kptllnd_data.kptl_expected_peers) {
                 /* peer table full */
         if (kptllnd_data.kptl_n_active_peers ==
             kptllnd_data.kptl_expected_peers) {
                 /* peer table full */
-                write_unlock_irqrestore(g_lock, flags);
+                cfs_write_unlock_irqrestore(g_lock, flags);
 
                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
 
 
                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
 
@@ -1046,8 +1300,9 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
                         return NULL;
                 }
                 
                         return NULL;
                 }
                 
-                write_lock_irqsave(g_lock, flags);
+                cfs_write_lock_irqsave(g_lock, flags);
                 kptllnd_data.kptl_expected_peers++;
                 kptllnd_data.kptl_expected_peers++;
+                goto again;
         }
 
         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
         }
 
         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
@@ -1060,10 +1315,12 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
         new_peer->peer_incarnation = msg->ptlm_srcstamp;
         new_peer->peer_next_matchbits = safe_matchbits;
         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
         new_peer->peer_incarnation = msg->ptlm_srcstamp;
         new_peer->peer_next_matchbits = safe_matchbits;
         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
+        new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
 
 
+        LASSERT (!net->net_shutdown);
         kptllnd_peer_add_peertable_locked(new_peer);
 
         kptllnd_peer_add_peertable_locked(new_peer);
 
-        write_unlock_irqrestore(g_lock, flags);
+        cfs_write_unlock_irqrestore(g_lock, flags);
 
        /* NB someone else could get in now and post a message before I post
         * the HELLO, but post_tx/check_sends take care of that! */
 
        /* NB someone else could get in now and post a message before I post
         * the HELLO, but post_tx/check_sends take care of that! */
@@ -1071,99 +1328,88 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
                libcfs_id2str(new_peer->peer_id), hello_tx);
 
         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
                libcfs_id2str(new_peer->peer_id), hello_tx);
 
-        kptllnd_post_tx(new_peer, hello_tx);
+        kptllnd_post_tx(new_peer, hello_tx, 0);
         kptllnd_peer_check_sends(new_peer);
 
         return new_peer;
 }
 
 void
         kptllnd_peer_check_sends(new_peer);
 
         return new_peer;
 }
 
 void
-kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target)
+kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
 {
 {
-        rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
+        kptllnd_post_tx(peer, tx, nfrag);
+        kptllnd_peer_check_sends(peer);
+}
+
+int
+kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
+                    kptl_peer_t **peerp)
+{
+        cfs_rwlock_t     *g_lock = &kptllnd_data.kptl_peer_rw_lock;
         ptl_process_id_t  ptl_id;
         ptl_process_id_t  ptl_id;
-        kptl_peer_t      *peer;
-        kptl_peer_t      *new_peer = NULL;
-        kptl_tx_t        *hello_tx = NULL;
+        kptl_peer_t      *new_peer;
+        kptl_tx_t        *hello_tx;
         unsigned long     flags;
         int               rc;
         __u64             last_matchbits_seen;
 
         unsigned long     flags;
         int               rc;
         __u64             last_matchbits_seen;
 
-        LASSERT (tx->tx_lnet_msg != NULL);
-        LASSERT (tx->tx_peer == NULL);
-
         /* I expect to find the peer, so I only take a read lock... */
         /* I expect to find the peer, so I only take a read lock... */
-        read_lock_irqsave(g_lock, flags);
-        peer = kptllnd_id2peer_locked(target);
-        read_unlock_irqrestore(g_lock, flags);
+        cfs_read_lock_irqsave(g_lock, flags);
+        *peerp = kptllnd_id2peer_locked(target);
+        cfs_read_unlock_irqrestore(g_lock, flags);
+
+        if (*peerp != NULL)
+                return 0;
 
 
-        if (peer != NULL) {
-                goto post;
-        }
-        
         if ((target.pid & LNET_PID_USERFLAG) != 0) {
                 CWARN("Refusing to create a new connection to %s "
                       "(non-kernel peer)\n", libcfs_id2str(target));
         if ((target.pid & LNET_PID_USERFLAG) != 0) {
                 CWARN("Refusing to create a new connection to %s "
                       "(non-kernel peer)\n", libcfs_id2str(target));
-                tx->tx_status = -EHOSTUNREACH;
-                goto failed;
+                return -EHOSTUNREACH;
         }
 
         }
 
-        /* The new peer is a kernel ptllnd, and kernel ptllnds all have
-         * the same portals PID */
+        /* The new peer is a kernel ptllnd, and kernel ptllnds all have the
+         * same portals PID, which has nothing to do with LUSTRE_SRV_LNET_PID */
         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
 
         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
 
-        write_lock_irqsave(g_lock, flags);
-
-        peer = kptllnd_id2peer_locked(target);
-        if (peer != NULL) {
-                write_unlock_irqrestore(g_lock, flags);
-                goto post;
-        }
-        
-        kptllnd_cull_peertable_locked(target);
-
-        write_unlock_irqrestore(g_lock, flags);
-                
         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
         if (hello_tx == NULL) {
                 CERROR("Unable to allocate connect message for %s\n",
                        libcfs_id2str(target));
         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
         if (hello_tx == NULL) {
                 CERROR("Unable to allocate connect message for %s\n",
                        libcfs_id2str(target));
-                tx->tx_status = -ENOMEM;
-                goto failed;
+                return -ENOMEM;
         }
 
         }
 
+        hello_tx->tx_acked = 1;
         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
-                         sizeof(kptl_hello_msg_t));
+                         target, sizeof(kptl_hello_msg_t));
 
 
-        new_peer = kptllnd_peer_allocate(target, ptl_id);
+        new_peer = kptllnd_peer_allocate(net, target, ptl_id);
         if (new_peer == NULL) {
         if (new_peer == NULL) {
-                tx->tx_status = -ENOMEM;
-                goto failed;
+                rc = -ENOMEM;
+                goto unwind_0;
         }
 
         rc = kptllnd_peer_reserve_buffers();
         }
 
         rc = kptllnd_peer_reserve_buffers();
-        if (rc != 0) {
-                tx->tx_status = rc;
-                goto failed;
-        }
-
-        write_lock_irqsave(g_lock, flags);
+        if (rc != 0)
+                goto unwind_1;
 
 
-        peer = kptllnd_id2peer_locked(target);
-        if (peer != NULL) {                     /* someone else beat me to it */
-                write_unlock_irqrestore(g_lock, flags);
+        cfs_write_lock_irqsave(g_lock, flags);
+ again:
+        /* Called only in lnd_send which can't happen after lnd_shutdown */
+        LASSERT (!net->net_shutdown);
 
 
-                kptllnd_peer_unreserve_buffers();
-                kptllnd_peer_decref(new_peer);
-                kptllnd_tx_decref(hello_tx);
-                goto post;
+        *peerp = kptllnd_id2peer_locked(target);
+        if (*peerp != NULL) {
+                cfs_write_unlock_irqrestore(g_lock, flags);
+                goto unwind_2;
         }
         }
-                
+
+        kptllnd_cull_peertable_locked(target);
+
         if (kptllnd_data.kptl_n_active_peers ==
             kptllnd_data.kptl_expected_peers) {
                 /* peer table full */
         if (kptllnd_data.kptl_n_active_peers ==
             kptllnd_data.kptl_expected_peers) {
                 /* peer table full */
-                write_unlock_irqrestore(g_lock, flags);
+                cfs_write_unlock_irqrestore(g_lock, flags);
 
                 kptllnd_peertable_overflow_msg("Connection to ", target);
 
 
                 kptllnd_peertable_overflow_msg("Connection to ", target);
 
@@ -1171,12 +1417,12 @@ kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target)
                 if (rc != 0) {
                         CERROR("Can't create connection to %s\n",
                                libcfs_id2str(target));
                 if (rc != 0) {
                         CERROR("Can't create connection to %s\n",
                                libcfs_id2str(target));
-                        kptllnd_peer_unreserve_buffers();
-                        tx->tx_status = -ENOMEM;
-                        goto failed;
+                        rc = -ENOMEM;
+                        goto unwind_2;
                 }
                 }
-                write_lock_irqsave(g_lock, flags);
+                cfs_write_lock_irqsave(g_lock, flags);
                 kptllnd_data.kptl_expected_peers++;
                 kptllnd_data.kptl_expected_peers++;
+                goto again;
         }
 
         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
         }
 
         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
@@ -1184,37 +1430,32 @@ kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target)
         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
                 *kptllnd_tunables.kptl_max_msg_size;
         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
                 *kptllnd_tunables.kptl_max_msg_size;
-                
+
         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
-        
+
         kptllnd_peer_add_peertable_locked(new_peer);
 
         kptllnd_peer_add_peertable_locked(new_peer);
 
-        write_unlock_irqrestore(g_lock, flags);
+        cfs_write_unlock_irqrestore(g_lock, flags);
 
 
-       /* NB someone else could get in now and post a message before I post
-        * the HELLO, but post_tx/check_sends take care of that! */
+        /* NB someone else could get in now and post a message before I post
+         * the HELLO, but post_tx/check_sends take care of that! */
 
         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
                libcfs_id2str(new_peer->peer_id), hello_tx);
 
 
         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
                libcfs_id2str(new_peer->peer_id), hello_tx);
 
-        peer = new_peer;
-        kptllnd_post_tx(peer, hello_tx);
+        kptllnd_post_tx(new_peer, hello_tx, 0);
+        kptllnd_peer_check_sends(new_peer);
 
 
- post:
-        kptllnd_post_tx(peer, tx);
-        kptllnd_peer_check_sends(peer);
-        kptllnd_peer_decref(peer);
-        return;
-        
- failed:
-        if (hello_tx != NULL)
-                kptllnd_tx_decref(hello_tx);
+        *peerp = new_peer;
+        return 0;
 
 
-        if (new_peer != NULL)
-                kptllnd_peer_decref(new_peer);
+ unwind_2:
+        kptllnd_peer_unreserve_buffers();
+ unwind_1:
+        kptllnd_peer_decref(new_peer);
+ unwind_0:
+        kptllnd_tx_decref(hello_tx);
 
 
-        LASSERT (tx->tx_status != 0);
-        kptllnd_tx_decref(tx);
-        
+        return rc;
 }
 }