__u64 ibm_srcstamp; /* sender's incarnation */
__u64 ibm_dstnid; /* destination's NID */
__u64 ibm_dststamp; /* destination's incarnation */
+ __u64 ibm_seq; /* sequence number */
union {
kib_connparams_t connparams;
struct kib_conn *tx_conn; /* owning conn */
int tx_mapped; /* mapped for RDMA? */
int tx_sending; /* # tx callbacks outstanding */
+ int tx_queued; /* queued for sending */
int tx_waiting; /* waiting for peer */
int tx_status; /* completion status */
unsigned long tx_deadline; /* completion deadline */
struct kib_peer *ibc_peer; /* owning peer */
struct list_head ibc_list; /* stash on peer's conn list */
__u64 ibc_incarnation; /* which instance of the peer */
+ __u64 ibc_txseq; /* tx sequence number */
+ __u64 ibc_rxseq; /* rx sequence number */
atomic_t ibc_refcount; /* # users */
int ibc_state; /* what's happening */
atomic_t ibc_nob; /* # bytes buffered */
extern kib_tunables_t kibnal_tunables;
extern void kibnal_init_msg(kib_msg_t *msg, int type, int body_nob);
-extern void kibnal_pack_msg(kib_msg_t *msg, int credits,
- ptl_nid_t dstnid, __u64 dststamp);
+extern void kibnal_pack_msg(kib_msg_t *msg, int credits, ptl_nid_t dstnid,
+ __u64 dststamp, __u64 seq);
extern int kibnal_unpack_msg(kib_msg_t *msg, int nob);
extern kib_peer_t *kibnal_create_peer(ptl_nid_t nid);
extern void kibnal_destroy_peer(kib_peer_t *peer);
/* CAVEAT EMPTOR: tx takes caller's ref on conn */
LASSERT (tx->tx_nwrq > 0); /* work items set up */
+ LASSERT (!tx->tx_queued); /* not queued for sending already */
+
if (tx->tx_conn == NULL) {
kibnal_conn_addref(conn);
tx->tx_conn = conn;
LASSERT (tx->tx_conn == conn);
LASSERT (tx->tx_msg->ibm_type == IBNAL_MSG_PUT_DONE);
}
+ tx->tx_queued = 1;
tx->tx_deadline = jiffies + kibnal_tunables.kib_io_timeout * HZ;
list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
}
int i;
LASSERT (!in_interrupt());
- LASSERT (tx->tx_sending == 0); /* mustn't be awaiting callback */
+ LASSERT (!tx->tx_queued); /* mustn't be queued for sending */
+ LASSERT (tx->tx_sending == 0); /* mustn't be awaiting sent callback */
LASSERT (!tx->tx_waiting); /* mustn't be awaiting peer response */
#if !IBNAL_WHOLE_MEM
LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
#endif
LASSERT (tx->tx_nwrq == 0);
+ LASSERT (!tx->tx_queued);
LASSERT (tx->tx_sending == 0);
LASSERT (!tx->tx_waiting);
LASSERT (tx->tx_status == 0);
list_for_each(tmp, &conn->ibc_active_txs) {
kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
+ LASSERT (!tx->tx_queued);
LASSERT (tx->tx_sending != 0 || tx->tx_waiting);
if (tx->tx_cookie != cookie)
tx->tx_waiting = 0;
- idle = tx->tx_sending == 0;
+ idle = !tx->tx_queued && (tx->tx_sending == 0);
if (idle)
list_del(&tx->tx_list);
LASSERT (tx->tx_waiting);
/* CAVEAT EMPTOR: I could be racing with tx_complete, but...
* (a) I can overwrite tx_msg since my peer has received it!
- * (b) while tx_waiting is set, tx_complete() won't touch it.
- */
+ * (b) tx_waiting set tells tx_complete() it's not done. */
tx->tx_nwrq = 0; /* overwrite PUT_REQ */
}
void
-kibnal_rx_complete (kib_rx_t *rx, vv_comp_status_t vvrc, int nob)
+kibnal_rx_complete (kib_rx_t *rx, vv_comp_status_t vvrc, int nob, __u64 rxseq)
{
kib_msg_t *msg = rx->rx_msg;
kib_conn_t *conn = rx->rx_conn;
goto failed;
}
+ if (msg->ibm_seq != rxseq) {
+ CERROR ("Out-of-sequence rx from "LPX64
+ ": got "LPD64" but expected "LPD64"\n",
+ conn->ibc_peer->ibp_nid, msg->ibm_seq, rxseq);
+ goto failed;
+ }
+
/* racing with connection establishment/teardown! */
if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
while (!list_empty (&conn->ibc_tx_queue)) {
tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list);
+ LASSERT (tx->tx_queued);
/* We rely on this for QP sizing */
LASSERT (tx->tx_nwrq > 0 && tx->tx_nwrq <= 1 + IBNAL_MAX_RDMA_FRAGS);
}
list_del (&tx->tx_list);
+ tx->tx_queued = 0;
+
+ /* NB don't drop ibc_lock before bumping tx_sending */
if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
(!list_empty(&conn->ibc_tx_queue) ||
}
kibnal_pack_msg(tx->tx_msg, conn->ibc_outstanding_credits,
- conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
+ conn->ibc_peer->ibp_nid, conn->ibc_incarnation,
+ conn->ibc_txseq);
+ conn->ibc_txseq++;
conn->ibc_outstanding_credits = 0;
conn->ibc_nsends_posted++;
conn->ibc_credits--;
CDEBUG(D_NET, "tx %p conn %p sending %d nwrq %d vvrc %d\n",
tx, conn, tx->tx_sending, tx->tx_nwrq, vvrc);
- LASSERT (tx->tx_sending != 0);
+ LASSERT (tx->tx_sending > 0);
if (failed &&
tx->tx_status == 0 &&
conn->ibc_state == IBNAL_CONN_ESTABLISHED)
- CERROR ("Tx completion to "LPX64" failed: %d\n",
- conn->ibc_peer->ibp_nid, vvrc);
+ CERROR("tx -> "LPX64" type %x cookie "LPX64
+ "sending %d waiting %d: failed %d\n",
+ conn->ibc_peer->ibp_nid, tx->tx_msg->ibm_type,
+ tx->tx_cookie, tx->tx_sending, tx->tx_waiting, vvrc);
spin_lock(&conn->ibc_lock);
}
idle = (tx->tx_sending == 0) && /* This is the final callback */
- !tx->tx_waiting; /* Not waiting for peer */
+ !tx->tx_waiting && /* Not waiting for peer */
+ !tx->tx_queued; /* Not re-queued (PUT_DONE) */
if (idle)
list_del(&tx->tx_list);
if (peer == NULL) {
read_unlock_irqrestore(g_lock, flags);
tx->tx_status = -EHOSTUNREACH;
+ tx->tx_waiting = 0;
kibnal_tx_done (tx);
return;
}
if (peer == NULL) {
write_unlock_irqrestore(g_lock, flags);
tx->tx_status = -EHOSTUNREACH;
+ tx->tx_waiting = 0;
kibnal_tx_done (tx);
return;
}
if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
write_unlock_irqrestore(g_lock, flags);
tx->tx_status = -EHOSTUNREACH;
+ tx->tx_waiting = 0;
kibnal_tx_done (tx);
return;
}
* connection to be finished off by the connd. Otherwise the connd is
* already dealing with it (either to set it up or tear it down).
* Caller holds kib_global_lock exclusively in irq context */
- kib_peer_t *peer = conn->ibc_peer;
-
+ kib_peer_t *peer = conn->ibc_peer;
+ struct list_head *tmp;
+
LASSERT (error != 0 || conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
if (error != 0 && conn->ibc_comms_error == 0)
if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
return; /* already being handled */
- CDEBUG (error == 0 ? D_NET : D_ERROR,
- "closing conn to "LPX64": error %d\n", peer->ibp_nid, error);
+ spin_lock(&conn->ibc_lock);
+
+ if (error == 0 &&
+ list_empty(&conn->ibc_tx_queue) &&
+ list_empty(&conn->ibc_active_txs)) {
+ CDEBUG(D_NET, "closing conn to "LPX64
+ " rx# "LPD64" tx# "LPD64"\n",
+ peer->ibp_nid, conn->ibc_txseq, conn->ibc_rxseq);
+ } else {
+ CERROR("Closing conn to "LPX64": error %d%s%s"
+ " rx# "LPD64" tx# "LPD64"\n",
+ peer->ibp_nid, error,
+ list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
+ list_empty(&conn->ibc_active_txs) ? "" : "(waiting)",
+ conn->ibc_txseq, conn->ibc_rxseq);
+
+ list_for_each(tmp, &conn->ibc_tx_queue) {
+ kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
+
+ CERROR(" queued tx type %x cookie "LPX64
+ " sending %d waiting %d ticks %ld/%d\n",
+ tx->tx_msg->ibm_type, tx->tx_cookie,
+ tx->tx_sending, tx->tx_waiting,
+ (long)(tx->tx_deadline - jiffies), HZ);
+ }
+
+ list_for_each(tmp, &conn->ibc_active_txs) {
+ kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
+
+ CERROR(" active tx type %x cookie "LPX64
+ " sending %d waiting %d ticks %ld/%d\n",
+ tx->tx_msg->ibm_type, tx->tx_cookie,
+ tx->tx_sending, tx->tx_waiting,
+ (long)(tx->tx_deadline - jiffies), HZ);
+ }
+ }
+
+ spin_unlock(&conn->ibc_lock);
/* connd takes ibc_list's ref */
list_del (&conn->ibc_list);
list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) {
tx = list_entry (tmp, kib_tx_t, tx_list);
+ LASSERT (tx->tx_queued);
+
tx->tx_status = -ECONNABORTED;
+ tx->tx_queued = 0;
tx->tx_waiting = 0;
if (tx->tx_sending != 0)
list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) {
tx = list_entry (tmp, kib_tx_t, tx_list);
+ LASSERT (!tx->tx_queued);
LASSERT (tx->tx_waiting ||
tx->tx_sending != 0);
txmsg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
txmsg->ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
txmsg->ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
- kibnal_pack_msg(txmsg, 0, rxmsg->ibm_srcnid, rxmsg->ibm_srcstamp);
+ kibnal_pack_msg(txmsg, 0, rxmsg->ibm_srcnid, rxmsg->ibm_srcstamp, 0);
kibnal_set_conn_state(conn, IBNAL_CONN_PASSIVE_WAIT);
msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
msg->ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
msg->ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
- kibnal_pack_msg(msg, 0, peer->ibp_nid, 0);
+ kibnal_pack_msg(msg, 0, peer->ibp_nid, 0, 0);
CDEBUG(D_NET, "Connecting %p to "LPX64"\n", conn, peer->ibp_nid);
list_for_each (ttmp, &conn->ibc_tx_queue) {
tx = list_entry (ttmp, kib_tx_t, tx_list);
+ LASSERT (tx->tx_queued);
+
if (time_after_eq (jiffies, tx->tx_deadline)) {
spin_unlock(&conn->ibc_lock);
return 1;
list_for_each (ttmp, &conn->ibc_active_txs) {
tx = list_entry (ttmp, kib_tx_t, tx_list);
+ LASSERT (!tx->tx_queued);
LASSERT (tx->tx_waiting ||
tx->tx_sending != 0);
vv_return_t vvrc;
vv_return_t vvrc2;
unsigned long flags;
+ kib_rx_t *rx;
+ __u64 rxseq = 0;
int busy_loops = 0;
snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
vv_next_solicit_unsolicit_event);
LASSERT (vvrc2 == vv_return_ok);
}
-
+
+ if (vvrc == vv_return_ok &&
+ kibnal_wreqid2type(wc.wr_id) == IBNAL_WID_RX) {
+ rx = (kib_rx_t *)kibnal_wreqid2ptr(wc.wr_id);
+
+ /* Grab the RX sequence number NOW before
+ * anyone else can get an RX completion */
+ rxseq = rx->rx_conn->ibc_rxseq++;
+ }
+
spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
/* give up ownership of completion polling */
kibnal_data.kib_checking_cq = 0;
kibnal_rx_complete(
(kib_rx_t *)kibnal_wreqid2ptr(wc.wr_id),
wc.completion_status,
- wc.num_bytes_transfered);
+ wc.num_bytes_transfered,
+ rxseq);
break;
case IBNAL_WID_TX: