Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_lib-winnt.c
index 43d16da..9631483 100755 (executable)
 
 #include "socklnd.h"
 
-# if CONFIG_SYSCTL && !CFS_SYSFS_MODULE_PARM
-static ctl_table ksocknal_ctl_table[18];
+# if defined(CONFIG_SYSCTL) && !CFS_SYSFS_MODULE_PARM
+static cfs_sysctl_table_t ksocknal_ctl_table[21];
 
-ctl_table ksocknal_top_ctl_table[] = {
-        {200, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
+cfs_sysctl_table_t ksocknal_top_ctl_table[] = {
+        {
+                /* ctl_name */  200,
+                /* procname */  "socknal",
+                /* data     */  NULL,
+                /* maxlen   */  0,
+                /* mode     */  0555,
+                /* child    */  ksocknal_ctl_table
+        },
         { 0 }
 };
 
 int
-ksocknal_lib_tunables_init () 
+ksocknal_lib_tunables_init ()
 {
-           int    i = 0;
-           int    j = 1;
-       
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "timeout", ksocknal_tunables.ksnd_timeout, 
-                sizeof (int), 0644, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "credits", ksocknal_tunables.ksnd_credits, 
-                sizeof (int), 0444, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "peer_credits", ksocknal_tunables.ksnd_peercredits, 
-                sizeof (int), 0444, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "nconnds", ksocknal_tunables.ksnd_nconnds, 
-                sizeof (int), 0444, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "min_reconnectms", ksocknal_tunables.ksnd_min_reconnectms, 
-                sizeof (int), 0444, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "max_reconnectms", ksocknal_tunables.ksnd_max_reconnectms, 
-                sizeof (int), 0444, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "eager_ack", ksocknal_tunables.ksnd_eager_ack, 
-                sizeof (int), 0644, NULL, &proc_dointvec};
-#if SOCKNAL_ZC
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "zero_copy", ksocknal_tunables.ksnd_zc_min_frag, 
-                sizeof (int), 0644, NULL, &proc_dointvec};
-#endif
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "typed", ksocknal_tunables.ksnd_typed_conns, 
-                sizeof (int), 0444, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "min_bulk", ksocknal_tunables.ksnd_min_bulk, 
-                sizeof (int), 0644, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "buffer_size", ksocknal_tunables.ksnd_buffer_size, 
-                sizeof(int), 0644, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "nagle", ksocknal_tunables.ksnd_nagle, 
-                sizeof(int), 0644, NULL, &proc_dointvec};
+        int    i = 0;
+        int    j = 1;
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "timeout";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_timeout;
+        ksocknal_ctl_table[i].maxlen   = sizeof (int);
+        ksocknal_ctl_table[i].mode     = 0644;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "credits";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_credits;
+        ksocknal_ctl_table[i].maxlen   = sizeof (int);
+        ksocknal_ctl_table[i].mode     = 0444;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "peer_credits";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_peercredits;
+        ksocknal_ctl_table[i].maxlen   = sizeof (int);
+        ksocknal_ctl_table[i].mode     = 0444;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "nconnds";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_nconnds;
+        ksocknal_ctl_table[i].maxlen   = sizeof (int);
+        ksocknal_ctl_table[i].mode     = 0444;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "min_reconnectms";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_min_reconnectms;
+        ksocknal_ctl_table[i].maxlen   = sizeof (int);
+        ksocknal_ctl_table[i].mode     = 0444;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "max_reconnectms";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_max_reconnectms;
+        ksocknal_ctl_table[i].maxlen   = sizeof (int);
+        ksocknal_ctl_table[i].mode     = 0444;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "eager_ack";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_eager_ack;
+        ksocknal_ctl_table[i].maxlen   = sizeof (int);
+        ksocknal_ctl_table[i].mode     = 0644;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "zero_copy";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_zc_min_payload;
+        ksocknal_ctl_table[i].maxlen   = sizeof (int);
+        ksocknal_ctl_table[i].mode     = 0644;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "typed";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_typed_conns;
+        ksocknal_ctl_table[i].maxlen   = sizeof (int);
+        ksocknal_ctl_table[i].mode     = 0444;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "min_bulk";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_min_bulk;
+        ksocknal_ctl_table[i].maxlen   = sizeof (int);
+        ksocknal_ctl_table[i].mode     = 0644;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "rx_buffer_size";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_rx_buffer_size;
+        ksocknal_ctl_table[i].maxlen   = sizeof(int);
+        ksocknal_ctl_table[i].mode     = 0644;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "tx_buffer_size";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_tx_buffer_size;
+        ksocknal_ctl_table[i].maxlen   = sizeof(int);
+        ksocknal_ctl_table[i].mode     = 0644;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "nagle";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_nagle;
+        ksocknal_ctl_table[i].maxlen   = sizeof(int);
+        ksocknal_ctl_table[i].mode     = 0644;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "round_robin";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_round_robin;
+        ksocknal_ctl_table[i].maxlen   = sizeof(int);
+        ksocknal_ctl_table[i].mode     = 0644;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
 #ifdef CPU_AFFINITY
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "irq_affinity", ksocknal_tunables.ksnd_irq_affinity, 
-                sizeof(int), 0644, NULL, &proc_dointvec};
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "irq_affinity";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_irq_affinity;
+        ksocknal_ctl_table[i].maxlen   = sizeof(int);
+        ksocknal_ctl_table[i].mode     = 0644;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+#endif
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "keepalive_idle";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_keepalive_idle;
+        ksocknal_ctl_table[i].maxlen   = sizeof(int);
+        ksocknal_ctl_table[i].mode     = 0644;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "keepalive_count";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_keepalive_count;
+        ksocknal_ctl_table[i].maxlen   = sizeof(int);
+        ksocknal_ctl_table[i].mode     = 0644;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "keepalive_intvl";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_keepalive_intvl;
+        ksocknal_ctl_table[i].maxlen   = sizeof(int);
+        ksocknal_ctl_table[i].mode     = 0644;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+#ifdef SOCKNAL_BACKOFF
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "backoff_init";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_backoff_init;
+        ksocknal_ctl_table[i].maxlen   = sizeof(int);
+        ksocknal_ctl_table[i].mode     = 0644;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "backoff_max";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_backoff_max;
+        ksocknal_ctl_table[i].maxlen   = sizeof(int);
+        ksocknal_ctl_table[i].mode     = 0644;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
 #endif
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "keepalive_idle", ksocknal_tunables.ksnd_keepalive_idle, 
-                sizeof(int), 0644, NULL, &proc_dointvec};
-        ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "keepalive_count", ksocknal_tunables.ksnd_keepalive_count, 
-                sizeof(int), 0644, NULL, &proc_dointvec};
-       ksocknal_ctl_table[i++] = (ctl_table)
-               {j++, "keepalive_intvl", ksocknal_tunables.ksnd_keepalive_intvl, 
-                sizeof(int), 0644, NULL, &proc_dointvec};
-
-       LASSERT (j == i+1);
-       LASSERT (i < sizeof(ksocknal_ctl_table)/sizeof(ksocknal_ctl_table[0]));
+
+#if SOCKNAL_VERSION_DEBUG
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "protocol";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_protocol;
+        ksocknal_ctl_table[i].maxlen   = sizeof(int);
+        ksocknal_ctl_table[i].mode     = 0644;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+#endif
+
+        LASSERT (j == i + 1);
+        LASSERT (i <= sizeof(ksocknal_ctl_table)/sizeof(ksocknal_ctl_table[0]));
 
         ksocknal_tunables.ksnd_sysctl =
-                register_sysctl_table(ksocknal_top_ctl_table, 0);
+                cfs_register_sysctl_table(ksocknal_top_ctl_table, 0);
 
         if (ksocknal_tunables.ksnd_sysctl == NULL)
-               CWARN("Can't setup /proc tunables\n");
+                CWARN("Can't setup /proc tunables\n");
 
-       return 0;
+        return 0;
 }
 
 void
-ksocknal_lib_tunables_fini () 
+ksocknal_lib_tunables_fini ()
 {
         if (ksocknal_tunables.ksnd_sysctl != NULL)
-                unregister_sysctl_table(ksocknal_tunables.ksnd_sysctl);        
+                cfs_unregister_sysctl_table(ksocknal_tunables.ksnd_sysctl);
 }
 #else
 int
-ksocknal_lib_tunables_init () 
+ksocknal_lib_tunables_init ()
 {
-       return 0;
+        return 0;
 }
 
-void 
+void
 ksocknal_lib_tunables_fini ()
 {
 }
-#endif
+#endif /* # if CONFIG_SYSCTL && !CFS_SYSFS_MODULE_PARM */
 
 void
 ksocknal_lib_bind_irq (unsigned int irq)
@@ -147,7 +276,8 @@ int
 ksocknal_lib_get_conn_addrs (ksock_conn_t *conn)
 {
         int rc = libcfs_sock_getaddr(conn->ksnc_sock, 1,
-                                    &conn->ksnc_ipaddr, &conn->ksnc_port);
+                                     &conn->ksnc_ipaddr,
+                                     &conn->ksnc_port);
 
         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
         LASSERT (!conn->ksnc_closing);
@@ -170,241 +300,33 @@ ksocknal_lib_get_conn_addrs (ksock_conn_t *conn)
 unsigned int
 ksocknal_lib_sock_irq (struct socket *sock)
 {
-    return 0;
-}
-
-#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
-static struct page *
-ksocknal_kvaddr_to_page (unsigned long vaddr)
-{
-        struct page *page;
-
-        if (vaddr >= VMALLOC_START &&
-            vaddr < VMALLOC_END)
-                page = vmalloc_to_page ((void *)vaddr);
-#ifdef CONFIG_HIGHMEM
-        else if (vaddr >= PKMAP_BASE &&
-                 vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
-                page = vmalloc_to_page ((void *)vaddr);
-                /* in 2.4 ^ just walks the page tables */
-#endif
-        else
-                page = virt_to_page (vaddr);
-
-        if (page == NULL ||
-            !VALID_PAGE (page))
-                return (NULL);
-
-        return (page);
-}
-#endif
-
-/*
- * ks_lock_iovs
- *   Lock the i/o vector buffers into MDL structure
- *
- * Arguments:
- *   iov:  the array of i/o vectors
- *   niov: number of i/o vectors to be locked
- *   len:  the real length of the iov vectors
- *
- * Return Value:
- *   ksock_mdl_t *: the Mdl of the locked buffers or
- *         NULL pointer in failure case
- *
- * Notes: 
- *   N/A
- */
-
-ksock_mdl_t *
-ks_lock_iovs(
-    IN struct iovec  *iov,
-    IN int            niov,
-    IN int            recving,
-    IN int *          len )
-{
-    int             rc = 0;
-
-    int             i = 0;
-    int             total = 0;
-    ksock_mdl_t *   mdl = NULL;
-    ksock_mdl_t *   tail = NULL;
-
-    LASSERT(iov != NULL);
-    LASSERT(niov > 0);
-    LASSERT(len != NULL);
-
-    for (i=0; i < niov; i++) {
-
-        ksock_mdl_t * Iovec = NULL;
-            
-        rc = ks_lock_buffer(
-                iov[i].iov_base,
-                FALSE,
-                iov[i].iov_len,
-                recving ? IoWriteAccess : IoReadAccess,
-                &Iovec );
-
-        if (rc < 0) {
-            break;
-        }
-
-        if (tail) {
-            tail->Next = Iovec;
-        } else {
-            mdl = Iovec;
-        }
-
-        tail = Iovec;
-
-        total +=iov[i].iov_len;
-    }
-
-    if (rc >= 0) {
-        *len = total;
-    } else {
-        if (mdl) {
-            ks_release_mdl(mdl, FALSE);
-            mdl = NULL;
-        }
-    }
-
-    return mdl;
-}
-
-/*
- * ks_lock_kiovs
- *   Lock the kiov pages into MDL structure
- *
- * Arguments:
- *   kiov:  the array of kiov pages
- *   niov:  number of kiov to be locked
- *   len:   the real length of the kiov arrary
- *
- * Return Value:
- *   PMDL: the Mdl of the locked buffers or NULL
- *         pointer in failure case
- *
- * Notes: 
- *   N/A
- */
-ksock_mdl_t *
-ks_lock_kiovs(
-    IN lnet_kiov_t *  kiov,
-    IN int            nkiov,
-    IN int            recving,
-    IN int *          len )
-{
-    int             rc = 0;
-    int             i = 0;
-    int             total = 0;
-    ksock_mdl_t *   mdl = NULL;
-    ksock_mdl_t *   tail = NULL;
-
-    LASSERT(kiov != NULL);
-    LASSERT(nkiov > 0);
-    LASSERT(len != NULL);
-
-    for (i=0; i < nkiov; i++) {
-
-        ksock_mdl_t *        Iovec = NULL;
-
-
-        //
-        //  Lock the kiov page into Iovec ¡­
-        //
-
-        rc = ks_lock_buffer(
-                (PUCHAR)kiov[i].kiov_page->addr + 
-                     kiov[i].kiov_offset,
-                FALSE,
-                kiov[i].kiov_len,
-                recving ? IoWriteAccess : IoReadAccess,
-                &Iovec
-            );
-
-        if (rc < 0) {
-            break;
-        }
-
-        //
-        // Attach the Iovec to the mdl chain
-        //
-
-        if (tail) {
-            tail->Next = Iovec;
-        } else {
-            mdl = Iovec;
-        }
-
-        tail = Iovec;
-
-        total += kiov[i].kiov_len;
-
-    }
-
-    if (rc >= 0) {
-        *len = total;
-    } else {
-        if (mdl) {
-            ks_release_mdl(mdl, FALSE);
-            mdl = NULL;
-        }
-    }
-
-    return mdl;
+        return 0;
 }
 
-
 int
 ksocknal_lib_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
         struct socket *sock = conn->ksnc_sock;
-#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
-        unsigned long  vaddr = (unsigned long)iov->iov_base
-        int            offset = vaddr & (PAGE_SIZE - 1);
-        int            zcsize = MIN (iov->iov_len, PAGE_SIZE - offset);
-        struct page   *page;
-#endif
+
         int            nob;
         int            rc;
-        ksock_mdl_t *  mdl;
+        int            flags;
 
-        /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone. */
 
-#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
-        if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
-            (sock->sk->sk_route_caps & NETIF_F_SG) &&
-            (sock->sk->sk_route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
-            (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
-                int msgflg = MSG_DONTWAIT;
+        if (*ksocknal_tunables.ksnd_enable_csum        && /* checksum enabled */
+            conn->ksnc_proto == &ksocknal_protocol_v2x && /* V2.x connection  */
+            tx->tx_nob == tx->tx_resid                 && /* frist sending    */
+            tx->tx_msg.ksm_csum == 0)                     /* not checksummed  */
+                ksocknal_lib_csum_tx(tx);
 
-                CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
-                       (void *)vaddr, page, page_address(page), offset, zcsize);
+        nob = ks_query_iovs_length(tx->tx_iov, tx->tx_niov);
+        flags = (!list_empty (&conn->ksnc_tx_queue) || nob < tx->tx_resid) ? 
+                (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT;
+        rc = ks_send_iovs(sock, tx->tx_iov, tx->tx_niov, flags, 0);
 
-                if (!list_empty (&conn->ksnc_tx_queue) ||
-                    zcsize < tx->tx_resid)
-                        msgflg |= MSG_MORE;
-
-                rc = tcp_sendpage_zccd(sock, page, offset, zcsize, msgflg, &tx->tx_zccd);
-        } else
-#endif
-        {
-                /* lock the whole tx iovs into a single mdl chain */
-                mdl = ks_lock_iovs(tx->tx_iov, tx->tx_niov, FALSE, &nob);
-
-                if (mdl) {
-                        /* send the total mdl chain */
-                        rc = ks_send_mdl( conn->ksnc_sock, tx, mdl, nob, 
-                                    (!list_empty (&conn->ksnc_tx_queue) || nob < tx->tx_resid) ? 
-                                    (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT);
-                } else {
-                        rc = -ENOMEM;
-                }
-        }
-
-           return rc;
+        KsPrint((4, "ksocknal_lib_send_iov: conn %p sock %p rc %d\n",
+                     conn, sock, rc));
+        return rc;
 }
 
 int
@@ -414,70 +336,63 @@ ksocknal_lib_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
         lnet_kiov_t    *kiov = tx->tx_kiov;
         int            rc;
         int            nob;
-        ksock_mdl_t *  mdl;
+        int            nkiov;
+        int            flags;
 
-        /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone. */
-
-#if SOCKNAL_ZC
-        if (kiov->kiov_len >= *ksocknal_tunables.ksnd_zc_min_frag &&
-            (sock->sk->sk_route_caps & NETIF_F_SG) &&
-            (sock->sk->sk_route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
-                struct page   *page = kiov->kiov_page;
-                int            offset = kiov->kiov_offset;
-                int            fragsize = kiov->kiov_len;
-                int            msgflg = MSG_DONTWAIT;
-
-                CDEBUG(D_NET, "page %p + offset %x for %d\n",
-                               page, offset, kiov->kiov_len);
-
-                if (!list_empty(&conn->ksnc_tx_queue) ||
-                    fragsize < tx->tx_resid)
-                        msgflg |= MSG_MORE;
-
-                rc = tcp_sendpage_zccd(sock, page, offset, fragsize, msgflg,
-                                       &tx->tx_zccd);
-        } else
-#endif
-        {
-                /* lock the whole tx kiovs into a single mdl chain */
-                mdl = ks_lock_kiovs(tx->tx_kiov, tx->tx_nkiov, FALSE, &nob);
-
-                if (mdl) {
-                        /* send the total mdl chain */
-                        rc = ks_send_mdl(
-                                    conn->ksnc_sock, tx, mdl, nob,
-                                    (!list_empty(&conn->ksnc_tx_queue) || nob < tx->tx_resid) ?
-                                    (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT);
-                } else {
-                        rc = -ENOMEM;
-                }
-        }
+        nkiov = tx->tx_nkiov;
+        nob = ks_query_kiovs_length(tx->tx_kiov, nkiov);
+        flags = (!list_empty (&conn->ksnc_tx_queue) || nob < tx->tx_resid) ? 
+                (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT;
+        rc = ks_send_kiovs(sock, tx->tx_kiov, nkiov, flags, 0);
 
-           return rc;
+        KsPrint((4, "ksocknal_lib_send_kiov: conn %p sock %p rc %d\n",
+                    conn, sock, rc));
+        return rc;
 }
 
-
 int
 ksocknal_lib_recv_iov (ksock_conn_t *conn)
 {
         struct iovec *iov = conn->ksnc_rx_iov;
         int           rc;
         int           size;
-        ksock_mdl_t * mdl;
 
-        /* lock the whole tx iovs into a single mdl chain */
-        mdl = ks_lock_iovs(iov, conn->ksnc_rx_niov, TRUE, &size);
+        /* receive payload from tsdu queue */
+        rc = ks_recv_iovs (conn->ksnc_sock, iov, conn->ksnc_rx_niov,
+                           MSG_DONTWAIT, 0);
 
-        if (!mdl) {
-            return (-ENOMEM);
-        }
-        
-        LASSERT (size <= conn->ksnc_rx_nob_wanted);
+        /* calcuate package checksum */
+        if (rc > 0) {
+
+                int     i;
+                int     fragnob;
+                int     sum;
+                __u32   saved_csum = 0;
+
+                if (conn->ksnc_proto == &ksocknal_protocol_v2x) {
+                        saved_csum = conn->ksnc_msg.ksm_csum;
+                        conn->ksnc_msg.ksm_csum = 0;
+                }
 
-        /* try to request data for the whole mdl chain */
-        rc = ks_recv_mdl (conn->ksnc_sock, mdl, size, MSG_DONTWAIT);
+                if (saved_csum != 0) {
 
+                        /* accumulate checksum */
+                        for (i = 0, sum = rc; sum > 0; i++, sum -= fragnob) {
+                                LASSERT (i < conn->ksnc_rx_niov);
+
+                                fragnob = iov[i].iov_len;
+                                if (fragnob > sum)
+                                        fragnob = sum;
+
+                                conn->ksnc_rx_csum = ksocknal_csum(conn->ksnc_rx_csum,
+                                                                   iov[i].iov_base, fragnob);
+                        }
+                        conn->ksnc_msg.ksm_csum = saved_csum;
+                }
+        }
+
+        KsPrint((4, "ksocknal_lib_recv_iov: conn %p sock %p rc %d.\n",
+                    conn, conn->ksnc_sock, rc));
         return rc;
 }
 
@@ -485,27 +400,39 @@ int
 ksocknal_lib_recv_kiov (ksock_conn_t *conn)
 {
         lnet_kiov_t  *kiov = conn->ksnc_rx_kiov;
-        int           size;
         int           rc;
-        ksock_mdl_t * mdl;
 
         /* NB we can't trust socket ops to either consume our iovs
          * or leave them alone, so we only receive 1 frag at a time. */
         LASSERT (conn->ksnc_rx_nkiov > 0);
 
-        /* lock the whole tx kiovs into a single mdl chain */
-        mdl = ks_lock_kiovs(kiov, conn->ksnc_rx_nkiov, TRUE, &size);
+        /* receive payload from tsdu queue */
+        rc = ks_recv_kiovs (conn->ksnc_sock, kiov, conn->ksnc_rx_nkiov,
+                            MSG_DONTWAIT, 0);
 
-        if (!mdl) {
-            rc = -ENOMEM;
-            return (rc);
-        }
-        
-        LASSERT (size <= conn->ksnc_rx_nob_wanted);
+        if (rc > 0 && conn->ksnc_msg.ksm_csum != 0) {
+
+                int          i;
+                char        *base;
+                int          sum;
+                int          fragnob;
+
+                for (i = 0, sum = rc; sum > 0; i++, sum -= fragnob) {
 
-        /* try to request data for the whole mdl chain */
-        rc = ks_recv_mdl (conn->ksnc_sock, mdl, size, MSG_DONTWAIT);
+                        LASSERT (i < conn->ksnc_rx_nkiov);
 
+                        base = (char *)(kiov[i].kiov_page->addr) + kiov[i].kiov_offset;
+                        fragnob = kiov[i].kiov_len;
+                        if (fragnob > sum)
+                                fragnob = sum;
+
+                        conn->ksnc_rx_csum = ksocknal_csum(conn->ksnc_rx_csum,
+                                                           base, fragnob);
+                }
+        }
+
+        KsPrint((4, "ksocknal_lib_recv_kiov: conn %p sock %p rc %d.\n",
+                    conn, conn->ksnc_sock, rc));
         return rc;
 }
 
@@ -526,23 +453,17 @@ ksocknal_lib_eager_ack (ksock_conn_t *conn)
 int
 ksocknal_lib_get_conn_tunables (ksock_conn_t *conn, int *txmem, int *rxmem, int *nagle)
 {
-        ksock_tconn_t * tconn = conn->ksnc_sock;
+        ks_tconn_t *    tconn = conn->ksnc_sock;
         int             len;
         int             rc;
 
         ks_get_tconn (tconn);
-        
         *txmem = *rxmem = 0;
-
         len = sizeof(*nagle);
-
-        rc = ks_get_tcp_option(
-                    tconn, TCP_SOCKET_NODELAY,
-                    (__u32 *)nagle, &len);
-
+        rc = ks_get_tcp_option(tconn, TCP_SOCKET_NODELAY, (__u32 *)nagle, &len);
         ks_put_tconn (tconn);
 
-        printk("ksocknal_get_conn_tunables: nodelay = %d rc = %d\n", *nagle, rc);
+        KsPrint((2, "ksocknal_get_conn_tunables: nodelay = %d rc = %d\n", *nagle, rc));
 
         if (rc == 0)
                 *nagle = !*nagle;
@@ -553,20 +474,6 @@ ksocknal_lib_get_conn_tunables (ksock_conn_t *conn, int *txmem, int *rxmem, int
 }
 
 int
-ksocknal_lib_buffersize (int current_sz, int tunable_sz)
-{
-           /* ensure >= SOCKNAL_MIN_BUFFER */
-           if (current_sz < SOCKNAL_MIN_BUFFER)
-                       return MAX(SOCKNAL_MIN_BUFFER, tunable_sz);
-
-           if (tunable_sz > SOCKNAL_MIN_BUFFER)
-                       return tunable_sz;
-       
-           /* leave alone */
-           return 0;
-}
-
-int
 ksocknal_lib_setup_sock (struct socket *sock)
 {
         int             rc;
@@ -578,9 +485,8 @@ ksocknal_lib_setup_sock (struct socket *sock)
 
         __u32           option;
 
-        /* set the window size */
-
 #if 0
+        /* set the window size */
         tconn->kstc_snd_wnd = ksocknal_tunables.ksnd_buffer_size;
         tconn->kstc_rcv_wnd = ksocknal_tunables.ksnd_buffer_size;
 #endif
@@ -593,7 +499,7 @@ ksocknal_lib_setup_sock (struct socket *sock)
                             sock, TCP_SOCKET_NODELAY,
                             &option, sizeof (option));
                 if (rc != 0) {
-                        printk ("Can't disable nagle: %d\n", rc);
+                        CERROR ("Can't disable nagle: %d\n", rc);
                         return (rc);
                 }
         }
@@ -621,7 +527,7 @@ ksocknal_lib_setup_sock (struct socket *sock)
 void
 ksocknal_lib_push_conn (ksock_conn_t *conn)
 {
-        ksock_tconn_t * tconn;
+        ks_tconn_t *    tconn;
         __u32           nagle;
         __u32           val = 1;
         int             rc;
@@ -632,12 +538,12 @@ ksocknal_lib_push_conn (ksock_conn_t *conn)
 
         spin_lock(&tconn->kstc_lock);
         if (tconn->kstc_type == kstt_sender) {
-            nagle = tconn->sender.kstc_info.nagle;
-            tconn->sender.kstc_info.nagle = 0;
+                nagle = tconn->sender.kstc_info.nagle;
+                tconn->sender.kstc_info.nagle = 0;
         } else {
-            LASSERT(tconn->kstc_type == kstt_child);
-            nagle = tconn->child.kstc_info.nagle;
-            tconn->child.kstc_info.nagle = 0;
+                LASSERT(tconn->kstc_type == kstt_child);
+                nagle = tconn->child.kstc_info.nagle;
+                tconn->child.kstc_info.nagle = 0;
         }
 
         spin_unlock(&tconn->kstc_lock);
@@ -654,185 +560,78 @@ ksocknal_lib_push_conn (ksock_conn_t *conn)
         spin_lock(&tconn->kstc_lock);
 
         if (tconn->kstc_type == kstt_sender) {
-            tconn->sender.kstc_info.nagle = nagle;
+                tconn->sender.kstc_info.nagle = nagle;
         } else {
-            LASSERT(tconn->kstc_type == kstt_child);
-            tconn->child.kstc_info.nagle = nagle;
+                LASSERT(tconn->kstc_type == kstt_child);
+                tconn->child.kstc_info.nagle = nagle;
         }
         spin_unlock(&tconn->kstc_lock);
-
         ks_put_tconn(tconn);
 }
 
-/* @mode: 0: receiving mode / 1: sending mode */
 void
-ksocknal_sched_conn (ksock_conn_t *conn, int mode, ksock_tx_t *tx)
+ksocknal_lib_csum_tx(ksock_tx_t *tx)
 {
-        int             flags;
-        ksock_sched_t * sched;
-        ENTRY;
-
-        /* interleave correctly with closing sockets... */
-        read_lock (&ksocknal_data.ksnd_global_lock);
-
-        sched = conn->ksnc_scheduler;
+        int          i;
+        __u32        csum;
+        void        *base;
 
-        spin_lock_irqsave (&sched->kss_lock, flags);
+        LASSERT(tx->tx_iov[0].iov_base == (void *)&tx->tx_msg);
+        LASSERT(tx->tx_conn != NULL);
+        LASSERT(tx->tx_conn->ksnc_proto == &ksocknal_protocol_v2x);
 
-        if (mode) { /* transmission can continue ... */ 
+        tx->tx_msg.ksm_csum = 0;
 
-#error "This is out of date - we should be calling ksocknal_write_callback()"
-                conn->ksnc_tx_ready = 1;
+        csum = ksocknal_csum(~0, (void *)tx->tx_iov[0].iov_base,
+                             tx->tx_iov[0].iov_len);
 
-                if (tx) {
-                    /* Incomplete send: place tx on HEAD of tx_queue */
-                    list_add (&tx->tx_list, &conn->ksnc_tx_queue);
-                }
-
-                if ( !conn->ksnc_tx_scheduled &&
-                     !list_empty(&conn->ksnc_tx_queue)) {  //packets to send
-                        list_add_tail (&conn->ksnc_tx_list,
-                                       &sched->kss_tx_conns);
-                        conn->ksnc_tx_scheduled = 1;
-                        /* extra ref for scheduler */
-                        atomic_inc (&conn->ksnc_conn_refcount);
-
-                        cfs_waitq_signal (&sched->kss_waitq);
-                }
-        } else {    /* receiving can continue ... */
-
-                conn->ksnc_rx_ready = 1;
+        if (tx->tx_kiov != NULL) {
+                for (i = 0; i < tx->tx_nkiov; i++) {
+                        base = (PUCHAR)(tx->tx_kiov[i].kiov_page->addr) +
+                               tx->tx_kiov[i].kiov_offset;
 
-                if ( !conn->ksnc_rx_scheduled) {  /* not being progressed */
-                        list_add_tail(&conn->ksnc_rx_list,
-                                      &sched->kss_rx_conns);
-                        conn->ksnc_rx_scheduled = 1;
-                        /* extra ref for scheduler */
-                        atomic_inc (&conn->ksnc_conn_refcount);
-
-                        cfs_waitq_signal (&sched->kss_waitq);
+                        csum = ksocknal_csum(csum, base, tx->tx_kiov[i].kiov_len);
                 }
+        } else {
+                for (i = 1; i < tx->tx_niov; i++)
+                        csum = ksocknal_csum(csum, tx->tx_iov[i].iov_base,
+                                             tx->tx_iov[i].iov_len);
         }
 
-        spin_unlock_irqrestore (&sched->kss_lock, flags);
-        read_unlock (&ksocknal_data.ksnd_global_lock);
+        if (*ksocknal_tunables.ksnd_inject_csum_error) {
+                csum++;
+                *ksocknal_tunables.ksnd_inject_csum_error = 0;
+        }
 
-        EXIT;
+        tx->tx_msg.ksm_csum = csum;
 }
 
-void ksocknal_schedule_callback(struct socket*sock, int mode, void * tx, ulong_ptr bytes)
+void ksocknal_schedule_callback(struct socket*sock, int mode)
 {
-    ksock_conn_t * conn = (ksock_conn_t *) sock->kstc_conn;
+        ksock_conn_t * conn = (ksock_conn_t *) sock->kstc_conn;
 
-    if (mode) {
-        ksocknal_sched_conn(conn, mode, tx);
-    } else {
-        if ( CAN_BE_SCHED(bytes, (ulong_ptr)conn->ksnc_rx_nob_wanted )) {
-            ksocknal_sched_conn(conn, mode, tx);
+        read_lock (&ksocknal_data.ksnd_global_lock);
+        if (mode) {
+                ksocknal_write_callback(conn);
+        } else {
+                ksocknal_read_callback(conn);
         }
-    }
+        read_unlock (&ksocknal_data.ksnd_global_lock);
 }
 
-extern void
-ksocknal_tx_launched (ksock_tx_t *tx);
-
 void
-ksocknal_fini_sending(ksock_tcpx_fini_t *tcpx)
-{
-    ksocknal_tx_launched(tcpx->tx);
-    cfs_free(tcpx);
-}
-
-void *
-ksocknal_update_tx(
-    struct socket*  tconn,
-    void *          txp,
-    ulong_ptr       rc
-    )
+ksocknal_tx_fini_callback(ksock_conn_t * conn, ksock_tx_t * tx)
 {
-    ksock_tx_t *    tx = (ksock_tx_t *)txp;
-
-    /*
-     *  the transmission was done, we need update the tx
-     */
-
-    LASSERT(tx->tx_resid >= (int)rc);
-    tx->tx_resid -= (int)rc;
-
-    /*
-     *  just partial of tx is sent out, we need update
-     *  the fields of tx and schedule later transmission.
-     */
-
-    if (tx->tx_resid) {
-
-        if (tx->tx_niov > 0) {
-
-            /* if there's iov, we need process iov first */
-            while (rc > 0 ) {
-                if (rc < tx->tx_iov->iov_len) {
-                    /* didn't send whole iov entry... */
-                    tx->tx_iov->iov_base = 
-                        (char *)(tx->tx_iov->iov_base) + rc;
-                    tx->tx_iov->iov_len -= rc;
-                    rc = 0;
-                 } else {
-                    /* the whole of iov was sent out */
-                    rc -= tx->tx_iov->iov_len;
-                    tx->tx_iov++;
-                    tx->tx_niov--;
-                }
-            }
-
-        } else {
-
-            /* now we need process the kiov queues ... */
-
-            while (rc > 0 ) {
-
-                if (rc < tx->tx_kiov->kiov_len) {
-                    /* didn't send whole kiov entry... */
-                    tx->tx_kiov->kiov_offset += rc;
-                    tx->tx_kiov->kiov_len -= rc;
-                    rc = 0;
-                } else {
-                    /* whole kiov was sent out */
-                    rc -= tx->tx_kiov->kiov_len;
-                    tx->tx_kiov++;
-                    tx->tx_nkiov--;
-                }
-            }
+        /* remove tx/conn from conn's outgoing queue */
+        spin_lock_bh (&conn->ksnc_scheduler->kss_lock);
+        list_del(&tx->tx_list);
+        if (list_empty(&conn->ksnc_tx_queue)) {
+                list_del (&conn->ksnc_tx_list);
         }
+        spin_unlock_bh (&conn->ksnc_scheduler->kss_lock);
 
-    } else {
-
-        ksock_tcpx_fini_t * tcpx = 
-                cfs_alloc(sizeof(ksock_tcpx_fini_t), CFS_ALLOC_ZERO);
-
-        ASSERT(tx->tx_resid == 0);
-
-        if (!tcpx) {
-
-            ksocknal_tx_launched (tx);
-
-        } else {
-
-            tcpx->tx = tx;
-            ExInitializeWorkItem(
-                    &(tcpx->item), 
-                    ksocknal_fini_sending,
-                    tcpx
-            );
-            ExQueueWorkItem(
-                    &(tcpx->item),
-                    CriticalWorkQueue
-                    );
-        }
-
-        tx = NULL;
-    }
-
-    return (void *)tx;
+        /* complete send; tx -ref */
+        ksocknal_tx_decref (tx);
 }
 
 void
@@ -845,7 +644,6 @@ ksocknal_lib_set_callback(struct socket *sock,  ksock_conn_t *conn)
 {
         sock->kstc_conn      = conn;
         sock->kstc_sched_cb  = ksocknal_schedule_callback;
-        sock->kstc_update_tx = ksocknal_update_tx;
 }
 
 void
@@ -853,5 +651,10 @@ ksocknal_lib_reset_callback(struct socket *sock, ksock_conn_t *conn)
 {
         sock->kstc_conn      = NULL;
         sock->kstc_sched_cb  = NULL;
-        sock->kstc_update_tx = NULL;
+}
+
+int
+ksocknal_lib_zc_capable(ksock_conn_t *conn)
+{
+        return 0;
 }