Whamcloud - gitweb
* fix for 5809: vibnal tx_sending race
authoreeb <eeb>
Thu, 10 Mar 2005 16:25:36 +0000 (16:25 +0000)
committereeb <eeb>
Thu, 10 Mar 2005 16:25:36 +0000 (16:25 +0000)
lnet/klnds/viblnd/viblnd.c
lnet/klnds/viblnd/viblnd.h
lnet/klnds/viblnd/viblnd_cb.c

index 8e5cfe7..d54d4c8 100644 (file)
@@ -75,7 +75,8 @@ kibnal_init_msg(kib_msg_t *msg, int type, int body_nob)
 }
 
 void
-kibnal_pack_msg(kib_msg_t *msg, int credits, ptl_nid_t dstnid, __u64 dststamp)
+kibnal_pack_msg(kib_msg_t *msg, int credits, ptl_nid_t dstnid, 
+                __u64 dststamp, __u64 seq)
 {
         /* CAVEAT EMPTOR! all message fields not set here should have been
          * initialised previously. */
@@ -89,6 +90,7 @@ kibnal_pack_msg(kib_msg_t *msg, int credits, ptl_nid_t dstnid, __u64 dststamp)
         msg->ibm_srcstamp = kibnal_data.kib_incarnation;
         msg->ibm_dstnid   = dstnid;
         msg->ibm_dststamp = dststamp;
+        msg->ibm_seq      = seq;
 #if IBNAL_CKSUM
         /* NB ibm_cksum zero while computing cksum */
         msg->ibm_cksum    = kibnal_cksum(msg, msg->ibm_nob);
@@ -158,6 +160,7 @@ kibnal_unpack_msg(kib_msg_t *msg, int nob)
                 __swab64s(&msg->ibm_srcstamp);
                 __swab64s(&msg->ibm_dstnid);
                 __swab64s(&msg->ibm_dststamp);
+                __swab64s(&msg->ibm_seq);
         }
         
         if (msg->ibm_srcnid == PTL_NID_ANY) {
index 1026995..c19a903 100644 (file)
@@ -339,6 +339,7 @@ typedef struct
         __u64             ibm_srcstamp;         /* sender's incarnation */
         __u64             ibm_dstnid;           /* destination's NID */
         __u64             ibm_dststamp;         /* destination's incarnation */
+        __u64             ibm_seq;              /* sequence number */
 
         union {
                 kib_connparams_t      connparams;
@@ -397,6 +398,7 @@ typedef struct kib_tx                           /* transmit message */
         struct kib_conn          *tx_conn;      /* owning conn */
         int                       tx_mapped;    /* mapped for RDMA? */
         int                       tx_sending;   /* # tx callbacks outstanding */
+        int                       tx_queued;    /* queued for sending */
         int                       tx_waiting;   /* waiting for peer */
         int                       tx_status;    /* completion status */
         unsigned long             tx_deadline;  /* completion deadline */
@@ -456,6 +458,8 @@ typedef struct kib_conn
         struct kib_peer    *ibc_peer;           /* owning peer */
         struct list_head    ibc_list;           /* stash on peer's conn list */
         __u64               ibc_incarnation;    /* which instance of the peer */
+        __u64               ibc_txseq;          /* tx sequence number */
+        __u64               ibc_rxseq;          /* rx sequence number */
         atomic_t            ibc_refcount;       /* # users */
         int                 ibc_state;          /* what's happening */
         atomic_t            ibc_nob;            /* # bytes buffered */
@@ -510,8 +514,8 @@ extern kib_data_t      kibnal_data;
 extern kib_tunables_t  kibnal_tunables;
 
 extern void kibnal_init_msg(kib_msg_t *msg, int type, int body_nob);
-extern void kibnal_pack_msg(kib_msg_t *msg, int credits, 
-                            ptl_nid_t dstnid, __u64 dststamp);
+extern void kibnal_pack_msg(kib_msg_t *msg, int credits, ptl_nid_t dstnid,
+                            __u64 dststamp, __u64 seq);
 extern int kibnal_unpack_msg(kib_msg_t *msg, int nob);
 extern kib_peer_t *kibnal_create_peer(ptl_nid_t nid);
 extern void kibnal_destroy_peer(kib_peer_t *peer);
@@ -613,6 +617,8 @@ kibnal_queue_tx_locked (kib_tx_t *tx, kib_conn_t *conn)
         /* CAVEAT EMPTOR: tx takes caller's ref on conn */
 
         LASSERT (tx->tx_nwrq > 0);              /* work items set up */
+        LASSERT (!tx->tx_queued);               /* not queued for sending already */
+
         if (tx->tx_conn == NULL) {
                 kibnal_conn_addref(conn);
                 tx->tx_conn = conn;
@@ -620,6 +626,7 @@ kibnal_queue_tx_locked (kib_tx_t *tx, kib_conn_t *conn)
                 LASSERT (tx->tx_conn == conn);
                 LASSERT (tx->tx_msg->ibm_type == IBNAL_MSG_PUT_DONE);
         }
+        tx->tx_queued = 1;
         tx->tx_deadline = jiffies + kibnal_tunables.kib_io_timeout * HZ;
         list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
 }
index 477d190..54c34d6 100644 (file)
@@ -31,7 +31,8 @@ kibnal_tx_done (kib_tx_t *tx)
         int              i;
 
         LASSERT (!in_interrupt());
-        LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
+        LASSERT (!tx->tx_queued);               /* mustn't be queued for sending */
+        LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting sent callback */
         LASSERT (!tx->tx_waiting);              /* mustn't be awaiting peer response */
 
 #if !IBNAL_WHOLE_MEM
@@ -129,6 +130,7 @@ kibnal_get_idle_tx (int may_block)
                 LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
 #endif
                 LASSERT (tx->tx_nwrq == 0);
+                LASSERT (!tx->tx_queued);
                 LASSERT (tx->tx_sending == 0);
                 LASSERT (!tx->tx_waiting);
                 LASSERT (tx->tx_status == 0);
@@ -236,6 +238,7 @@ kibnal_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie)
         list_for_each(tmp, &conn->ibc_active_txs) {
                 kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
                 
+                LASSERT (!tx->tx_queued);
                 LASSERT (tx->tx_sending != 0 || tx->tx_waiting);
 
                 if (tx->tx_cookie != cookie)
@@ -286,7 +289,7 @@ kibnal_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie)
         
         tx->tx_waiting = 0;
 
-        idle = tx->tx_sending == 0;
+        idle = !tx->tx_queued && (tx->tx_sending == 0);
         if (idle)
                 list_del(&tx->tx_list);
 
@@ -387,8 +390,7 @@ kibnal_handle_rx (kib_rx_t *rx)
                 LASSERT (tx->tx_waiting);
                 /* CAVEAT EMPTOR: I could be racing with tx_complete, but...
                  * (a) I can overwrite tx_msg since my peer has received it!
-                 * (b) while tx_waiting is set, tx_complete() won't touch it.
-                 */
+                 * (b) tx_waiting set tells tx_complete() it's not done. */
 
                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
 
@@ -436,7 +438,7 @@ kibnal_handle_rx (kib_rx_t *rx)
 }
 
 void
-kibnal_rx_complete (kib_rx_t *rx, vv_comp_status_t vvrc, int nob)
+kibnal_rx_complete (kib_rx_t *rx, vv_comp_status_t vvrc, int nob, __u64 rxseq)
 {
         kib_msg_t    *msg = rx->rx_msg;
         kib_conn_t   *conn = rx->rx_conn;
@@ -472,6 +474,13 @@ kibnal_rx_complete (kib_rx_t *rx, vv_comp_status_t vvrc, int nob)
                 goto failed;
         }
 
+        if (msg->ibm_seq != rxseq) {
+                CERROR ("Out-of-sequence rx from "LPX64
+                        ": got "LPD64" but expected "LPD64"\n",
+                        conn->ibc_peer->ibp_nid, msg->ibm_seq, rxseq);
+                goto failed;
+        }
+
         /* racing with connection establishment/teardown! */
 
         if (conn->ibc_state < IBNAL_CONN_ESTABLISHED) {
@@ -897,6 +906,7 @@ kibnal_check_sends (kib_conn_t *conn)
         while (!list_empty (&conn->ibc_tx_queue)) {
                 tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list);
 
+                LASSERT (tx->tx_queued);
                 /* We rely on this for QP sizing */
                 LASSERT (tx->tx_nwrq > 0 && tx->tx_nwrq <= 1 + IBNAL_MAX_RDMA_FRAGS);
 
@@ -925,6 +935,9 @@ kibnal_check_sends (kib_conn_t *conn)
                 }
                 
                 list_del (&tx->tx_list);
+                tx->tx_queued = 0;
+
+                /* NB don't drop ibc_lock before bumping tx_sending */
 
                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
                     (!list_empty(&conn->ibc_tx_queue) ||
@@ -939,8 +952,10 @@ kibnal_check_sends (kib_conn_t *conn)
                 }
 
                 kibnal_pack_msg(tx->tx_msg, conn->ibc_outstanding_credits,
-                                conn->ibc_peer->ibp_nid, conn->ibc_incarnation);
+                                conn->ibc_peer->ibp_nid, conn->ibc_incarnation,
+                                conn->ibc_txseq);
 
+                conn->ibc_txseq++;
                 conn->ibc_outstanding_credits = 0;
                 conn->ibc_nsends_posted++;
                 conn->ibc_credits--;
@@ -1018,13 +1033,15 @@ kibnal_tx_complete (kib_tx_t *tx, vv_comp_status_t vvrc)
         CDEBUG(D_NET, "tx %p conn %p sending %d nwrq %d vvrc %d\n", 
                tx, conn, tx->tx_sending, tx->tx_nwrq, vvrc);
 
-        LASSERT (tx->tx_sending != 0);
+        LASSERT (tx->tx_sending > 0);
 
         if (failed &&
             tx->tx_status == 0 &&
             conn->ibc_state == IBNAL_CONN_ESTABLISHED)
-                CERROR ("Tx completion to "LPX64" failed: %d\n", 
-                        conn->ibc_peer->ibp_nid, vvrc);
+                CERROR("tx -> "LPX64" type %x cookie "LPX64
+                       "sending %d waiting %d: failed %d\n", 
+                       conn->ibc_peer->ibp_nid, tx->tx_msg->ibm_type, 
+                       tx->tx_cookie, tx->tx_sending, tx->tx_waiting, vvrc);
 
         spin_lock(&conn->ibc_lock);
 
@@ -1040,7 +1057,8 @@ kibnal_tx_complete (kib_tx_t *tx, vv_comp_status_t vvrc)
         }
         
         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
-               !tx->tx_waiting;                 /* Not waiting for peer */
+               !tx->tx_waiting &&               /* Not waiting for peer */
+               !tx->tx_queued;                  /* Not re-queued (PUT_DONE) */
         if (idle)
                 list_del(&tx->tx_list);
 
@@ -1222,6 +1240,7 @@ kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
         if (peer == NULL) {
                 read_unlock_irqrestore(g_lock, flags);
                 tx->tx_status = -EHOSTUNREACH;
+                tx->tx_waiting = 0;
                 kibnal_tx_done (tx);
                 return;
         }
@@ -1244,6 +1263,7 @@ kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
         if (peer == NULL) {
                 write_unlock_irqrestore(g_lock, flags);
                 tx->tx_status = -EHOSTUNREACH;
+                tx->tx_waiting = 0;
                 kibnal_tx_done (tx);
                 return;
         }
@@ -1263,6 +1283,7 @@ kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
                 if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
                         write_unlock_irqrestore(g_lock, flags);
                         tx->tx_status = -EHOSTUNREACH;
+                        tx->tx_waiting = 0;
                         kibnal_tx_done (tx);
                         return;
                 }
@@ -1698,8 +1719,9 @@ kibnal_close_conn_locked (kib_conn_t *conn, int error)
          * connection to be finished off by the connd.  Otherwise the connd is
          * already dealing with it (either to set it up or tear it down).
          * Caller holds kib_global_lock exclusively in irq context */
-        kib_peer_t   *peer = conn->ibc_peer;
-
+        kib_peer_t       *peer = conn->ibc_peer;
+        struct list_head *tmp;
+        
         LASSERT (error != 0 || conn->ibc_state >= IBNAL_CONN_ESTABLISHED);
 
         if (error != 0 && conn->ibc_comms_error == 0)
@@ -1708,8 +1730,44 @@ kibnal_close_conn_locked (kib_conn_t *conn, int error)
         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
                 return; /* already being handled  */
 
-        CDEBUG (error == 0 ? D_NET : D_ERROR,
-                "closing conn to "LPX64": error %d\n", peer->ibp_nid, error);
+        spin_lock(&conn->ibc_lock);
+        
+        if (error == 0 &&
+            list_empty(&conn->ibc_tx_queue) &&
+            list_empty(&conn->ibc_active_txs)) {
+                CDEBUG(D_NET, "closing conn to "LPX64
+                       " rx# "LPD64" tx# "LPD64"\n", 
+                       peer->ibp_nid, conn->ibc_txseq, conn->ibc_rxseq);
+        } else {
+                CERROR("Closing conn to "LPX64": error %d%s%s"
+                       " rx# "LPD64" tx# "LPD64"\n",
+                       peer->ibp_nid, error,
+                       list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
+                       list_empty(&conn->ibc_active_txs) ? "" : "(waiting)",
+                       conn->ibc_txseq, conn->ibc_rxseq);
+
+                list_for_each(tmp, &conn->ibc_tx_queue) {
+                        kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
+                        
+                        CERROR("   queued tx type %x cookie "LPX64
+                               " sending %d waiting %d ticks %ld/%d\n", 
+                               tx->tx_msg->ibm_type, tx->tx_cookie, 
+                               tx->tx_sending, tx->tx_waiting,
+                               (long)(tx->tx_deadline - jiffies), HZ);
+                }
+
+                list_for_each(tmp, &conn->ibc_active_txs) {
+                        kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);
+                        
+                        CERROR("   active tx type %x cookie "LPX64
+                               " sending %d waiting %d ticks %ld/%d\n", 
+                               tx->tx_msg->ibm_type, tx->tx_cookie, 
+                               tx->tx_sending, tx->tx_waiting,
+                               (long)(tx->tx_deadline - jiffies), HZ);
+                }
+        }
+
+        spin_unlock(&conn->ibc_lock);
 
         /* connd takes ibc_list's ref */
         list_del (&conn->ibc_list);
@@ -1791,7 +1849,10 @@ kibnal_conn_disconnected(kib_conn_t *conn)
         list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) {
                 tx = list_entry (tmp, kib_tx_t, tx_list);
 
+                LASSERT (tx->tx_queued);
+
                 tx->tx_status = -ECONNABORTED;
+                tx->tx_queued = 0;
                 tx->tx_waiting = 0;
                 
                 if (tx->tx_sending != 0)
@@ -1804,6 +1865,7 @@ kibnal_conn_disconnected(kib_conn_t *conn)
         list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) {
                 tx = list_entry (tmp, kib_tx_t, tx_list);
 
+                LASSERT (!tx->tx_queued);
                 LASSERT (tx->tx_waiting ||
                          tx->tx_sending != 0);
 
@@ -2283,7 +2345,7 @@ kibnal_recv_connreq(cm_cep_handle_t *cep, cm_request_data_t *cmreq)
         txmsg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
         txmsg->ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
         txmsg->ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
-        kibnal_pack_msg(txmsg, 0, rxmsg->ibm_srcnid, rxmsg->ibm_srcstamp);
+        kibnal_pack_msg(txmsg, 0, rxmsg->ibm_srcnid, rxmsg->ibm_srcstamp, 0);
         
         kibnal_set_conn_state(conn, IBNAL_CONN_PASSIVE_WAIT);
         
@@ -2404,7 +2466,7 @@ kibnal_connect_conn (kib_conn_t *conn)
         msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
         msg->ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
         msg->ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
-        kibnal_pack_msg(msg, 0, peer->ibp_nid, 0);
+        kibnal_pack_msg(msg, 0, peer->ibp_nid, 0, 0);
         
         CDEBUG(D_NET, "Connecting %p to "LPX64"\n", conn, peer->ibp_nid);
 
@@ -2766,6 +2828,8 @@ kibnal_conn_timed_out (kib_conn_t *conn)
         list_for_each (ttmp, &conn->ibc_tx_queue) {
                 tx = list_entry (ttmp, kib_tx_t, tx_list);
 
+                LASSERT (tx->tx_queued);
+
                 if (time_after_eq (jiffies, tx->tx_deadline)) {
                         spin_unlock(&conn->ibc_lock);
                         return 1;
@@ -2775,6 +2839,7 @@ kibnal_conn_timed_out (kib_conn_t *conn)
         list_for_each (ttmp, &conn->ibc_active_txs) {
                 tx = list_entry (ttmp, kib_tx_t, tx_list);
 
+                LASSERT (!tx->tx_queued);
                 LASSERT (tx->tx_waiting ||
                          tx->tx_sending != 0);
 
@@ -3075,6 +3140,8 @@ kibnal_scheduler(void *arg)
         vv_return_t     vvrc;
         vv_return_t     vvrc2;
         unsigned long   flags;
+        kib_rx_t       *rx;
+        __u64           rxseq = 0;
         int             busy_loops = 0;
 
         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
@@ -3114,7 +3181,16 @@ kibnal_scheduler(void *arg)
                                         vv_next_solicit_unsolicit_event);
                                 LASSERT (vvrc2 == vv_return_ok);
                         }
-                        
+
+                        if (vvrc == vv_return_ok &&
+                            kibnal_wreqid2type(wc.wr_id) == IBNAL_WID_RX) {
+                                rx = (kib_rx_t *)kibnal_wreqid2ptr(wc.wr_id);
+
+                                /* Grab the RX sequence number NOW before
+                                 * anyone else can get an RX completion */
+                                rxseq = rx->rx_conn->ibc_rxseq++;
+                        }
+
                         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
                         /* give up ownership of completion polling */
                         kibnal_data.kib_checking_cq = 0;
@@ -3137,7 +3213,8 @@ kibnal_scheduler(void *arg)
                                 kibnal_rx_complete(
                                         (kib_rx_t *)kibnal_wreqid2ptr(wc.wr_id),
                                         wc.completion_status,
-                                        wc.num_bytes_transfered);
+                                        wc.num_bytes_transfered,
+                                        rxseq);
                                 break;
 
                         case IBNAL_WID_TX: