Whamcloud - gitweb
* Print portals error string in ptllnd warnings/errors
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_rx_buf.c
index b83ab51..ce21e95 100644 (file)
@@ -268,7 +268,8 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
                          PTL_INS_AFTER,
                          &meh);
         if (rc != PTL_OK) {
-                CERROR("PtlMeAttach rxb failed %d\n", rc);
+                CERROR("PtlMeAttach rxb failed %s(%d)\n",
+                       kptllnd_errtype2str(rc), rc);
                 goto failed;
         }
 
@@ -296,7 +297,8 @@ kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
                 return;
         }
         
-        CERROR("PtlMDAttach rxb failed %d\n", rc);
+        CERROR("PtlMDAttach rxb failed %s(%d)\n",
+               kptllnd_errtype2str(rc), rc);
         rc = PtlMEUnlink(meh);
         LASSERT(rc == PTL_OK);
 
@@ -345,12 +347,14 @@ kptllnd_rx_done(kptl_rx_t *rx)
                 spin_lock_irqsave(&peer->peer_lock, flags);
 
                 peer->peer_outstanding_credits++;
-                LASSERT (peer->peer_outstanding_credits <=
+                LASSERT (peer->peer_outstanding_credits +
+                         peer->peer_sent_credits <=
                          *kptllnd_tunables.kptl_peercredits);
 
-                CDEBUG(D_NETTRACE, "%s[%d/%d]: rx %p done\n",
-                       libcfs_id2str(peer->peer_id),
-                       peer->peer_credits, peer->peer_outstanding_credits, rx);
+                CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n",
+                       libcfs_id2str(peer->peer_id), peer->peer_credits,
+                       peer->peer_outstanding_credits, peer->peer_sent_credits,
+                       rx);
 
                 spin_unlock_irqrestore(&peer->peer_lock, flags);
 
@@ -378,10 +382,11 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev)
         unlinked = ev->type == PTL_EVENT_UNLINK;
 #endif
 
-        CDEBUG(D_NET, "RXB Callback %s(%d) rxb=%p id=%s unlink=%d rc %d\n",
-               kptllnd_evtype2str(ev->type), ev->type, rxb, 
+        CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n",
                kptllnd_ptlid2str(ev->initiator), 
-               unlinked, ev->ni_fail_type);
+               kptllnd_evtype2str(ev->type), ev->type, rxb, 
+               kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type,
+               unlinked);
 
         LASSERT (!rxb->rxb_idle);
         LASSERT (ev->md.start == rxb->rxb_buffer);
@@ -392,14 +397,15 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev)
         LASSERT (ev->type == PTL_EVENT_UNLINK ||
                  ev->match_bits == LNET_MSG_MATCHBITS);
 
-        if (ev->ni_fail_type != PTL_NI_OK)
-                CERROR("event type %d, status %d from %s\n",
-                       ev->type, ev->ni_fail_type,
-                       kptllnd_ptlid2str(ev->initiator));
+        if (ev->ni_fail_type != PTL_NI_OK) {
+                CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn",
+                       kptllnd_ptlid2str(ev->initiator),
+                       kptllnd_evtype2str(ev->type), ev->type, rxb,
+                       kptllnd_errtype2str(ev->ni_fail_type),
+                       ev->ni_fail_type, unlinked);
 
-        if (ev->type == PTL_EVENT_PUT_END &&
-            ev->ni_fail_type == PTL_NI_OK &&
-            !rxbp->rxbp_shutdown) {
+        } else if (ev->type == PTL_EVENT_PUT_END &&
+                   !rxbp->rxbp_shutdown) {
 
                 /* rxbp_shutdown sampled without locking!  I only treat it as a
                  * hint since shutdown can start while rx's are queued on
@@ -409,7 +415,7 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev)
                  * odd-length message will misalign subsequent messages and
                  * force the fixup below...  */
                 if ((ev->mlength & 7) != 0)
-                        CWARN("Message from %s has odd length %d: "
+                        CWARN("Message from %s has odd length %llu: "
                               "probable version incompatibility\n",
                               kptllnd_ptlid2str(ev->initiator),
                               ev->mlength);
@@ -443,6 +449,7 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev)
                         }
 
                         rx->rx_initiator = ev->initiator;
+                        rx->rx_treceived = jiffies;
 #ifdef CRAY_XT3
                         rx->rx_uid = ev->uid;
 #endif
@@ -488,8 +495,9 @@ kptllnd_nak (kptl_rx_t *rx)
 
         rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);
         if (rc != PTL_OK) {
-                CWARN("Can't NAK %s: bind failed %d\n",
-                      kptllnd_ptlid2str(rx->rx_initiator), rc);
+                CWARN("Can't NAK %s: bind failed %s(%d)\n",
+                      kptllnd_ptlid2str(rx->rx_initiator),
+                      kptllnd_errtype2str(rc), rc);
                 return;
         }
 
@@ -498,8 +506,9 @@ kptllnd_nak (kptl_rx_t *rx)
                     LNET_MSG_MATCHBITS, 0, 0);
 
         if (rc != PTL_OK)
-                CWARN("Can't NAK %s: put failed %d\n",
-                      kptllnd_ptlid2str(rx->rx_initiator), rc);
+                CWARN("Can't NAK %s: put failed %s(%d)\n",
+                      kptllnd_ptlid2str(rx->rx_initiator),
+                      kptllnd_errtype2str(rc), rc);
 }
 
 void
@@ -508,7 +517,6 @@ kptllnd_rx_parse(kptl_rx_t *rx)
         kptl_msg_t             *msg = rx->rx_msg;
         kptl_peer_t            *peer;
         int                     rc;
-        int                     credits;
         unsigned long           flags;
         lnet_process_id_t       srcid;
 
@@ -543,8 +551,11 @@ kptllnd_rx_parse(kptl_rx_t *rx)
         srcid.nid = msg->ptlm_srcnid;
         srcid.pid = msg->ptlm_srcpid;
 
-        CDEBUG(D_NETTRACE, "%s: RX %s c %d %p\n", libcfs_id2str(srcid),
-               kptllnd_msgtype2str(msg->ptlm_type), msg->ptlm_credits, rx);
+        CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks (%ld s)\n",
+               libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type),
+               msg->ptlm_credits, rx, rx->rx_rxb, 
+               jiffies - rx->rx_treceived,
+               cfs_duration_sec(jiffies - rx->rx_treceived));
 
         if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) {
                 CERROR("Bad source id %s from %s\n",
@@ -581,11 +592,8 @@ kptllnd_rx_parse(kptl_rx_t *rx)
 
         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
                 peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg);
-                if (peer == NULL) {
-                        CWARN("No peer for %s\n",
-                              kptllnd_ptlid2str(rx->rx_initiator));
+                if (peer == NULL)
                         goto rx_done;
-                }
         } else {
                 peer = kptllnd_id2peer(srcid);
                 if (peer == NULL) {
@@ -596,57 +604,62 @@ kptllnd_rx_parse(kptl_rx_t *rx)
                         goto rx_done;
                 }
 
-                /* Ignore anything else while I'm waiting for HELLO */
-                if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
+                /* Ignore anything apart from HELLO while I'm waiting for it and
+                 * any messages for a previous incarnation of the connection */
+                if (peer->peer_state == PEER_STATE_WAITING_HELLO ||
+                    msg->ptlm_dststamp < peer->peer_myincarnation) {
                         kptllnd_peer_decref(peer);
                         goto rx_done;
                 }
+
+                if (msg->ptlm_srcstamp != peer->peer_incarnation) {
+                        CERROR("%s: Unexpected srcstamp "LPX64" "
+                               "("LPX64" expected)\n",
+                               libcfs_id2str(peer->peer_id),
+                               msg->ptlm_srcstamp,
+                               peer->peer_incarnation);
+                        rc = -EPROTO;
+                        goto failed;
+                }
+
+                if (msg->ptlm_dststamp != peer->peer_myincarnation) {
+                        CERROR("%s: Unexpected dststamp "LPX64" "
+                               "("LPX64" expected)\n",
+                               libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
+                               peer->peer_myincarnation);
+                        rc = -EPROTO;
+                        goto failed;
+                }
         }
 
         LASSERT (msg->ptlm_srcnid == peer->peer_id.nid &&
                  msg->ptlm_srcpid == peer->peer_id.pid);
 
-        if (msg->ptlm_srcstamp != peer->peer_incarnation) {
-                CERROR("Stale rx from %s srcstamp "LPX64" expected "LPX64"\n",
-                       libcfs_id2str(peer->peer_id),
-                       msg->ptlm_srcstamp,
-                       peer->peer_incarnation);
-                rc = -EPROTO;
-                goto failed;
-        }
+        spin_lock_irqsave(&peer->peer_lock, flags);
 
-        if (msg->ptlm_dststamp != kptllnd_data.kptl_incarnation &&
-            (msg->ptlm_type != PTLLND_MSG_TYPE_HELLO || /* HELLO sends a */
-             msg->ptlm_dststamp != 0)) {                /* zero dststamp */
-                CERROR("Stale rx from %s dststamp "LPX64" expected "LPX64"\n",
-                       libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
-                       kptllnd_data.kptl_incarnation);
-                rc = -EPROTO;
+        /* Check peer only sends when I've sent her credits */
+        if (peer->peer_sent_credits == 0) {
+                int  c = peer->peer_credits;
+                int oc = peer->peer_outstanding_credits;
+                int sc = peer->peer_sent_credits;
+                
+                spin_unlock_irqrestore(&peer->peer_lock, flags);
+
+                CERROR("%s: buffer overrun [%d/%d+%d]\n",
+                       libcfs_id2str(peer->peer_id), c, sc, oc);
                 goto failed;
         }
+        peer->peer_sent_credits--;
 
-        if (msg->ptlm_credits != 0) {
-                spin_lock_irqsave(&peer->peer_lock, flags);
+        /* No check for credit overflow - the peer may post new
+         * buffers after the startup handshake. */
+        peer->peer_credits += msg->ptlm_credits;
 
-                if (peer->peer_credits + msg->ptlm_credits >
-                    *kptllnd_tunables.kptl_peercredits) {
-                        credits = peer->peer_credits;
-                        spin_unlock_irqrestore(&peer->peer_lock, flags);
-                        
-                        CERROR("Credit overflow from %s: %d + %d > %d\n",
-                               libcfs_id2str(peer->peer_id),
-                               credits, msg->ptlm_credits,
-                               *kptllnd_tunables.kptl_peercredits);
-                        rc = -EPROTO;
-                        goto failed;
-                }
-                               
-                peer->peer_credits += msg->ptlm_credits;
-
-                spin_unlock_irqrestore(&peer->peer_lock, flags);
+        spin_unlock_irqrestore(&peer->peer_lock, flags);
 
+        /* See if something can go out now that credits have come in */
+        if (msg->ptlm_credits != 0)
                 kptllnd_peer_check_sends(peer);
-        }
 
         /* ptllnd-level protocol correct - rx takes my ref on peer and increments
          * peer_outstanding_credits when it completes */