Whamcloud - gitweb
LU-11299 lnet: modify lnd notification mechanism
[fs/lustre-release.git] / lnet / klnds / o2iblnd / o2iblnd_cb.c
index de09e6d..c6fb08f 100644 (file)
@@ -107,7 +107,8 @@ kiblnd_txlist_done(struct list_head *txlist, int status,
                /* complete now */
                tx->tx_waiting = 0;
                tx->tx_status = status;
-               tx->tx_hstatus = hstatus;
+               if (hstatus != LNET_MSG_STATUS_OK)
+                       tx->tx_hstatus = hstatus;
                kiblnd_tx_done(tx);
        }
 }
@@ -202,7 +203,12 @@ kiblnd_post_rx(struct kib_rx *rx, int credit)
         * own this rx (and rx::rx_conn) anymore, LU-5678.
         */
        kiblnd_conn_addref(conn);
+#ifdef HAVE_IB_POST_SEND_RECV_CONST
+       rc = ib_post_recv(conn->ibc_cmid->qp, &rx->rx_wrq,
+                         (const struct ib_recv_wr **)&bad_wrq);
+#else
        rc = ib_post_recv(conn->ibc_cmid->qp, &rx->rx_wrq, &bad_wrq);
+#endif
        if (unlikely(rc != 0)) {
                CERROR("Can't post rx for %s: %d, bad_wrq: %p\n",
                       libcfs_nid2str(conn->ibc_peer->ibp_nid), rc, bad_wrq);
@@ -865,6 +871,7 @@ __must_hold(&conn->ibc_lock)
 {
        struct kib_msg *msg = tx->tx_msg;
        struct kib_peer_ni *peer_ni = conn->ibc_peer;
+       struct lnet_ni *ni = peer_ni->ibp_ni;
        int ver = conn->ibc_version;
        int rc;
        int done;
@@ -881,7 +888,7 @@ __must_hold(&conn->ibc_lock)
        LASSERT(conn->ibc_credits <= conn->ibc_queue_depth);
 
        if (conn->ibc_nsends_posted ==
-           conn->ibc_queue_depth) {
+           kiblnd_concurrent_sends(ver, ni)) {
                /* tx completions outstanding... */
                CDEBUG(D_NET, "%s: posted enough\n",
                       libcfs_nid2str(peer_ni->ibp_nid));
@@ -969,7 +976,15 @@ __must_hold(&conn->ibc_lock)
                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
 
                bad = NULL;
-               rc = ib_post_send(conn->ibc_cmid->qp, wr, &bad);
+               if (lnet_send_error_simulation(tx->tx_lntmsg[0], &tx->tx_hstatus))
+                       rc = -EINVAL;
+               else
+#ifdef HAVE_IB_POST_SEND_RECV_CONST
+                       rc = ib_post_send(conn->ibc_cmid->qp, wr,
+                                         (const struct ib_send_wr **)&bad);
+#else
+                       rc = ib_post_send(conn->ibc_cmid->qp, wr, &bad);
+#endif
        }
 
        conn->ibc_last_send = ktime_get();
@@ -1027,7 +1042,7 @@ kiblnd_check_sends_locked(struct kib_conn *conn)
         }
 
        LASSERT(conn->ibc_nsends_posted <=
-               conn->ibc_queue_depth);
+               kiblnd_concurrent_sends(ver, ni));
         LASSERT (!IBLND_OOB_CAPABLE(ver) ||
                  conn->ibc_noops_posted <= IBLND_OOB_MSGS(ver));
         LASSERT (conn->ibc_reserved_credits >= 0);
@@ -1278,6 +1293,21 @@ kiblnd_queue_tx_locked(struct kib_tx *tx, struct kib_conn *conn)
        LASSERT(!tx->tx_queued);        /* not queued for sending already */
        LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
 
+       if (conn->ibc_state >= IBLND_CONN_DISCONNECTED) {
+               tx->tx_status = -ECONNABORTED;
+               tx->tx_waiting = 0;
+               if (tx->tx_conn != NULL) {
+                       /* PUT_DONE first attached to conn as a PUT_REQ */
+                       LASSERT(tx->tx_conn == conn);
+                       LASSERT(tx->tx_msg->ibm_type == IBLND_MSG_PUT_DONE);
+                       tx->tx_conn = NULL;
+                       kiblnd_conn_decref(conn);
+               }
+               list_add(&tx->tx_list, &conn->ibc_zombie_txs);
+
+               return;
+       }
+
        timeout_ns = lnet_get_lnd_timeout() * NSEC_PER_SEC;
        tx->tx_queued = 1;
        tx->tx_deadline = ktime_add_ns(ktime_get(), timeout_ns);
@@ -1989,24 +2019,24 @@ kiblnd_peer_alive(struct kib_peer_ni *peer_ni)
 static void
 kiblnd_peer_notify(struct kib_peer_ni *peer_ni)
 {
-        int           error = 0;
+       int           error = 0;
        time64_t last_alive = 0;
-        unsigned long flags;
+       unsigned long flags;
 
        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
 
        if (kiblnd_peer_idle(peer_ni) && peer_ni->ibp_error != 0) {
-                error = peer_ni->ibp_error;
-                peer_ni->ibp_error = 0;
+               error = peer_ni->ibp_error;
+               peer_ni->ibp_error = 0;
 
-                last_alive = peer_ni->ibp_last_alive;
-        }
+               last_alive = peer_ni->ibp_last_alive;
+       }
 
        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
 
-        if (error != 0)
-                lnet_notify(peer_ni->ibp_ni,
-                            peer_ni->ibp_nid, 0, last_alive);
+       if (error != 0)
+               lnet_notify(peer_ni->ibp_ni,
+                           peer_ni->ibp_nid, false, false, last_alive);
 }
 
 void
@@ -2117,7 +2147,7 @@ kiblnd_handle_early_rxs(struct kib_conn *conn)
        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
 }
 
-static void
+void
 kiblnd_abort_txs(struct kib_conn *conn, struct list_head *txs)
 {
        struct list_head         zombies = LIST_HEAD_INIT(zombies);
@@ -2174,9 +2204,11 @@ kiblnd_abort_txs(struct kib_conn *conn, struct list_head *txs)
 
        /*
         * aborting transmits occurs when finalizing the connection.
-        * The connection is finalized on error
+        * The connection is finalized on error.
+        * Passing LNET_MSG_STATUS_OK to txlist_done() will not
+        * override the value already set in tx->tx_hstatus above.
         */
-       kiblnd_txlist_done(&zombies, -ECONNABORTED, -1);
+       kiblnd_txlist_done(&zombies, -ECONNABORTED, LNET_MSG_STATUS_OK);
 }
 
 static void
@@ -2185,13 +2217,13 @@ kiblnd_finalise_conn(struct kib_conn *conn)
        LASSERT (!in_interrupt());
        LASSERT (conn->ibc_state > IBLND_CONN_INIT);
 
-       kiblnd_set_conn_state(conn, IBLND_CONN_DISCONNECTED);
-
        /* abort_receives moves QP state to IB_QPS_ERR.  This is only required
         * for connections that didn't get as far as being connected, because
         * rdma_disconnect() does this for free. */
        kiblnd_abort_receives(conn);
 
+       kiblnd_set_conn_state(conn, IBLND_CONN_DISCONNECTED);
+
        /* Complete all tx descs not waiting for sends to complete.
         * NB we should be safe from RDMA now that the QP has changed state */