}
void
-kptllnd_peer_cancel_txs(kptl_peer_t *peer)
+kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
{
- struct list_head sendq;
- struct list_head activeq;
struct list_head *tmp;
struct list_head *nxt;
kptl_tx_t *tx;
- unsigned long flags;
-
- /* atomically grab all the peer's tx-es... */
- spin_lock_irqsave(&peer->peer_lock, flags);
-
- list_add(&sendq, &peer->peer_sendq);
- list_del_init(&peer->peer_sendq);
- list_for_each (tmp, &sendq) {
+ list_for_each_safe (tmp, nxt, peerq) {
tx = list_entry(tmp, kptl_tx_t, tx_list);
- tx->tx_active = 0;
- }
- list_add(&activeq, &peer->peer_activeq);
- list_del_init(&peer->peer_activeq);
- list_for_each (tmp, &activeq) {
- tx = list_entry(tmp, kptl_tx_t, tx_list);
+ list_del(&tx->tx_list);
+ list_add_tail(&tx->tx_list, txs);
+
+ tx->tx_status = -EIO;
tx->tx_active = 0;
}
+}
- spin_unlock_irqrestore(&peer->peer_lock, flags);
-
- /* ...then drop the peer's ref on them at leasure. This will get
- * kptllnd_tx_fini() to abort outstanding comms if necessary. */
+void
+kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
+{
+ unsigned long flags;
- list_for_each_safe (tmp, nxt, &sendq) {
- tx = list_entry(tmp, kptl_tx_t, tx_list);
- list_del(&tx->tx_list);
- tx->tx_status = -EIO;
- kptllnd_tx_decref(tx);
- }
+ spin_lock_irqsave(&peer->peer_lock, flags);
- list_for_each_safe (tmp, nxt, &activeq) {
- tx = list_entry(tmp, kptl_tx_t, tx_list);
- list_del(&tx->tx_list);
- tx->tx_status = -EIO;
- kptllnd_tx_decref(tx);
- }
+ kptllnd_cancel_txlist(&peer->peer_sendq, txs);
+ kptllnd_cancel_txlist(&peer->peer_activeq, txs);
+
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
}
void
kptllnd_handle_closing_peers ()
{
unsigned long flags;
+ struct list_head txs;
kptl_peer_t *peer;
struct list_head *tmp;
struct list_head *nxt;
+ kptl_tx_t *tx;
int idle;
/* Check with a read lock first to avoid blocking anyone */
read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
- idle = list_empty(&kptllnd_data.kptl_closing_peers);
+ idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
+ list_empty(&kptllnd_data.kptl_zombie_peers);
read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
if (idle)
return;
- /* Scan the closing peers and cancel their txs.
- * NB only safe while there is only a single watchdog */
+ INIT_LIST_HEAD(&txs);
write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+ /* Cancel txs on all zombie peers. NB anyone dropping the last peer
+ * ref removes it from this list, so I musn't drop the lock while
+ * scanning it. */
+ list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
+ peer = list_entry (tmp, kptl_peer_t, peer_list);
+
+ LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
+
+ kptllnd_peer_cancel_txs(peer, &txs);
+ }
+
+ /* Notify LNET and cancel txs on closing (i.e. newly closed) peers. NB
+ * I'm the only one removing from this list, but peers can be added on
+ * the end any time I drop the lock. */
+
list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
peer = list_entry (tmp, kptl_peer_t, peer_list);
write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
kptllnd_peer_notify(peer);
- kptllnd_peer_cancel_txs(peer);
+ kptllnd_peer_cancel_txs(peer, &txs);
kptllnd_peer_decref(peer);
write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
}
write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+
+ /* Drop peer's ref on all cancelled txs. This will get
+ * kptllnd_tx_fini() to abort outstanding comms if necessary. */
+
+ list_for_each_safe (tmp, nxt, &txs) {
+ tx = list_entry(tmp, kptl_tx_t, tx_list);
+ list_del(&tx->tx_list);
+ kptllnd_tx_decref(tx);
+ }
}
void
kptllnd_peer_unreserve_buffers();
peer->peer_error = why; /* stash 'why' only on first close */
+ peer->peer_state = PEER_STATE_CLOSING;
/* Schedule for immediate attention, taking peer table's ref */
list_add_tail(&peer->peer_list,
break;
case PEER_STATE_ZOMBIE:
- /* Schedule for attention at next timeout */
- kptllnd_peer_addref(peer);
- list_del(&peer->peer_list);
- list_add_tail(&peer->peer_list,
- &kptllnd_data.kptl_closing_peers);
- break;
-
case PEER_STATE_CLOSING:
break;
}
-
- peer->peer_state = PEER_STATE_CLOSING;
}
void
spin_lock_irqsave(&peer->peer_lock, flags);
+ peer->peer_retry_noop = 0;
+
if (list_empty(&peer->peer_sendq) &&
peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER &&
peer->peer_credits != 0) {
}
spin_lock_irqsave(&peer->peer_lock, flags);
+ peer->peer_retry_noop = (tx == NULL);
}
while (!list_empty(&peer->peer_sendq)) {
{
kptl_tx_t *tx;
struct list_head *tmp;
- unsigned long flags;
-
- spin_lock_irqsave(&peer->peer_lock, flags);
list_for_each(tmp, &peer->peer_sendq) {
tx = list_entry(peer->peer_sendq.next, kptl_tx_t, tx_list);
if (time_after_eq(jiffies, tx->tx_deadline)) {
kptllnd_tx_addref(tx);
- spin_unlock_irqrestore(&peer->peer_lock, flags);
return tx;
}
}
if (time_after_eq(jiffies, tx->tx_deadline)) {
kptllnd_tx_addref(tx);
- spin_unlock_irqrestore(&peer->peer_lock, flags);
return tx;
}
}
- spin_unlock_irqrestore(&peer->peer_lock, flags);
return NULL;
}
void
-kptllnd_peer_check_bucket (int idx)
+kptllnd_peer_check_bucket (int idx, int stamp)
{
struct list_head *peers = &kptllnd_data.kptl_peers[idx];
struct list_head *ptmp;
unsigned long flags;
int nsend;
int nactive;
+ int check_sends;
- CDEBUG(D_NET, "Bucket=%d\n", idx);
+ CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
again:
/* NB. Shared lock while I just look */
libcfs_id2str(peer->peer_id), peer->peer_credits,
peer->peer_outstanding_credits, peer->peer_sent_credits);
- /* In case we have enough credits to return via a
- * NOOP, but there were no non-blocking tx descs
- * free to do it last time... */
- kptllnd_peer_check_sends(peer);
+ spin_lock(&peer->peer_lock);
+
+ if (peer->peer_check_stamp == stamp) {
+ /* checked already this pass */
+ spin_unlock(&peer->peer_lock);
+ continue;
+ }
+ peer->peer_check_stamp = stamp;
tx = kptllnd_find_timed_out_tx(peer);
- if (tx == NULL)
+ check_sends = peer->peer_retry_noop;
+
+ spin_unlock(&peer->peer_lock);
+
+ if (tx == NULL && !check_sends)
continue;
kptllnd_peer_addref(peer); /* 1 ref for me... */
- read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
- flags);
+ read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+
+ if (tx == NULL) { /* nothing timed out */
+ kptllnd_peer_check_sends(peer);
+ kptllnd_peer_decref(peer); /* ...until here or... */
+
+ /* rescan after dropping the lock */
+ goto again;
+ }
spin_lock_irqsave(&peer->peer_lock, flags);
nsend = kptllnd_count_queue(&peer->peer_sendq);
if (peer != NULL) {
if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
/* An outgoing message instantiated 'peer' for me */
- CWARN("Outgoing instantiated peer %s\n", libcfs_id2str(lpid));
LASSERT(peer->peer_incarnation == 0);
peer->peer_state = PEER_STATE_ACTIVE;
peer->peer_next_matchbits = safe_matchbits;
peer->peer_max_msg_size =
msg->ptlm_u.hello.kptlhm_max_msg_size;
+
+ write_unlock_irqrestore(g_lock, flags);
+
+ CWARN("Outgoing instantiated peer %s\n",
+ libcfs_id2str(lpid));
} else {
LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
+
+ write_unlock_irqrestore(g_lock, flags);
+
/* WOW! Somehow this peer completed the HELLO
* handshake while I slept. I guess I could have slept
* while it rebooted and sent a new HELLO, so I'll fail
kptllnd_peer_decref(peer);
peer = NULL;
}
-
- write_unlock_irqrestore(g_lock, flags);
kptllnd_peer_unreserve_buffers();
kptllnd_peer_decref(new_peer);