* disengage the socket from its callbacks and close it.
* ksnc_refcount will eventually hit zero, and then the reaper will
* destroy it. */
- ksock_peer_t *peer = conn->ksnc_peer;
- ksock_sched_t *sched = conn->ksnc_scheduler;
- int failed = 0;
+ ksock_peer_t *peer = conn->ksnc_peer;
+ ksock_sched_t *sched = conn->ksnc_scheduler;
+ int failed = 0;
+ struct list_head *tmp;
+ struct list_head *nxt;
+ ksock_tx_t *tx;
+ LIST_HEAD (zlist);
LASSERT(conn->ksnc_closing);
spin_unlock_bh (&sched->kss_lock);
spin_lock(&peer->ksnp_lock);
- if (!list_empty(&peer->ksnp_zc_req_list)) {
- struct list_head *tmp;
- struct list_head *nxt;
- ksock_tx_t *tx;
- LIST_HEAD (zlist);
- list_for_each_safe(tmp, nxt, &peer->ksnp_zc_req_list) {
- tx = list_entry(tmp, ksock_tx_t, tx_zc_list);
+ list_for_each_safe(tmp, nxt, &peer->ksnp_zc_req_list) {
+ tx = list_entry(tmp, ksock_tx_t, tx_zc_list);
- if (tx->tx_conn != conn)
- continue;
- list_del(&tx->tx_zc_list);
- /* tell scheduler it's deleted */
- tx->tx_msg.ksm_zc_req_cookie = 0;
- list_add(&tx->tx_zc_list, &zlist);
- }
- spin_unlock(&peer->ksnp_lock);
+ if (tx->tx_conn != conn)
+ continue;
- list_for_each_safe(tmp, nxt, &zlist) {
- tx = list_entry(tmp, ksock_tx_t, tx_zc_list);
- list_del(&tx->tx_zc_list);
- ksocknal_tx_decref(tx);
- }
- } else {
- spin_unlock(&peer->ksnp_lock);
+ LASSERT (tx->tx_msg.ksm_zc_req_cookie != 0);
+
+ tx->tx_msg.ksm_zc_req_cookie = 0;
+ list_del(&tx->tx_zc_list);
+ list_add(&tx->tx_zc_list, &zlist);
+ }
+
+ spin_unlock(&peer->ksnp_lock);
+
+ list_for_each_safe(tmp, nxt, &zlist) {
+ tx = list_entry(tmp, ksock_tx_t, tx_zc_list);
+
+ list_del(&tx->tx_zc_list);
+ ksocknal_tx_decref(tx);
}
/* serialise with callbacks */
}
}
-int
-ksocknal_zc_req(ksock_tx_t *tx)
+static void
+ksocknal_check_zc_req(ksock_tx_t *tx)
{
+ ksock_conn_t *conn = tx->tx_conn;
+ ksock_peer_t *peer = conn->ksnc_peer;
lnet_kiov_t *kiov = tx->tx_kiov;
int nkiov = tx->tx_nkiov;
- if (!tx->tx_conn->ksnc_zc_capable)
- return 0;
+ /* Set tx_msg.ksm_zc_req_cookie to a unique non-zero cookie and add tx
+ * to ksnp_zc_req_list if some fragment of this message should be sent
+ * zero-copy. Our peer will send an ACK containing this cookie when
+ * she has received this message to tell us we can signal completion.
+ * tx_msg.ksm_zc_req_cookie remains non-zero while tx is on
+ * ksnp_zc_req_list. */
+ if (conn->ksnc_proto != &ksocknal_protocol_v2x ||
+ !conn->ksnc_zc_capable)
+ return;
+
while (nkiov > 0) {
if (kiov->kiov_len >= *ksocknal_tunables.ksnd_zc_min_frag)
- return 1;
+ break;
--nkiov;
++kiov;
- }
-
- return 0;
-}
-
-static void
-ksocknal_queue_zc_req(ksock_tx_t *tx)
-{
- ksock_peer_t *peer = tx->tx_conn->ksnc_peer;
+ }
- /* assign cookie and queue tx to pending list, it will be
- * released while getting ack, see ksocknal_handle_zc_ack() */
+ if (nkiov == 0)
+ return;
+
+ /* assign cookie and queue tx to pending list, it will be released when
+ * a matching ack is received. See ksocknal_handle_zc_ack() */
- ksocknal_tx_addref(tx); /* +1 ref */
+ ksocknal_tx_addref(tx);
spin_lock(&peer->ksnp_lock);
+ LASSERT (tx->tx_msg.ksm_zc_req_cookie == 0);
tx->tx_msg.ksm_zc_req_cookie = peer->ksnp_zc_next_cookie++;
list_add_tail(&tx->tx_zc_list, &peer->ksnp_zc_req_list);
}
static void
-ksocknal_dequeue_zc_req(ksock_tx_t *tx)
+ksocknal_unzc_req(ksock_tx_t *tx)
{
ksock_peer_t *peer = tx->tx_conn->ksnc_peer;
spin_lock(&peer->ksnp_lock);
- if (tx->tx_msg.ksm_zc_req_cookie != 0) {
- /* not deleted by ksocknal_terminate_conn() */
- list_del(&tx->tx_zc_list);
+ if (tx->tx_msg.ksm_zc_req_cookie == 0) {
+ /* Not waiting for an ACK */
+ spin_unlock(&peer->ksnp_lock);
+ return;
}
+ tx->tx_msg.ksm_zc_req_cookie = 0;
+ list_del(&tx->tx_zc_list);
+
spin_unlock(&peer->ksnp_lock);
- if (tx->tx_msg.ksm_zc_req_cookie != 0)
- ksocknal_tx_decref(tx); /* -1 ref */
+ ksocknal_tx_decref(tx);
}
+
int
ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
{
int rc;
- if (conn->ksnc_proto == &ksocknal_protocol_v2x &&
- tx->tx_msg.ksm_zc_req_cookie == 0 &&
- ksocknal_zc_req(tx)) {
- /* wait for ACK */
- ksocknal_queue_zc_req(tx);
+ if (!tx->tx_checked_zc) {
+ tx->tx_checked_zc = 1;
+ ksocknal_check_zc_req(tx);
}
rc = ksocknal_transmit (conn, tx);
libcfs_id2str(conn->ksnc_peer->ksnp_id),
HIPQUAD(conn->ksnc_ipaddr),
conn->ksnc_port);
- } else {
- /* closed, dequeue the ZC request if needed */
- ksocknal_dequeue_zc_req(tx);
- }
+ }
+
+ ksocknal_unzc_req(tx);
/* it's not an error if conn is being closed */
ksocknal_close_conn_and_siblings (conn,
HIPQUAD(conn->ksnc_ipaddr),
conn->ksnc_port);
+ tx->tx_checked_zc = 0;
conn->ksnc_proto->pro_pack(tx);
/* Ensure the frags we've been given EXACTLY match the number of
if (tx->tx_msg.ksm_zc_req_cookie != cookie)
continue;
+ tx->tx_msg.ksm_zc_req_cookie = 0;
list_del(&tx->tx_zc_list);
+
spin_unlock(&peer->ksnp_lock);
ksocknal_tx_decref(tx);
-
return 0;
}
spin_unlock(&peer->ksnp_lock);