Whamcloud - gitweb
LU-17000 utils: handle_yaml_no_op() has wrong signature
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_lib.c
index 023ca82..2b9dbc5 100644 (file)
@@ -1,34 +1,12 @@
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.gnu.org/licenses/gpl-2.0.html
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
+// SPDX-License-Identifier: GPL-2.0
+
+/* Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
  * Copyright (c) 2011, 2017, Intel Corporation.
  */
  * Use is subject to license terms.
  *
  * Copyright (c) 2011, 2017, Intel Corporation.
  */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- */
+
+/* This file is part of Lustre, http://www.lustre.org/ */
 
 #include "socklnd.h"
 
 
 #include "socklnd.h"
 
@@ -42,14 +20,14 @@ ksocknal_lib_get_conn_addrs(struct ksock_conn *conn)
        LASSERT(!conn->ksnc_closing);
 
        if (rc != 0) {
        LASSERT(!conn->ksnc_closing);
 
        if (rc != 0) {
-               CERROR("Error %d getting sock peer_ni IP\n", rc);
+               CERROR("Error getting sock peer_ni IP: rc = %d\n", rc);
                return rc;
        }
 
        rc = lnet_sock_getaddr(conn->ksnc_sock, false,
                               &conn->ksnc_myaddr);
        if (rc != 0) {
                return rc;
        }
 
        rc = lnet_sock_getaddr(conn->ksnc_sock, false,
                               &conn->ksnc_myaddr);
        if (rc != 0) {
-               CERROR("Error %d getting sock local IP\n", rc);
+               CERROR("Error getting sock local IP: rc = %d\n", rc);
                return rc;
        }
 
                return rc;
        }
 
@@ -65,7 +43,8 @@ ksocknal_lib_zc_capable(struct ksock_conn *conn)
                return 0;
 
        /* ZC if the socket supports scatter/gather and doesn't need software
                return 0;
 
        /* ZC if the socket supports scatter/gather and doesn't need software
-        * checksums */
+        * checksums
+        */
        return ((caps & NETIF_F_SG) != 0 && (caps & NETIF_F_CSUM_MASK) != 0);
 }
 
        return ((caps & NETIF_F_SG) != 0 && (caps & NETIF_F_CSUM_MASK) != 0);
 }
 
@@ -84,12 +63,12 @@ ksocknal_lib_send_hdr(struct ksock_conn *conn, struct ksock_tx *tx,
                ksocknal_lib_csum_tx(tx);
 
        /* NB we can't trust socket ops to either consume our iovs
                ksocknal_lib_csum_tx(tx);
 
        /* NB we can't trust socket ops to either consume our iovs
-        * or leave them alone. */
-
+        * or leave them alone.
+        */
        {
 #if SOCKNAL_SINGLE_FRAG_TX
                struct kvec scratch;
        {
 #if SOCKNAL_SINGLE_FRAG_TX
                struct kvec scratch;
-               struct kvec *scratchiov = &scratch;
+               scratchiov = &scratch;
                unsigned int niov = 1;
 #else
                unsigned int niov = tx->tx_niov;
                unsigned int niov = 1;
 #else
                unsigned int niov = tx->tx_niov;
@@ -110,42 +89,63 @@ ksocknal_lib_send_hdr(struct ksock_conn *conn, struct ksock_tx *tx,
        return rc;
 }
 
        return rc;
 }
 
+static int
+ksocknal_lib_sendpage(struct socket *sock, struct bio_vec *kiov,
+                     int nkiov, int msgflg)
+{
+#ifdef MSG_SPLICE_PAGES
+       struct msghdr msg = {.msg_flags = msgflg | MSG_SPLICE_PAGES};
+       int i;
+
+       iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, kiov, 1, kiov->bv_len);
+       for (i = 0; i < nkiov; i++) {
+               if (!sendpage_ok(kiov[i].bv_page)) {
+                       msg.msg_flags &= ~MSG_SPLICE_PAGES;
+                       break;
+               }
+       }
+
+       return sock_sendmsg(sock, &msg);
+#else
+       struct sock   *sk = sock->sk;
+       struct page *page = kiov->bv_page;
+       int offset = kiov->bv_offset;
+       int fragsize = kiov->bv_len;
+
+       return sk->sk_prot->sendpage(sk, page, offset, fragsize, msgflg);
+#endif
+}
+
 int
 ksocknal_lib_send_kiov(struct ksock_conn *conn, struct ksock_tx *tx,
                       struct kvec *scratchiov)
 {
        struct socket *sock = conn->ksnc_sock;
        struct bio_vec *kiov = tx->tx_kiov;
 int
 ksocknal_lib_send_kiov(struct ksock_conn *conn, struct ksock_tx *tx,
                       struct kvec *scratchiov)
 {
        struct socket *sock = conn->ksnc_sock;
        struct bio_vec *kiov = tx->tx_kiov;
-       int            rc;
-       int            nob;
+       int rc;
+       int nob;
 
        /* Not NOOP message */
        LASSERT(tx->tx_lnetmsg != NULL);
 
 
        /* Not NOOP message */
        LASSERT(tx->tx_lnetmsg != NULL);
 
-       /* NB we can't trust socket ops to either consume our iovs
-        * or leave them alone. */
+       /* can't trust socket ops to consume our iovs or leave them alone */
        if (tx->tx_msg.ksm_zc_cookies[0] != 0) {
                /* Zero copy is enabled */
        if (tx->tx_msg.ksm_zc_cookies[0] != 0) {
                /* Zero copy is enabled */
-               struct sock   *sk = sock->sk;
-               struct page   *page = kiov->bv_page;
-               int            offset = kiov->bv_offset;
-               int            fragsize = kiov->bv_len;
-               int            msgflg = MSG_DONTWAIT;
+               int msgflg = MSG_DONTWAIT;
 
                CDEBUG(D_NET, "page %p + offset %x for %d\n",
 
                CDEBUG(D_NET, "page %p + offset %x for %d\n",
-                              page, offset, kiov->bv_len);
+                      kiov->bv_page, kiov->bv_offset, kiov->bv_len);
 
                if (!list_empty(&conn->ksnc_tx_queue) ||
 
                if (!list_empty(&conn->ksnc_tx_queue) ||
-                   fragsize < tx->tx_resid)
+                   kiov->bv_len < tx->tx_resid)
                        msgflg |= MSG_MORE;
 
                        msgflg |= MSG_MORE;
 
-               rc = sk->sk_prot->sendpage(sk, page,
-                                          offset, fragsize, msgflg);
+               rc = ksocknal_lib_sendpage(sock, kiov, tx->tx_nkiov, msgflg);
        } else {
 #if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK
        } else {
 #if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK
-               struct kvec     scratch;
-               struct kvec   *scratchiov = &scratch;
-               unsigned int    niov = 1;
+               struct kvec scratch;
+               scratchiov = &scratch;
+               unsigned int niov = 1;
 #else
 #ifdef CONFIG_HIGHMEM
 #warning "XXX risk of kmap deadlock on multiple frags..."
 #else
 #ifdef CONFIG_HIGHMEM
 #warning "XXX risk of kmap deadlock on multiple frags..."
@@ -153,7 +153,7 @@ ksocknal_lib_send_kiov(struct ksock_conn *conn, struct ksock_tx *tx,
                unsigned int  niov = tx->tx_nkiov;
 #endif
                struct msghdr msg = { .msg_flags = MSG_DONTWAIT };
                unsigned int  niov = tx->tx_nkiov;
 #endif
                struct msghdr msg = { .msg_flags = MSG_DONTWAIT };
-               int           i;
+               int i;
 
                for (nob = i = 0; i < niov; i++) {
                        scratchiov[i].iov_base = kmap(kiov[i].bv_page) +
 
                for (nob = i = 0; i < niov; i++) {
                        scratchiov[i].iov_base = kmap(kiov[i].bv_page) +
@@ -182,7 +182,6 @@ ksocknal_lib_eager_ack(struct ksock_conn *conn)
         * think I'm about to send something it could piggy-back the ACK on,
         * introducing delay in completing zero-copy sends in my peer_ni.
         */
         * think I'm about to send something it could piggy-back the ACK on,
         * introducing delay in completing zero-copy sends in my peer_ni.
         */
-
        tcp_sock_set_quickack(sock->sk, 1);
 }
 
        tcp_sock_set_quickack(sock->sk, 1);
 }
 
@@ -191,84 +190,86 @@ ksocknal_lib_recv_iov(struct ksock_conn *conn, struct kvec *scratchiov)
 {
 #if SOCKNAL_SINGLE_FRAG_RX
        struct kvec  scratch;
 {
 #if SOCKNAL_SINGLE_FRAG_RX
        struct kvec  scratch;
-       struct kvec *scratchiov = &scratch;
-       unsigned int  niov = 1;
+       scratchiov = &scratch;
+       unsigned int niov = 1;
 #else
 #else
-       unsigned int  niov = conn->ksnc_rx_niov;
+       unsigned int niov = conn->ksnc_rx_niov;
 #endif
        struct kvec *iov = conn->ksnc_rx_iov;
        struct msghdr msg = {
 #endif
        struct kvec *iov = conn->ksnc_rx_iov;
        struct msghdr msg = {
-               .msg_flags      = 0
+               .msg_flags = 0
        };
        };
-        int          nob;
-        int          i;
-        int          rc;
-        int          fragnob;
-        int          sum;
-        __u32        saved_csum;
-
-        /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone. */
-        LASSERT (niov > 0);
-
-        for (nob = i = 0; i < niov; i++) {
-                scratchiov[i] = iov[i];
-                nob += scratchiov[i].iov_len;
-        }
-        LASSERT (nob <= conn->ksnc_rx_nob_wanted);
+       int nob;
+       int i;
+       int rc;
+       int fragnob;
+       int sum;
+       __u32 saved_csum;
+
+       /* NB we can't trust socket ops to either consume our iovs
+        * or leave them alone.
+        */
+       LASSERT(niov > 0);
+
+       for (nob = i = 0; i < niov; i++) {
+               scratchiov[i] = iov[i];
+               nob += scratchiov[i].iov_len;
+       }
+       LASSERT(nob <= conn->ksnc_rx_nob_wanted);
 
        rc = kernel_recvmsg(conn->ksnc_sock, &msg, scratchiov, niov, nob,
                            MSG_DONTWAIT);
 
 
        rc = kernel_recvmsg(conn->ksnc_sock, &msg, scratchiov, niov, nob,
                            MSG_DONTWAIT);
 
-        saved_csum = 0;
-        if (conn->ksnc_proto == &ksocknal_protocol_v2x) {
-                saved_csum = conn->ksnc_msg.ksm_csum;
-                conn->ksnc_msg.ksm_csum = 0;
-        }
+       saved_csum = 0;
+       if (conn->ksnc_proto == &ksocknal_protocol_v2x) {
+               saved_csum = conn->ksnc_msg.ksm_csum;
+               conn->ksnc_msg.ksm_csum = 0;
+       }
 
 
-        if (saved_csum != 0) {
-                /* accumulate checksum */
-                for (i = 0, sum = rc; sum > 0; i++, sum -= fragnob) {
-                        LASSERT (i < niov);
+       if (saved_csum != 0) {
+               /* accumulate checksum */
+               for (i = 0, sum = rc; sum > 0; i++, sum -= fragnob) {
+                       LASSERT(i < niov);
 
 
-                        fragnob = iov[i].iov_len;
-                        if (fragnob > sum)
-                                fragnob = sum;
+                       fragnob = iov[i].iov_len;
+                       if (fragnob > sum)
+                               fragnob = sum;
 
 
-                        conn->ksnc_rx_csum = ksocknal_csum(conn->ksnc_rx_csum,
-                                                           iov[i].iov_base, fragnob);
-                }
-                conn->ksnc_msg.ksm_csum = saved_csum;
-        }
+                       conn->ksnc_rx_csum = ksocknal_csum(conn->ksnc_rx_csum,
+                                                          iov[i].iov_base,
+                                                          fragnob);
+               }
+               conn->ksnc_msg.ksm_csum = saved_csum;
+       }
 
 
-        return rc;
+       return rc;
 }
 
 static void
 ksocknal_lib_kiov_vunmap(void *addr)
 {
 }
 
 static void
 ksocknal_lib_kiov_vunmap(void *addr)
 {
-        if (addr == NULL)
-                return;
+       if (addr == NULL)
+               return;
 
 
-        vunmap(addr);
+       vunmap(addr);
 }
 
 static void *
 ksocknal_lib_kiov_vmap(struct bio_vec *kiov, int niov,
                       struct kvec *iov, struct page **pages)
 {
 }
 
 static void *
 ksocknal_lib_kiov_vmap(struct bio_vec *kiov, int niov,
                       struct kvec *iov, struct page **pages)
 {
-        void             *addr;
-        int               nob;
-        int               i;
+       void *addr;
+       int nob;
+       int i;
 
 
-        if (!*ksocknal_tunables.ksnd_zc_recv || pages == NULL)
-                return NULL;
+       if (!*ksocknal_tunables.ksnd_zc_recv || pages == NULL)
+               return NULL;
 
 
-        LASSERT (niov <= LNET_MAX_IOV);
+       LASSERT(niov <= LNET_MAX_IOV);
 
 
-        if (niov < 2 ||
-            niov < *ksocknal_tunables.ksnd_zc_recv_min_nfrags)
-                return NULL;
+       if (niov < 2 ||
+           niov < *ksocknal_tunables.ksnd_zc_recv_min_nfrags)
+               return NULL;
 
        for (nob = i = 0; i < niov; i++) {
                if ((kiov[i].bv_offset != 0 && i > 0) ||
 
        for (nob = i = 0; i < niov; i++) {
                if ((kiov[i].bv_offset != 0 && i > 0) ||
@@ -296,9 +297,9 @@ ksocknal_lib_recv_kiov(struct ksock_conn *conn, struct page **pages,
 {
 #if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK
        struct kvec   scratch;
 {
 #if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK
        struct kvec   scratch;
-       struct kvec  *scratchiov = &scratch;
-        struct page  **pages      = NULL;
-        unsigned int   niov       = 1;
+       scratchiov = &scratch;
+       pages      = NULL;
+       unsigned int   niov       = 1;
 #else
 #ifdef CONFIG_HIGHMEM
 #warning "XXX risk of kmap deadlock on multiple frags..."
 #else
 #ifdef CONFIG_HIGHMEM
 #warning "XXX risk of kmap deadlock on multiple frags..."
@@ -309,18 +310,20 @@ ksocknal_lib_recv_kiov(struct ksock_conn *conn, struct page **pages,
        struct msghdr msg = {
                .msg_flags      = 0
        };
        struct msghdr msg = {
                .msg_flags      = 0
        };
-        int          nob;
-        int          i;
-        int          rc;
-        void        *base;
-        void        *addr;
-        int          sum;
-        int          fragnob;
+       int nob;
+       int i;
+       int rc;
+       void *base;
+       void *addr;
+       int sum;
+       int fragnob;
        int n;
 
        int n;
 
-        /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone. */
-       if ((addr = ksocknal_lib_kiov_vmap(kiov, niov, scratchiov, pages)) != NULL) {
+       /* NB we can't trust socket ops to either consume our iovs
+        * or leave them alone.
+        */
+       addr = ksocknal_lib_kiov_vmap(kiov, niov, scratchiov, pages);
+       if (addr != NULL) {
                nob = scratchiov[0].iov_len;
                n = 1;
 
                nob = scratchiov[0].iov_len;
                n = 1;
 
@@ -333,7 +336,7 @@ ksocknal_lib_recv_kiov(struct ksock_conn *conn, struct page **pages,
                n = niov;
        }
 
                n = niov;
        }
 
-       LASSERT (nob <= conn->ksnc_rx_nob_wanted);
+       LASSERT(nob <= conn->ksnc_rx_nob_wanted);
 
        rc = kernel_recvmsg(conn->ksnc_sock, &msg, scratchiov, n, nob,
                            MSG_DONTWAIT);
 
        rc = kernel_recvmsg(conn->ksnc_sock, &msg, scratchiov, n, nob,
                            MSG_DONTWAIT);
@@ -372,15 +375,15 @@ ksocknal_lib_recv_kiov(struct ksock_conn *conn, struct page **pages,
 void
 ksocknal_lib_csum_tx(struct ksock_tx *tx)
 {
 void
 ksocknal_lib_csum_tx(struct ksock_tx *tx)
 {
-        int          i;
-        __u32        csum;
-        void        *base;
+       int i;
+       __u32 csum;
+       void *base;
 
        LASSERT(tx->tx_hdr.iov_base == (void *)&tx->tx_msg);
        LASSERT(tx->tx_conn != NULL);
        LASSERT(tx->tx_conn->ksnc_proto == &ksocknal_protocol_v2x);
 
 
        LASSERT(tx->tx_hdr.iov_base == (void *)&tx->tx_msg);
        LASSERT(tx->tx_conn != NULL);
        LASSERT(tx->tx_conn->ksnc_proto == &ksocknal_protocol_v2x);
 
-        tx->tx_msg.ksm_csum = 0;
+       tx->tx_msg.ksm_csum = 0;
 
        csum = ksocknal_csum(~0, (void *)tx->tx_hdr.iov_base,
                             tx->tx_hdr.iov_len);
 
        csum = ksocknal_csum(~0, (void *)tx->tx_hdr.iov_base,
                             tx->tx_hdr.iov_len);
@@ -394,16 +397,17 @@ ksocknal_lib_csum_tx(struct ksock_tx *tx)
                kunmap(tx->tx_kiov[i].bv_page);
        }
 
                kunmap(tx->tx_kiov[i].bv_page);
        }
 
-        if (*ksocknal_tunables.ksnd_inject_csum_error) {
-                csum++;
-                *ksocknal_tunables.ksnd_inject_csum_error = 0;
-        }
+       if (*ksocknal_tunables.ksnd_inject_csum_error) {
+               csum++;
+               *ksocknal_tunables.ksnd_inject_csum_error = 0;
+       }
 
 
-        tx->tx_msg.ksm_csum = csum;
+       tx->tx_msg.ksm_csum = csum;
 }
 
 int
 }
 
 int
-ksocknal_lib_get_conn_tunables(struct ksock_conn *conn, int *txmem, int *rxmem, int *nagle)
+ksocknal_lib_get_conn_tunables(struct ksock_conn *conn, int *txmem,
+                              int *rxmem, int *nagle)
 {
        struct socket *sock = conn->ksnc_sock;
        struct tcp_sock *tp = tcp_sk(sock->sk);
 {
        struct socket *sock = conn->ksnc_sock;
        struct tcp_sock *tp = tcp_sk(sock->sk);
@@ -422,12 +426,11 @@ ksocknal_lib_get_conn_tunables(struct ksock_conn *conn, int *txmem, int *rxmem,
 
        ksocknal_connsock_decref(conn);
 
 
        ksocknal_connsock_decref(conn);
 
-
        return 0;
 }
 
 int
        return 0;
 }
 
 int
-ksocknal_lib_setup_sock (struct socket *sock)
+ksocknal_lib_setup_sock(struct socket *sock, struct lnet_ni *ni)
 {
        int rc;
        int keep_idle;
 {
        int rc;
        int keep_idle;
@@ -435,6 +438,7 @@ ksocknal_lib_setup_sock (struct socket *sock)
        int keep_count;
        int do_keepalive;
        struct tcp_sock *tp = tcp_sk(sock->sk);
        int keep_count;
        int do_keepalive;
        struct tcp_sock *tp = tcp_sk(sock->sk);
+       struct lnet_ioctl_config_socklnd_tunables *lndtun;
 
        sock->sk->sk_allocation = GFP_NOFS;
 
 
        sock->sk->sk_allocation = GFP_NOFS;
 
@@ -497,6 +501,7 @@ ksocknal_lib_setup_sock (struct socket *sock)
         */
        {
                int option = (do_keepalive ? 1 : 0);
         */
        {
                int option = (do_keepalive ? 1 : 0);
+
                kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
                                  (char *)&option, sizeof(option));
        }
                kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
                                  (char *)&option, sizeof(option));
        }
@@ -530,7 +535,11 @@ ksocknal_lib_setup_sock (struct socket *sock)
                return rc;
        }
 
                return rc;
        }
 
-       return (0);
+       lndtun = &ni->ni_lnd_tunables.lnd_tun_u.lnd_sock;
+       if (lndtun->lnd_tos >= 0)
+               ip_sock_set_tos(sock->sk, lndtun->lnd_tos);
+
+       return 0;
 }
 
 void
 }
 
 void
@@ -575,11 +584,10 @@ ksocknal_data_ready(struct sock *sk, int n)
 #endif
 {
        struct ksock_conn  *conn;
 #endif
 {
        struct ksock_conn  *conn;
-       ENTRY;
 
 
-        /* interleave correctly with closing sockets... */
-        LASSERT(!in_irq());
-       read_lock(&ksocknal_data.ksnd_global_lock);
+       /* interleave correctly with closing sockets... */
+       LASSERT(!in_irq());
+       read_lock_bh(&ksocknal_data.ksnd_global_lock);
 
        conn = sk->sk_user_data;
        if (conn == NULL) {     /* raced with ksocknal_terminate_conn */
 
        conn = sk->sk_user_data;
        if (conn == NULL) {     /* raced with ksocknal_terminate_conn */
@@ -592,52 +600,49 @@ ksocknal_data_ready(struct sock *sk, int n)
        } else
                ksocknal_read_callback(conn);
 
        } else
                ksocknal_read_callback(conn);
 
-       read_unlock(&ksocknal_data.ksnd_global_lock);
-
-       EXIT;
+       read_unlock_bh(&ksocknal_data.ksnd_global_lock);
 }
 
 static void
 }
 
 static void
-ksocknal_write_space (struct sock *sk)
+ksocknal_write_space(struct sock *sk)
 {
 {
-       struct ksock_conn  *conn;
-        int            wspace;
-        int            min_wpace;
+       struct ksock_conn *conn;
+       int wspace;
+       int min_wpace;
 
 
-        /* interleave correctly with closing sockets... */
-        LASSERT(!in_irq());
+       /* interleave correctly with closing sockets... */
+       LASSERT(!in_irq());
        read_lock(&ksocknal_data.ksnd_global_lock);
 
        read_lock(&ksocknal_data.ksnd_global_lock);
 
-        conn = sk->sk_user_data;
+       conn = sk->sk_user_data;
        wspace = sk_stream_wspace(sk);
        min_wpace = sk_stream_min_wspace(sk);
 
        wspace = sk_stream_wspace(sk);
        min_wpace = sk_stream_min_wspace(sk);
 
-        CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
-               sk, wspace, min_wpace, conn,
-               (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
-                                      " ready" : " blocked"),
-               (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
-                                      " scheduled" : " idle"),
+       CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
+              sk, wspace, min_wpace, conn,
+              (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
+                                     " ready" : " blocked"),
+              (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
+                                     " scheduled" : " idle"),
               (conn == NULL) ? "" : (list_empty(&conn->ksnc_tx_queue) ?
               (conn == NULL) ? "" : (list_empty(&conn->ksnc_tx_queue) ?
-                                      " empty" : " queued"));
+                                     " empty" : " queued"));
 
 
-        if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
-                LASSERT (sk->sk_write_space != &ksocknal_write_space);
-                sk->sk_write_space (sk);
+       if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
+               LASSERT(sk->sk_write_space != &ksocknal_write_space);
+               sk->sk_write_space(sk);
 
                read_unlock(&ksocknal_data.ksnd_global_lock);
 
                read_unlock(&ksocknal_data.ksnd_global_lock);
-                return;
-        }
-
-        if (wspace >= min_wpace) {              /* got enough space */
-                ksocknal_write_callback(conn);
+               return;
+       }
 
 
-                /* Clear SOCK_NOSPACE _after_ ksocknal_write_callback so the
-                 * ENOMEM check in ksocknal_transmit is race-free (think about
-                 * it). */
+       if (wspace >= min_wpace) { /* got enough space */
+               ksocknal_write_callback(conn);
 
 
-                clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
-        }
+               /* Clear SOCK_NOSPACE _after_ ksocknal_write_callback so the
+                * ENOMEM check in ksocknal_transmit is race-free (think!).
+                */
+               clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+       }
 
        read_unlock(&ksocknal_data.ksnd_global_lock);
 }
 
        read_unlock(&ksocknal_data.ksnd_global_lock);
 }
@@ -645,56 +650,54 @@ ksocknal_write_space (struct sock *sk)
 void
 ksocknal_lib_save_callback(struct socket *sock, struct ksock_conn *conn)
 {
 void
 ksocknal_lib_save_callback(struct socket *sock, struct ksock_conn *conn)
 {
-        conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
-        conn->ksnc_saved_write_space = sock->sk->sk_write_space;
+       conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
+       conn->ksnc_saved_write_space = sock->sk->sk_write_space;
 }
 
 void
 ksocknal_lib_set_callback(struct socket *sock,  struct ksock_conn *conn)
 {
 }
 
 void
 ksocknal_lib_set_callback(struct socket *sock,  struct ksock_conn *conn)
 {
-        sock->sk->sk_user_data = conn;
-        sock->sk->sk_data_ready = ksocknal_data_ready;
-        sock->sk->sk_write_space = ksocknal_write_space;
+       sock->sk->sk_user_data = conn;
+       sock->sk->sk_data_ready = ksocknal_data_ready;
+       sock->sk->sk_write_space = ksocknal_write_space;
 }
 
 void
 ksocknal_lib_reset_callback(struct socket *sock, struct ksock_conn *conn)
 {
 }
 
 void
 ksocknal_lib_reset_callback(struct socket *sock, struct ksock_conn *conn)
 {
-        /* Remove conn's network callbacks.
-         * NB I _have_ to restore the callback, rather than storing a noop,
-         * since the socket could survive past this module being unloaded!! */
-        sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
-        sock->sk->sk_write_space = conn->ksnc_saved_write_space;
-
-        /* A callback could be in progress already; they hold a read lock
-         * on ksnd_global_lock (to serialise with me) and NOOP if
-         * sk_user_data is NULL. */
-        sock->sk->sk_user_data = NULL;
-
-        return ;
+       /* Remove conn's network callbacks.
+        * NB I _have_ to restore the callback, rather than storing a noop,
+        * since the socket could survive past this module being unloaded!!
+        */
+       sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
+       sock->sk->sk_write_space = conn->ksnc_saved_write_space;
+
+       /* A callback could be in progress already; they hold a read lock
+        * on ksnd_global_lock (to serialise with me) and NOOP if
+        * sk_user_data is NULL.
+        */
+       sock->sk->sk_user_data = NULL;
 }
 
 int
 ksocknal_lib_memory_pressure(struct ksock_conn *conn)
 {
 }
 
 int
 ksocknal_lib_memory_pressure(struct ksock_conn *conn)
 {
-       int            rc = 0;
        struct ksock_sched *sched;
        struct ksock_sched *sched;
+       int rc = 0;
 
        sched = conn->ksnc_scheduler;
        spin_lock_bh(&sched->kss_lock);
 
        if (!test_bit(SOCK_NOSPACE, &conn->ksnc_sock->flags) &&
 
        sched = conn->ksnc_scheduler;
        spin_lock_bh(&sched->kss_lock);
 
        if (!test_bit(SOCK_NOSPACE, &conn->ksnc_sock->flags) &&
-            !conn->ksnc_tx_ready) {
-                /* SOCK_NOSPACE is set when the socket fills
-                 * and cleared in the write_space callback
-                 * (which also sets ksnc_tx_ready).  If
-                 * SOCK_NOSPACE and ksnc_tx_ready are BOTH
-                 * zero, I didn't fill the socket and
-                 * write_space won't reschedule me, so I
-                 * return -ENOMEM to get my caller to retry
-                 * after a timeout */
-                rc = -ENOMEM;
-        }
+           !conn->ksnc_tx_ready) {
+               /* SOCK_NOSPACE is set when the socket fills and cleared in the
+                * write_space callback (which also sets ksnc_tx_ready). If
+                * SOCK_NOSPACE and ksnc_tx_ready are BOTH zero, I didn't fill
+                * the socket and write_space won't reschedule me, so I return
+                * -ENOMEM to get my caller to retry after a timeout
+                */
+               rc = -ENOMEM;
+       }
 
        spin_unlock_bh(&sched->kss_lock);
 
 
        spin_unlock_bh(&sched->kss_lock);