Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_lib-linux.c
index b7e2f49..8595e4c 100644 (file)
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 #include "socklnd.h"
 
-# if CONFIG_SYSCTL && !CFS_SYSFS_MODULE_PARM
-static ctl_table ksocknal_ctl_table[21];
+# if defined(CONFIG_SYSCTL) && !CFS_SYSFS_MODULE_PARM
+
+#ifndef HAVE_SYSCTL_UNNUMBERED
+
+enum {
+        SOCKLND_TIMEOUT = 1,
+        SOCKLND_CREDITS,
+        SOCKLND_PEER_CREDITS,
+        SOCKLND_NCONNDS,
+        SOCKLND_RECONNECTS_MIN,
+        SOCKLND_RECONNECTS_MAX,
+        SOCKLND_EAGER_ACK,
+        SOCKLND_ZERO_COPY,
+        SOCKLND_TYPED,
+        SOCKLND_BULK_MIN,
+        SOCKLND_RX_BUFFER_SIZE,
+        SOCKLND_TX_BUFFER_SIZE,
+        SOCKLND_NAGLE,
+        SOCKLND_IRQ_AFFINITY,
+        SOCKLND_ROUND_ROBIN,
+        SOCKLND_KEEPALIVE,
+        SOCKLND_KEEPALIVE_IDLE,
+        SOCKLND_KEEPALIVE_COUNT,
+        SOCKLND_KEEPALIVE_INTVL,
+        SOCKLND_BACKOFF_INIT,
+        SOCKLND_BACKOFF_MAX,
+        SOCKLND_PROTOCOL,
+        SOCKLND_ZERO_COPY_RECV,
+        SOCKLND_ZERO_COPY_RECV_MIN_NFRAGS
+};
+#else
+
+#define SOCKLND_TIMEOUT         CTL_UNNUMBERED
+#define SOCKLND_CREDITS         CTL_UNNUMBERED
+#define SOCKLND_PEER_CREDITS    CTL_UNNUMBERED
+#define SOCKLND_NCONNDS         CTL_UNNUMBERED
+#define SOCKLND_RECONNECTS_MIN  CTL_UNNUMBERED
+#define SOCKLND_RECONNECTS_MAX  CTL_UNNUMBERED
+#define SOCKLND_EAGER_ACK       CTL_UNNUMBERED
+#define SOCKLND_ZERO_COPY       CTL_UNNUMBERED
+#define SOCKLND_TYPED           CTL_UNNUMBERED
+#define SOCKLND_BULK_MIN        CTL_UNNUMBERED
+#define SOCKLND_RX_BUFFER_SIZE  CTL_UNNUMBERED
+#define SOCKLND_TX_BUFFER_SIZE  CTL_UNNUMBERED
+#define SOCKLND_NAGLE           CTL_UNNUMBERED
+#define SOCKLND_IRQ_AFFINITY    CTL_UNNUMBERED
+#define SOCKLND_ROUND_ROBIN     CTL_UNNUMBERED
+#define SOCKLND_KEEPALIVE       CTL_UNNUMBERED
+#define SOCKLND_KEEPALIVE_IDLE  CTL_UNNUMBERED
+#define SOCKLND_KEEPALIVE_COUNT CTL_UNNUMBERED
+#define SOCKLND_KEEPALIVE_INTVL CTL_UNNUMBERED
+#define SOCKLND_BACKOFF_INIT    CTL_UNNUMBERED
+#define SOCKLND_BACKOFF_MAX     CTL_UNNUMBERED
+#define SOCKLND_PROTOCOL        CTL_UNNUMBERED
+#define SOCKLND_ZERO_COPY_RECV  CTL_UNNUMBERED
+#define SOCKLND_ZERO_COPY_RECV_MIN_NFRAGS CTL_UNNUMBERED
+#endif
+
+static cfs_sysctl_table_t ksocknal_ctl_table[] = {
+        {
+                .ctl_name = SOCKLND_TIMEOUT,
+                .procname = "timeout",
+                .data     = &ksocknal_tunables.ksnd_timeout,
+                .maxlen   = sizeof (int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_CREDITS,
+                .procname = "credits",
+                .data     = &ksocknal_tunables.ksnd_credits,
+                .maxlen   = sizeof (int),
+                .mode     = 0444,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+         {
+                .ctl_name = SOCKLND_PEER_CREDITS,
+                .procname = "peer_credits",
+                .data     = &ksocknal_tunables.ksnd_peercredits,
+                .maxlen   = sizeof (int),
+                .mode     = 0444,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_NCONNDS,
+                .procname = "nconnds",
+                .data     = &ksocknal_tunables.ksnd_nconnds,
+                .maxlen   = sizeof (int),
+                .mode     = 0444,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_RECONNECTS_MIN,
+                .procname = "min_reconnectms",
+                .data     = &ksocknal_tunables.ksnd_min_reconnectms,
+                .maxlen   = sizeof (int),
+                .mode     = 0444,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_RECONNECTS_MAX,
+                .procname = "max_reconnectms",
+                .data     = &ksocknal_tunables.ksnd_max_reconnectms,
+                .maxlen   = sizeof (int),
+                .mode     = 0444,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_EAGER_ACK,
+                .procname = "eager_ack",
+                .data     = &ksocknal_tunables.ksnd_eager_ack,
+                .maxlen   = sizeof (int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_ZERO_COPY,
+                .procname = "zero_copy",
+                .data     = &ksocknal_tunables.ksnd_zc_min_payload,
+                .maxlen   = sizeof (int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_ZERO_COPY_RECV,
+                .procname = "zero_copy_recv",
+                .data     = &ksocknal_tunables.ksnd_zc_recv,
+                .maxlen   = sizeof (int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
 
-ctl_table ksocknal_top_ctl_table[] = {
-        {200, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
+        {
+                .ctl_name = SOCKLND_ZERO_COPY_RECV_MIN_NFRAGS,
+                .procname = "zero_copy_recv",
+                .data     = &ksocknal_tunables.ksnd_zc_recv_min_nfrags,
+                .maxlen   = sizeof (int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_TYPED,
+                .procname = "typed",
+                .data     = &ksocknal_tunables.ksnd_typed_conns,
+                .maxlen   = sizeof (int),
+                .mode     = 0444,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_BULK_MIN,
+                .procname = "min_bulk",
+                .data     = &ksocknal_tunables.ksnd_min_bulk,
+                .maxlen   = sizeof (int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_RX_BUFFER_SIZE,
+                .procname = "rx_buffer_size",
+                .data     = &ksocknal_tunables.ksnd_rx_buffer_size,
+                .maxlen   = sizeof(int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_TX_BUFFER_SIZE,
+                .procname = "tx_buffer_size",
+                .data     = &ksocknal_tunables.ksnd_tx_buffer_size,
+                .maxlen   = sizeof(int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_NAGLE,
+                .procname = "nagle",
+                .data     = &ksocknal_tunables.ksnd_nagle,
+                .maxlen   = sizeof(int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+#ifdef CPU_AFFINITY
+        {
+                .ctl_name = SOCKLND_IRQ_AFFINITY,
+                .procname = "irq_affinity",
+                .data     = &ksocknal_tunables.ksnd_irq_affinity,
+                .maxlen   = sizeof(int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+#endif
+        {
+                .ctl_name = SOCKLND_ROUND_ROBIN,
+                .procname = "round_robin",
+                .data     = &ksocknal_tunables.ksnd_round_robin,
+                .maxlen   = sizeof(int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_KEEPALIVE,
+                .procname = "keepalive",
+                .data     = &ksocknal_tunables.ksnd_keepalive,
+                .maxlen   = sizeof(int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_KEEPALIVE_IDLE,
+                .procname = "keepalive_idle",
+                .data     = &ksocknal_tunables.ksnd_keepalive_idle,
+                .maxlen   = sizeof(int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_KEEPALIVE_COUNT,
+                .procname = "keepalive_count",
+                .data     = &ksocknal_tunables.ksnd_keepalive_count,
+                .maxlen   = sizeof(int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_KEEPALIVE_INTVL,
+                .procname = "keepalive_intvl",
+                .data     = &ksocknal_tunables.ksnd_keepalive_intvl,
+                .maxlen   = sizeof(int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+#ifdef SOCKNAL_BACKOFF
+        {
+                .ctl_name = SOCKLND_BACKOFF_INIT,
+                .procname = "backoff_init",
+                .data     = &ksocknal_tunables.ksnd_backoff_init,
+                .maxlen   = sizeof(int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_BACKOFF_MAX,
+                .procname = "backoff_max",
+                .data     = &ksocknal_tunables.ksnd_backoff_max,
+                .maxlen   = sizeof(int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+#endif
+#if SOCKNAL_VERSION_DEBUG
+        {
+                .ctl_name = SOCKLND_PROTOCOL,
+                .procname = "protocol",
+                .data     = &ksocknal_tunables.ksnd_protocol,
+                .maxlen   = sizeof(int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec,
+                .strategy = &sysctl_intvec,
+        },
+#endif
+        {0}
+};
+
+
+cfs_sysctl_table_t ksocknal_top_ctl_table[] = {
+        {
+                .ctl_name = CTL_SOCKLND,
+                .procname = "socknal",
+                .data     = NULL,
+                .maxlen   = 0,
+                .mode     = 0555,
+                .child    = ksocknal_ctl_table
+        },
         { 0 }
 };
 
 int
 ksocknal_lib_tunables_init ()
 {
-       int    i = 0;
-       int    j = 1;
-
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "timeout", ksocknal_tunables.ksnd_timeout,
-                sizeof (int), 0644, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "credits", ksocknal_tunables.ksnd_credits,
-                sizeof (int), 0444, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "peer_credits", ksocknal_tunables.ksnd_peercredits,
-                sizeof (int), 0444, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "nconnds", ksocknal_tunables.ksnd_nconnds,
-                sizeof (int), 0444, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "min_reconnectms", ksocknal_tunables.ksnd_min_reconnectms,
-                sizeof (int), 0444, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "max_reconnectms", ksocknal_tunables.ksnd_max_reconnectms,
-                sizeof (int), 0444, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "eager_ack", ksocknal_tunables.ksnd_eager_ack,
-                sizeof (int), 0644, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "zero_copy", ksocknal_tunables.ksnd_zc_min_frag,
-                sizeof (int), 0644, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "typed", ksocknal_tunables.ksnd_typed_conns,
-                sizeof (int), 0444, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "min_bulk", ksocknal_tunables.ksnd_min_bulk,
-                sizeof (int), 0644, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "rx_buffer_size", ksocknal_tunables.ksnd_rx_buffer_size,
-                sizeof(int), 0644, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "tx_buffer_size", ksocknal_tunables.ksnd_tx_buffer_size,
-                sizeof(int), 0644, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "nagle", ksocknal_tunables.ksnd_nagle,
-                sizeof(int), 0644, NULL, &proc_dointvec};
-#if CPU_AFFINITY
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "irq_affinity", ksocknal_tunables.ksnd_irq_affinity,
-                sizeof(int), 0644, NULL, &proc_dointvec};
-#endif
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "keepalive_idle", ksocknal_tunables.ksnd_keepalive_idle,
-                sizeof(int), 0644, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "keepalive_count", ksocknal_tunables.ksnd_keepalive_count,
-                sizeof(int), 0644, NULL, &proc_dointvec};
-       ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "keepalive_intvl", ksocknal_tunables.ksnd_keepalive_intvl,
-                sizeof(int), 0644, NULL, &proc_dointvec};
-#ifdef SOCKNAL_BACKOFF
-        ksocknal_ctl_table[i++] = (ctl_table)
-                {j++, "backoff_init", ksocknal_tunables.ksnd_backoff_init,
-                sizeof(int), 0644, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-                {j++, "backoff_max", ksocknal_tunables.ksnd_backoff_max,
-                sizeof(int), 0644, NULL, &proc_dointvec};
+        if (!*ksocknal_tunables.ksnd_typed_conns) {
+                int rc = -EINVAL;
+#if SOCKNAL_VERSION_DEBUG
+                if (*ksocknal_tunables.ksnd_protocol < 3)
+                        rc = 0;
 #endif
+                if (rc != 0) {
+                        CERROR("Protocol V3.x MUST have typed connections\n");
+                        return rc;
+                }
+        }
 
-       LASSERT (j == i+1);
-       LASSERT (i < sizeof(ksocknal_ctl_table)/sizeof(ksocknal_ctl_table[0]));
+        if (*ksocknal_tunables.ksnd_zc_recv_min_nfrags < 2)
+                *ksocknal_tunables.ksnd_zc_recv_min_nfrags = 2;
+        if (*ksocknal_tunables.ksnd_zc_recv_min_nfrags > LNET_MAX_IOV)
+                *ksocknal_tunables.ksnd_zc_recv_min_nfrags = LNET_MAX_IOV;
 
         ksocknal_tunables.ksnd_sysctl =
-                register_sysctl_table(ksocknal_top_ctl_table, 0);
+                cfs_register_sysctl_table(ksocknal_top_ctl_table, 0);
 
         if (ksocknal_tunables.ksnd_sysctl == NULL)
-               CWARN("Can't setup /proc tunables\n");
+                CWARN("Can't setup /proc tunables\n");
 
-       return 0;
+        return 0;
 }
 
 void
 ksocknal_lib_tunables_fini ()
 {
         if (ksocknal_tunables.ksnd_sysctl != NULL)
-                unregister_sysctl_table(ksocknal_tunables.ksnd_sysctl);
+                cfs_unregister_sysctl_table(ksocknal_tunables.ksnd_sysctl);
 }
 #else
 int
 ksocknal_lib_tunables_init ()
 {
-       return 0;
+        return 0;
 }
 
 void
@@ -114,7 +385,7 @@ ksocknal_lib_tunables_fini ()
 void
 ksocknal_lib_bind_irq (unsigned int irq)
 {
-#if (defined(CONFIG_SMP) && CPU_AFFINITY)
+#if (defined(CONFIG_SMP) && defined(CPU_AFFINITY))
         int              bind;
         int              cpu;
         char             cmdline[64];
@@ -133,13 +404,13 @@ ksocknal_lib_bind_irq (unsigned int irq)
 
         info = &ksocknal_data.ksnd_irqinfo[irq];
 
-        write_lock_bh (&ksocknal_data.ksnd_global_lock);
+        cfs_write_lock_bh (&ksocknal_data.ksnd_global_lock);
 
         LASSERT (info->ksni_valid);
         bind = !info->ksni_bound;
         info->ksni_bound = 1;
 
-        write_unlock_bh (&ksocknal_data.ksnd_global_lock);
+        cfs_write_unlock_bh (&ksocknal_data.ksnd_global_lock);
 
         if (!bind)                              /* bound already */
                 return;
@@ -149,7 +420,7 @@ ksocknal_lib_bind_irq (unsigned int irq)
                   "echo %d > /proc/irq/%u/smp_affinity", 1 << cpu, irq);
 
         LCONSOLE_INFO("Binding irq %u to CPU %d with cmd: %s\n",
-                     irq, cpu, cmdline);
+                      irq, cpu, cmdline);
 
         /* FIXME: Find a better method of setting IRQ affinity...
          */
@@ -162,8 +433,8 @@ int
 ksocknal_lib_get_conn_addrs (ksock_conn_t *conn)
 {
         int rc = libcfs_sock_getaddr(conn->ksnc_sock, 1,
-                                    &conn->ksnc_ipaddr,
-                                    &conn->ksnc_port);
+                                     &conn->ksnc_ipaddr,
+                                     &conn->ksnc_port);
 
         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
         LASSERT (!conn->ksnc_closing);
@@ -174,7 +445,7 @@ ksocknal_lib_get_conn_addrs (ksock_conn_t *conn)
         }
 
         rc = libcfs_sock_getaddr(conn->ksnc_sock, 0,
-                                &conn->ksnc_myipaddr, NULL);
+                                 &conn->ksnc_myipaddr, NULL);
         if (rc != 0) {
                 CERROR ("Error %d getting sock local IP\n", rc);
                 return rc;
@@ -187,7 +458,7 @@ unsigned int
 ksocknal_lib_sock_irq (struct socket *sock)
 {
         int                irq = 0;
-#if CPU_AFFINITY
+#ifdef CPU_AFFINITY
         struct dst_entry  *dst;
 
         if (!*ksocknal_tunables.ksnd_irq_affinity)
@@ -210,10 +481,13 @@ ksocknal_lib_sock_irq (struct socket *sock)
 }
 
 int
-ksocknal_lib_zc_capable(struct socket *sock)
+ksocknal_lib_zc_capable(ksock_conn_t *conn)
 {
-        int  caps = sock->sk->sk_route_caps;
-        
+        int  caps = conn->ksnc_sock->sk->sk_route_caps;
+
+        if (conn->ksnc_proto == &ksocknal_protocol_v1x)
+                return 0;
+
         /* ZC if the socket supports scatter/gather and doesn't need software
          * checksums */
         return ((caps & NETIF_F_SG) != 0 &&
@@ -242,7 +516,7 @@ ksocknal_lib_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
                 struct iovec   *scratchiov = &scratch;
                 unsigned int    niov = 1;
 #else
-                struct iovec   *scratchiov = conn->ksnc_tx_scratch_iov;
+                struct iovec   *scratchiov = conn->ksnc_scheduler->kss_scratch_iov;
                 unsigned int    niov = tx->tx_niov;
 #endif
                 struct msghdr msg = {
@@ -270,23 +544,25 @@ ksocknal_lib_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
                 rc = sock_sendmsg(sock, &msg, nob);
                 set_fs (oldmm);
         }
-       return rc;
+        return rc;
 }
 
 int
 ksocknal_lib_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
         struct socket *sock = conn->ksnc_sock;
-        lnet_kiov_t    *kiov = tx->tx_kiov;
+        lnet_kiov_t   *kiov = tx->tx_kiov;
         int            rc;
         int            nob;
 
+        /* Not NOOP message */
+        LASSERT (tx->tx_lnetmsg != NULL);
+
         /* NB we can't trust socket ops to either consume our iovs
          * or leave them alone. */
-
-        if (kiov->kiov_len >= *ksocknal_tunables.ksnd_zc_min_frag &&
-            tx->tx_msg.ksm_zc_req_cookie != 0) {
+        if (tx->tx_msg.ksm_zc_cookies[0] != 0) {
                 /* Zero copy is enabled */
+                struct sock   *sk = sock->sk;
                 struct page   *page = kiov->kiov_page;
                 int            offset = kiov->kiov_offset;
                 int            fragsize = kiov->kiov_len;
@@ -299,7 +575,12 @@ ksocknal_lib_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
                     fragsize < tx->tx_resid)
                         msgflg |= MSG_MORE;
 
-                rc = tcp_sendpage(sock, page, offset, fragsize, msgflg);
+                if (sk->sk_prot->sendpage != NULL) {
+                        rc = sk->sk_prot->sendpage(sk, page,
+                                                   offset, fragsize, msgflg);
+                } else {
+                        rc = tcp_sendpage(sock, page, offset, fragsize, msgflg);
+                }
         } else {
 #if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK
                 struct iovec  scratch;
@@ -309,7 +590,7 @@ ksocknal_lib_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
 #ifdef CONFIG_HIGHMEM
 #warning "XXX risk of kmap deadlock on multiple frags..."
 #endif
-                struct iovec *scratchiov = conn->ksnc_tx_scratch_iov;
+                struct iovec *scratchiov = conn->ksnc_scheduler->kss_scratch_iov;
                 unsigned int  niov = tx->tx_nkiov;
 #endif
                 struct msghdr msg = {
@@ -341,7 +622,7 @@ ksocknal_lib_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
                 for (i = 0; i < niov; i++)
                         kunmap(kiov[i].kiov_page);
         }
-       return rc;
+        return rc;
 }
 
 void
@@ -370,7 +651,7 @@ ksocknal_lib_recv_iov (ksock_conn_t *conn)
         struct iovec *scratchiov = &scratch;
         unsigned int  niov = 1;
 #else
-        struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
+        struct iovec *scratchiov = conn->ksnc_scheduler->kss_scratch_iov;
         unsigned int  niov = conn->ksnc_rx_niov;
 #endif
         struct iovec *iov = conn->ksnc_rx_iov;
@@ -420,36 +701,82 @@ ksocknal_lib_recv_iov (ksock_conn_t *conn)
                         fragnob = iov[i].iov_len;
                         if (fragnob > sum)
                                 fragnob = sum;
-                
-                        conn->ksnc_rx_csum = ksocknal_csum(conn->ksnc_rx_csum, 
+
+                        conn->ksnc_rx_csum = ksocknal_csum(conn->ksnc_rx_csum,
                                                            iov[i].iov_base, fragnob);
                 }
                 conn->ksnc_msg.ksm_csum = saved_csum;
         }
 
-       return rc;
+        return rc;
+}
+
+static void
+ksocknal_lib_kiov_vunmap(void *addr)
+{
+        if (addr == NULL)
+                return;
+
+        vunmap(addr);
+}
+
+static void *
+ksocknal_lib_kiov_vmap(lnet_kiov_t *kiov, int niov,
+                       struct iovec *iov, struct page **pages)
+{
+        void             *addr;
+        int               nob;
+        int               i;
+
+        if (!*ksocknal_tunables.ksnd_zc_recv || pages == NULL)
+                return NULL;
+
+        LASSERT (niov <= LNET_MAX_IOV);
+
+        if (niov < 2 ||
+            niov < *ksocknal_tunables.ksnd_zc_recv_min_nfrags)
+                return NULL;
+
+        for (nob = i = 0; i < niov; i++) {
+                if ((kiov[i].kiov_offset != 0 && i > 0) ||
+                    (kiov[i].kiov_offset + kiov[i].kiov_len != CFS_PAGE_SIZE && i < niov - 1))
+                        return NULL;
+
+                pages[i] = kiov[i].kiov_page;
+                nob += kiov[i].kiov_len;
+        }
+
+        addr = vmap(pages, niov, VM_MAP, PAGE_KERNEL);
+        if (addr == NULL)
+                return NULL;
+
+        iov->iov_base = addr + kiov[0].kiov_offset;
+        iov->iov_len = nob;
+
+        return addr;
 }
 
 int
 ksocknal_lib_recv_kiov (ksock_conn_t *conn)
 {
 #if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK
-        struct iovec  scratch;
-        struct iovec *scratchiov = &scratch;
-        unsigned int  niov = 1;
+        struct iovec   scratch;
+        struct iovec  *scratchiov = &scratch;
+        struct page  **pages      = NULL;
+        unsigned int   niov       = 1;
 #else
 #ifdef CONFIG_HIGHMEM
 #warning "XXX risk of kmap deadlock on multiple frags..."
 #endif
-        struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
-        unsigned int  niov = conn->ksnc_rx_nkiov;
+        struct iovec  *scratchiov = conn->ksnc_scheduler->kss_scratch_iov;
+        struct page  **pages      = conn->ksnc_scheduler->kss_rx_scratch_pgs;
+        unsigned int   niov       = conn->ksnc_rx_nkiov;
 #endif
         lnet_kiov_t   *kiov = conn->ksnc_rx_kiov;
         struct msghdr msg = {
                 .msg_name       = NULL,
                 .msg_namelen    = 0,
                 .msg_iov        = scratchiov,
-                .msg_iovlen     = niov,
                 .msg_control    = NULL,
                 .msg_controllen = 0,
                 .msg_flags      = 0
@@ -459,15 +786,25 @@ ksocknal_lib_recv_kiov (ksock_conn_t *conn)
         int          i;
         int          rc;
         void        *base;
+        void        *addr;
         int          sum;
         int          fragnob;
 
         /* NB we can't trust socket ops to either consume our iovs
          * or leave them alone. */
-        for (nob = i = 0; i < niov; i++) {
-                scratchiov[i].iov_base = kmap(kiov[i].kiov_page) + kiov[i].kiov_offset;
-                nob += scratchiov[i].iov_len = kiov[i].kiov_len;
+        if ((addr = ksocknal_lib_kiov_vmap(kiov, niov, scratchiov, pages)) != NULL) {
+                nob = scratchiov[0].iov_len;
+                msg.msg_iovlen = 1;
+
+        } else {
+                for (nob = i = 0; i < niov; i++) {
+                        nob += scratchiov[i].iov_len = kiov[i].kiov_len;
+                        scratchiov[i].iov_base = kmap(kiov[i].kiov_page) +
+                                                 kiov[i].kiov_offset;
+                }
+                msg.msg_iovlen = niov;
         }
+
         LASSERT (nob <= conn->ksnc_rx_nob_wanted);
 
         set_fs (KERNEL_DS);
@@ -487,20 +824,26 @@ ksocknal_lib_recv_kiov (ksock_conn_t *conn)
                         fragnob = kiov[i].kiov_len;
                         if (fragnob > sum)
                                 fragnob = sum;
-                
+
                         conn->ksnc_rx_csum = ksocknal_csum(conn->ksnc_rx_csum,
                                                            base, fragnob);
 
                         kunmap(kiov[i].kiov_page);
                 }
         }
-        for (i = 0; i < niov; i++)
-                kunmap(kiov[i].kiov_page);
 
-       return (rc);
+        if (addr != NULL) {
+                ksocknal_lib_kiov_vunmap(addr);
+        } else {
+                for (i = 0; i < niov; i++)
+                        kunmap(kiov[i].kiov_page);
+        }
+
+        return (rc);
 }
 
-void ksocknal_lib_csum_tx(ksock_tx_t *tx)
+void
+ksocknal_lib_csum_tx(ksock_tx_t *tx)
 {
         int          i;
         __u32        csum;
@@ -553,13 +896,13 @@ ksocknal_lib_get_conn_tunables (ksock_conn_t *conn, int *txmem, int *rxmem, int
                 return (-ESHUTDOWN);
         }
 
-       rc = libcfs_sock_getbuf(sock, txmem, rxmem);
+        rc = libcfs_sock_getbuf(sock, txmem, rxmem);
         if (rc == 0) {
                 len = sizeof(*nagle);
-               set_fs(KERNEL_DS);
+                set_fs(KERNEL_DS);
                 rc = sock->ops->getsockopt(sock, SOL_TCP, TCP_NODELAY,
                                            (char *)nagle, &len);
-               set_fs(oldmm);
+                set_fs(oldmm);
         }
 
         ksocknal_connsock_decref(conn);
@@ -624,20 +967,23 @@ ksocknal_lib_setup_sock (struct socket *sock)
                 }
         }
 
-       rc = libcfs_sock_setbuf(sock,
+        rc = libcfs_sock_setbuf(sock,
                                 *ksocknal_tunables.ksnd_tx_buffer_size,
                                 *ksocknal_tunables.ksnd_rx_buffer_size);
-       if (rc != 0) {
-               CERROR ("Can't set buffer tx %d, rx %d buffers: %d\n",
+        if (rc != 0) {
+                CERROR ("Can't set buffer tx %d, rx %d buffers: %d\n",
                         *ksocknal_tunables.ksnd_tx_buffer_size,
                         *ksocknal_tunables.ksnd_rx_buffer_size, rc);
-               return (rc);
-       }
+                return (rc);
+        }
 
 /* TCP_BACKOFF_* sockopt tunables unsupported in stock kernels */
 #ifdef SOCKNAL_BACKOFF
         if (*ksocknal_tunables.ksnd_backoff_init > 0) {
                 option = *ksocknal_tunables.ksnd_backoff_init;
+#ifdef SOCKNAL_BACKOFF_MS
+                option *= 1000;
+#endif
 
                 set_fs (KERNEL_DS);
                 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_BACKOFF_INIT,
@@ -652,6 +998,9 @@ ksocknal_lib_setup_sock (struct socket *sock)
 
         if (*ksocknal_tunables.ksnd_backoff_max > 0) {
                 option = *ksocknal_tunables.ksnd_backoff_max;
+#ifdef SOCKNAL_BACKOFF_MS
+                option *= 1000;
+#endif
 
                 set_fs (KERNEL_DS);
                 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_BACKOFF_MAX,
@@ -785,16 +1134,16 @@ ksocknal_data_ready (struct sock *sk, int n)
 
         /* interleave correctly with closing sockets... */
         LASSERT(!in_irq());
-        read_lock (&ksocknal_data.ksnd_global_lock);
+        cfs_read_lock (&ksocknal_data.ksnd_global_lock);
 
         conn = sk->sk_user_data;
         if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
                 sk->sk_data_ready (sk, n);
         } else
-               ksocknal_read_callback(conn);
+                ksocknal_read_callback(conn);
 
-        read_unlock (&ksocknal_data.ksnd_global_lock);
+        cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
 
         EXIT;
 }
@@ -808,7 +1157,7 @@ ksocknal_write_space (struct sock *sk)
 
         /* interleave correctly with closing sockets... */
         LASSERT(!in_irq());
-        read_lock (&ksocknal_data.ksnd_global_lock);
+        cfs_read_lock (&ksocknal_data.ksnd_global_lock);
 
         conn = sk->sk_user_data;
         wspace = SOCKNAL_WSPACE(sk);
@@ -827,53 +1176,113 @@ ksocknal_write_space (struct sock *sk)
                 LASSERT (sk->sk_write_space != &ksocknal_write_space);
                 sk->sk_write_space (sk);
 
-                read_unlock (&ksocknal_data.ksnd_global_lock);
+                cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
                 return;
         }
 
         if (wspace >= min_wpace) {              /* got enough space */
-               ksocknal_write_callback(conn);
+                ksocknal_write_callback(conn);
 
-               /* Clear SOCK_NOSPACE _after_ ksocknal_write_callback so the
-                * ENOMEM check in ksocknal_transmit is race-free (think about
-                * it). */
+                /* Clear SOCK_NOSPACE _after_ ksocknal_write_callback so the
+                 * ENOMEM check in ksocknal_transmit is race-free (think about
+                 * it). */
 
                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
         }
 
-        read_unlock (&ksocknal_data.ksnd_global_lock);
+        cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
 }
 
 void
 ksocknal_lib_save_callback(struct socket *sock, ksock_conn_t *conn)
 {
-       conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
-       conn->ksnc_saved_write_space = sock->sk->sk_write_space;
+        conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
+        conn->ksnc_saved_write_space = sock->sk->sk_write_space;
 }
 
 void
 ksocknal_lib_set_callback(struct socket *sock,  ksock_conn_t *conn)
 {
-       sock->sk->sk_user_data = conn;
-       sock->sk->sk_data_ready = ksocknal_data_ready;
-       sock->sk->sk_write_space = ksocknal_write_space;
-       return;
+        sock->sk->sk_user_data = conn;
+        sock->sk->sk_data_ready = ksocknal_data_ready;
+        sock->sk->sk_write_space = ksocknal_write_space;
+        return;
 }
 
 void
 ksocknal_lib_reset_callback(struct socket *sock, ksock_conn_t *conn)
 {
-       /* Remove conn's network callbacks.
-        * NB I _have_ to restore the callback, rather than storing a noop,
-        * since the socket could survive past this module being unloaded!! */
-       sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
-       sock->sk->sk_write_space = conn->ksnc_saved_write_space;
-
-       /* A callback could be in progress already; they hold a read lock
-        * on ksnd_global_lock (to serialise with me) and NOOP if
-        * sk_user_data is NULL. */
-       sock->sk->sk_user_data = NULL;
-
-       return ;
+        /* Remove conn's network callbacks.
+         * NB I _have_ to restore the callback, rather than storing a noop,
+         * since the socket could survive past this module being unloaded!! */
+        sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
+        sock->sk->sk_write_space = conn->ksnc_saved_write_space;
+
+        /* A callback could be in progress already; they hold a read lock
+         * on ksnd_global_lock (to serialise with me) and NOOP if
+         * sk_user_data is NULL. */
+        sock->sk->sk_user_data = NULL;
+
+        return ;
+}
+
+int
+ksocknal_lib_memory_pressure(ksock_conn_t *conn)
+{
+        int            rc = 0;
+        ksock_sched_t *sched;
+        
+        sched = conn->ksnc_scheduler;
+        cfs_spin_lock_bh (&sched->kss_lock);
+        
+        if (!SOCK_TEST_NOSPACE(conn->ksnc_sock) &&
+            !conn->ksnc_tx_ready) {
+                /* SOCK_NOSPACE is set when the socket fills
+                 * and cleared in the write_space callback
+                 * (which also sets ksnc_tx_ready).  If
+                 * SOCK_NOSPACE and ksnc_tx_ready are BOTH
+                 * zero, I didn't fill the socket and
+                 * write_space won't reschedule me, so I
+                 * return -ENOMEM to get my caller to retry
+                 * after a timeout */
+                rc = -ENOMEM;
+        }
+        
+        cfs_spin_unlock_bh (&sched->kss_lock);
+
+        return rc;
 }
 
+__u64
+ksocknal_lib_new_incarnation(void)
+{
+        struct timeval tv;
+
+        /* The incarnation number is the time this module loaded and it
+         * identifies this particular instance of the socknal.  Hopefully
+         * we won't be able to reboot more frequently than 1MHz for the
+         * forseeable future :) */
+
+        do_gettimeofday(&tv);
+
+        return (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
+}
+
+int
+ksocknal_lib_bind_thread_to_cpu(int id)
+{
+#if defined(CONFIG_SMP) && defined(CPU_AFFINITY)
+        id = ksocknal_sched2cpu(id);
+        if (cpu_online(id)) {
+                cpumask_t m = CPU_MASK_NONE;
+                cpu_set(id, m);
+                set_cpus_allowed(current, m);
+                return 0;
+        }
+
+        return -1;
+
+#else
+        return 0;
+#endif
+}