Whamcloud - gitweb
Branch b1_8
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_lib-winnt.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/socklnd/socklnd_lib-winnt.c
37  *
38  * windows socknal library
39  */
40
41 #include "socklnd.h"
42
43 # if CONFIG_SYSCTL && !CFS_SYSFS_MODULE_PARM
44 static ctl_table ksocknal_ctl_table[18];
45
46 ctl_table ksocknal_top_ctl_table[] = {
47         {200, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
48         { 0 }
49 };
50
51 int
52 ksocknal_lib_tunables_init () 
53 {
54             int    i = 0;
55             int    j = 1;
56         
57         ksocknal_ctl_table[i++] = (ctl_table)
58                 {j++, "timeout", ksocknal_tunables.ksnd_timeout, 
59                  sizeof (int), 0644, NULL, &proc_dointvec};
60         ksocknal_ctl_table[i++] = (ctl_table)
61                 {j++, "credits", ksocknal_tunables.ksnd_credits, 
62                  sizeof (int), 0444, NULL, &proc_dointvec};
63         ksocknal_ctl_table[i++] = (ctl_table)
64                 {j++, "peer_credits", ksocknal_tunables.ksnd_peertxcredits, 
65                  sizeof (int), 0444, NULL, &proc_dointvec};
66         ksocknal_ctl_table[i++] = (ctl_table)
67                 {j++, "peer_buffer_credits", ksocknal_tunables.ksnd_peerrtrcredits, 
68                  sizeof (int), 0444, NULL, &proc_dointvec};
69         ksocknal_ctl_table[i++] = (ctl_table)
70                 {j++, "nconnds", ksocknal_tunables.ksnd_nconnds, 
71                  sizeof (int), 0444, NULL, &proc_dointvec};
72         ksocknal_ctl_table[i++] = (ctl_table)
73                 {j++, "min_reconnectms", ksocknal_tunables.ksnd_min_reconnectms, 
74                  sizeof (int), 0444, NULL, &proc_dointvec};
75         ksocknal_ctl_table[i++] = (ctl_table)
76                 {j++, "max_reconnectms", ksocknal_tunables.ksnd_max_reconnectms, 
77                  sizeof (int), 0444, NULL, &proc_dointvec};
78         ksocknal_ctl_table[i++] = (ctl_table)
79                 {j++, "eager_ack", ksocknal_tunables.ksnd_eager_ack, 
80                  sizeof (int), 0644, NULL, &proc_dointvec};
81 #if SOCKNAL_ZC
82         ksocknal_ctl_table[i++] = (ctl_table)
83                 {j++, "zero_copy", ksocknal_tunables.ksnd_zc_min_payload, 
84                  sizeof (int), 0644, NULL, &proc_dointvec};
85 #endif
86         ksocknal_ctl_table[i++] = (ctl_table)
87                 {j++, "typed", ksocknal_tunables.ksnd_typed_conns, 
88                  sizeof (int), 0444, NULL, &proc_dointvec};
89         ksocknal_ctl_table[i++] = (ctl_table)
90                 {j++, "min_bulk", ksocknal_tunables.ksnd_min_bulk, 
91                  sizeof (int), 0644, NULL, &proc_dointvec};
92         ksocknal_ctl_table[i++] = (ctl_table)
93                 {j++, "buffer_size", ksocknal_tunables.ksnd_buffer_size, 
94                  sizeof(int), 0644, NULL, &proc_dointvec};
95         ksocknal_ctl_table[i++] = (ctl_table)
96                 {j++, "nagle", ksocknal_tunables.ksnd_nagle, 
97                  sizeof(int), 0644, NULL, &proc_dointvec};
98 #ifdef CPU_AFFINITY
99         ksocknal_ctl_table[i++] = (ctl_table)
100                 {j++, "irq_affinity", ksocknal_tunables.ksnd_irq_affinity, 
101                  sizeof(int), 0644, NULL, &proc_dointvec};
102 #endif
103         ksocknal_ctl_table[i++] = (ctl_table)
104                 {j++, "keepalive_idle", ksocknal_tunables.ksnd_keepalive_idle, 
105                  sizeof(int), 0644, NULL, &proc_dointvec};
106         ksocknal_ctl_table[i++] = (ctl_table)
107                 {j++, "keepalive_count", ksocknal_tunables.ksnd_keepalive_count, 
108                  sizeof(int), 0644, NULL, &proc_dointvec};
109         ksocknal_ctl_table[i++] = (ctl_table)
110                 {j++, "keepalive_intvl", ksocknal_tunables.ksnd_keepalive_intvl, 
111                  sizeof(int), 0644, NULL, &proc_dointvec};
112
113         LASSERT (j == i+1);
114         LASSERT (i < sizeof(ksocknal_ctl_table)/sizeof(ksocknal_ctl_table[0]));
115
116         ksocknal_tunables.ksnd_sysctl =
117                 register_sysctl_table(ksocknal_top_ctl_table, 0);
118
119         if (ksocknal_tunables.ksnd_sysctl == NULL)
120                 CWARN("Can't setup /proc tunables\n");
121
122         return 0;
123 }
124
125 void
126 ksocknal_lib_tunables_fini () 
127 {
128         if (ksocknal_tunables.ksnd_sysctl != NULL)
129                 unregister_sysctl_table(ksocknal_tunables.ksnd_sysctl); 
130 }
131 #else
132 int
133 ksocknal_lib_tunables_init () 
134 {
135         return 0;
136 }
137
138 void 
139 ksocknal_lib_tunables_fini ()
140 {
141 }
142 #endif
143
144 void
145 ksocknal_lib_bind_irq (unsigned int irq)
146 {
147 }
148
149 int
150 ksocknal_lib_get_conn_addrs (ksock_conn_t *conn)
151 {
152         int rc = libcfs_sock_getaddr(conn->ksnc_sock, 1,
153                                      &conn->ksnc_ipaddr, &conn->ksnc_port);
154
155         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
156         LASSERT (!conn->ksnc_closing);
157
158         if (rc != 0) {
159                 CERROR ("Error %d getting sock peer IP\n", rc);
160                 return rc;
161         }
162
163         rc = libcfs_sock_getaddr(conn->ksnc_sock, 0,
164                                  &conn->ksnc_myipaddr, NULL);
165         if (rc != 0) {
166                 CERROR ("Error %d getting sock local IP\n", rc);
167                 return rc;
168         }
169
170         return 0;
171 }
172
173 unsigned int
174 ksocknal_lib_sock_irq (struct socket *sock)
175 {
176     return 0;
177 }
178
179 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
180 static struct page *
181 ksocknal_kvaddr_to_page (unsigned long vaddr)
182 {
183         struct page *page;
184
185         if (vaddr >= VMALLOC_START &&
186             vaddr < VMALLOC_END)
187                 page = vmalloc_to_page ((void *)vaddr);
188 #ifdef CONFIG_HIGHMEM
189         else if (vaddr >= PKMAP_BASE &&
190                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
191                 page = vmalloc_to_page ((void *)vaddr);
192                 /* in 2.4 ^ just walks the page tables */
193 #endif
194         else
195                 page = virt_to_page (vaddr);
196
197         if (page == NULL ||
198             !VALID_PAGE (page))
199                 return (NULL);
200
201         return (page);
202 }
203 #endif
204
205 /*
206  * ks_lock_iovs
207  *   Lock the i/o vector buffers into MDL structure
208  *
209  * Arguments:
210  *   iov:  the array of i/o vectors
211  *   niov: number of i/o vectors to be locked
212  *   len:  the real length of the iov vectors
213  *
214  * Return Value:
215  *   ksock_mdl_t *: the Mdl of the locked buffers or
216  *         NULL pointer in failure case
217  *
218  * Notes: 
219  *   N/A
220  */
221
222 ksock_mdl_t *
223 ks_lock_iovs(
224     IN struct iovec  *iov,
225     IN int            niov,
226     IN int            recving,
227     IN int *          len )
228 {
229     int             rc = 0;
230
231     int             i = 0;
232     int             total = 0;
233     ksock_mdl_t *   mdl = NULL;
234     ksock_mdl_t *   tail = NULL;
235
236     LASSERT(iov != NULL);
237     LASSERT(niov > 0);
238     LASSERT(len != NULL);
239
240     for (i=0; i < niov; i++) {
241
242         ksock_mdl_t * Iovec = NULL;
243             
244         rc = ks_lock_buffer(
245                 iov[i].iov_base,
246                 FALSE,
247                 iov[i].iov_len,
248                 recving ? IoWriteAccess : IoReadAccess,
249                 &Iovec );
250
251         if (rc < 0) {
252             break;
253         }
254
255         if (tail) {
256             tail->Next = Iovec;
257         } else {
258             mdl = Iovec;
259         }
260
261         tail = Iovec;
262
263         total +=iov[i].iov_len;
264     }
265
266     if (rc >= 0) {
267         *len = total;
268     } else {
269         if (mdl) {
270             ks_release_mdl(mdl, FALSE);
271             mdl = NULL;
272         }
273     }
274
275     return mdl;
276 }
277
278 /*
279  * ks_lock_kiovs
280  *   Lock the kiov pages into MDL structure
281  *
282  * Arguments:
283  *   kiov:  the array of kiov pages
284  *   niov:  number of kiov to be locked
285  *   len:   the real length of the kiov arrary
286  *
287  * Return Value:
288  *   PMDL: the Mdl of the locked buffers or NULL
289  *         pointer in failure case
290  *
291  * Notes: 
292  *   N/A
293  */
294 ksock_mdl_t *
295 ks_lock_kiovs(
296     IN lnet_kiov_t *  kiov,
297     IN int            nkiov,
298     IN int            recving,
299     IN int *          len )
300 {
301     int             rc = 0;
302     int             i = 0;
303     int             total = 0;
304     ksock_mdl_t *   mdl = NULL;
305     ksock_mdl_t *   tail = NULL;
306
307     LASSERT(kiov != NULL);
308     LASSERT(nkiov > 0);
309     LASSERT(len != NULL);
310
311     for (i=0; i < nkiov; i++) {
312
313         ksock_mdl_t *        Iovec = NULL;
314
315
316         //
317         //  Lock the kiov page into Iovec Â¡Â­
318         //
319
320         rc = ks_lock_buffer(
321                 (PUCHAR)kiov[i].kiov_page->addr + 
322                      kiov[i].kiov_offset,
323                 FALSE,
324                 kiov[i].kiov_len,
325                 recving ? IoWriteAccess : IoReadAccess,
326                 &Iovec
327             );
328
329         if (rc < 0) {
330             break;
331         }
332
333         //
334         // Attach the Iovec to the mdl chain
335         //
336
337         if (tail) {
338             tail->Next = Iovec;
339         } else {
340             mdl = Iovec;
341         }
342
343         tail = Iovec;
344
345         total += kiov[i].kiov_len;
346
347     }
348
349     if (rc >= 0) {
350         *len = total;
351     } else {
352         if (mdl) {
353             ks_release_mdl(mdl, FALSE);
354             mdl = NULL;
355         }
356     }
357
358     return mdl;
359 }
360
361
362 int
363 ksocknal_lib_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
364 {
365         struct socket *sock = conn->ksnc_sock;
366 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
367         unsigned long  vaddr = (unsigned long)iov->iov_base
368         int            offset = vaddr & (PAGE_SIZE - 1);
369         int            zcsize = MIN (iov->iov_len, PAGE_SIZE - offset);
370         struct page   *page;
371 #endif
372         int            nob;
373         int            rc;
374         ksock_mdl_t *  mdl;
375
376         /* NB we can't trust socket ops to either consume our iovs
377          * or leave them alone. */
378
379 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
380         if (tx->tx_zc_capable &&
381             (sock->sk->sk_route_caps & NETIF_F_SG) &&
382             (sock->sk->sk_route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
383             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
384                 int msgflg = MSG_DONTWAIT;
385
386                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
387                        (void *)vaddr, page, page_address(page), offset, zcsize);
388
389                 if (!list_empty (&conn->ksnc_tx_queue) ||
390                     zcsize < tx->tx_resid)
391                         msgflg |= MSG_MORE;
392
393                 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, msgflg, &tx->tx_zccd);
394         } else
395 #endif
396         {
397                 /* lock the whole tx iovs into a single mdl chain */
398                 mdl = ks_lock_iovs(tx->tx_iov, tx->tx_niov, FALSE, &nob);
399
400                 if (mdl) {
401                         /* send the total mdl chain */
402                         rc = ks_send_mdl( conn->ksnc_sock, tx, mdl, nob, 
403                                     (!list_empty (&conn->ksnc_tx_queue) || nob < tx->tx_resid) ? 
404                                     (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT);
405                 } else {
406                         rc = -ENOMEM;
407                 }
408         }
409
410             return rc;
411 }
412
413 int
414 ksocknal_lib_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
415 {
416         struct socket *sock = conn->ksnc_sock;
417         lnet_kiov_t    *kiov = tx->tx_kiov;
418         int            rc;
419         int            nob;
420         ksock_mdl_t *  mdl;
421
422         /* NB we can't trust socket ops to either consume our iovs
423          * or leave them alone. */
424
425 #if SOCKNAL_ZC
426         if (tx->tx_zc_capable &&
427             (sock->sk->sk_route_caps & NETIF_F_SG) &&
428             (sock->sk->sk_route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
429                 struct page   *page = kiov->kiov_page;
430                 int            offset = kiov->kiov_offset;
431                 int            fragsize = kiov->kiov_len;
432                 int            msgflg = MSG_DONTWAIT;
433
434                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
435                                page, offset, kiov->kiov_len);
436
437                 if (!list_empty(&conn->ksnc_tx_queue) ||
438                     fragsize < tx->tx_resid)
439                         msgflg |= MSG_MORE;
440
441                 rc = tcp_sendpage_zccd(sock, page, offset, fragsize, msgflg,
442                                        &tx->tx_zccd);
443         } else
444 #endif
445         {
446                 /* lock the whole tx kiovs into a single mdl chain */
447                 mdl = ks_lock_kiovs(tx->tx_kiov, tx->tx_nkiov, FALSE, &nob);
448
449                 if (mdl) {
450                         /* send the total mdl chain */
451                         rc = ks_send_mdl(
452                                     conn->ksnc_sock, tx, mdl, nob,
453                                     (!list_empty(&conn->ksnc_tx_queue) || nob < tx->tx_resid) ?
454                                     (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT);
455                 } else {
456                         rc = -ENOMEM;
457                 }
458         }
459
460             return rc;
461 }
462
463
464 int
465 ksocknal_lib_recv_iov (ksock_conn_t *conn)
466 {
467         struct iovec *iov = conn->ksnc_rx_iov;
468         int           rc;
469         int           size;
470         ksock_mdl_t * mdl;
471
472         /* lock the whole tx iovs into a single mdl chain */
473         mdl = ks_lock_iovs(iov, conn->ksnc_rx_niov, TRUE, &size);
474
475         if (!mdl) {
476             return (-ENOMEM);
477         }
478         
479         LASSERT (size <= conn->ksnc_rx_nob_wanted);
480
481         /* try to request data for the whole mdl chain */
482         rc = ks_recv_mdl (conn->ksnc_sock, mdl, size, MSG_DONTWAIT);
483
484         return rc;
485 }
486
487 int
488 ksocknal_lib_recv_kiov (ksock_conn_t *conn)
489 {
490         lnet_kiov_t  *kiov = conn->ksnc_rx_kiov;
491         int           size;
492         int           rc;
493         ksock_mdl_t * mdl;
494
495         /* NB we can't trust socket ops to either consume our iovs
496          * or leave them alone, so we only receive 1 frag at a time. */
497         LASSERT (conn->ksnc_rx_nkiov > 0);
498
499         /* lock the whole tx kiovs into a single mdl chain */
500         mdl = ks_lock_kiovs(kiov, conn->ksnc_rx_nkiov, TRUE, &size);
501
502         if (!mdl) {
503             rc = -ENOMEM;
504             return (rc);
505         }
506         
507         LASSERT (size <= conn->ksnc_rx_nob_wanted);
508
509         /* try to request data for the whole mdl chain */
510         rc = ks_recv_mdl (conn->ksnc_sock, mdl, size, MSG_DONTWAIT);
511
512         return rc;
513 }
514
515 void
516 ksocknal_lib_eager_ack (ksock_conn_t *conn)
517 {
518         __u32   option = 1;
519         int     rc = 0;
520                 
521         rc = ks_set_tcp_option(
522                 conn->ksnc_sock, TCP_SOCKET_NODELAY,
523                 &option, sizeof(option) );
524         if (rc != 0) {
525                 CERROR("Can't disable nagle: %d\n", rc);
526         }
527 }
528
529 int
530 ksocknal_lib_get_conn_tunables (ksock_conn_t *conn, int *txmem, int *rxmem, int *nagle)
531 {
532         ksock_tconn_t * tconn = conn->ksnc_sock;
533         int             len;
534         int             rc;
535
536         ks_get_tconn (tconn);
537         
538         *txmem = *rxmem = 0;
539
540         len = sizeof(*nagle);
541
542         rc = ks_get_tcp_option(
543                     tconn, TCP_SOCKET_NODELAY,
544                     (__u32 *)nagle, &len);
545
546         ks_put_tconn (tconn);
547
548         printk("ksocknal_get_conn_tunables: nodelay = %d rc = %d\n", *nagle, rc);
549
550         if (rc == 0)
551                 *nagle = !*nagle;
552         else
553                 *txmem = *rxmem = *nagle = 0;
554                 
555         return (rc);
556 }
557
558 int
559 ksocknal_lib_buffersize (int current_sz, int tunable_sz)
560 {
561             /* ensure >= SOCKNAL_MIN_BUFFER */
562             if (current_sz < SOCKNAL_MIN_BUFFER)
563                         return MAX(SOCKNAL_MIN_BUFFER, tunable_sz);
564
565             if (tunable_sz > SOCKNAL_MIN_BUFFER)
566                         return tunable_sz;
567         
568             /* leave alone */
569             return 0;
570 }
571
572 int
573 ksocknal_lib_setup_sock (struct socket *sock)
574 {
575         int             rc;
576
577         int             keep_idle;
578         int             keep_count;
579         int             keep_intvl;
580         int             keep_alive;
581
582         __u32           option;
583
584         /* set the window size */
585
586 #if 0
587         tconn->kstc_snd_wnd = ksocknal_tunables.ksnd_buffer_size;
588         tconn->kstc_rcv_wnd = ksocknal_tunables.ksnd_buffer_size;
589 #endif
590
591         /* disable nagle */
592         if (!ksocknal_tunables.ksnd_nagle) {
593                 option = 1;
594                 
595                 rc = ks_set_tcp_option(
596                             sock, TCP_SOCKET_NODELAY,
597                             &option, sizeof (option));
598                 if (rc != 0) {
599                         printk ("Can't disable nagle: %d\n", rc);
600                         return (rc);
601                 }
602         }
603
604         /* snapshot tunables */
605         keep_idle  = *ksocknal_tunables.ksnd_keepalive_idle;
606         keep_count = *ksocknal_tunables.ksnd_keepalive_count;
607         keep_intvl = *ksocknal_tunables.ksnd_keepalive_intvl;
608         
609         keep_alive = (keep_idle > 0 && keep_count > 0 && keep_intvl > 0);
610
611         option = (__u32)(keep_alive ? 1 : 0);
612
613         rc = ks_set_tcp_option(
614                     sock, TCP_SOCKET_KEEPALIVE,
615                     &option, sizeof (option));
616         if (rc != 0) {
617                 CERROR ("Can't disable nagle: %d\n", rc);
618                 return (rc);
619         }
620
621         return (0);
622 }
623
624 void
625 ksocknal_lib_push_conn (ksock_conn_t *conn)
626 {
627         ksock_tconn_t * tconn;
628         __u32           nagle;
629         __u32           val = 1;
630         int             rc;
631
632         tconn = conn->ksnc_sock;
633
634         ks_get_tconn(tconn);
635
636         spin_lock(&tconn->kstc_lock);
637         if (tconn->kstc_type == kstt_sender) {
638             nagle = tconn->sender.kstc_info.nagle;
639             tconn->sender.kstc_info.nagle = 0;
640         } else {
641             LASSERT(tconn->kstc_type == kstt_child);
642             nagle = tconn->child.kstc_info.nagle;
643             tconn->child.kstc_info.nagle = 0;
644         }
645
646         spin_unlock(&tconn->kstc_lock);
647
648         val = 1;
649         rc = ks_set_tcp_option(
650                     tconn,
651                     TCP_SOCKET_NODELAY,
652                     &(val),
653                     sizeof(__u32)
654                     );
655
656         LASSERT (rc == 0);
657         spin_lock(&tconn->kstc_lock);
658
659         if (tconn->kstc_type == kstt_sender) {
660             tconn->sender.kstc_info.nagle = nagle;
661         } else {
662             LASSERT(tconn->kstc_type == kstt_child);
663             tconn->child.kstc_info.nagle = nagle;
664         }
665         spin_unlock(&tconn->kstc_lock);
666
667         ks_put_tconn(tconn);
668 }
669
670 /* @mode: 0: receiving mode / 1: sending mode */
671 void
672 ksocknal_sched_conn (ksock_conn_t *conn, int mode, ksock_tx_t *tx)
673 {
674         int             flags;
675         ksock_sched_t * sched;
676         ENTRY;
677
678         /* interleave correctly with closing sockets... */
679         read_lock (&ksocknal_data.ksnd_global_lock);
680
681         sched = conn->ksnc_scheduler;
682
683         spin_lock_irqsave (&sched->kss_lock, flags);
684
685         if (mode) { /* transmission can continue ... */ 
686
687 #error "This is out of date - we should be calling ksocknal_write_callback()"
688                 conn->ksnc_tx_ready = 1;
689
690                 if (tx) {
691                     /* Incomplete send: place tx on HEAD of tx_queue */
692                     list_add (&tx->tx_list, &conn->ksnc_tx_queue);
693                 }
694
695                 if ( !conn->ksnc_tx_scheduled &&
696                      !list_empty(&conn->ksnc_tx_queue)) {  //packets to send
697                         list_add_tail (&conn->ksnc_tx_list,
698                                        &sched->kss_tx_conns);
699                         conn->ksnc_tx_scheduled = 1;
700                         /* extra ref for scheduler */
701                         atomic_inc (&conn->ksnc_conn_refcount);
702
703                         cfs_waitq_signal (&sched->kss_waitq);
704                 }
705         } else {    /* receiving can continue ... */
706
707                 conn->ksnc_rx_ready = 1;
708
709                 if ( !conn->ksnc_rx_scheduled) {  /* not being progressed */
710                         list_add_tail(&conn->ksnc_rx_list,
711                                       &sched->kss_rx_conns);
712                         conn->ksnc_rx_scheduled = 1;
713                         /* extra ref for scheduler */
714                         atomic_inc (&conn->ksnc_conn_refcount);
715
716                         cfs_waitq_signal (&sched->kss_waitq);
717                 }
718         }
719
720         spin_unlock_irqrestore (&sched->kss_lock, flags);
721         read_unlock (&ksocknal_data.ksnd_global_lock);
722
723         EXIT;
724 }
725
726 void ksocknal_schedule_callback(struct socket*sock, int mode, void * tx, ulong_ptr bytes)
727 {
728     ksock_conn_t * conn = (ksock_conn_t *) sock->kstc_conn;
729
730     if (mode) {
731         ksocknal_sched_conn(conn, mode, tx);
732     } else {
733         if ( CAN_BE_SCHED(bytes, (ulong_ptr)conn->ksnc_rx_nob_wanted )) {
734             ksocknal_sched_conn(conn, mode, tx);
735         }
736     }
737 }
738
739 extern void
740 ksocknal_tx_launched (ksock_tx_t *tx);
741
742 void
743 ksocknal_fini_sending(ksock_tcpx_fini_t *tcpx)
744 {
745     ksocknal_tx_launched(tcpx->tx);
746     cfs_free(tcpx);
747 }
748
749 void *
750 ksocknal_update_tx(
751     struct socket*  tconn,
752     void *          txp,
753     ulong_ptr       rc
754     )
755 {
756     ksock_tx_t *    tx = (ksock_tx_t *)txp;
757
758     /*
759      *  the transmission was done, we need update the tx
760      */
761
762     LASSERT(tx->tx_resid >= (int)rc);
763     tx->tx_resid -= (int)rc;
764
765     /*
766      *  just partial of tx is sent out, we need update
767      *  the fields of tx and schedule later transmission.
768      */
769
770     if (tx->tx_resid) {
771
772         if (tx->tx_niov > 0) {
773
774             /* if there's iov, we need process iov first */
775             while (rc > 0 ) {
776                 if (rc < tx->tx_iov->iov_len) {
777                     /* didn't send whole iov entry... */
778                     tx->tx_iov->iov_base = 
779                         (char *)(tx->tx_iov->iov_base) + rc;
780                     tx->tx_iov->iov_len -= rc;
781                     rc = 0;
782                  } else {
783                     /* the whole of iov was sent out */
784                     rc -= tx->tx_iov->iov_len;
785                     tx->tx_iov++;
786                     tx->tx_niov--;
787                 }
788             }
789
790         } else {
791
792             /* now we need process the kiov queues ... */
793
794             while (rc > 0 ) {
795
796                 if (rc < tx->tx_kiov->kiov_len) {
797                     /* didn't send whole kiov entry... */
798                     tx->tx_kiov->kiov_offset += rc;
799                     tx->tx_kiov->kiov_len -= rc;
800                     rc = 0;
801                 } else {
802                     /* whole kiov was sent out */
803                     rc -= tx->tx_kiov->kiov_len;
804                     tx->tx_kiov++;
805                     tx->tx_nkiov--;
806                 }
807             }
808         }
809
810     } else {
811
812         ksock_tcpx_fini_t * tcpx = 
813                 cfs_alloc(sizeof(ksock_tcpx_fini_t), CFS_ALLOC_ZERO);
814
815         ASSERT(tx->tx_resid == 0);
816
817         if (!tcpx) {
818
819             ksocknal_tx_launched (tx);
820
821         } else {
822
823             tcpx->tx = tx;
824             ExInitializeWorkItem(
825                     &(tcpx->item), 
826                     ksocknal_fini_sending,
827                     tcpx
828             );
829             ExQueueWorkItem(
830                     &(tcpx->item),
831                     CriticalWorkQueue
832                     );
833         }
834
835         tx = NULL;
836     }
837
838     return (void *)tx;
839 }
840
841 void
842 ksocknal_lib_save_callback(struct socket *sock, ksock_conn_t *conn)
843 {
844 }
845
846 void
847 ksocknal_lib_set_callback(struct socket *sock,  ksock_conn_t *conn)
848 {
849         sock->kstc_conn      = conn;
850         sock->kstc_sched_cb  = ksocknal_schedule_callback;
851         sock->kstc_update_tx = ksocknal_update_tx;
852 }
853
854 void
855 ksocknal_lib_reset_callback(struct socket *sock, ksock_conn_t *conn)
856 {
857         sock->kstc_conn      = NULL;
858         sock->kstc_sched_cb  = NULL;
859         sock->kstc_update_tx = NULL;
860 }