}
void
+kptllnd_queue_tx(kptl_peer_t *peer, kptl_tx_t *tx)
+{
+ /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
+ unsigned long flags;
+
+ spin_lock_irqsave(&peer->peer_lock, flags);
+
+ /* Ensure HELLO is sent first */
+ if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
+ list_add(&tx->tx_list, &peer->peer_noops);
+ else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
+ list_add(&tx->tx_list, &peer->peer_sendq);
+ else
+ list_add_tail(&tx->tx_list, &peer->peer_sendq);
+
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
+}
+
+
+void
kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
{
/* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
ptl_handle_md_t msg_mdh;
ptl_md_t md;
ptl_err_t prc;
- unsigned long flags;
LASSERT (!tx->tx_idle);
LASSERT (!tx->tx_active);
return;
}
- spin_lock_irqsave(&peer->peer_lock, flags);
tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
tx->tx_active = 1;
tx->tx_msg_mdh = msg_mdh;
+ kptllnd_queue_tx(peer, tx);
+}
- /* Ensure HELLO is sent first */
- if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
- list_add(&tx->tx_list, &peer->peer_noops);
- else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
- list_add(&tx->tx_list, &peer->peer_sendq);
- else
- list_add_tail(&tx->tx_list, &peer->peer_sendq);
+/* NB "restarts" comes from peer_sendq of a single peer */
+void
+kptllnd_restart_txs (lnet_process_id_t target, struct list_head *restarts)
+{
+ kptl_tx_t *tx;
+ kptl_tx_t *tmp;
+ kptl_peer_t *peer;
- spin_unlock_irqrestore(&peer->peer_lock, flags);
+ LASSERT (!list_empty(restarts));
+
+ if (kptllnd_find_target(&peer, target) != 0)
+ peer = NULL;
+
+ list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
+ LASSERT (tx->tx_peer != NULL);
+ LASSERT (tx->tx_type == TX_TYPE_GET_REQUEST ||
+ tx->tx_type == TX_TYPE_PUT_REQUEST ||
+ tx->tx_type == TX_TYPE_SMALL_MESSAGE);
+
+ list_del_init(&tx->tx_list);
+
+ if (peer == NULL ||
+ tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
+ kptllnd_tx_decref(tx);
+ continue;
+ }
+
+ LASSERT (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_NOOP);
+ tx->tx_status = 0;
+ tx->tx_active = 1;
+ kptllnd_peer_decref(tx->tx_peer);
+ tx->tx_peer = NULL;
+ kptllnd_set_tx_peer(tx, peer);
+ kptllnd_queue_tx(peer, tx); /* takes over my ref on tx */
+ }
+
+ if (peer == NULL)
+ return;
+
+ kptllnd_peer_check_sends(peer);
+ kptllnd_peer_decref(peer);
}
static inline int
return -ENOMEM;
}
+ hello_tx->tx_acked = 1;
kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
sizeof(kptl_hello_msg_t));
kptllnd_rx_parse(kptl_rx_t *rx)
{
kptl_msg_t *msg = rx->rx_msg;
+ int rc = 0;
int post_credit = PTLLND_POSTRX_PEER_CREDIT;
kptl_peer_t *peer;
- int rc;
+ struct list_head txs;
unsigned long flags;
lnet_process_id_t srcid;
LASSERT (rx->rx_peer == NULL);
+ INIT_LIST_HEAD(&txs);
+
if ((rx->rx_nob >= 4 &&
(msg->ptlm_magic == LNET_PROTO_MAGIC ||
msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) ||
if (peer == NULL)
goto rx_done;
- CWARN("NAK from %s (%s)\n",
- libcfs_id2str(srcid),
+ CWARN("NAK from %s (%d:%s)\n",
+ libcfs_id2str(srcid), peer->peer_state,
kptllnd_ptlid2str(rx->rx_initiator));
+ /* NB can't nuke new peer - bug 17546 comment 31 */
+ if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
+ CDEBUG(D_NET, "Stale NAK from %s(%s): WAITING_HELLO\n",
+ libcfs_id2str(srcid),
+ kptllnd_ptlid2str(rx->rx_initiator));
+ kptllnd_peer_decref(peer);
+ goto rx_done;
+ }
+
rc = -EPROTO;
goto failed;
}
} else {
peer = kptllnd_id2peer(srcid);
if (peer == NULL) {
- CWARN("NAK %s: no connection; peer must reconnect\n",
+ CWARN("NAK %s: no connection, %s must reconnect\n",
+ kptllnd_msgtype2str(msg->ptlm_type),
libcfs_id2str(srcid));
/* NAK to make the peer reconnect */
kptllnd_nak(rx);
goto rx_done;
}
- /* Ignore anything apart from HELLO while I'm waiting for it and
- * any messages for a previous incarnation of the connection */
- if (peer->peer_state == PEER_STATE_WAITING_HELLO ||
- msg->ptlm_dststamp < peer->peer_myincarnation) {
+ /* Ignore any messages for a previous incarnation of me */
+ if (msg->ptlm_dststamp < peer->peer_myincarnation) {
kptllnd_peer_decref(peer);
goto rx_done;
}
- if (msg->ptlm_srcstamp != peer->peer_incarnation) {
- CERROR("%s: Unexpected srcstamp "LPX64" "
+ if (msg->ptlm_dststamp != peer->peer_myincarnation) {
+ CERROR("%s: Unexpected dststamp "LPX64" "
"("LPX64" expected)\n",
- libcfs_id2str(peer->peer_id),
- msg->ptlm_srcstamp,
- peer->peer_incarnation);
+ libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
+ peer->peer_myincarnation);
rc = -EPROTO;
goto failed;
}
- if (msg->ptlm_dststamp != peer->peer_myincarnation) {
- CERROR("%s: Unexpected dststamp "LPX64" "
+ if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
+ /* recoverable error - restart txs */
+ spin_lock_irqsave(&peer->peer_lock, flags);
+ kptllnd_cancel_txlist(&peer->peer_sendq, &txs);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
+
+ CWARN("NAK %s: Unexpected %s message\n",
+ libcfs_id2str(srcid),
+ kptllnd_msgtype2str(msg->ptlm_type));
+ kptllnd_nak(rx);
+ rc = -EPROTO;
+ goto failed;
+ }
+
+ if (msg->ptlm_srcstamp != peer->peer_incarnation) {
+ CERROR("%s: Unexpected srcstamp "LPX64" "
"("LPX64" expected)\n",
- libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
- peer->peer_myincarnation);
+ libcfs_id2str(srcid),
+ msg->ptlm_srcstamp,
+ peer->peer_incarnation);
rc = -EPROTO;
goto failed;
}
CERROR("%s: buffer overrun [%d/%d+%d]\n",
libcfs_id2str(peer->peer_id), c, sc, oc);
+ rc = -EPROTO;
goto failed;
}
peer->peer_sent_credits--;
if (rc >= 0) /* kptllnd_recv owns 'rx' now */
return;
goto failed;
- }
+ }
failed:
+ LASSERT (rc != 0);
kptllnd_peer_close(peer, rc);
if (rx->rx_peer == NULL) /* drop ref on peer */
kptllnd_peer_decref(peer); /* unless rx_done will */
+ if (!list_empty(&txs))
+ kptllnd_restart_txs(srcid, &txs);
rx_done:
kptllnd_rx_done(rx, post_credit);
}