Whamcloud - gitweb
LU-1617 build: skip generated files in .gitignore
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_rx_buf.c
index b83ab51..f12be7d 100644 (file)
@@ -1,19 +1,39 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
+/*
+ * GPL HEADER START
  *
- * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
- *   Author: PJ Kirner <pjkirner@clusterfs.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   This file is confidential source code owned by Cluster File Systems.
- *   No viewing, modification, compilation, redistribution, or any other
- *   form of use is permitted except through a signed license agreement.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   If you have not signed such an agreement, then you have no rights to
- *   this file.  Please destroy it immediately and contact CFS.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lnet/klnds/ptllnd/ptllnd_rx_buf.c
+ *
+ * Author: PJ Kirner <pjkirner@clusterfs.com>
  */
 
  #include "ptllnd.h"
@@ -22,8 +42,8 @@ void
 kptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp)
 {
         memset(rxbp, 0, sizeof(*rxbp));
-        spin_lock_init(&rxbp->rxbp_lock);
-        INIT_LIST_HEAD(&rxbp->rxbp_list);
+        cfs_spin_lock_init(&rxbp->rxbp_lock);
+        CFS_INIT_LIST_HEAD(&rxbp->rxbp_list);
 }
 
 void
@@ -36,7 +56,7 @@ kptllnd_rx_buffer_destroy(kptl_rx_buffer_t *rxb)
         LASSERT(!rxb->rxb_posted);
         LASSERT(rxb->rxb_idle);
 
-        list_del(&rxb->rxb_list);
+        cfs_list_del(&rxb->rxb_list);
         rxbp->rxbp_count--;
 
         LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size());
@@ -58,7 +78,7 @@ kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count)
 
         CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count);
 
-        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+        cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
 
         for (;;) {
                 if (rxbp->rxbp_shutdown) {
@@ -72,7 +92,7 @@ kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count)
                         break;
                 }
                 
-                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+                cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
                 
                 LIBCFS_ALLOC(rxb, sizeof(*rxb));
                 LIBCFS_ALLOC(buffer, bufsize);
@@ -85,7 +105,7 @@ kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count)
                         if (buffer != NULL)
                                 LIBCFS_FREE(buffer, bufsize);
                         
-                        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+                        cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
                         rc = -ENOMEM;
                         break;
                 }
@@ -100,33 +120,33 @@ kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count)
                 rxb->rxb_buffer = buffer;
                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
 
-                spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+                cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
                 
                 if (rxbp->rxbp_shutdown) {
-                        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+                        cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
                         
                         LIBCFS_FREE(rxb, sizeof(*rxb));
                         LIBCFS_FREE(buffer, bufsize);
 
-                        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+                        cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
                         rc = -ESHUTDOWN;
                         break;
                 }
                 
-                list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list);
+                cfs_list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list);
                 rxbp->rxbp_count++;
 
-                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+                cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
                 
                 kptllnd_rx_buffer_post(rxb);
 
-                spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+                cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
         }
 
         if (rc == 0)
                 rxbp->rxbp_reserved += count;
 
-        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+        cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
 
         return rc;
 }
@@ -137,12 +157,12 @@ kptllnd_rx_buffer_pool_unreserve(kptl_rx_buffer_pool_t *rxbp,
 {
         unsigned long flags;
 
-        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+        cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
 
         CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count);
         rxbp->rxbp_reserved -= count;
 
-        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+        cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
 }
 
 void
@@ -152,11 +172,11 @@ kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp)
         int                     rc;
         int                     i;
         unsigned long           flags;
-        struct list_head       *tmp;
-        struct list_head       *nxt;
+        cfs_list_t             *tmp;
+        cfs_list_t             *nxt;
         ptl_handle_md_t         mdh;
 
-        /* CAVEAT EMPTOR: I'm racing with everything here!!!  
+        /* CAVEAT EMPTOR: I'm racing with everything here!!!
          *
          * Buffers can still be posted after I set rxbp_shutdown because I
          * can't hold rxbp_lock while I'm posting them.
@@ -167,20 +187,20 @@ kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp)
          * different MD) from when the MD is actually unlinked, to when the
          * event callback tells me it has been unlinked. */
 
-        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+        cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
 
         rxbp->rxbp_shutdown = 1;
 
         for (i = 9;; i++) {
-                list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) {
-                        rxb = list_entry (tmp, kptl_rx_buffer_t, rxb_list);
-                
+                cfs_list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) {
+                        rxb = cfs_list_entry (tmp, kptl_rx_buffer_t, rxb_list);
+
                         if (rxb->rxb_idle) {
-                                spin_unlock_irqrestore(&rxbp->rxbp_lock, 
-                                                       flags);
+                                cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock,
+                                                           flags);
                                 kptllnd_rx_buffer_destroy(rxb);
-                                spin_lock_irqsave(&rxbp->rxbp_lock, 
-                                                  flags);
+                                cfs_spin_lock_irqsave(&rxbp->rxbp_lock,
+                                                      flags);
                                 continue;
                         }
 
@@ -188,11 +208,11 @@ kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp)
                         if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE))
                                 continue;
                         
-                        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+                        cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
 
                         rc = PtlMDUnlink(mdh);
 
-                        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+                        cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
                         
 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
                         /* callback clears rxb_mdh and drops net's ref
@@ -208,10 +228,10 @@ kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp)
 #endif
                 }
 
-                if (list_empty(&rxbp->rxbp_list))
+                if (cfs_list_empty(&rxbp->rxbp_list))
                         break;
 
-                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+                cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
 
                 /* Wait a bit for references to be dropped */
                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
@@ -220,10 +240,10 @@ kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp)
 
                 cfs_pause(cfs_time_seconds(1));
 
-                spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+                cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
         }
 
-        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+        cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
 }
 
 void
@@ -237,7 +257,7 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
         kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
         unsigned long           flags;
 
-        LASSERT (!in_interrupt());
+        LASSERT (!cfs_in_interrupt());
         LASSERT (rxb->rxb_refcount == 0);
         LASSERT (!rxb->rxb_idle);
         LASSERT (!rxb->rxb_posted);
@@ -246,18 +266,18 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
         any.nid = PTL_NID_ANY;
         any.pid = PTL_PID_ANY;
 
-        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+        cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
 
         if (rxbp->rxbp_shutdown) {
                 rxb->rxb_idle = 1;
-                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+                cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
                 return;
         }
 
         rxb->rxb_refcount = 1;                  /* net's ref */
         rxb->rxb_posted = 1;                    /* I'm posting */
         
-        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+        cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
 
         rc = PtlMEAttach(kptllnd_data.kptl_nih,
                          *kptllnd_tunables.kptl_portal,
@@ -268,7 +288,8 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
                          PTL_INS_AFTER,
                          &meh);
         if (rc != PTL_OK) {
-                CERROR("PtlMeAttach rxb failed %d\n", rc);
+                CERROR("PtlMeAttach rxb failed %s(%d)\n",
+                       kptllnd_errtype2str(rc), rc);
                 goto failed;
         }
 
@@ -276,7 +297,7 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
          * Setup MD
          */
         md.start = rxb->rxb_buffer;
-        md.length = PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages;
+        md.length = kptllnd_rx_buffer_size();
         md.threshold = PTL_MD_THRESH_INF;
         md.options = PTL_MD_OP_PUT |
                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
@@ -289,23 +310,24 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
 
         rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh);
         if (rc == PTL_OK) {
-                spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+                cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
                 if (rxb->rxb_posted)            /* Not auto-unlinked yet!!! */
                         rxb->rxb_mdh = mdh;
-                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+                cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
                 return;
         }
         
-        CERROR("PtlMDAttach rxb failed %d\n", rc);
+        CERROR("PtlMDAttach rxb failed %s(%d)\n",
+               kptllnd_errtype2str(rc), rc);
         rc = PtlMEUnlink(meh);
         LASSERT(rc == PTL_OK);
 
  failed:
-        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+        cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
         rxb->rxb_posted = 0;
         /* XXX this will just try again immediately */
         kptllnd_rx_buffer_decref_locked(rxb);
-        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+        cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
 }
 
 kptl_rx_t *
@@ -329,12 +351,15 @@ kptllnd_rx_alloc(void)
 }
 
 void
-kptllnd_rx_done(kptl_rx_t *rx)
+kptllnd_rx_done(kptl_rx_t *rx, int post_credit)
 {
         kptl_rx_buffer_t *rxb = rx->rx_rxb;
         kptl_peer_t      *peer = rx->rx_peer;
         unsigned long     flags;
 
+        LASSERT (post_credit == PTLLND_POSTRX_NO_CREDIT ||
+                 post_credit == PTLLND_POSTRX_PEER_CREDIT);
+
         CDEBUG(D_NET, "rx=%p rxb %p peer %p\n", rx, rxb, peer);
 
         if (rxb != NULL)
@@ -342,17 +367,21 @@ kptllnd_rx_done(kptl_rx_t *rx)
 
         if (peer != NULL) {
                 /* Update credits (after I've decref-ed the buffer) */
-                spin_lock_irqsave(&peer->peer_lock, flags);
+                cfs_spin_lock_irqsave(&peer->peer_lock, flags);
 
-                peer->peer_outstanding_credits++;
-                LASSERT (peer->peer_outstanding_credits <=
-                         *kptllnd_tunables.kptl_peercredits);
+                if (post_credit == PTLLND_POSTRX_PEER_CREDIT)
+                        peer->peer_outstanding_credits++;
 
-                CDEBUG(D_NETTRACE, "%s[%d/%d]: rx %p done\n",
-                       libcfs_id2str(peer->peer_id),
-                       peer->peer_credits, peer->peer_outstanding_credits, rx);
+                LASSERT (peer->peer_outstanding_credits +
+                         peer->peer_sent_credits <=
+                         *kptllnd_tunables.kptl_peertxcredits);
 
-                spin_unlock_irqrestore(&peer->peer_lock, flags);
+                CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n",
+                       libcfs_id2str(peer->peer_id), peer->peer_credits,
+                       peer->peer_outstanding_credits, peer->peer_sent_credits,
+                       rx);
+
+                cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
 
                 /* I might have to send back credits */
                 kptllnd_peer_check_sends(peer);
@@ -378,28 +407,29 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev)
         unlinked = ev->type == PTL_EVENT_UNLINK;
 #endif
 
-        CDEBUG(D_NET, "RXB Callback %s(%d) rxb=%p id=%s unlink=%d rc %d\n",
-               kptllnd_evtype2str(ev->type), ev->type, rxb, 
-               kptllnd_ptlid2str(ev->initiator), 
-               unlinked, ev->ni_fail_type);
+        CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n",
+               kptllnd_ptlid2str(ev->initiator),
+               kptllnd_evtype2str(ev->type), ev->type, rxb,
+               kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type,
+               unlinked);
 
         LASSERT (!rxb->rxb_idle);
         LASSERT (ev->md.start == rxb->rxb_buffer);
-        LASSERT (ev->offset + ev->mlength <= 
+        LASSERT (ev->offset + ev->mlength <=
                  PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages);
-        LASSERT (ev->type == PTL_EVENT_PUT_END || 
+        LASSERT (ev->type == PTL_EVENT_PUT_END ||
                  ev->type == PTL_EVENT_UNLINK);
         LASSERT (ev->type == PTL_EVENT_UNLINK ||
                  ev->match_bits == LNET_MSG_MATCHBITS);
 
-        if (ev->ni_fail_type != PTL_NI_OK)
-                CERROR("event type %d, status %d from %s\n",
-                       ev->type, ev->ni_fail_type,
-                       kptllnd_ptlid2str(ev->initiator));
-
-        if (ev->type == PTL_EVENT_PUT_END &&
-            ev->ni_fail_type == PTL_NI_OK &&
-            !rxbp->rxbp_shutdown) {
+        if (ev->ni_fail_type != PTL_NI_OK) {
+                CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn",
+                       kptllnd_ptlid2str(ev->initiator),
+                       kptllnd_evtype2str(ev->type), ev->type, rxb,
+                       kptllnd_errtype2str(ev->ni_fail_type),
+                       ev->ni_fail_type, unlinked);
+        } else if (ev->type == PTL_EVENT_PUT_END &&
+                   !rxbp->rxbp_shutdown) {
 
                 /* rxbp_shutdown sampled without locking!  I only treat it as a
                  * hint since shutdown can start while rx's are queued on
@@ -409,10 +439,10 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev)
                  * odd-length message will misalign subsequent messages and
                  * force the fixup below...  */
                 if ((ev->mlength & 7) != 0)
-                        CWARN("Message from %s has odd length %d: "
+                        CWARN("Message from %s has odd length "LPU64": "
                               "probable version incompatibility\n",
                               kptllnd_ptlid2str(ev->initiator),
-                              ev->mlength);
+                              (__u64)ev->mlength);
 #endif
                 rx = kptllnd_rx_alloc();
                 if (rx == NULL) {
@@ -430,7 +460,7 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev)
                                 /* Portals can't force alignment - copy into
                                  * rx_space (avoiding overflow) to fix */
                                 int maxlen = *kptllnd_tunables.kptl_max_msg_size;
-                                
+
                                 rx->rx_rxb = NULL;
                                 rx->rx_nob = MIN(maxlen, ev->mlength);
                                 rx->rx_msg = (kptl_msg_t *)rx->rx_space;
@@ -443,35 +473,33 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev)
                         }
 
                         rx->rx_initiator = ev->initiator;
-#ifdef CRAY_XT3
-                        rx->rx_uid = ev->uid;
-#endif
+                        rx->rx_treceived = jiffies;
                         /* Queue for attention */
-                        spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, 
-                                          flags);
+                        cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
+                                              flags);
 
-                        list_add_tail(&rx->rx_list, 
-                                      &kptllnd_data.kptl_sched_rxq);
-                        wake_up(&kptllnd_data.kptl_sched_waitq);
+                        cfs_list_add_tail(&rx->rx_list,
+                                          &kptllnd_data.kptl_sched_rxq);
+                        cfs_waitq_signal(&kptllnd_data.kptl_sched_waitq);
 
-                        spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, 
-                                               flags);
+                        cfs_spin_unlock_irqrestore(&kptllnd_data. \
+                                                   kptl_sched_lock, flags);
                 }
         }
 
         if (unlinked) {
-                spin_lock_irqsave(&rxbp->rxbp_lock, flags);
+                cfs_spin_lock_irqsave(&rxbp->rxbp_lock, flags);
 
                 rxb->rxb_posted = 0;
                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
                 kptllnd_rx_buffer_decref_locked(rxb);
 
-                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
+                cfs_spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
         }
 }
 
 void
-kptllnd_nak (kptl_rx_t *rx)
+kptllnd_nak (ptl_process_id_t dest)
 {
         /* Fire-and-forget a stub message that will let the peer know my
          * protocol magic/version and make her drop/refresh any peer state she
@@ -488,32 +516,57 @@ kptllnd_nak (kptl_rx_t *rx)
 
         rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);
         if (rc != PTL_OK) {
-                CWARN("Can't NAK %s: bind failed %d\n",
-                      kptllnd_ptlid2str(rx->rx_initiator), rc);
+                CWARN("Can't NAK %s: bind failed %s(%d)\n",
+                      kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc);
                 return;
         }
 
-        rc = PtlPut(mdh, PTL_NOACK_REQ, rx->rx_initiator,
+        rc = PtlPut(mdh, PTL_NOACK_REQ, dest,
                     *kptllnd_tunables.kptl_portal, 0,
                     LNET_MSG_MATCHBITS, 0, 0);
+        if (rc != PTL_OK) {
+                CWARN("Can't NAK %s: put failed %s(%d)\n",
+                      kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc);
+        }
+}
+
+kptl_net_t *
+kptllnd_find_net (lnet_nid_t nid)
+{
+        kptl_net_t *net;
+
+        cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
+        cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
+                LASSERT (!net->net_shutdown);
+
+                if (net->net_ni->ni_nid == nid) {
+                        kptllnd_net_addref(net);
+                        cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
+                        return net;
+                }
+        }
+        cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
 
-        if (rc != PTL_OK)
-                CWARN("Can't NAK %s: put failed %d\n",
-                      kptllnd_ptlid2str(rx->rx_initiator), rc);
+        return NULL;
 }
 
 void
 kptllnd_rx_parse(kptl_rx_t *rx)
 {
         kptl_msg_t             *msg = rx->rx_msg;
+        int                     rc = 0;
+        int                     post_credit = PTLLND_POSTRX_PEER_CREDIT;
+        kptl_net_t             *net = NULL;
         kptl_peer_t            *peer;
-        int                     rc;
-        int                     credits;
+        cfs_list_t              txs;
         unsigned long           flags;
         lnet_process_id_t       srcid;
 
+        LASSERT (!cfs_in_interrupt());
         LASSERT (rx->rx_peer == NULL);
 
+        CFS_INIT_LIST_HEAD(&txs);
+
         if ((rx->rx_nob >= 4 &&
              (msg->ptlm_magic == LNET_PROTO_MAGIC ||
               msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) ||
@@ -529,7 +582,8 @@ kptllnd_rx_parse(kptl_rx_t *rx)
                        (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ?
                                msg->ptlm_version : __swab16(msg->ptlm_version)),
                         PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator));
-                kptllnd_nak(rx);
+                /* NB backward compatibility */
+                kptllnd_nak(rx->rx_initiator);
                 goto rx_done;
         }
         
@@ -543,11 +597,14 @@ kptllnd_rx_parse(kptl_rx_t *rx)
         srcid.nid = msg->ptlm_srcnid;
         srcid.pid = msg->ptlm_srcpid;
 
-        CDEBUG(D_NETTRACE, "%s: RX %s c %d %p\n", libcfs_id2str(srcid),
-               kptllnd_msgtype2str(msg->ptlm_type), msg->ptlm_credits, rx);
+        CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks (%ld s)\n",
+               libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type),
+               msg->ptlm_credits, rx, rx->rx_rxb, 
+               jiffies - rx->rx_treceived,
+               cfs_duration_sec(jiffies - rx->rx_treceived));
 
-        if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) {
-                CERROR("Bad source id %s from %s\n",
+        if (kptllnd_lnet2ptlnid(srcid.nid) != rx->rx_initiator.nid) {
+                CERROR("Bad source nid %s from %s\n",
                        libcfs_id2str(srcid),
                        kptllnd_ptlid2str(rx->rx_initiator));
                 goto rx_done;
@@ -558,95 +615,137 @@ kptllnd_rx_parse(kptl_rx_t *rx)
                 if (peer == NULL)
                         goto rx_done;
                 
-                CWARN("NAK from %s (%s)\n",
-                      libcfs_id2str(srcid),
+                CWARN("NAK from %s (%d:%s)\n",
+                      libcfs_id2str(srcid), peer->peer_state,
                       kptllnd_ptlid2str(rx->rx_initiator));
 
+                /* NB can't nuke new peer - bug 17546 comment 31 */
+                if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
+                        CDEBUG(D_NET, "Stale NAK from %s(%s): WAITING_HELLO\n",
+                               libcfs_id2str(srcid),
+                               kptllnd_ptlid2str(rx->rx_initiator));
+                        kptllnd_peer_decref(peer);
+                        goto rx_done;
+                }
+
                 rc = -EPROTO;
                 goto failed;
         }
 
-        if (msg->ptlm_dstnid != kptllnd_data.kptl_ni->ni_nid ||
-            msg->ptlm_dstpid != the_lnet.ln_pid) {
-                CERROR("Bad dstid %s (expected %s) from %s\n",
+        net = kptllnd_find_net(msg->ptlm_dstnid);
+        if (net == NULL || msg->ptlm_dstpid != the_lnet.ln_pid) {
+                CERROR("Bad dstid %s from %s\n",
                        libcfs_id2str((lnet_process_id_t) {
                                .nid = msg->ptlm_dstnid,
                                .pid = msg->ptlm_dstpid}),
-                       libcfs_id2str((lnet_process_id_t) {
-                               .nid = kptllnd_data.kptl_ni->ni_nid,
-                               .pid = the_lnet.ln_pid}),
                        kptllnd_ptlid2str(rx->rx_initiator));
                 goto rx_done;
         }
 
+        if (LNET_NIDNET(srcid.nid) != LNET_NIDNET(net->net_ni->ni_nid)) {
+                lnet_nid_t nid = LNET_MKNID(LNET_NIDNET(net->net_ni->ni_nid),
+                                            LNET_NIDADDR(srcid.nid));
+                CERROR("Bad source nid %s from %s, %s expected.\n",
+                       libcfs_id2str(srcid),
+                       kptllnd_ptlid2str(rx->rx_initiator),
+                       libcfs_nid2str(nid));
+                goto rx_done;
+        }
+
         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
-                peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg);
-                if (peer == NULL) {
-                        CWARN("No peer for %s\n",
-                              kptllnd_ptlid2str(rx->rx_initiator));
+                peer = kptllnd_peer_handle_hello(net, rx->rx_initiator, msg);
+                if (peer == NULL)
                         goto rx_done;
-                }
         } else {
                 peer = kptllnd_id2peer(srcid);
                 if (peer == NULL) {
-                        CWARN("NAK %s: no connection; peer must reconnect\n",
+                        CWARN("NAK %s: no connection, %s must reconnect\n",
+                              kptllnd_msgtype2str(msg->ptlm_type),
                               libcfs_id2str(srcid));
                         /* NAK to make the peer reconnect */
-                        kptllnd_nak(rx);
+                        kptllnd_nak(rx->rx_initiator);
                         goto rx_done;
                 }
 
-                /* Ignore anything else while I'm waiting for HELLO */
-                if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
+                /* Ignore any messages for a previous incarnation of me */
+                if (msg->ptlm_dststamp < peer->peer_myincarnation) {
                         kptllnd_peer_decref(peer);
                         goto rx_done;
                 }
-        }
 
-        LASSERT (msg->ptlm_srcnid == peer->peer_id.nid &&
-                 msg->ptlm_srcpid == peer->peer_id.pid);
+                if (msg->ptlm_dststamp != peer->peer_myincarnation) {
+                        CERROR("%s: Unexpected dststamp "LPX64" "
+                               "("LPX64" expected)\n",
+                               libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
+                               peer->peer_myincarnation);
+                        rc = -EPROTO;
+                        goto failed;
+                }
 
-        if (msg->ptlm_srcstamp != peer->peer_incarnation) {
-                CERROR("Stale rx from %s srcstamp "LPX64" expected "LPX64"\n",
-                       libcfs_id2str(peer->peer_id),
-                       msg->ptlm_srcstamp,
-                       peer->peer_incarnation);
-                rc = -EPROTO;
-                goto failed;
+                if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
+                        /* recoverable error - restart txs */
+                        cfs_spin_lock_irqsave(&peer->peer_lock, flags);
+                        kptllnd_cancel_txlist(&peer->peer_sendq, &txs);
+                        cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
+
+                        CWARN("NAK %s: Unexpected %s message\n",
+                              libcfs_id2str(srcid),
+                              kptllnd_msgtype2str(msg->ptlm_type));
+                        kptllnd_nak(rx->rx_initiator);
+                        rc = -EPROTO;
+                        goto failed;
+                }
+
+                if (msg->ptlm_srcstamp != peer->peer_incarnation) {
+                        CERROR("%s: Unexpected srcstamp "LPX64" "
+                               "("LPX64" expected)\n",
+                               libcfs_id2str(srcid),
+                               msg->ptlm_srcstamp,
+                               peer->peer_incarnation);
+                        rc = -EPROTO;
+                        goto failed;
+                }
         }
 
-        if (msg->ptlm_dststamp != kptllnd_data.kptl_incarnation &&
-            (msg->ptlm_type != PTLLND_MSG_TYPE_HELLO || /* HELLO sends a */
-             msg->ptlm_dststamp != 0)) {                /* zero dststamp */
-                CERROR("Stale rx from %s dststamp "LPX64" expected "LPX64"\n",
-                       libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
-                       kptllnd_data.kptl_incarnation);
+        LASSERTF (LNET_NIDADDR(msg->ptlm_srcnid) ==
+                         LNET_NIDADDR(peer->peer_id.nid), "m %s p %s\n",
+                  libcfs_nid2str(msg->ptlm_srcnid),
+                  libcfs_nid2str(peer->peer_id.nid));
+        LASSERTF (msg->ptlm_srcpid == peer->peer_id.pid, "m %u p %u\n",
+                  msg->ptlm_srcpid, peer->peer_id.pid);
+
+        cfs_spin_lock_irqsave(&peer->peer_lock, flags);
+
+        /* Check peer only sends when I've sent her credits */
+        if (peer->peer_sent_credits == 0) {
+                int  c = peer->peer_credits;
+                int oc = peer->peer_outstanding_credits;
+                int sc = peer->peer_sent_credits;
+
+                cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
+
+                CERROR("%s: buffer overrun [%d/%d+%d]\n",
+                       libcfs_id2str(peer->peer_id), c, sc, oc);
                 rc = -EPROTO;
                 goto failed;
         }
+        peer->peer_sent_credits--;
 
-        if (msg->ptlm_credits != 0) {
-                spin_lock_irqsave(&peer->peer_lock, flags);
+        /* No check for credit overflow - the peer may post new
+         * buffers after the startup handshake. */
+        peer->peer_credits += msg->ptlm_credits;
 
-                if (peer->peer_credits + msg->ptlm_credits >
-                    *kptllnd_tunables.kptl_peercredits) {
-                        credits = peer->peer_credits;
-                        spin_unlock_irqrestore(&peer->peer_lock, flags);
-                        
-                        CERROR("Credit overflow from %s: %d + %d > %d\n",
-                               libcfs_id2str(peer->peer_id),
-                               credits, msg->ptlm_credits,
-                               *kptllnd_tunables.kptl_peercredits);
-                        rc = -EPROTO;
-                        goto failed;
-                }
-                               
-                peer->peer_credits += msg->ptlm_credits;
+        /* This ensures the credit taken by NOOP can be returned */
+        if (msg->ptlm_type == PTLLND_MSG_TYPE_NOOP) {
+                peer->peer_outstanding_credits++;
+                post_credit = PTLLND_POSTRX_NO_CREDIT;
+        }
 
-                spin_unlock_irqrestore(&peer->peer_lock, flags);
+        cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
 
+        /* See if something can go out now that credits have come in */
+        if (msg->ptlm_credits != 0)
                 kptllnd_peer_check_sends(peer);
-        }
 
         /* ptllnd-level protocol correct - rx takes my ref on peer and increments
          * peer_outstanding_credits when it completes */
@@ -668,12 +767,14 @@ kptllnd_rx_parse(kptl_rx_t *rx)
 
         case PTLLND_MSG_TYPE_IMMEDIATE:
                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
-                rc = lnet_parse(kptllnd_data.kptl_ni,
+                rc = lnet_parse(net->net_ni,
                                 &msg->ptlm_u.immediate.kptlim_hdr,
                                 msg->ptlm_srcnid,
                                 rx, 0);
-                if (rc >= 0)                    /* kptllnd_recv owns 'rx' now */
+                if (rc >= 0) {                  /* kptllnd_recv owns 'rx' now */
+                        kptllnd_net_decref(net);
                         return;
+                }
                 goto failed;
                 
         case PTLLND_MSG_TYPE_PUT:
@@ -687,28 +788,37 @@ kptllnd_rx_parse(kptl_rx_t *rx)
                          PTL_RESERVED_MATCHBITS);
 
                 /* Update last match bits seen */
-                spin_lock_irqsave(&peer->peer_lock, flags);
+                cfs_spin_lock_irqsave(&peer->peer_lock, flags);
 
                 if (msg->ptlm_u.rdma.kptlrm_matchbits >
                     rx->rx_peer->peer_last_matchbits_seen)
                         rx->rx_peer->peer_last_matchbits_seen =
                                 msg->ptlm_u.rdma.kptlrm_matchbits;
 
-                spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
+                cfs_spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
 
-                rc = lnet_parse(kptllnd_data.kptl_ni,
+                rc = lnet_parse(net->net_ni,
                                 &msg->ptlm_u.rdma.kptlrm_hdr,
                                 msg->ptlm_srcnid,
                                 rx, 1);
-                if (rc >= 0)                    /* kptllnd_recv owns 'rx' now */
+                if (rc >= 0) {                  /* kptllnd_recv owns 'rx' now */
+                        kptllnd_net_decref(net);
                         return;
+                }
                 goto failed;
-         }
+        }
 
  failed:
+        LASSERT (rc != 0);
         kptllnd_peer_close(peer, rc);
         if (rx->rx_peer == NULL)                /* drop ref on peer */
                 kptllnd_peer_decref(peer);      /* unless rx_done will */
+        if (!cfs_list_empty(&txs)) {
+                LASSERT (net != NULL);
+                kptllnd_restart_txs(net, srcid, &txs);
+        }
  rx_done:
-        kptllnd_rx_done(rx);
+        if (net != NULL)
+                kptllnd_net_decref(net);
+        kptllnd_rx_done(rx, post_credit);
 }