Whamcloud - gitweb
* bug 11659 fix - finer-grained peerstamps for ptllnd connection
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_rx_buf.c
index b83ab51..ad0f05d 100644 (file)
@@ -344,6 +344,9 @@ kptllnd_rx_done(kptl_rx_t *rx)
                 /* Update credits (after I've decref-ed the buffer) */
                 spin_lock_irqsave(&peer->peer_lock, flags);
 
+                peer->peer_active_rxs--;
+                LASSERT (peer->peer_active_rxs >= 0);
+
                 peer->peer_outstanding_credits++;
                 LASSERT (peer->peer_outstanding_credits <=
                          *kptllnd_tunables.kptl_peercredits);
@@ -581,11 +584,8 @@ kptllnd_rx_parse(kptl_rx_t *rx)
 
         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
                 peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg);
-                if (peer == NULL) {
-                        CWARN("No peer for %s\n",
-                              kptllnd_ptlid2str(rx->rx_initiator));
+                if (peer == NULL)
                         goto rx_done;
-                }
         } else {
                 peer = kptllnd_id2peer(srcid);
                 if (peer == NULL) {
@@ -596,61 +596,74 @@ kptllnd_rx_parse(kptl_rx_t *rx)
                         goto rx_done;
                 }
 
-                /* Ignore anything else while I'm waiting for HELLO */
-                if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
+                /* Ignore anything apart from HELLO while I'm waiting for it and
+                 * any messages for a previous incarnation of the connection */
+                if (peer->peer_state == PEER_STATE_WAITING_HELLO ||
+                    msg->ptlm_dststamp < peer->peer_myincarnation) {
                         kptllnd_peer_decref(peer);
                         goto rx_done;
                 }
+
+                if (msg->ptlm_srcstamp != peer->peer_incarnation) {
+                        CERROR("%s: Unexpected srcstamp "LPX64" "
+                               "("LPX64" expected)\n",
+                               libcfs_id2str(peer->peer_id),
+                               msg->ptlm_srcstamp,
+                               peer->peer_incarnation);
+                        rc = -EPROTO;
+                        goto failed;
+                }
+
+                if (msg->ptlm_dststamp != peer->peer_myincarnation) {
+                        CERROR("%s: Unexpected dststamp "LPX64" "
+                               "("LPX64" expected)\n",
+                               libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
+                               peer->peer_myincarnation);
+                        rc = -EPROTO;
+                        goto failed;
+                }
         }
 
         LASSERT (msg->ptlm_srcnid == peer->peer_id.nid &&
                  msg->ptlm_srcpid == peer->peer_id.pid);
 
-        if (msg->ptlm_srcstamp != peer->peer_incarnation) {
-                CERROR("Stale rx from %s srcstamp "LPX64" expected "LPX64"\n",
+        spin_lock_irqsave(&peer->peer_lock, flags);
+
+        if (peer->peer_active_rxs == *kptllnd_tunables.kptl_peercredits) {
+                spin_unlock_irqrestore(&peer->peer_lock, flags);
+                        
+                CERROR("Message overflow from %s: handling %d already\n",
                        libcfs_id2str(peer->peer_id),
-                       msg->ptlm_srcstamp,
-                       peer->peer_incarnation);
+                       *kptllnd_tunables.kptl_peercredits);
                 rc = -EPROTO;
                 goto failed;
         }
+        
+        if (msg->ptlm_credits != 0 &&
+            peer->peer_credits + msg->ptlm_credits >
+            *kptllnd_tunables.kptl_peercredits) {
+                credits = peer->peer_credits;
+                spin_unlock_irqrestore(&peer->peer_lock, flags);
 
-        if (msg->ptlm_dststamp != kptllnd_data.kptl_incarnation &&
-            (msg->ptlm_type != PTLLND_MSG_TYPE_HELLO || /* HELLO sends a */
-             msg->ptlm_dststamp != 0)) {                /* zero dststamp */
-                CERROR("Stale rx from %s dststamp "LPX64" expected "LPX64"\n",
-                       libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
-                       kptllnd_data.kptl_incarnation);
+                CERROR("Credit overflow from %s: %d + %d > %d\n",
+                       libcfs_id2str(peer->peer_id),
+                       credits, msg->ptlm_credits,
+                       *kptllnd_tunables.kptl_peercredits);
                 rc = -EPROTO;
                 goto failed;
         }
 
-        if (msg->ptlm_credits != 0) {
-                spin_lock_irqsave(&peer->peer_lock, flags);
-
-                if (peer->peer_credits + msg->ptlm_credits >
-                    *kptllnd_tunables.kptl_peercredits) {
-                        credits = peer->peer_credits;
-                        spin_unlock_irqrestore(&peer->peer_lock, flags);
-                        
-                        CERROR("Credit overflow from %s: %d + %d > %d\n",
-                               libcfs_id2str(peer->peer_id),
-                               credits, msg->ptlm_credits,
-                               *kptllnd_tunables.kptl_peercredits);
-                        rc = -EPROTO;
-                        goto failed;
-                }
-                               
-                peer->peer_credits += msg->ptlm_credits;
+        /* ptllnd-level protocol correct: account credits */
+        peer->peer_credits += msg->ptlm_credits;
+        peer->peer_active_rxs++;
 
-                spin_unlock_irqrestore(&peer->peer_lock, flags);
+        spin_unlock_irqrestore(&peer->peer_lock, flags);
 
+        /* See if something can go out now that credits have come in */
+        if (msg->ptlm_credits != 0)
                 kptllnd_peer_check_sends(peer);
-        }
 
-        /* ptllnd-level protocol correct - rx takes my ref on peer and increments
-         * peer_outstanding_credits when it completes */
-        rx->rx_peer = peer;
+        rx->rx_peer = peer;                /* do buffer accounting on rxdone */
         kptllnd_peer_alive(peer);
 
         switch (msg->ptlm_type) {