/* Update credits (after I've decref-ed the buffer) */
spin_lock_irqsave(&peer->peer_lock, flags);
+ peer->peer_active_rxs--;
+ LASSERT (peer->peer_active_rxs >= 0);
+
peer->peer_outstanding_credits++;
LASSERT (peer->peer_outstanding_credits <=
*kptllnd_tunables.kptl_peercredits);
if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg);
- if (peer == NULL) {
- CWARN("No peer for %s\n",
- kptllnd_ptlid2str(rx->rx_initiator));
+ if (peer == NULL)
goto rx_done;
- }
} else {
peer = kptllnd_id2peer(srcid);
if (peer == NULL) {
goto rx_done;
}
- /* Ignore anything else while I'm waiting for HELLO */
- if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
+ /* Ignore anything apart from HELLO while I'm waiting for it and
+ * any messages for a previous incarnation of the connection */
+ if (peer->peer_state == PEER_STATE_WAITING_HELLO ||
+ msg->ptlm_dststamp < peer->peer_myincarnation) {
kptllnd_peer_decref(peer);
goto rx_done;
}
+
+ if (msg->ptlm_srcstamp != peer->peer_incarnation) {
+ CERROR("%s: Unexpected srcstamp "LPX64" "
+ "("LPX64" expected)\n",
+ libcfs_id2str(peer->peer_id),
+ msg->ptlm_srcstamp,
+ peer->peer_incarnation);
+ rc = -EPROTO;
+ goto failed;
+ }
+
+ if (msg->ptlm_dststamp != peer->peer_myincarnation) {
+ CERROR("%s: Unexpected dststamp "LPX64" "
+ "("LPX64" expected)\n",
+ libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
+ peer->peer_myincarnation);
+ rc = -EPROTO;
+ goto failed;
+ }
}
LASSERT (msg->ptlm_srcnid == peer->peer_id.nid &&
msg->ptlm_srcpid == peer->peer_id.pid);
- if (msg->ptlm_srcstamp != peer->peer_incarnation) {
- CERROR("Stale rx from %s srcstamp "LPX64" expected "LPX64"\n",
+ spin_lock_irqsave(&peer->peer_lock, flags);
+
+ if (peer->peer_active_rxs == *kptllnd_tunables.kptl_peercredits) {
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
+
+ CERROR("Message overflow from %s: handling %d already\n",
libcfs_id2str(peer->peer_id),
- msg->ptlm_srcstamp,
- peer->peer_incarnation);
+ *kptllnd_tunables.kptl_peercredits);
rc = -EPROTO;
goto failed;
}
+
+ if (msg->ptlm_credits != 0 &&
+ peer->peer_credits + msg->ptlm_credits >
+ *kptllnd_tunables.kptl_peercredits) {
+ credits = peer->peer_credits;
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
- if (msg->ptlm_dststamp != kptllnd_data.kptl_incarnation &&
- (msg->ptlm_type != PTLLND_MSG_TYPE_HELLO || /* HELLO sends a */
- msg->ptlm_dststamp != 0)) { /* zero dststamp */
- CERROR("Stale rx from %s dststamp "LPX64" expected "LPX64"\n",
- libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
- kptllnd_data.kptl_incarnation);
+ CERROR("Credit overflow from %s: %d + %d > %d\n",
+ libcfs_id2str(peer->peer_id),
+ credits, msg->ptlm_credits,
+ *kptllnd_tunables.kptl_peercredits);
rc = -EPROTO;
goto failed;
}
- if (msg->ptlm_credits != 0) {
- spin_lock_irqsave(&peer->peer_lock, flags);
-
- if (peer->peer_credits + msg->ptlm_credits >
- *kptllnd_tunables.kptl_peercredits) {
- credits = peer->peer_credits;
- spin_unlock_irqrestore(&peer->peer_lock, flags);
-
- CERROR("Credit overflow from %s: %d + %d > %d\n",
- libcfs_id2str(peer->peer_id),
- credits, msg->ptlm_credits,
- *kptllnd_tunables.kptl_peercredits);
- rc = -EPROTO;
- goto failed;
- }
-
- peer->peer_credits += msg->ptlm_credits;
+ /* ptllnd-level protocol correct: account credits */
+ peer->peer_credits += msg->ptlm_credits;
+ peer->peer_active_rxs++;
- spin_unlock_irqrestore(&peer->peer_lock, flags);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
+ /* See if something can go out now that credits have come in */
+ if (msg->ptlm_credits != 0)
kptllnd_peer_check_sends(peer);
- }
- /* ptllnd-level protocol correct - rx takes my ref on peer and increments
- * peer_outstanding_credits when it completes */
- rx->rx_peer = peer;
+ rx->rx_peer = peer; /* do buffer accounting on rxdone */
kptllnd_peer_alive(peer);
switch (msg->ptlm_type) {