Whamcloud - gitweb
i=liangzhen
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_lib-winnt.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2006 Cluster File Systems, Inc, All rights reserved.
5  * Author: Matt Wu
6  *
7  * This file is part of Lustre, http://www.lustre.org.
8  *
9  * This Lustre Software is proprietary - please refer to the license
10  * agreement you received with your software.
11  *
12  * windows socknal library
13  *
14  */ 
15
16 #include "socklnd.h"
17
18 # if CONFIG_SYSCTL && !CFS_SYSFS_MODULE_PARM
19 static ctl_table ksocknal_ctl_table[18];
20
21 ctl_table ksocknal_top_ctl_table[] = {
22         {200, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
23         { 0 }
24 };
25
26 int
27 ksocknal_lib_tunables_init () 
28 {
29             int    i = 0;
30             int    j = 1;
31         
32         ksocknal_ctl_table[i++] = (ctl_table)
33                 {j++, "timeout", ksocknal_tunables.ksnd_timeout, 
34                  sizeof (int), 0644, NULL, &proc_dointvec};
35         ksocknal_ctl_table[i++] = (ctl_table)
36                 {j++, "credits", ksocknal_tunables.ksnd_credits, 
37                  sizeof (int), 0444, NULL, &proc_dointvec};
38         ksocknal_ctl_table[i++] = (ctl_table)
39                 {j++, "peer_credits", ksocknal_tunables.ksnd_peercredits, 
40                  sizeof (int), 0444, NULL, &proc_dointvec};
41         ksocknal_ctl_table[i++] = (ctl_table)
42                 {j++, "nconnds", ksocknal_tunables.ksnd_nconnds, 
43                  sizeof (int), 0444, NULL, &proc_dointvec};
44         ksocknal_ctl_table[i++] = (ctl_table)
45                 {j++, "min_reconnectms", ksocknal_tunables.ksnd_min_reconnectms, 
46                  sizeof (int), 0444, NULL, &proc_dointvec};
47         ksocknal_ctl_table[i++] = (ctl_table)
48                 {j++, "max_reconnectms", ksocknal_tunables.ksnd_max_reconnectms, 
49                  sizeof (int), 0444, NULL, &proc_dointvec};
50         ksocknal_ctl_table[i++] = (ctl_table)
51                 {j++, "eager_ack", ksocknal_tunables.ksnd_eager_ack, 
52                  sizeof (int), 0644, NULL, &proc_dointvec};
53 #if SOCKNAL_ZC
54         ksocknal_ctl_table[i++] = (ctl_table)
55                 {j++, "zero_copy", ksocknal_tunables.ksnd_zc_min_frag, 
56                  sizeof (int), 0644, NULL, &proc_dointvec};
57 #endif
58         ksocknal_ctl_table[i++] = (ctl_table)
59                 {j++, "typed", ksocknal_tunables.ksnd_typed_conns, 
60                  sizeof (int), 0444, NULL, &proc_dointvec};
61         ksocknal_ctl_table[i++] = (ctl_table)
62                 {j++, "min_bulk", ksocknal_tunables.ksnd_min_bulk, 
63                  sizeof (int), 0644, NULL, &proc_dointvec};
64         ksocknal_ctl_table[i++] = (ctl_table)
65                 {j++, "buffer_size", ksocknal_tunables.ksnd_buffer_size, 
66                  sizeof(int), 0644, NULL, &proc_dointvec};
67         ksocknal_ctl_table[i++] = (ctl_table)
68                 {j++, "nagle", ksocknal_tunables.ksnd_nagle, 
69                  sizeof(int), 0644, NULL, &proc_dointvec};
70 #ifdef CPU_AFFINITY
71         ksocknal_ctl_table[i++] = (ctl_table)
72                 {j++, "irq_affinity", ksocknal_tunables.ksnd_irq_affinity, 
73                  sizeof(int), 0644, NULL, &proc_dointvec};
74 #endif
75         ksocknal_ctl_table[i++] = (ctl_table)
76                 {j++, "keepalive_idle", ksocknal_tunables.ksnd_keepalive_idle, 
77                  sizeof(int), 0644, NULL, &proc_dointvec};
78         ksocknal_ctl_table[i++] = (ctl_table)
79                 {j++, "keepalive_count", ksocknal_tunables.ksnd_keepalive_count, 
80                  sizeof(int), 0644, NULL, &proc_dointvec};
81         ksocknal_ctl_table[i++] = (ctl_table)
82                 {j++, "keepalive_intvl", ksocknal_tunables.ksnd_keepalive_intvl, 
83                  sizeof(int), 0644, NULL, &proc_dointvec};
84
85         LASSERT (j == i+1);
86         LASSERT (i < sizeof(ksocknal_ctl_table)/sizeof(ksocknal_ctl_table[0]));
87
88         ksocknal_tunables.ksnd_sysctl =
89                 register_sysctl_table(ksocknal_top_ctl_table, 0);
90
91         if (ksocknal_tunables.ksnd_sysctl == NULL)
92                 CWARN("Can't setup /proc tunables\n");
93
94         return 0;
95 }
96
97 void
98 ksocknal_lib_tunables_fini () 
99 {
100         if (ksocknal_tunables.ksnd_sysctl != NULL)
101                 unregister_sysctl_table(ksocknal_tunables.ksnd_sysctl); 
102 }
103 #else
104 int
105 ksocknal_lib_tunables_init () 
106 {
107         return 0;
108 }
109
110 void 
111 ksocknal_lib_tunables_fini ()
112 {
113 }
114 #endif
115
116 void
117 ksocknal_lib_bind_irq (unsigned int irq)
118 {
119 }
120
121 int
122 ksocknal_lib_get_conn_addrs (ksock_conn_t *conn)
123 {
124         int rc = libcfs_sock_getaddr(conn->ksnc_sock, 1,
125                                      &conn->ksnc_ipaddr, &conn->ksnc_port);
126
127         /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
128         LASSERT (!conn->ksnc_closing);
129
130         if (rc != 0) {
131                 CERROR ("Error %d getting sock peer IP\n", rc);
132                 return rc;
133         }
134
135         rc = libcfs_sock_getaddr(conn->ksnc_sock, 0,
136                                  &conn->ksnc_myipaddr, NULL);
137         if (rc != 0) {
138                 CERROR ("Error %d getting sock local IP\n", rc);
139                 return rc;
140         }
141
142         return 0;
143 }
144
145 unsigned int
146 ksocknal_lib_sock_irq (struct socket *sock)
147 {
148     return 0;
149 }
150
151 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
152 static struct page *
153 ksocknal_kvaddr_to_page (unsigned long vaddr)
154 {
155         struct page *page;
156
157         if (vaddr >= VMALLOC_START &&
158             vaddr < VMALLOC_END)
159                 page = vmalloc_to_page ((void *)vaddr);
160 #ifdef CONFIG_HIGHMEM
161         else if (vaddr >= PKMAP_BASE &&
162                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
163                 page = vmalloc_to_page ((void *)vaddr);
164                 /* in 2.4 ^ just walks the page tables */
165 #endif
166         else
167                 page = virt_to_page (vaddr);
168
169         if (page == NULL ||
170             !VALID_PAGE (page))
171                 return (NULL);
172
173         return (page);
174 }
175 #endif
176
177 /*
178  * ks_lock_iovs
179  *   Lock the i/o vector buffers into MDL structure
180  *
181  * Arguments:
182  *   iov:  the array of i/o vectors
183  *   niov: number of i/o vectors to be locked
184  *   len:  the real length of the iov vectors
185  *
186  * Return Value:
187  *   ksock_mdl_t *: the Mdl of the locked buffers or
188  *         NULL pointer in failure case
189  *
190  * Notes: 
191  *   N/A
192  */
193
194 ksock_mdl_t *
195 ks_lock_iovs(
196     IN struct iovec  *iov,
197     IN int            niov,
198     IN int            recving,
199     IN int *          len )
200 {
201     int             rc = 0;
202
203     int             i = 0;
204     int             total = 0;
205     ksock_mdl_t *   mdl = NULL;
206     ksock_mdl_t *   tail = NULL;
207
208     LASSERT(iov != NULL);
209     LASSERT(niov > 0);
210     LASSERT(len != NULL);
211
212     for (i=0; i < niov; i++) {
213
214         ksock_mdl_t * Iovec = NULL;
215             
216         rc = ks_lock_buffer(
217                 iov[i].iov_base,
218                 FALSE,
219                 iov[i].iov_len,
220                 recving ? IoWriteAccess : IoReadAccess,
221                 &Iovec );
222
223         if (rc < 0) {
224             break;
225         }
226
227         if (tail) {
228             tail->Next = Iovec;
229         } else {
230             mdl = Iovec;
231         }
232
233         tail = Iovec;
234
235         total +=iov[i].iov_len;
236     }
237
238     if (rc >= 0) {
239         *len = total;
240     } else {
241         if (mdl) {
242             ks_release_mdl(mdl, FALSE);
243             mdl = NULL;
244         }
245     }
246
247     return mdl;
248 }
249
250 /*
251  * ks_lock_kiovs
252  *   Lock the kiov pages into MDL structure
253  *
254  * Arguments:
255  *   kiov:  the array of kiov pages
256  *   niov:  number of kiov to be locked
257  *   len:   the real length of the kiov arrary
258  *
259  * Return Value:
260  *   PMDL: the Mdl of the locked buffers or NULL
261  *         pointer in failure case
262  *
263  * Notes: 
264  *   N/A
265  */
266 ksock_mdl_t *
267 ks_lock_kiovs(
268     IN lnet_kiov_t *  kiov,
269     IN int            nkiov,
270     IN int            recving,
271     IN int *          len )
272 {
273     int             rc = 0;
274     int             i = 0;
275     int             total = 0;
276     ksock_mdl_t *   mdl = NULL;
277     ksock_mdl_t *   tail = NULL;
278
279     LASSERT(kiov != NULL);
280     LASSERT(nkiov > 0);
281     LASSERT(len != NULL);
282
283     for (i=0; i < nkiov; i++) {
284
285         ksock_mdl_t *        Iovec = NULL;
286
287
288         //
289         //  Lock the kiov page into Iovec Â¡Â­
290         //
291
292         rc = ks_lock_buffer(
293                 (PUCHAR)kiov[i].kiov_page->addr + 
294                      kiov[i].kiov_offset,
295                 FALSE,
296                 kiov[i].kiov_len,
297                 recving ? IoWriteAccess : IoReadAccess,
298                 &Iovec
299             );
300
301         if (rc < 0) {
302             break;
303         }
304
305         //
306         // Attach the Iovec to the mdl chain
307         //
308
309         if (tail) {
310             tail->Next = Iovec;
311         } else {
312             mdl = Iovec;
313         }
314
315         tail = Iovec;
316
317         total += kiov[i].kiov_len;
318
319     }
320
321     if (rc >= 0) {
322         *len = total;
323     } else {
324         if (mdl) {
325             ks_release_mdl(mdl, FALSE);
326             mdl = NULL;
327         }
328     }
329
330     return mdl;
331 }
332
333
334 int
335 ksocknal_lib_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
336 {
337         struct socket *sock = conn->ksnc_sock;
338 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
339         unsigned long  vaddr = (unsigned long)iov->iov_base
340         int            offset = vaddr & (PAGE_SIZE - 1);
341         int            zcsize = MIN (iov->iov_len, PAGE_SIZE - offset);
342         struct page   *page;
343 #endif
344         int            nob;
345         int            rc;
346         ksock_mdl_t *  mdl;
347
348         /* NB we can't trust socket ops to either consume our iovs
349          * or leave them alone. */
350
351 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
352         if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
353             (sock->sk->sk_route_caps & NETIF_F_SG) &&
354             (sock->sk->sk_route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
355             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
356                 int msgflg = MSG_DONTWAIT;
357
358                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
359                        (void *)vaddr, page, page_address(page), offset, zcsize);
360
361                 if (!list_empty (&conn->ksnc_tx_queue) ||
362                     zcsize < tx->tx_resid)
363                         msgflg |= MSG_MORE;
364
365                 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, msgflg, &tx->tx_zccd);
366         } else
367 #endif
368         {
369                 /* lock the whole tx iovs into a single mdl chain */
370                 mdl = ks_lock_iovs(tx->tx_iov, tx->tx_niov, FALSE, &nob);
371
372                 if (mdl) {
373                         /* send the total mdl chain */
374                         rc = ks_send_mdl( conn->ksnc_sock, tx, mdl, nob, 
375                                     (!list_empty (&conn->ksnc_tx_queue) || nob < tx->tx_resid) ? 
376                                     (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT);
377                 } else {
378                         rc = -ENOMEM;
379                 }
380         }
381
382             return rc;
383 }
384
385 int
386 ksocknal_lib_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
387 {
388         struct socket *sock = conn->ksnc_sock;
389         lnet_kiov_t    *kiov = tx->tx_kiov;
390         int            rc;
391         int            nob;
392         ksock_mdl_t *  mdl;
393
394         /* NB we can't trust socket ops to either consume our iovs
395          * or leave them alone. */
396
397 #if SOCKNAL_ZC
398         if (kiov->kiov_len >= *ksocknal_tunables.ksnd_zc_min_frag &&
399             (sock->sk->sk_route_caps & NETIF_F_SG) &&
400             (sock->sk->sk_route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
401                 struct page   *page = kiov->kiov_page;
402                 int            offset = kiov->kiov_offset;
403                 int            fragsize = kiov->kiov_len;
404                 int            msgflg = MSG_DONTWAIT;
405
406                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
407                                page, offset, kiov->kiov_len);
408
409                 if (!list_empty(&conn->ksnc_tx_queue) ||
410                     fragsize < tx->tx_resid)
411                         msgflg |= MSG_MORE;
412
413                 rc = tcp_sendpage_zccd(sock, page, offset, fragsize, msgflg,
414                                        &tx->tx_zccd);
415         } else
416 #endif
417         {
418                 /* lock the whole tx kiovs into a single mdl chain */
419                 mdl = ks_lock_kiovs(tx->tx_kiov, tx->tx_nkiov, FALSE, &nob);
420
421                 if (mdl) {
422                         /* send the total mdl chain */
423                         rc = ks_send_mdl(
424                                     conn->ksnc_sock, tx, mdl, nob,
425                                     (!list_empty(&conn->ksnc_tx_queue) || nob < tx->tx_resid) ?
426                                     (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT);
427                 } else {
428                         rc = -ENOMEM;
429                 }
430         }
431
432             return rc;
433 }
434
435
436 int
437 ksocknal_lib_recv_iov (ksock_conn_t *conn)
438 {
439         struct iovec *iov = conn->ksnc_rx_iov;
440         int           rc;
441         int           size;
442         ksock_mdl_t * mdl;
443
444         /* lock the whole tx iovs into a single mdl chain */
445         mdl = ks_lock_iovs(iov, conn->ksnc_rx_niov, TRUE, &size);
446
447         if (!mdl) {
448             return (-ENOMEM);
449         }
450         
451         LASSERT (size <= conn->ksnc_rx_nob_wanted);
452
453         /* try to request data for the whole mdl chain */
454         rc = ks_recv_mdl (conn->ksnc_sock, mdl, size, MSG_DONTWAIT);
455
456         return rc;
457 }
458
459 int
460 ksocknal_lib_recv_kiov (ksock_conn_t *conn)
461 {
462         lnet_kiov_t  *kiov = conn->ksnc_rx_kiov;
463         int           size;
464         int           rc;
465         ksock_mdl_t * mdl;
466
467         /* NB we can't trust socket ops to either consume our iovs
468          * or leave them alone, so we only receive 1 frag at a time. */
469         LASSERT (conn->ksnc_rx_nkiov > 0);
470
471         /* lock the whole tx kiovs into a single mdl chain */
472         mdl = ks_lock_kiovs(kiov, conn->ksnc_rx_nkiov, TRUE, &size);
473
474         if (!mdl) {
475             rc = -ENOMEM;
476             return (rc);
477         }
478         
479         LASSERT (size <= conn->ksnc_rx_nob_wanted);
480
481         /* try to request data for the whole mdl chain */
482         rc = ks_recv_mdl (conn->ksnc_sock, mdl, size, MSG_DONTWAIT);
483
484         return rc;
485 }
486
487 void
488 ksocknal_lib_eager_ack (ksock_conn_t *conn)
489 {
490         __u32   option = 1;
491         int     rc = 0;
492                 
493         rc = ks_set_tcp_option(
494                 conn->ksnc_sock, TCP_SOCKET_NODELAY,
495                 &option, sizeof(option) );
496         if (rc != 0) {
497                 CERROR("Can't disable nagle: %d\n", rc);
498         }
499 }
500
501 int
502 ksocknal_lib_get_conn_tunables (ksock_conn_t *conn, int *txmem, int *rxmem, int *nagle)
503 {
504         ksock_tconn_t * tconn = conn->ksnc_sock;
505         int             len;
506         int             rc;
507
508         ks_get_tconn (tconn);
509         
510         *txmem = *rxmem = 0;
511
512         len = sizeof(*nagle);
513
514         rc = ks_get_tcp_option(
515                     tconn, TCP_SOCKET_NODELAY,
516                     (__u32 *)nagle, &len);
517
518         ks_put_tconn (tconn);
519
520         printk("ksocknal_get_conn_tunables: nodelay = %d rc = %d\n", *nagle, rc);
521
522         if (rc == 0)
523                 *nagle = !*nagle;
524         else
525                 *txmem = *rxmem = *nagle = 0;
526                 
527         return (rc);
528 }
529
530 int
531 ksocknal_lib_buffersize (int current_sz, int tunable_sz)
532 {
533             /* ensure >= SOCKNAL_MIN_BUFFER */
534             if (current_sz < SOCKNAL_MIN_BUFFER)
535                         return MAX(SOCKNAL_MIN_BUFFER, tunable_sz);
536
537             if (tunable_sz > SOCKNAL_MIN_BUFFER)
538                         return tunable_sz;
539         
540             /* leave alone */
541             return 0;
542 }
543
544 int
545 ksocknal_lib_setup_sock (struct socket *sock)
546 {
547         int             rc;
548
549         int             keep_idle;
550         int             keep_count;
551         int             keep_intvl;
552         int             keep_alive;
553
554         __u32           option;
555
556         /* set the window size */
557
558 #if 0
559         tconn->kstc_snd_wnd = ksocknal_tunables.ksnd_buffer_size;
560         tconn->kstc_rcv_wnd = ksocknal_tunables.ksnd_buffer_size;
561 #endif
562
563         /* disable nagle */
564         if (!ksocknal_tunables.ksnd_nagle) {
565                 option = 1;
566                 
567                 rc = ks_set_tcp_option(
568                             sock, TCP_SOCKET_NODELAY,
569                             &option, sizeof (option));
570                 if (rc != 0) {
571                         printk ("Can't disable nagle: %d\n", rc);
572                         return (rc);
573                 }
574         }
575
576         /* snapshot tunables */
577         keep_idle  = *ksocknal_tunables.ksnd_keepalive_idle;
578         keep_count = *ksocknal_tunables.ksnd_keepalive_count;
579         keep_intvl = *ksocknal_tunables.ksnd_keepalive_intvl;
580         
581         keep_alive = (keep_idle > 0 && keep_count > 0 && keep_intvl > 0);
582
583         option = (__u32)(keep_alive ? 1 : 0);
584
585         rc = ks_set_tcp_option(
586                     sock, TCP_SOCKET_KEEPALIVE,
587                     &option, sizeof (option));
588         if (rc != 0) {
589                 CERROR ("Can't disable nagle: %d\n", rc);
590                 return (rc);
591         }
592
593         return (0);
594 }
595
596 void
597 ksocknal_lib_push_conn (ksock_conn_t *conn)
598 {
599         ksock_tconn_t * tconn;
600         __u32           nagle;
601         __u32           val = 1;
602         int             rc;
603
604         tconn = conn->ksnc_sock;
605
606         ks_get_tconn(tconn);
607
608         spin_lock(&tconn->kstc_lock);
609         if (tconn->kstc_type == kstt_sender) {
610             nagle = tconn->sender.kstc_info.nagle;
611             tconn->sender.kstc_info.nagle = 0;
612         } else {
613             LASSERT(tconn->kstc_type == kstt_child);
614             nagle = tconn->child.kstc_info.nagle;
615             tconn->child.kstc_info.nagle = 0;
616         }
617
618         spin_unlock(&tconn->kstc_lock);
619
620         val = 1;
621         rc = ks_set_tcp_option(
622                     tconn,
623                     TCP_SOCKET_NODELAY,
624                     &(val),
625                     sizeof(__u32)
626                     );
627
628         LASSERT (rc == 0);
629         spin_lock(&tconn->kstc_lock);
630
631         if (tconn->kstc_type == kstt_sender) {
632             tconn->sender.kstc_info.nagle = nagle;
633         } else {
634             LASSERT(tconn->kstc_type == kstt_child);
635             tconn->child.kstc_info.nagle = nagle;
636         }
637         spin_unlock(&tconn->kstc_lock);
638
639         ks_put_tconn(tconn);
640 }
641
642 /* @mode: 0: receiving mode / 1: sending mode */
643 void
644 ksocknal_sched_conn (ksock_conn_t *conn, int mode, ksock_tx_t *tx)
645 {
646         int             flags;
647         ksock_sched_t * sched;
648         ENTRY;
649
650         /* interleave correctly with closing sockets... */
651         read_lock (&ksocknal_data.ksnd_global_lock);
652
653         sched = conn->ksnc_scheduler;
654
655         spin_lock_irqsave (&sched->kss_lock, flags);
656
657         if (mode) { /* transmission can continue ... */ 
658
659 #error "This is out of date - we should be calling ksocknal_write_callback()"
660                 conn->ksnc_tx_ready = 1;
661
662                 if (tx) {
663                     /* Incomplete send: place tx on HEAD of tx_queue */
664                     list_add (&tx->tx_list, &conn->ksnc_tx_queue);
665                 }
666
667                 if ( !conn->ksnc_tx_scheduled && 
668                      !list_empty(&conn->ksnc_tx_queue)) {  //packets to send
669                         list_add_tail (&conn->ksnc_tx_list,
670                                        &sched->kss_tx_conns);
671                         conn->ksnc_tx_scheduled = 1;
672                         /* extra ref for scheduler */
673                         atomic_inc (&conn->ksnc_conn_refcount);
674
675                         cfs_waitq_signal (&sched->kss_waitq);
676                 }
677         } else {    /* receiving can continue ... */
678
679                 conn->ksnc_rx_ready = 1;
680
681                 if ( !conn->ksnc_rx_scheduled) {  /* not being progressed */
682                         list_add_tail(&conn->ksnc_rx_list,
683                                       &sched->kss_rx_conns);
684                         conn->ksnc_rx_scheduled = 1;
685                         /* extra ref for scheduler */
686                         atomic_inc (&conn->ksnc_conn_refcount);
687
688                         cfs_waitq_signal (&sched->kss_waitq);
689                 }
690         }
691
692         spin_unlock_irqrestore (&sched->kss_lock, flags);
693         read_unlock (&ksocknal_data.ksnd_global_lock);
694
695         EXIT;
696 }
697
698 void ksocknal_schedule_callback(struct socket*sock, int mode, void * tx, ulong_ptr bytes)
699 {
700     ksock_conn_t * conn = (ksock_conn_t *) sock->kstc_conn;
701
702     if (mode) {
703         ksocknal_sched_conn(conn, mode, tx);
704     } else {
705         if ( CAN_BE_SCHED(bytes, (ulong_ptr)conn->ksnc_rx_nob_wanted )) {
706             ksocknal_sched_conn(conn, mode, tx);
707         }
708     }
709 }
710
711 extern void
712 ksocknal_tx_launched (ksock_tx_t *tx);
713
714 void
715 ksocknal_fini_sending(ksock_tcpx_fini_t *tcpx)
716 {
717     ksocknal_tx_launched(tcpx->tx);
718     cfs_free(tcpx);
719 }
720
721 void *
722 ksocknal_update_tx(
723     struct socket*  tconn,
724     void *          txp,
725     ulong_ptr       rc
726     )
727 {
728     ksock_tx_t *    tx = (ksock_tx_t *)txp;
729
730     /*
731      *  the transmission was done, we need update the tx
732      */
733
734     LASSERT(tx->tx_resid >= (int)rc);
735     tx->tx_resid -= (int)rc;
736
737     /*
738      *  just partial of tx is sent out, we need update
739      *  the fields of tx and schedule later transmission.
740      */
741
742     if (tx->tx_resid) {
743
744         if (tx->tx_niov > 0) {
745
746             /* if there's iov, we need process iov first */
747             while (rc > 0 ) {
748                 if (rc < tx->tx_iov->iov_len) {
749                     /* didn't send whole iov entry... */
750                     tx->tx_iov->iov_base = 
751                         (char *)(tx->tx_iov->iov_base) + rc;
752                     tx->tx_iov->iov_len -= rc;
753                     rc = 0;
754                  } else {
755                     /* the whole of iov was sent out */
756                     rc -= tx->tx_iov->iov_len;
757                     tx->tx_iov++;
758                     tx->tx_niov--;
759                 }
760             }
761
762         } else {
763
764             /* now we need process the kiov queues ... */
765
766             while (rc > 0 ) {
767
768                 if (rc < tx->tx_kiov->kiov_len) {
769                     /* didn't send whole kiov entry... */
770                     tx->tx_kiov->kiov_offset += rc;
771                     tx->tx_kiov->kiov_len -= rc;
772                     rc = 0;
773                 } else {
774                     /* whole kiov was sent out */
775                     rc -= tx->tx_kiov->kiov_len;
776                     tx->tx_kiov++;
777                     tx->tx_nkiov--;
778                 }
779             }
780         }
781
782     } else {
783
784         ksock_tcpx_fini_t * tcpx = 
785                 cfs_alloc(sizeof(ksock_tcpx_fini_t), CFS_ALLOC_ZERO);
786
787         ASSERT(tx->tx_resid == 0);
788
789         if (!tcpx) {
790
791             ksocknal_tx_launched (tx);
792
793         } else {
794
795             tcpx->tx = tx;
796             ExInitializeWorkItem(
797                     &(tcpx->item), 
798                     ksocknal_fini_sending,
799                     tcpx
800             );
801             ExQueueWorkItem(
802                     &(tcpx->item),
803                     CriticalWorkQueue
804                     );
805         }
806
807         tx = NULL;
808     }
809
810     return (void *)tx;
811 }
812
813 void
814 ksocknal_lib_save_callback(struct socket *sock, ksock_conn_t *conn)
815 {
816 }
817
818 void
819 ksocknal_lib_set_callback(struct socket *sock,  ksock_conn_t *conn)
820 {
821         sock->kstc_conn      = conn;
822         sock->kstc_sched_cb  = ksocknal_schedule_callback;
823         sock->kstc_update_tx = ksocknal_update_tx;
824 }
825
826 void
827 ksocknal_lib_reset_callback(struct socket *sock, ksock_conn_t *conn)
828 {
829         sock->kstc_conn      = NULL;
830         sock->kstc_sched_cb  = NULL;
831         sock->kstc_update_tx = NULL;
832 }
833