Whamcloud - gitweb
Severity : major
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_rx_buf.c
index ad0f05d..364540b 100644 (file)
@@ -344,16 +344,15 @@ kptllnd_rx_done(kptl_rx_t *rx)
                 /* Update credits (after I've decref-ed the buffer) */
                 spin_lock_irqsave(&peer->peer_lock, flags);
 
-                peer->peer_active_rxs--;
-                LASSERT (peer->peer_active_rxs >= 0);
-
                 peer->peer_outstanding_credits++;
-                LASSERT (peer->peer_outstanding_credits <=
+                LASSERT (peer->peer_outstanding_credits +
+                         peer->peer_sent_credits <=
                          *kptllnd_tunables.kptl_peercredits);
 
-                CDEBUG(D_NETTRACE, "%s[%d/%d]: rx %p done\n",
-                       libcfs_id2str(peer->peer_id),
-                       peer->peer_credits, peer->peer_outstanding_credits, rx);
+                CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n",
+                       libcfs_id2str(peer->peer_id), peer->peer_credits,
+                       peer->peer_outstanding_credits, peer->peer_sent_credits,
+                       rx);
 
                 spin_unlock_irqrestore(&peer->peer_lock, flags);
 
@@ -381,10 +380,11 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev)
         unlinked = ev->type == PTL_EVENT_UNLINK;
 #endif
 
-        CDEBUG(D_NET, "RXB Callback %s(%d) rxb=%p id=%s unlink=%d rc %d\n",
-               kptllnd_evtype2str(ev->type), ev->type, rxb, 
+        CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n",
                kptllnd_ptlid2str(ev->initiator), 
-               unlinked, ev->ni_fail_type);
+               kptllnd_evtype2str(ev->type), ev->type, rxb, 
+               kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type,
+               unlinked);
 
         LASSERT (!rxb->rxb_idle);
         LASSERT (ev->md.start == rxb->rxb_buffer);
@@ -396,9 +396,11 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev)
                  ev->match_bits == LNET_MSG_MATCHBITS);
 
         if (ev->ni_fail_type != PTL_NI_OK)
-                CERROR("event type %d, status %d from %s\n",
-                       ev->type, ev->ni_fail_type,
-                       kptllnd_ptlid2str(ev->initiator));
+                CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn",
+                       kptllnd_ptlid2str(ev->initiator),
+                       kptllnd_evtype2str(ev->type), ev->type, rxb,
+                       kptllnd_errtype2str(ev->ni_fail_type),
+                       ev->ni_fail_type, unlinked);
 
         if (ev->type == PTL_EVENT_PUT_END &&
             ev->ni_fail_type == PTL_NI_OK &&
@@ -446,6 +448,7 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev)
                         }
 
                         rx->rx_initiator = ev->initiator;
+                        rx->rx_treceived = jiffies;
 #ifdef CRAY_XT3
                         rx->rx_uid = ev->uid;
 #endif
@@ -511,7 +514,6 @@ kptllnd_rx_parse(kptl_rx_t *rx)
         kptl_msg_t             *msg = rx->rx_msg;
         kptl_peer_t            *peer;
         int                     rc;
-        int                     credits;
         unsigned long           flags;
         lnet_process_id_t       srcid;
 
@@ -546,8 +548,9 @@ kptllnd_rx_parse(kptl_rx_t *rx)
         srcid.nid = msg->ptlm_srcnid;
         srcid.pid = msg->ptlm_srcpid;
 
-        CDEBUG(D_NETTRACE, "%s: RX %s c %d %p\n", libcfs_id2str(srcid),
-               kptllnd_msgtype2str(msg->ptlm_type), msg->ptlm_credits, rx);
+        CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks\n",
+               libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type),
+               msg->ptlm_credits, rx, rx->rx_rxb, jiffies - rx->rx_treceived);
 
         if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) {
                 CERROR("Bad source id %s from %s\n",
@@ -629,33 +632,23 @@ kptllnd_rx_parse(kptl_rx_t *rx)
 
         spin_lock_irqsave(&peer->peer_lock, flags);
 
-        if (peer->peer_active_rxs == *kptllnd_tunables.kptl_peercredits) {
-                spin_unlock_irqrestore(&peer->peer_lock, flags);
-                        
-                CERROR("Message overflow from %s: handling %d already\n",
-                       libcfs_id2str(peer->peer_id),
-                       *kptllnd_tunables.kptl_peercredits);
-                rc = -EPROTO;
-                goto failed;
-        }
-        
-        if (msg->ptlm_credits != 0 &&
-            peer->peer_credits + msg->ptlm_credits >
-            *kptllnd_tunables.kptl_peercredits) {
-                credits = peer->peer_credits;
+        /* Check peer only sends when I've sent her credits */
+        if (peer->peer_sent_credits == 0) {
+                int  c = peer->peer_credits;
+                int oc = peer->peer_outstanding_credits;
+                int sc = peer->peer_sent_credits;
+                
                 spin_unlock_irqrestore(&peer->peer_lock, flags);
 
-                CERROR("Credit overflow from %s: %d + %d > %d\n",
-                       libcfs_id2str(peer->peer_id),
-                       credits, msg->ptlm_credits,
-                       *kptllnd_tunables.kptl_peercredits);
-                rc = -EPROTO;
+                CERROR("%s: buffer overrun [%d/%d+%d]\n",
+                       libcfs_id2str(peer->peer_id), c, sc, oc);
                 goto failed;
         }
+        peer->peer_sent_credits--;
 
-        /* ptllnd-level protocol correct: account credits */
+        /* No check for credit overflow - the peer may post new
+         * buffers after the startup handshake. */
         peer->peer_credits += msg->ptlm_credits;
-        peer->peer_active_rxs++;
 
         spin_unlock_irqrestore(&peer->peer_lock, flags);
 
@@ -663,7 +656,9 @@ kptllnd_rx_parse(kptl_rx_t *rx)
         if (msg->ptlm_credits != 0)
                 kptllnd_peer_check_sends(peer);
 
-        rx->rx_peer = peer;                /* do buffer accounting on rxdone */
+        /* ptllnd-level protocol correct - rx takes my ref on peer and increments
+         * peer_outstanding_credits when it completes */
+        rx->rx_peer = peer;
         kptllnd_peer_alive(peer);
 
         switch (msg->ptlm_type) {