Whamcloud - gitweb
656a0c5cce68f2c17a1312712182b1dd681595a2
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 int        ksocknal_io_timeout = SOCKNAL_IO_TIMEOUT;
29 #if SOCKNAL_ZC
30 int        ksocknal_do_zc = 1;
31 int        ksocknal_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
32 #endif
33
34 /*
35  *  LIB functions follow
36  *
37  */
38 int
39 ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
40               user_ptr src_addr, size_t len)
41 {
42         CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
43                nal->ni.nid, (long)len, src_addr, dst_addr);
44
45         memcpy( dst_addr, src_addr, len );
46         return 0;
47 }
48
49 int
50 ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
51                void *src_addr, size_t len)
52 {
53         CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
54                nal->ni.nid, (long)len, src_addr, dst_addr);
55
56         memcpy( dst_addr, src_addr, len );
57         return 0;
58 }
59
60 int
61 ksocknal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq,
62                          ptl_event_t *ev)
63 {
64         CDEBUG(D_NET, LPX64": callback eq %p ev %p\n",
65                nal->ni.nid, eq, ev);
66
67         if (eq->event_callback != NULL)
68                 eq->event_callback(ev);
69
70         return 0;
71 }
72
73 void *
74 ksocknal_malloc(nal_cb_t *nal, size_t len)
75 {
76         void *buf;
77
78         PORTAL_ALLOC(buf, len);
79
80         if (buf != NULL)
81                 memset(buf, 0, len);
82
83         return (buf);
84 }
85
86 void
87 ksocknal_free(nal_cb_t *nal, void *buf, size_t len)
88 {
89         PORTAL_FREE(buf, len);
90 }
91
92 void
93 ksocknal_printf(nal_cb_t *nal, const char *fmt, ...)
94 {
95         va_list ap;
96         char msg[256];
97
98         va_start (ap, fmt);
99         vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
100         va_end (ap);
101
102         msg[sizeof (msg) - 1] = 0;              /* ensure terminated */
103
104         CDEBUG (D_NET, "%s", msg);
105 }
106
107 void
108 ksocknal_cli(nal_cb_t *nal, unsigned long *flags)
109 {
110         ksock_nal_data_t *data = nal->nal_data;
111
112         spin_lock(&data->ksnd_nal_cb_lock);
113 }
114
115 void
116 ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
117 {
118         ksock_nal_data_t *data;
119         data = nal->nal_data;
120
121         spin_unlock(&data->ksnd_nal_cb_lock);
122 }
123
124 int
125 ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
126 {
127         /* I would guess that if ksocknal_get_peer (nid) == NULL,
128            and we're not routing, then 'nid' is very distant :) */
129         if ( nal->ni.nid == nid ) {
130                 *dist = 0;
131         } else {
132                 *dist = 1;
133         }
134
135         return 0;
136 }
137
138 ksock_ltx_t *
139 ksocknal_get_ltx (int may_block)
140 {
141         unsigned long flags;
142         ksock_ltx_t *ltx = NULL;
143
144         for (;;) {
145                 spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
146
147                 if (!list_empty (&ksocknal_data.ksnd_idle_ltx_list)) {
148                         ltx = list_entry(ksocknal_data.ksnd_idle_ltx_list.next,
149                                          ksock_ltx_t, ltx_tx.tx_list);
150                         list_del (&ltx->ltx_tx.tx_list);
151                         ksocknal_data.ksnd_active_ltxs++;
152                         break;
153                 }
154
155                 if (!may_block) {
156                         if (!list_empty(&ksocknal_data.ksnd_idle_nblk_ltx_list)) {
157                                 ltx = list_entry(ksocknal_data.ksnd_idle_nblk_ltx_list.next,
158                                                  ksock_ltx_t, ltx_tx.tx_list);
159                                 list_del (&ltx->ltx_tx.tx_list);
160                                 ksocknal_data.ksnd_active_ltxs++;
161                         }
162                         break;
163                 }
164
165                 spin_unlock_irqrestore(&ksocknal_data.ksnd_idle_ltx_lock,
166                                        flags);
167
168                 wait_event (ksocknal_data.ksnd_idle_ltx_waitq,
169                             !list_empty (&ksocknal_data.ksnd_idle_ltx_list));
170         }
171
172         spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
173
174         return (ltx);
175 }
176
177 void
178 ksocknal_put_ltx (ksock_ltx_t *ltx)
179 {
180         unsigned long   flags;
181         
182         spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
183
184         ksocknal_data.ksnd_active_ltxs--;
185         list_add_tail (&ltx->ltx_tx.tx_list, ltx->ltx_idle);
186
187         /* normal tx desc => wakeup anyone blocking for one */
188         if (ltx->ltx_idle == &ksocknal_data.ksnd_idle_ltx_list &&
189             waitqueue_active (&ksocknal_data.ksnd_idle_ltx_waitq))
190                 wake_up (&ksocknal_data.ksnd_idle_ltx_waitq);
191
192         spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
193 }
194
195 #if SOCKNAL_ZC
196 struct page *
197 ksocknal_kvaddr_to_page (unsigned long vaddr)
198 {
199         struct page *page;
200
201         if (vaddr >= VMALLOC_START &&
202             vaddr < VMALLOC_END)
203                 page = vmalloc_to_page ((void *)vaddr);
204 #if CONFIG_HIGHMEM
205         else if (vaddr >= PKMAP_BASE &&
206                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
207                 page = vmalloc_to_page ((void *)vaddr);
208                 /* in 2.4 ^ just walks the page tables */
209 #endif
210         else
211                 page = virt_to_page (vaddr);
212
213         if (page == NULL ||
214             !VALID_PAGE (page))
215                 return (NULL);
216
217         return (page);
218 }
219 #endif
220
221 int
222 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
223 {
224         struct socket *sock = conn->ksnc_sock;
225         struct iovec  *iov = tx->tx_iov;
226         int            fragsize = iov->iov_len;
227         unsigned long  vaddr = (unsigned long)iov->iov_base;
228         int            more = !list_empty (&conn->ksnc_tx_queue) |
229                               (tx->tx_niov > 1) |
230                               (tx->tx_nkiov > 1);
231 #if SOCKNAL_ZC
232         int            offset = vaddr & (PAGE_SIZE - 1);
233         int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
234         struct page   *page;
235 #endif
236         int            rc;
237
238         /* NB we can't trust socket ops to either consume our iovs
239          * or leave them alone, so we only send 1 frag at a time. */
240         LASSERT (fragsize <= tx->tx_resid);
241         LASSERT (tx->tx_niov > 0);
242         
243 #if SOCKNAL_ZC
244         if (ksocknal_do_zc &&
245             (sock->sk->route_caps & NETIF_F_SG) &&
246             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
247             zcsize >= ksocknal_zc_min_frag &&
248             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
249                 
250                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
251                        (void *)vaddr, page, page_address(page), offset, zcsize);
252
253                 if (fragsize > zcsize) {
254                         more = 1;
255                         fragsize = zcsize;
256                 }
257
258                 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, 
259                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
260                                        &tx->tx_zccd);
261         } else
262 #endif
263         {
264                 /* NB don't pass tx's iov; sendmsg may or may not update it */
265                 struct iovec fragiov = { .iov_base = (void *)vaddr,
266                                          .iov_len  = fragsize};
267                 struct msghdr msg = {
268                         .msg_name       = NULL,
269                         .msg_namelen    = 0,
270                         .msg_iov        = &fragiov,
271                         .msg_iovlen     = 1,
272                         .msg_control    = NULL,
273                         .msg_controllen = 0,
274                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
275                 };
276                 mm_segment_t oldmm = get_fs();
277                 
278                 set_fs (KERNEL_DS);
279                 rc = sock_sendmsg(sock, &msg, fragsize);
280                 set_fs (oldmm);
281         } 
282
283         if (rc <= 0)
284                 return (rc);
285
286         tx->tx_resid -= rc;
287
288         if (rc < iov->iov_len) {
289                 /* didn't send whole iov entry... */
290                 iov->iov_base = (void *)(vaddr + rc);
291                 iov->iov_len -= rc;
292                 /* ...but did we send everything we tried to send? */
293                 return ((rc == fragsize) ? 1 : -EAGAIN);
294         }
295
296         tx->tx_iov++;
297         tx->tx_niov--;
298         return (1);
299 }
300
301 int
302 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
303 {
304         struct socket *sock = conn->ksnc_sock;
305         ptl_kiov_t    *kiov = tx->tx_kiov;
306         int            fragsize = kiov->kiov_len;
307         struct page   *page = kiov->kiov_page;
308         int            offset = kiov->kiov_offset;
309         int            more = !list_empty (&conn->ksnc_tx_queue) |
310                               (tx->tx_nkiov > 1);
311         int            rc;
312
313         /* NB we can't trust socket ops to either consume our iovs
314          * or leave them alone, so we only send 1 frag at a time. */
315         LASSERT (fragsize <= tx->tx_resid);
316         LASSERT (offset + fragsize <= PAGE_SIZE);
317         LASSERT (tx->tx_niov == 0);
318         LASSERT (tx->tx_nkiov > 0);
319
320 #if SOCKNAL_ZC
321         if (ksocknal_do_zc &&
322             (sock->sk->route_caps & NETIF_F_SG) &&
323             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
324             fragsize >= ksocknal_zc_min_frag) {
325
326                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
327                                page, offset, fragsize);
328
329                 rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
330                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
331                                        &tx->tx_zccd);
332         } else
333 #endif
334         {
335                 char *addr = ((char *)kmap (page)) + offset;
336                 struct iovec fragiov = {.iov_base = addr,
337                                         .iov_len  = fragsize};
338                 struct msghdr msg = {
339                         .msg_name       = NULL,
340                         .msg_namelen    = 0,
341                         .msg_iov        = &fragiov,
342                         .msg_iovlen     = 1,
343                         .msg_control    = NULL,
344                         .msg_controllen = 0,
345                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
346                 };
347                 mm_segment_t  oldmm = get_fs();
348                 
349                 set_fs (KERNEL_DS);
350                 rc = sock_sendmsg(sock, &msg, fragsize);
351                 set_fs (oldmm);
352                 kunmap (page);
353         }
354
355         if (rc <= 0)
356                 return (rc);
357
358         tx->tx_resid -= rc;
359  
360         if (rc < fragsize) {
361                 /* didn't send whole frag */
362                 kiov->kiov_offset = offset + rc;
363                 kiov->kiov_len    = fragsize - rc;
364                 return (-EAGAIN);
365         }
366
367         /* everything went */
368         LASSERT (rc == fragsize);
369         tx->tx_kiov++;
370         tx->tx_nkiov--;
371         return (1);
372 }
373
374 int
375 ksocknal_sendmsg (ksock_conn_t *conn, ksock_tx_t *tx)
376 {
377         /* Return 0 on success, < 0 on error.
378          * caller checks tx_resid to determine progress/completion */
379         int      rc;
380         ENTRY;
381         
382         if (ksocknal_data.ksnd_stall_tx != 0) {
383                 set_current_state (TASK_UNINTERRUPTIBLE);
384                 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
385         }
386
387         rc = ksocknal_getconnsock (conn);
388         if (rc != 0)
389                 return (rc);
390
391         for (;;) {
392                 LASSERT (tx->tx_resid != 0);
393
394                 if (conn->ksnc_closing) {
395                         rc = -ESHUTDOWN;
396                         break;
397                 }
398
399                 if (tx->tx_niov != 0)
400                         rc = ksocknal_send_iov (conn, tx);
401                 else
402                         rc = ksocknal_send_kiov (conn, tx);
403
404                 if (rc <= 0) {                  /* error or socket full? */
405                         /* NB: rc == 0 and rc == -EAGAIN both mean try
406                          * again later (linux stack returns -EAGAIN for
407                          * this, but Adaptech TOE returns 0) */
408                         if (rc == -EAGAIN)
409                                 rc = 0;
410                         break;
411                 }
412
413                 if (tx->tx_resid == 0) {        /* sent everything */
414                         rc = 0;
415                         break;
416                 }
417         }
418
419         ksocknal_putconnsock (conn);
420         RETURN (rc);
421 }
422
423 int
424 ksocknal_recv_iov (ksock_conn_t *conn)
425 {
426         struct iovec *iov = conn->ksnc_rx_iov;
427         int           fragsize  = iov->iov_len;
428         unsigned long vaddr = (unsigned long)iov->iov_base;
429         struct iovec  fragiov = { .iov_base = (void *)vaddr,
430                                   .iov_len  = fragsize};
431         struct msghdr msg = {
432                 .msg_name       = NULL,
433                 .msg_namelen    = 0,
434                 .msg_iov        = &fragiov,
435                 .msg_iovlen     = 1,
436                 .msg_control    = NULL,
437                 .msg_controllen = 0,
438                 .msg_flags      = 0
439         };
440         mm_segment_t oldmm = get_fs();
441         int          rc;
442
443         /* NB we can't trust socket ops to either consume our iovs
444          * or leave them alone, so we only receive 1 frag at a time. */
445         LASSERT (conn->ksnc_rx_niov > 0);
446         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
447         
448         set_fs (KERNEL_DS);
449         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
450         /* NB this is just a boolean............................^ */
451         set_fs (oldmm);
452
453         if (rc <= 0)
454                 return (rc);
455
456         conn->ksnc_rx_nob_wanted -= rc;
457         conn->ksnc_rx_nob_left -= rc;
458                 
459         if (rc < fragsize) {
460                 iov->iov_base = (void *)(vaddr + rc);
461                 iov->iov_len = fragsize - rc;
462                 return (-EAGAIN);
463         }
464
465         conn->ksnc_rx_iov++;
466         conn->ksnc_rx_niov--;
467         return (1);
468 }
469
470 int
471 ksocknal_recv_kiov (ksock_conn_t *conn)
472 {
473         ptl_kiov_t   *kiov = conn->ksnc_rx_kiov;
474         struct page  *page = kiov->kiov_page;
475         int           offset = kiov->kiov_offset;
476         int           fragsize = kiov->kiov_len;
477         unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
478         struct iovec  fragiov = { .iov_base = (void *)vaddr,
479                                   .iov_len  = fragsize};
480         struct msghdr msg = {
481                 .msg_name       = NULL,
482                 .msg_namelen    = 0,
483                 .msg_iov        = &fragiov,
484                 .msg_iovlen     = 1,
485                 .msg_control    = NULL,
486                 .msg_controllen = 0,
487                 .msg_flags      = 0
488         };
489         mm_segment_t oldmm = get_fs();
490         int          rc;
491
492         /* NB we can't trust socket ops to either consume our iovs
493          * or leave them alone, so we only receive 1 frag at a time. */
494         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
495         LASSERT (conn->ksnc_rx_nkiov > 0);
496         LASSERT (offset + fragsize <= PAGE_SIZE);
497         
498         set_fs (KERNEL_DS);
499         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
500         /* NB this is just a boolean............................^ */
501         set_fs (oldmm);
502         kunmap (page);
503         
504         if (rc <= 0)
505                 return (rc);
506         
507         conn->ksnc_rx_nob_wanted -= rc;
508         conn->ksnc_rx_nob_left -= rc;
509                 
510         if (rc < fragsize) {
511                 kiov->kiov_offset = offset + rc;
512                 kiov->kiov_len = fragsize - rc;
513                 return (-EAGAIN);
514         }
515
516         conn->ksnc_rx_kiov++;
517         conn->ksnc_rx_nkiov--;
518         return (1);
519 }
520
521 int
522 ksocknal_recvmsg (ksock_conn_t *conn) 
523 {
524         /* Return 1 on success, 0 on EOF, < 0 on error.
525          * Caller checks ksnc_rx_nob_wanted to determine
526          * progress/completion. */
527         int     rc;
528         ENTRY;
529         
530         if (ksocknal_data.ksnd_stall_rx != 0) {
531                 set_current_state (TASK_UNINTERRUPTIBLE);
532                 schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
533         }
534
535         rc = ksocknal_getconnsock (conn);
536         if (rc != 0)
537                 return (rc);
538
539         for (;;) {
540                 if (conn->ksnc_closing) {
541                         rc = -ESHUTDOWN;
542                         break;
543                 }
544                 
545                 if (conn->ksnc_rx_niov != 0)
546                         rc = ksocknal_recv_iov (conn);
547                 else
548                         rc = ksocknal_recv_kiov (conn);
549                 
550                 if (rc <= 0) {
551                         /* error/EOF or partial receive */
552                         if (rc == -EAGAIN)
553                                 rc = 1;
554                         break;
555                 }
556
557                 if (conn->ksnc_rx_nob_wanted == 0) {
558                         rc = 1;
559                         break;
560                 }
561         }
562
563         ksocknal_putconnsock (conn);
564         RETURN (rc);
565 }
566
567 #if SOCKNAL_ZC
568 void
569 ksocknal_zc_callback (zccd_t *zcd)
570 {
571         ksock_tx_t    *tx = KSOCK_ZCCD_2_TX(zcd);
572         ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
573         unsigned long  flags;
574         ENTRY;
575
576         /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
577
578         spin_lock_irqsave (&sched->kss_lock, flags);
579
580         list_del (&tx->tx_list); /* remove from kss_zctxpending_list */
581         list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
582         if (waitqueue_active (&sched->kss_waitq))
583                 wake_up (&sched->kss_waitq);
584
585         spin_unlock_irqrestore (&sched->kss_lock, flags);
586         EXIT;
587 }
588 #endif
589
590 void
591 ksocknal_tx_done (ksock_tx_t *tx, int asynch)
592 {
593         ksock_ltx_t   *ltx;
594         ENTRY;
595
596         if (tx->tx_conn != NULL) {
597                 /* This tx got queued on a conn; do the accounting... */
598                 atomic_sub (tx->tx_nob, &tx->tx_conn->ksnc_tx_nob);
599 #if SOCKNAL_ZC
600                 /* zero copy completion isn't always from
601                  * process_transmit() so it needs to keep a ref on
602                  * tx_conn... */
603                 if (asynch)
604                         ksocknal_put_conn (tx->tx_conn);
605 #else
606                 LASSERT (!asynch);
607 #endif
608         }
609
610         if (tx->tx_isfwd) {             /* was a forwarded packet? */
611                 kpr_fwd_done (&ksocknal_data.ksnd_router,
612                               KSOCK_TX_2_KPR_FWD_DESC (tx), 0);
613                 EXIT;
614                 return;
615         }
616
617         /* local send */
618         ltx = KSOCK_TX_2_KSOCK_LTX (tx);
619
620         lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie);
621
622         ksocknal_put_ltx (ltx);
623         EXIT;
624 }
625
626 void
627 ksocknal_tx_launched (ksock_tx_t *tx) 
628 {
629 #if SOCKNAL_ZC
630         if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
631                 unsigned long  flags;
632                 ksock_conn_t  *conn = tx->tx_conn;
633                 ksock_sched_t *sched = conn->ksnc_scheduler;
634                 
635                 /* zccd skbufs are still in-flight.  First take a ref on
636                  * conn, so it hangs about for ksocknal_tx_done... */
637                 atomic_inc (&conn->ksnc_refcount);
638
639                 /* Stash it for timeout...
640                  * NB We have to hold a lock to stash the tx, and we have
641                  * stash it before we zcc_put(), but we have to _not_ hold
642                  * this lock when we zcc_put(), otherwise we could deadlock
643                  * if it turns out to be the last put. Aaaaarrrrggghhh! */
644                 spin_lock_irqsave (&sched->kss_lock, flags);
645                 list_add_tail (&tx->tx_list, &conn->ksnc_tx_pending);
646                 spin_unlock_irqrestore (&sched->kss_lock, flags);
647                 
648                 /* ...then drop the initial ref on zccd, so the zero copy
649                  * callback can occur */
650                 zccd_put (&tx->tx_zccd);
651                 return;
652         }
653 #endif
654         /* Any zero-copy-ness (if any) has completed; I can complete the
655          * transmit now, avoiding an extra schedule */
656         ksocknal_tx_done (tx, 0);
657 }
658
659 void
660 ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags)
661 {
662         ksock_conn_t *conn;
663         ksock_tx_t *tx;
664         int         rc;
665
666         LASSERT (!list_empty (&sched->kss_tx_conns));
667         conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list);
668         list_del (&conn->ksnc_tx_list);
669
670         LASSERT (conn->ksnc_tx_scheduled);
671         LASSERT (conn->ksnc_tx_ready);
672         LASSERT (!list_empty (&conn->ksnc_tx_queue));
673         tx = list_entry (conn->ksnc_tx_queue.next, ksock_tx_t, tx_list);
674         /* assume transmit will complete now, so dequeue while I've got lock */
675         list_del (&tx->tx_list);
676
677         spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
678
679         LASSERT (tx->tx_resid > 0);
680
681         conn->ksnc_tx_ready = 0;/* write_space may race with me and set ready */
682         mb();                   /* => clear BEFORE trying to write */
683
684         rc = ksocknal_sendmsg (conn, tx);
685
686         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
687
688         if (rc != 0) {
689                 if (ksocknal_close_conn_unlocked (conn)) {
690                         /* I'm the first to close */
691                         CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
692                                 conn, rc, conn->ksnc_peer->ksnp_nid,
693                                 conn->ksnc_ipaddr, conn->ksnc_port);
694                 }
695                 ksocknal_tx_launched (tx);
696                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
697
698         } else if (tx->tx_resid == 0) {  
699
700                 /* everything went; assume more can go, and avoid
701                  * write_space locking */
702                 conn->ksnc_tx_ready = 1;
703
704                 ksocknal_tx_launched (tx);
705                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
706         } else {
707                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
708
709                 /* back onto HEAD of tx_queue */
710                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
711         }
712
713         /* no space to write, or nothing to write? */
714         if (!conn->ksnc_tx_ready ||
715             list_empty (&conn->ksnc_tx_queue)) {
716                 /* mark not scheduled */
717                 conn->ksnc_tx_scheduled = 0;
718                 /* drop scheduler's ref */
719                 ksocknal_put_conn (conn);
720         } else {
721                 /* stay scheduled */
722                 list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
723         }
724 }
725
726 void
727 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
728 {
729         unsigned long     flags;
730
731         /* called holding write lock on ksnd_global_lock */
732
733         LASSERT (route->ksnr_conn == NULL);
734         LASSERT (!route->ksnr_deleted && !route->ksnr_connecting);
735         
736         route->ksnr_connecting = 1;
737         atomic_inc (&route->ksnr_refcount);     /* extra ref for asynchd */
738         
739         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
740         
741         list_add_tail (&route->ksnr_connect_list,
742                        &ksocknal_data.ksnd_autoconnectd_routes);
743
744         if (waitqueue_active (&ksocknal_data.ksnd_autoconnectd_waitq))
745                 wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
746         
747         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
748 }
749
750 ksock_peer_t *
751 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
752 {
753         ptl_nid_t     target_nid;
754         int           rc;
755         ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
756         
757         if (peer != NULL)
758                 return (peer);
759         
760         if (tx->tx_isfwd) {
761                 CERROR ("Can't send packet to "LPX64
762                         ": routed target is not a peer\n", nid);
763                 return (NULL);
764         }
765         
766         rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, &target_nid);
767         if (rc != 0) {
768                 CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
769                 return (NULL);
770         }
771
772         peer = ksocknal_find_peer_locked (target_nid);
773         if (peer != NULL)
774                 return (peer);
775
776         CERROR ("Can't send packet to "LPX64": no peer entry\n", target_nid);
777         return (NULL);
778 }
779
780 ksock_conn_t *
781 ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer) 
782 {
783         struct list_head *tmp;
784         ksock_conn_t     *conn = NULL;
785
786         /* Find the conn with the shortest tx queue */
787         list_for_each (tmp, &peer->ksnp_conns) {
788                 ksock_conn_t *c = list_entry (tmp, ksock_conn_t, ksnc_list);
789
790                 LASSERT (!c->ksnc_closing);
791                 
792                 if (conn == NULL ||
793                     atomic_read (&conn->ksnc_tx_nob) >
794                     atomic_read (&c->ksnc_tx_nob))
795                         conn = c;
796         }
797
798         return (conn);
799 }
800
801 void
802 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
803 {
804         unsigned long  flags;
805         ksock_sched_t *sched = conn->ksnc_scheduler;
806
807         /* called holding global lock (read or irq-write) */
808
809         CDEBUG (D_NET, "Sending to "LPX64" on port %d\n", 
810                 conn->ksnc_peer->ksnp_nid, conn->ksnc_port);
811
812         atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
813         tx->tx_resid = tx->tx_nob;
814         tx->tx_conn = conn;
815
816 #if SOCKNAL_ZC
817         zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
818         /* NB this sets 1 ref on zccd, so the callback can only occur after
819          * I've released this ref. */
820 #endif
821
822         spin_lock_irqsave (&sched->kss_lock, flags);
823                 
824         tx->tx_deadline = jiffies_64 + ksocknal_io_timeout;
825         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
826                 
827         if (conn->ksnc_tx_ready &&      /* able to send */
828             !conn->ksnc_tx_scheduled) { /* not scheduled to send */
829                 /* +1 ref for scheduler */
830                 atomic_inc (&conn->ksnc_refcount);
831                 list_add_tail (&conn->ksnc_tx_list, 
832                                &sched->kss_tx_conns);
833                 conn->ksnc_tx_scheduled = 1;
834                 if (waitqueue_active (&sched->kss_waitq))
835                         wake_up (&sched->kss_waitq);
836         }
837
838         spin_unlock_irqrestore (&sched->kss_lock, flags);
839 }
840
841 ksock_route_t *
842 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
843 {
844         struct list_head  *tmp;
845         ksock_route_t     *route;
846         
847         list_for_each (tmp, &peer->ksnp_routes) {
848                 route = list_entry (tmp, ksock_route_t, ksnr_list);
849                 
850                 if (route->ksnr_conn == NULL && /* not connected */
851                     !route->ksnr_connecting &&  /* not connecting */
852                     route->ksnr_timeout <= jiffies_64) /* OK to retry */
853                         return (route);
854         }
855         
856         return (NULL);
857 }
858
859 ksock_route_t *
860 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
861 {
862         struct list_head  *tmp;
863         ksock_route_t     *route;
864
865         list_for_each (tmp, &peer->ksnp_routes) {
866                 route = list_entry (tmp, ksock_route_t, ksnr_list);
867                 
868                 if (route->ksnr_connecting)
869                         return (route);
870         }
871         
872         return (NULL);
873 }
874
875 int
876 ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
877 {
878         unsigned long     flags;
879         ksock_peer_t     *peer;
880         ksock_conn_t     *conn;
881         ksock_route_t    *route;
882         rwlock_t         *g_lock;
883         
884         /* Ensure the frags we've been given EXACTLY match the number of
885          * bytes we want to send.  Many TCP/IP stacks disregard any total
886          * size parameters passed to them and just look at the frags. 
887          *
888          * We always expect at least 1 mapped fragment containing the
889          * complete portals header. */
890         LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
891                  lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
892         LASSERT (tx->tx_niov >= 1);
893         LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
894
895         CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
896                 tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type, 
897                 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
898
899         tx->tx_conn = NULL;                     /* only set when assigned a conn */
900
901         g_lock = &ksocknal_data.ksnd_global_lock;
902         read_lock (g_lock);
903         
904         peer = ksocknal_find_target_peer_locked (tx, nid);
905         if (peer == NULL) {
906                 read_unlock (g_lock);
907                 return (PTL_FAIL);
908         }
909
910         /* Any routes need to be connected? (need write lock if so) */
911         if (ksocknal_find_connectable_route_locked (peer) == NULL) {
912                 conn = ksocknal_find_conn_locked (tx, peer);
913                 if (conn != NULL) {
914                         ksocknal_queue_tx_locked (tx, conn);
915                         read_unlock (g_lock);
916                         return (PTL_OK);
917                 }
918         }
919         
920         /* need a write lock now to change peer state... */
921
922         atomic_inc (&peer->ksnp_refcount);      /* +1 ref for me while I unlock */
923         read_unlock (g_lock);
924         write_lock_irqsave (g_lock, flags);
925         
926         if (peer->ksnp_closing) {               /* peer deleted as I blocked! */
927                 write_unlock_irqrestore (g_lock, flags);
928                 ksocknal_put_peer (peer);
929                 return (PTL_FAIL);
930         }
931         ksocknal_put_peer (peer);               /* drop ref I got above */
932
933         /* I may launch autoconnects, now we're write locked... */
934         while ((route = ksocknal_find_connectable_route_locked (peer)) != NULL)
935                 ksocknal_launch_autoconnect_locked (route);
936
937         conn = ksocknal_find_conn_locked (tx, peer);
938         if (conn != NULL) {
939                 ksocknal_queue_tx_locked (tx, conn);
940                 write_unlock_irqrestore (g_lock, flags);
941                 return (PTL_OK);
942         }
943                 
944         if (ksocknal_find_connecting_route_locked (peer) == NULL) {
945                 /* no routes actually connecting now */
946                 write_unlock_irqrestore (g_lock, flags);
947                 return (PTL_FAIL);
948         }
949
950         list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
951
952         write_unlock_irqrestore (g_lock, flags);
953         return (PTL_OK);
954 }
955
956 ksock_ltx_t *
957 ksocknal_setup_hdr (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
958                     ptl_hdr_t *hdr, int type)
959 {
960         ksock_ltx_t  *ltx;
961
962         /* I may not block for a transmit descriptor if I might block the
963          * receiver, or an interrupt handler. */
964         ltx = ksocknal_get_ltx (!(type == PTL_MSG_ACK ||
965                                   type == PTL_MSG_REPLY ||
966                                   in_interrupt ()));
967         if (ltx == NULL) {
968                 CERROR ("Can't allocate tx desc\n");
969                 return (NULL);
970         }
971
972         /* Init local send packet (storage for hdr, finalize() args) */
973         ltx->ltx_hdr = *hdr;
974         ltx->ltx_private = private;
975         ltx->ltx_cookie = cookie;
976         
977         /* Init common ltx_tx */
978         ltx->ltx_tx.tx_isfwd = 0;
979         ltx->ltx_tx.tx_nob = sizeof (*hdr);
980
981         /* We always have 1 mapped frag for the header */
982         ltx->ltx_tx.tx_niov = 1;
983         ltx->ltx_tx.tx_iov = &ltx->ltx_iov_space.hdr;
984         ltx->ltx_tx.tx_iov[0].iov_base = &ltx->ltx_hdr;
985         ltx->ltx_tx.tx_iov[0].iov_len = sizeof (ltx->ltx_hdr);
986
987         ltx->ltx_tx.tx_kiov  = NULL;
988         ltx->ltx_tx.tx_nkiov = 0;
989
990         return (ltx);
991 }
992
993 int
994 ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
995                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
996                unsigned int payload_niov, struct iovec *payload_iov,
997                size_t payload_len)
998 {
999         ksock_ltx_t  *ltx;
1000         int           rc;
1001
1002         /* NB 'private' is different depending on what we're sending.
1003          * Just ignore it until we can rely on it
1004          */
1005
1006         CDEBUG(D_NET,
1007                "sending "LPSZ" bytes in %d mapped frags to nid: "LPX64
1008                " pid %d\n", payload_len, payload_niov, nid, pid);
1009
1010         ltx = ksocknal_setup_hdr (nal, private, cookie, hdr, type);
1011         if (ltx == NULL)
1012                 return (PTL_FAIL);
1013
1014         /* append the payload_iovs to the one pointing at the header */
1015         LASSERT (ltx->ltx_tx.tx_niov == 1 && ltx->ltx_tx.tx_nkiov == 0);
1016         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1017
1018         memcpy (ltx->ltx_tx.tx_iov + 1, payload_iov,
1019                 payload_niov * sizeof (*payload_iov));
1020         ltx->ltx_tx.tx_niov = 1 + payload_niov;
1021         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
1022
1023         rc = ksocknal_launch_packet (&ltx->ltx_tx, nid);
1024         if (rc != PTL_OK)
1025                 ksocknal_put_ltx (ltx);
1026         
1027         return (rc);
1028 }
1029
1030 int
1031 ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
1032                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1033                      unsigned int payload_niov, ptl_kiov_t *payload_iov, size_t payload_len)
1034 {
1035         ksock_ltx_t *ltx;
1036         int          rc;
1037
1038         /* NB 'private' is different depending on what we're sending.
1039          * Just ignore it until we can rely on it */
1040
1041         CDEBUG(D_NET,
1042                "sending "LPSZ" bytes in %d mapped frags to nid: "LPX64" pid %d\n",
1043                payload_len, payload_niov, nid, pid);
1044
1045         ltx = ksocknal_setup_hdr (nal, private, cookie, hdr, type);
1046         if (ltx == NULL)
1047                 return (PTL_FAIL);
1048
1049         LASSERT (ltx->ltx_tx.tx_niov == 1 && ltx->ltx_tx.tx_nkiov == 0);
1050         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1051         
1052         ltx->ltx_tx.tx_kiov = ltx->ltx_iov_space.payload.kiov;
1053         memcpy (ltx->ltx_tx.tx_kiov, payload_iov, 
1054                 payload_niov * sizeof (*payload_iov));
1055         ltx->ltx_tx.tx_nkiov = payload_niov;
1056         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
1057
1058         rc = ksocknal_launch_packet (&ltx->ltx_tx, nid);
1059         if (rc != PTL_OK) 
1060                 ksocknal_put_ltx (ltx);
1061                 
1062         return (rc);
1063 }
1064
1065 void
1066 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1067 {
1068         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
1069         ksock_tx_t   *tx  = (ksock_tx_t *)&fwd->kprfd_scratch;
1070         int           rc;
1071         
1072         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
1073                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
1074
1075         /* I'm the gateway; must be the last hop */
1076         if (nid == ksocknal_lib.ni.nid)
1077                 nid = fwd->kprfd_target_nid;
1078
1079         tx->tx_isfwd = 1;                   /* This is a forwarding packet */
1080         tx->tx_nob   = fwd->kprfd_nob;
1081         tx->tx_niov  = fwd->kprfd_niov;
1082         tx->tx_iov   = fwd->kprfd_iov;
1083         tx->tx_nkiov = 0;
1084         tx->tx_kiov  = NULL;
1085         tx->tx_hdr   = (ptl_hdr_t *)fwd->kprfd_iov[0].iov_base;
1086
1087         rc = ksocknal_launch_packet (tx, nid);
1088         if (rc != 0) {
1089                 /* FIXME, could pass a better completion error */
1090                 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, -EHOSTUNREACH);
1091         }
1092 }
1093
1094 int
1095 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1096 {
1097         long    pid = kernel_thread (fn, arg, 0);
1098
1099         if (pid < 0)
1100                 return ((int)pid);
1101
1102         atomic_inc (&ksocknal_data.ksnd_nthreads);
1103         return (0);
1104 }
1105
1106 void
1107 ksocknal_thread_fini (void)
1108 {
1109         atomic_dec (&ksocknal_data.ksnd_nthreads);
1110 }
1111
1112 void
1113 ksocknal_fmb_callback (void *arg, int error)
1114 {
1115         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
1116         ksock_fmb_pool_t  *fmp = fmb->fmb_pool;
1117         ptl_hdr_t         *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
1118         ksock_conn_t      *conn = NULL;
1119         ksock_sched_t     *sched;
1120         unsigned long      flags;
1121
1122         if (error != 0)
1123                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1124                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),
1125                        error);
1126         else
1127                 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
1128                         NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid));
1129
1130         spin_lock_irqsave (&fmp->fmp_lock, flags);
1131
1132         list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
1133
1134         if (!list_empty (&fmp->fmp_blocked_conns)) {
1135                 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
1136                                    ksock_conn_t, ksnc_rx_list);
1137                 list_del (&conn->ksnc_rx_list);
1138         }
1139
1140         spin_unlock_irqrestore (&fmp->fmp_lock, flags);
1141
1142         /* drop peer ref taken on init */
1143         ksocknal_put_peer (fmb->fmb_peer);
1144         
1145         if (conn == NULL)
1146                 return;
1147
1148         CDEBUG (D_NET, "Scheduling conn %p\n", conn);
1149         LASSERT (conn->ksnc_rx_scheduled);
1150         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
1151
1152         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
1153
1154         sched = conn->ksnc_scheduler;
1155
1156         spin_lock_irqsave (&sched->kss_lock, flags);
1157
1158         list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1159
1160         if (waitqueue_active (&sched->kss_waitq))
1161                 wake_up (&sched->kss_waitq);
1162
1163         spin_unlock_irqrestore (&sched->kss_lock, flags);
1164 }
1165
1166 ksock_fmb_t *
1167 ksocknal_get_idle_fmb (ksock_conn_t *conn)
1168 {
1169         int               payload_nob = conn->ksnc_rx_nob_left;
1170         int               packet_nob = sizeof (ptl_hdr_t) + payload_nob;
1171         unsigned long     flags;
1172         ksock_fmb_pool_t *pool;
1173         ksock_fmb_t      *fmb;
1174
1175         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1176         LASSERT (ksocknal_data.ksnd_fmbs != NULL);
1177
1178         if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
1179                 pool = &ksocknal_data.ksnd_small_fmp;
1180         else
1181                 pool = &ksocknal_data.ksnd_large_fmp;
1182
1183         spin_lock_irqsave (&pool->fmp_lock, flags);
1184
1185         if (!list_empty (&pool->fmp_idle_fmbs)) {
1186                 fmb = list_entry(pool->fmp_idle_fmbs.next,
1187                                  ksock_fmb_t, fmb_list);
1188                 list_del (&fmb->fmb_list);
1189                 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1190
1191                 return (fmb);
1192         }
1193
1194         /* deschedule until fmb free */
1195
1196         conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
1197
1198         list_add_tail (&conn->ksnc_rx_list,
1199                        &pool->fmp_blocked_conns);
1200
1201         spin_unlock_irqrestore (&pool->fmp_lock, flags);
1202         return (NULL);
1203 }
1204
1205 int
1206 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
1207 {
1208         int payload_nob = conn->ksnc_rx_nob_left;
1209         int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
1210         ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1211         int niov;                               /* at least the header */
1212         int nob;
1213
1214         LASSERT (conn->ksnc_rx_scheduled);
1215         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1216         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
1217         LASSERT (payload_nob >= 0);
1218         LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
1219         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
1220
1221         /* Got a forwarding buffer; copy the header we just read into the
1222          * forwarding buffer.  If there's payload, start reading reading it
1223          * into the buffer, otherwise the forwarding buffer can be kicked
1224          * off immediately.
1225          *
1226          * NB fmb->fmb_iov spans the WHOLE packet.
1227          *    conn->ksnc_rx_iov spans just the payload.
1228          */
1229         fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
1230
1231         /* copy header */
1232         memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t));
1233
1234         /* Take a ref on the conn's peer to prevent module unload before
1235          * forwarding completes.  NB we ref peer and not conn since because
1236          * all refs on conn after it has been closed must remove themselves
1237          * in finite time */
1238         fmb->fmb_peer = conn->ksnc_peer;
1239         atomic_inc (&conn->ksnc_peer->ksnp_refcount);
1240
1241         if (payload_nob == 0) {         /* got complete packet already */
1242                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n",
1243                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1244                         dest_nid, packet_nob);
1245
1246                 fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
1247
1248                 kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1249                               packet_nob, 1, fmb->fmb_iov,
1250                               ksocknal_fmb_callback, fmb);
1251
1252                 /* forward it now */
1253                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1254
1255                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1256                 return (1);
1257         }
1258
1259         niov = 1;
1260         if (packet_nob <= PAGE_SIZE) {  /* whole packet fits in first page */
1261                 fmb->fmb_iov[0].iov_len = packet_nob;
1262         } else {
1263                 fmb->fmb_iov[0].iov_len = PAGE_SIZE;
1264                 nob = packet_nob - PAGE_SIZE;
1265
1266                 do {
1267                         LASSERT (niov < fmb->fmb_npages);
1268                         fmb->fmb_iov[niov].iov_base =
1269                                 page_address (fmb->fmb_pages[niov]);
1270                         fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
1271                         nob -= PAGE_SIZE;
1272                         niov++;
1273                 } while (nob > 0);
1274         }
1275
1276         kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1277                       packet_nob, niov, fmb->fmb_iov,
1278                       ksocknal_fmb_callback, fmb);
1279
1280         conn->ksnc_cookie = fmb;                /* stash fmb for later */
1281         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1282         conn->ksnc_rx_deadline = jiffies_64 + ksocknal_io_timeout; /* start timeout */
1283         
1284         /* payload is desc's iov-ed buffer, but skipping the hdr */
1285         LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
1286                  sizeof (struct iovec));
1287
1288         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1289         conn->ksnc_rx_iov[0].iov_base =
1290                 (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) +
1291                          sizeof (ptl_hdr_t));
1292         conn->ksnc_rx_iov[0].iov_len =
1293                 fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
1294
1295         if (niov > 1)
1296                 memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1],
1297                        (niov - 1) * sizeof (struct iovec));
1298
1299         conn->ksnc_rx_niov = niov;
1300
1301         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1302                 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1303         return (0);
1304 }
1305
1306 void
1307 ksocknal_fwd_parse (ksock_conn_t *conn)
1308 {
1309         ksock_peer_t *peer;
1310         ptl_nid_t     dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1311         int           body_len = NTOH__u32 (PTL_HDR_LENGTH(&conn->ksnc_hdr));
1312
1313         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1314                 NTOH__u64 (conn->ksnc_hdr.src_nid),
1315                 dest_nid, conn->ksnc_rx_nob_left);
1316
1317         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1318         LASSERT (conn->ksnc_rx_scheduled);
1319
1320         if (body_len < 0) {                 /* length corrupt (overflow) */
1321                 CERROR("dropping packet from "LPX64" for "LPX64": packet "
1322                        "size %d illegal\n", NTOH__u64 (conn->ksnc_hdr.src_nid),
1323                        dest_nid, body_len);
1324
1325                 ksocknal_new_packet (conn, 0);  /* on to new packet */
1326                 ksocknal_close_conn_unlocked (conn); /* give up on conn */
1327                 return;
1328         }
1329
1330         if (ksocknal_data.ksnd_fmbs == NULL) {        /* not forwarding */
1331                 CERROR("dropping packet from "LPX64" for "LPX64": not "
1332                        "forwarding\n", conn->ksnc_hdr.src_nid,
1333                        conn->ksnc_hdr.dest_nid);
1334                 /* on to new packet (skip this one's body) */
1335                 ksocknal_new_packet (conn, body_len);
1336                 return;
1337         }
1338
1339         if (body_len > SOCKNAL_MAX_FWD_PAYLOAD) {      /* too big to forward */
1340                 CERROR ("dropping packet from "LPX64" for "LPX64
1341                         ": packet size %d too big\n", conn->ksnc_hdr.src_nid,
1342                         conn->ksnc_hdr.dest_nid, body_len);
1343                 /* on to new packet (skip this one's body) */
1344                 ksocknal_new_packet (conn, body_len);
1345                 return;
1346         }
1347
1348         /* should have gone direct */
1349         peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
1350         if (peer != NULL) {
1351                 CERROR ("dropping packet from "LPX64" for "LPX64
1352                         ": target is a peer\n", conn->ksnc_hdr.src_nid,
1353                         conn->ksnc_hdr.dest_nid);
1354                 ksocknal_put_peer (peer);  /* drop ref from get above */
1355
1356                 /* on to next packet (skip this one's body) */
1357                 ksocknal_new_packet (conn, body_len);
1358                 return;
1359         }
1360
1361         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;       /* Getting FMB now */
1362         conn->ksnc_rx_nob_left = body_len;              /* stash packet size */
1363         conn->ksnc_rx_nob_wanted = body_len;            /* (no slop) */
1364 }
1365
1366 int
1367 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1368 {
1369         static char ksocknal_slop_buffer[4096];
1370
1371         int   nob;
1372         int   niov;
1373         int   skipped;
1374
1375         if (nob_to_skip == 0) {         /* right at next packet boundary now */
1376                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1377                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1378                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1379
1380                 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1381                 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1382                 conn->ksnc_rx_iov[0].iov_len  = sizeof (ptl_hdr_t);
1383                 conn->ksnc_rx_niov = 1;
1384
1385                 conn->ksnc_rx_kiov = NULL;
1386                 conn->ksnc_rx_nkiov = 0;
1387                 return (1);
1388         }
1389
1390         /* Set up to skip as much a possible now.  If there's more left
1391          * (ran out of iov entries) we'll get called again */
1392
1393         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1394         conn->ksnc_rx_nob_left = nob_to_skip;
1395         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1396         skipped = 0;
1397         niov = 0;
1398
1399         do {
1400                 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1401
1402                 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1403                 conn->ksnc_rx_iov[niov].iov_len  = nob;
1404                 niov++;
1405                 skipped += nob;
1406                 nob_to_skip -=nob;
1407
1408         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1409                  niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1410
1411         conn->ksnc_rx_niov = niov;
1412         conn->ksnc_rx_kiov = NULL;
1413         conn->ksnc_rx_nkiov = 0;
1414         conn->ksnc_rx_nob_wanted = skipped;
1415         return (0);
1416 }
1417
1418 void
1419 ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
1420 {
1421         ksock_conn_t *conn;
1422         ksock_fmb_t  *fmb;
1423         int           rc;
1424
1425         /* NB: sched->ksnc_lock lock held */
1426
1427         LASSERT (!list_empty (&sched->kss_rx_conns));
1428         conn = list_entry(sched->kss_rx_conns.next, ksock_conn_t, ksnc_rx_list);
1429         list_del (&conn->ksnc_rx_list);
1430
1431         spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
1432
1433         CDEBUG(D_NET, "sched %p conn %p\n", sched, conn);
1434         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1435         LASSERT (conn->ksnc_rx_scheduled);
1436         LASSERT (conn->ksnc_rx_ready);
1437
1438         /* doesn't need a forwarding buffer */
1439         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1440                 goto try_read;
1441
1442  get_fmb:
1443         fmb = ksocknal_get_idle_fmb (conn);
1444         if (fmb == NULL) {      /* conn descheduled waiting for idle fmb */
1445                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
1446                 return;
1447         }
1448
1449         if (ksocknal_init_fmb (conn, fmb)) /* packet forwarded ? */
1450                 goto out;               /* come back later for next packet */
1451
1452  try_read:
1453         /* NB: sched lock NOT held */
1454         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1455                  conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1456                  conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1457                  conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1458
1459         LASSERT (conn->ksnc_rx_nob_wanted > 0);
1460
1461         conn->ksnc_rx_ready = 0;/* data ready may race with me and set ready */
1462         mb();                   /* => clear BEFORE trying to read */
1463
1464         rc = ksocknal_recvmsg(conn);
1465
1466         if (rc <= 0) {
1467                 if (ksocknal_close_conn_unlocked (conn)) {
1468                         /* I'm the first to close */
1469                         if (rc < 0)
1470                                 CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n",
1471                                         conn, rc, conn->ksnc_peer->ksnp_nid,
1472                                         conn->ksnc_ipaddr, conn->ksnc_port);
1473                         else
1474                                 CERROR ("[%p] EOF from "LPX64" ip %08x:%d\n",
1475                                         conn, conn->ksnc_peer->ksnp_nid,
1476                                         conn->ksnc_ipaddr, conn->ksnc_port);
1477                 }
1478                 goto out;
1479         }
1480
1481         if (conn->ksnc_rx_nob_wanted != 0)      /* short read */
1482                 goto out;                       /* try again later */
1483
1484         /* got all I wanted, assume there's more - prevent data_ready locking */
1485         conn->ksnc_rx_ready = 1;
1486
1487         switch (conn->ksnc_rx_state) {
1488         case SOCKNAL_RX_HEADER:
1489                 if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
1490                     NTOH__u64(conn->ksnc_hdr.dest_nid) != ksocknal_lib.ni.nid) {
1491                         /* This packet isn't for me */
1492                         ksocknal_fwd_parse (conn);
1493                         switch (conn->ksnc_rx_state) {
1494                         case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1495                                 goto out;       /* => come back later */
1496                         case SOCKNAL_RX_SLOP:   /* skipping packet's body */
1497                                 goto try_read;  /* => go read it */
1498                         case SOCKNAL_RX_GET_FMB: /* forwarding */
1499                                 goto get_fmb;   /* => go get a fwd msg buffer */
1500                         default:
1501                                 LBUG ();
1502                         }
1503                         /* Not Reached */
1504                 }
1505
1506                 /* sets wanted_len, iovs etc */
1507                 lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1508
1509                 /* start timeout (lib is waiting for finalize) */
1510                 conn->ksnc_rx_deadline = jiffies_64 + ksocknal_io_timeout;
1511
1512                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1513                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1514                         goto try_read;          /* go read the payload */
1515                 }
1516                 /* Fall through (completed packet for me) */
1517
1518         case SOCKNAL_RX_BODY:
1519                 /* payload all received */
1520                 conn->ksnc_rx_deadline = 0;     /* cancel timeout */
1521                 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie);
1522                 /* Fall through */
1523
1524         case SOCKNAL_RX_SLOP:
1525                 /* starting new packet? */
1526                 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1527                         goto out;       /* come back later */
1528                 goto try_read;          /* try to finish reading slop now */
1529
1530         case SOCKNAL_RX_BODY_FWD:
1531                 /* payload all received */
1532                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1533                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1534                         NTOH__u64 (conn->ksnc_hdr.dest_nid),
1535                         conn->ksnc_rx_nob_left);
1536
1537                 /* cancel timeout (only needed it while fmb allocated) */
1538                 conn->ksnc_rx_deadline = 0;
1539
1540                 /* forward the packet. NB ksocknal_init_fmb() put fmb into
1541                  * conn->ksnc_cookie */
1542                 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
1543                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1544
1545                 /* no slop in forwarded packets */
1546                 LASSERT (conn->ksnc_rx_nob_left == 0);
1547
1548                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1549                 goto out;                       /* (later) */
1550
1551         default:
1552                 break;
1553         }
1554
1555         /* Not Reached */
1556         LBUG ();
1557
1558  out:
1559         spin_lock_irqsave (&sched->kss_lock, *irq_flags);
1560
1561         /* no data there to read? */
1562         if (!conn->ksnc_rx_ready) {
1563                 /* let socket callback schedule again */
1564                 conn->ksnc_rx_scheduled = 0;
1565                 /* drop scheduler's ref */
1566                 ksocknal_put_conn (conn);   
1567         } else {
1568                 /* stay scheduled */
1569                 list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1570         }
1571 }
1572
1573 int
1574 ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
1575                unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
1576 {
1577         ksock_conn_t *conn = (ksock_conn_t *)private;
1578
1579         LASSERT (mlen <= rlen);
1580         LASSERT (niov <= PTL_MD_MAX_IOV);
1581         
1582         conn->ksnc_cookie = msg;
1583         conn->ksnc_rx_nob_wanted = mlen;
1584         conn->ksnc_rx_nob_left   = rlen;
1585
1586         conn->ksnc_rx_nkiov = 0;
1587         conn->ksnc_rx_kiov = NULL;
1588         conn->ksnc_rx_niov = niov;
1589         conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1590         memcpy (conn->ksnc_rx_iov, iov, niov * sizeof (*iov));
1591
1592         LASSERT (mlen == 
1593                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1594                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1595
1596         return (rlen);
1597 }
1598
1599 int
1600 ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
1601                      unsigned int niov, ptl_kiov_t *kiov, size_t mlen, size_t rlen)
1602 {
1603         ksock_conn_t *conn = (ksock_conn_t *)private;
1604
1605         LASSERT (mlen <= rlen);
1606         LASSERT (niov <= PTL_MD_MAX_IOV);
1607         
1608         conn->ksnc_cookie = msg;
1609         conn->ksnc_rx_nob_wanted = mlen;
1610         conn->ksnc_rx_nob_left   = rlen;
1611
1612         conn->ksnc_rx_niov = 0;
1613         conn->ksnc_rx_iov  = NULL;
1614         conn->ksnc_rx_nkiov = niov;
1615         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1616         memcpy (conn->ksnc_rx_kiov, kiov, niov * sizeof (*kiov));
1617
1618         LASSERT (mlen == 
1619                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1620                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1621
1622         return (rlen);
1623 }
1624
1625 int ksocknal_scheduler (void *arg)
1626 {
1627         ksock_sched_t     *sched = (ksock_sched_t *)arg;
1628         unsigned long      flags;
1629         int                rc;
1630         int                nloops = 0;
1631         int                id = sched - ksocknal_data.ksnd_schedulers;
1632         char               name[16];
1633
1634         snprintf (name, sizeof (name),"ksocknald[%d]", id);
1635         kportal_daemonize (name);
1636         kportal_blockallsigs ();
1637
1638 #if (CONFIG_SMP && CPU_AFFINITY)
1639         if ((cpu_online_map & (1 << id)) != 0)
1640                 current->cpus_allowed = (1 << id);
1641         else
1642                 CERROR ("Can't set CPU affinity for %s\n", name);
1643 #endif /* CONFIG_SMP && CPU_AFFINITY */
1644         
1645         spin_lock_irqsave (&sched->kss_lock, flags);
1646
1647         while (!ksocknal_data.ksnd_shuttingdown) {
1648                 int did_something = 0;
1649
1650                 /* Ensure I progress everything semi-fairly */
1651
1652                 if (!list_empty (&sched->kss_rx_conns)) {
1653                         did_something = 1;
1654                         /* drops & regains kss_lock */
1655                         ksocknal_process_receive (sched, &flags);
1656                 }
1657
1658                 if (!list_empty (&sched->kss_tx_conns)) {
1659                         did_something = 1;
1660                         /* drops and regains kss_lock */
1661                         ksocknal_process_transmit (sched, &flags);
1662                 }
1663 #if SOCKNAL_ZC
1664                 if (!list_empty (&sched->kss_zctxdone_list)) {
1665                         ksock_tx_t *tx =
1666                                 list_entry(sched->kss_zctxdone_list.next,
1667                                            ksock_tx_t, tx_list);
1668                         did_something = 1;
1669
1670                         list_del (&tx->tx_list);
1671                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1672
1673                         ksocknal_tx_done (tx, 1);
1674
1675                         spin_lock_irqsave (&sched->kss_lock, flags);
1676                 }
1677 #endif
1678                 if (!did_something ||           /* nothing to do */
1679                     ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1680                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1681
1682                         nloops = 0;
1683
1684                         if (!did_something) {   /* wait for something to do */
1685 #if SOCKNAL_ZC
1686                                 rc = wait_event_interruptible (sched->kss_waitq,
1687                                                                ksocknal_data.ksnd_shuttingdown ||
1688                                                                !list_empty(&sched->kss_rx_conns) ||
1689                                                                !list_empty(&sched->kss_tx_conns) ||
1690                                                                !list_empty(&sched->kss_zctxdone_list));
1691 #else
1692                                 rc = wait_event_interruptible (sched->kss_waitq,
1693                                                                ksocknal_data.ksnd_shuttingdown ||
1694                                                                !list_empty(&sched->kss_rx_conns) ||
1695                                                                !list_empty(&sched->kss_tx_conns));
1696 #endif
1697                                 LASSERT (rc == 0);
1698                         } else
1699                                our_cond_resched();
1700
1701                         spin_lock_irqsave (&sched->kss_lock, flags);
1702                 }
1703         }
1704
1705         spin_unlock_irqrestore (&sched->kss_lock, flags);
1706         ksocknal_thread_fini ();
1707         return (0);
1708 }
1709
1710 void
1711 ksocknal_data_ready (struct sock *sk, int n)
1712 {
1713         unsigned long  flags;
1714         ksock_conn_t  *conn;
1715         ksock_sched_t *sched;
1716         ENTRY;
1717
1718         /* interleave correctly with closing sockets... */
1719         read_lock (&ksocknal_data.ksnd_global_lock);
1720
1721         conn = sk->sk_user_data;
1722         if (conn == NULL) {             /* raced with ksocknal_close_sock */
1723                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
1724                 sk->sk_data_ready (sk, n);
1725         } else if (!conn->ksnc_rx_ready) {        /* new news */
1726                 /* Set ASAP in case of concurrent calls to me */
1727                 conn->ksnc_rx_ready = 1;
1728
1729                 sched = conn->ksnc_scheduler;
1730
1731                 spin_lock_irqsave (&sched->kss_lock, flags);
1732
1733                 /* Set again (process_receive may have cleared while I blocked for the lock) */
1734                 conn->ksnc_rx_ready = 1;
1735
1736                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1737                         list_add_tail(&conn->ksnc_rx_list,
1738                                       &sched->kss_rx_conns);
1739                         conn->ksnc_rx_scheduled = 1;
1740                         /* extra ref for scheduler */
1741                         atomic_inc (&conn->ksnc_refcount);
1742
1743                         if (waitqueue_active (&sched->kss_waitq))
1744                                 wake_up (&sched->kss_waitq);
1745                 }
1746
1747                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1748         }
1749
1750         read_unlock (&ksocknal_data.ksnd_global_lock);
1751
1752         EXIT;
1753 }
1754
1755 void
1756 ksocknal_write_space (struct sock *sk)
1757 {
1758         unsigned long  flags;
1759         ksock_conn_t  *conn;
1760         ksock_sched_t *sched;
1761
1762         /* interleave correctly with closing sockets... */
1763         read_lock (&ksocknal_data.ksnd_global_lock);
1764
1765         conn = sk->sk_user_data;
1766
1767         CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1768                sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1769                (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
1770                                       " ready" : " blocked"),
1771                (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1772                                       " scheduled" : " idle"),
1773                (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1774                                       " empty" : " queued"));
1775
1776         if (conn == NULL) {             /* raced with ksocknal_close_sock */
1777                 LASSERT (sk->sk_write_space != &ksocknal_write_space);
1778                 sk->sk_write_space (sk);
1779         } else if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1780                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
1781
1782                 if (!conn->ksnc_tx_ready) {      /* new news */
1783                         /* Set ASAP in case of concurrent calls to me */
1784                         conn->ksnc_tx_ready = 1;
1785
1786                         sched = conn->ksnc_scheduler;
1787
1788                         spin_lock_irqsave (&sched->kss_lock, flags);
1789
1790                         /* Set again (process_transmit may have
1791                            cleared while I blocked for the lock) */
1792                         conn->ksnc_tx_ready = 1;
1793
1794                         if (!conn->ksnc_tx_scheduled && // not being progressed
1795                             !list_empty(&conn->ksnc_tx_queue)){//packets to send
1796                                 list_add_tail (&conn->ksnc_tx_list,
1797                                                &sched->kss_tx_conns);
1798                                 conn->ksnc_tx_scheduled = 1;
1799                                 /* extra ref for scheduler */
1800                                 atomic_inc (&conn->ksnc_refcount);
1801
1802                                 if (waitqueue_active (&sched->kss_waitq))
1803                                         wake_up (&sched->kss_waitq);
1804                         }
1805
1806                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1807                 }
1808         }
1809
1810         read_unlock (&ksocknal_data.ksnd_global_lock);
1811 }
1812
1813 int
1814 ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
1815 {
1816         int           rc;
1817         mm_segment_t  oldmm = get_fs();
1818
1819         while (nob > 0) {
1820                 struct iovec  iov = {
1821                         .iov_base = buffer,
1822                         .iov_len  = nob
1823                 };
1824                 struct msghdr msg = {
1825                         .msg_name       = NULL,
1826                         .msg_namelen    = 0,
1827                         .msg_iov        = &iov,
1828                         .msg_iovlen     = 1,
1829                         .msg_control    = NULL,
1830                         .msg_controllen = 0,
1831                         .msg_flags      = 0
1832                 };
1833
1834                 set_fs (KERNEL_DS);
1835                 rc = sock_sendmsg (sock, &msg, iov.iov_len);
1836                 set_fs (oldmm);
1837                 
1838                 if (rc < 0)
1839                         return (rc);
1840
1841                 if (rc == 0) {
1842                         CERROR ("Unexpected zero rc\n");
1843                         return (-ECONNABORTED);
1844                 }
1845
1846                 buffer = ((char *)buffer) + rc;
1847                 nob -= rc;
1848         }
1849         
1850         return (0);
1851 }
1852
1853 int
1854 ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
1855 {
1856         int           rc;
1857         mm_segment_t  oldmm = get_fs();
1858         
1859         while (nob > 0) {
1860                 struct iovec  iov = {
1861                         .iov_base = buffer,
1862                         .iov_len  = nob
1863                 };
1864                 struct msghdr msg = {
1865                         .msg_name       = NULL,
1866                         .msg_namelen    = 0,
1867                         .msg_iov        = &iov,
1868                         .msg_iovlen     = 1,
1869                         .msg_control    = NULL,
1870                         .msg_controllen = 0,
1871                         .msg_flags      = 0
1872                 };
1873
1874                 set_fs (KERNEL_DS);
1875                 rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
1876                 set_fs (oldmm);
1877                 
1878                 if (rc < 0)
1879                         return (rc);
1880
1881                 if (rc == 0)
1882                         return (-ECONNABORTED);
1883
1884                 buffer = ((char *)buffer) + rc;
1885                 nob -= rc;
1886         }
1887         
1888         return (0);
1889 }
1890
1891 int
1892 ksocknal_exchange_nids (struct socket *sock, ptl_nid_t nid)
1893 {
1894         int                 rc;
1895         ptl_hdr_t           hdr;
1896         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
1897
1898         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
1899
1900         memset (&hdr, 0, sizeof (hdr));
1901         hmv->magic         = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
1902         hmv->version_major = __cpu_to_le32 (PORTALS_PROTO_VERSION_MAJOR);
1903         hmv->version_minor = __cpu_to_le32 (PORTALS_PROTO_VERSION_MINOR);
1904         
1905         hdr.src_nid = __cpu_to_le64 (ksocknal_lib.ni.nid);
1906         hdr.type    = __cpu_to_le32 (PTL_MSG_HELLO);
1907         
1908         /* Assume sufficient socket buffering for this message */
1909         rc = ksocknal_sock_write (sock, &hdr, sizeof (hdr));
1910         if (rc != 0) {
1911                 CERROR ("Error %d sending HELLO to "LPX64"\n", rc, nid);
1912                 return (rc);
1913         }
1914
1915         rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
1916         if (rc != 0) {
1917                 CERROR ("Error %d reading HELLO from "LPX64"\n", rc, nid);
1918                 return (rc);
1919         }
1920         
1921         if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
1922                 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
1923                         __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, nid);
1924                 return (-EINVAL);
1925         }
1926
1927         if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
1928             hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
1929                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
1930                         " from "LPX64"\n",
1931                         __le16_to_cpu (hmv->version_major),
1932                         __le16_to_cpu (hmv->version_minor),
1933                         PORTALS_PROTO_VERSION_MAJOR,
1934                         PORTALS_PROTO_VERSION_MINOR,
1935                         nid);
1936                 return (-EINVAL);
1937         }
1938
1939         LASSERT (PORTALS_PROTO_VERSION_MAJOR == 0);
1940         /* version 0 sends magic/version as the dest_nid of a 'hello' header,
1941          * so read the rest of it in now... */
1942
1943         rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
1944         if (rc != 0) {
1945                 CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
1946                         rc, nid);
1947                 return (rc);
1948         }
1949
1950         /* ...and check we got what we expected */
1951         if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
1952             PTL_HDR_LENGTH (&hdr) != __cpu_to_le32 (0)) {
1953                 CERROR ("Expecting a HELLO hdr with 0 payload,"
1954                         " but got type %d with %d payload from "LPX64"\n",
1955                         __le32_to_cpu (hdr.type),
1956                         __le32_to_cpu (PTL_HDR_LENGTH (&hdr)), nid);
1957                 return (-EINVAL);
1958         }
1959         
1960         if (__le64_to_cpu (hdr.src_nid) != nid) {
1961                 CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
1962                         __le64_to_cpu (hdr.src_nid), nid);
1963                 return (-EINVAL);
1964         }
1965
1966         return (0);
1967 }
1968
1969 int
1970 ksocknal_set_linger (struct socket *sock) 
1971 {
1972         mm_segment_t    oldmm = get_fs ();
1973         int             rc;
1974         int             option;
1975         struct linger   linger;
1976
1977         /* Ensure this socket aborts active sends immediately when we close
1978          * it. */
1979         
1980         linger.l_onoff = 0;
1981         linger.l_linger = 0;
1982
1983         set_fs (KERNEL_DS);
1984         rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER, 
1985                               (char *)&linger, sizeof (linger));
1986         set_fs (oldmm);
1987         if (rc != 0) {
1988                 CERROR ("Can't set SO_LINGER: %d\n", rc);
1989                 return (rc);
1990         }
1991         
1992         option = -1;
1993         set_fs (KERNEL_DS);
1994         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
1995                                     (char *)&option, sizeof (option));
1996         set_fs (oldmm);
1997         if (rc != 0) {
1998                 CERROR ("Can't set SO_LINGER2: %d\n", rc);
1999                 return (rc);
2000         }
2001         
2002         return (0);
2003 }
2004
2005 int
2006 ksocknal_connect_peer (ksock_route_t *route)
2007 {
2008         struct sockaddr_in  peer_addr;
2009         mm_segment_t        oldmm = get_fs();
2010         __u64               n;
2011         struct timeval      tv;
2012         int                 fd;
2013         struct socket      *sock;
2014         int                 rc;
2015
2016         rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
2017         if (rc != 0) {
2018                 CERROR ("Can't create autoconnect socket: %d\n", rc);
2019                 return (rc);
2020         }
2021
2022         /* Ugh; have to map_fd for compatibility with sockets passed in
2023          * from userspace.  And we actually need the refcounting that
2024          * this gives you :) */
2025
2026         fd = sock_map_fd (sock);
2027         if (fd < 0) {
2028                 sock_release (sock);
2029                 CERROR ("sock_map_fd error %d\n", fd);
2030                 return (fd);
2031         }
2032
2033         /* NB the fd now owns the ref on sock->file */
2034         LASSERT (sock->file != NULL);
2035         LASSERT (file_count(sock->file) == 1);
2036         
2037         /* Set the socket timeouts, so our connection attempt completes in
2038          * finite time */
2039         tv.tv_sec = ksocknal_io_timeout / HZ;
2040         n = ksocknal_io_timeout % HZ;
2041         n = n * 1000000 + HZ - 1;
2042         do_div (n, HZ);
2043         tv.tv_usec = n;
2044
2045         set_fs (KERNEL_DS);
2046         rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
2047                               (char *)&tv, sizeof (tv));
2048         set_fs (oldmm);
2049         if (rc != 0) {
2050                 CERROR ("Can't set send timeout %d (in HZ): %d\n", 
2051                         ksocknal_io_timeout, rc);
2052                 goto out;
2053         }
2054         
2055         set_fs (KERNEL_DS);
2056         rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
2057                               (char *)&tv, sizeof (tv));
2058         set_fs (oldmm);
2059         if (rc != 0) {
2060                 CERROR ("Can't set receive timeout %d (in HZ): %d\n",
2061                         ksocknal_io_timeout, rc);
2062                 goto out;
2063         }
2064
2065         if (route->ksnr_nonagel) {
2066                 int  option = 1;
2067                 
2068                 set_fs (KERNEL_DS);
2069                 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
2070                                             (char *)&option, sizeof (option));
2071                 set_fs (oldmm);
2072                 if (rc != 0) {
2073                         CERROR ("Can't disable nagel: %d\n", rc);
2074                         goto out;
2075                 }
2076         }
2077         
2078         if (route->ksnr_buffer_size != 0) {
2079                 int option = route->ksnr_buffer_size;
2080                 
2081                 set_fs (KERNEL_DS);
2082                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
2083                                       (char *)&option, sizeof (option));
2084                 set_fs (oldmm);
2085                 if (rc != 0) {
2086                         CERROR ("Can't set send buffer %d: %d\n",
2087                                 route->ksnr_buffer_size, rc);
2088                         goto out;
2089                 }
2090
2091                 set_fs (KERNEL_DS);
2092                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
2093                                       (char *)&option, sizeof (option));
2094                 set_fs (oldmm);
2095                 if (rc != 0) {
2096                         CERROR ("Can't set receive buffer %d: %d\n",
2097                                 route->ksnr_buffer_size, rc);
2098                         goto out;
2099                 }
2100         }
2101         
2102         memset (&peer_addr, 0, sizeof (peer_addr));
2103         peer_addr.sin_family = AF_INET;
2104         peer_addr.sin_port = htons (route->ksnr_port);
2105         peer_addr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
2106         
2107         rc = sock->ops->connect (sock, (struct sockaddr *)&peer_addr, 
2108                                  sizeof (peer_addr), sock->file->f_flags);
2109         if (rc != 0) {
2110                 CERROR ("Error %d connecting to "LPX64"\n", rc,
2111                         route->ksnr_peer->ksnp_nid);
2112                 goto out;
2113         }
2114         
2115         if (route->ksnr_xchange_nids) {
2116                 rc = ksocknal_exchange_nids (sock, route->ksnr_peer->ksnp_nid);
2117                 if (rc != 0)
2118                         goto out;
2119         }
2120
2121         rc = ksocknal_create_conn (route->ksnr_peer->ksnp_nid,
2122                                    route, sock, route->ksnr_irq_affinity);
2123         if (rc == 0) {
2124                 /* Take an extra ref on sock->file to compensate for the
2125                  * upcoming close which will lose fd's ref on it. */
2126                 get_file (sock->file);
2127         }
2128
2129  out:
2130         sys_close (fd);
2131         return (rc);
2132 }
2133
2134 void
2135 ksocknal_autoconnect (ksock_route_t *route)
2136 {
2137         LIST_HEAD        (zombies);
2138         ksock_tx_t       *tx;
2139         ksock_peer_t     *peer;
2140         unsigned long     flags;
2141         int               rc;
2142         
2143         rc = ksocknal_connect_peer (route);
2144         if (rc == 0) {
2145                 /* successfully autoconnected: create_conn did the
2146                  * route/conn binding and scheduled any blocked packets, 
2147                  * so there's nothing left to do now. */
2148                 return;
2149         }
2150
2151         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
2152
2153         peer = route->ksnr_peer;
2154         route->ksnr_connecting = 0;
2155
2156         LASSERT (route->ksnr_retry_interval != 0);
2157         route->ksnr_timeout = jiffies_64 + route->ksnr_retry_interval;
2158         route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
2159                                           SOCKNAL_MAX_RECONNECT_INTERVAL);
2160
2161         if (!list_empty (&peer->ksnp_tx_queue) &&
2162             ksocknal_find_connecting_route_locked (peer) == NULL) {
2163                 LASSERT (list_empty (&peer->ksnp_conns));
2164
2165                 /* None of the connections that the blocked packets are
2166                  * waiting for have been successful.  Complete them now... */
2167                 do {
2168                         tx = list_entry (peer->ksnp_tx_queue.next,
2169                                          ksock_tx_t, tx_list);
2170                         list_del (&tx->tx_list);
2171                         list_add_tail (&tx->tx_list, &zombies);
2172                 } while (!list_empty (&peer->ksnp_tx_queue));
2173         }
2174
2175         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
2176
2177         while (!list_empty (&zombies)) {
2178                 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
2179                 
2180                 CERROR ("Deleting packet type %d len %d ("LPX64"->"LPX64")\n",
2181                         NTOH__u32 (tx->tx_hdr->type),
2182                         NTOH__u32 (PTL_HDR_LENGTH(tx->tx_hdr)),
2183                         NTOH__u64 (tx->tx_hdr->src_nid),
2184                         NTOH__u64 (tx->tx_hdr->dest_nid));
2185
2186                 list_del (&tx->tx_list);
2187                 /* complete now */
2188                 ksocknal_tx_done (tx, 0);
2189         }
2190 }
2191
2192 int
2193 ksocknal_autoconnectd (void *arg)
2194 {
2195         long               id = (long)arg;
2196         char               name[16];
2197         unsigned long      flags;
2198         ksock_route_t     *route;
2199         int                rc;
2200
2201         snprintf (name, sizeof (name), "ksocknal_ad[%ld]", id);
2202         kportal_daemonize (name);
2203         kportal_blockallsigs ();
2204
2205         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2206
2207         while (!ksocknal_data.ksnd_shuttingdown) {
2208
2209                 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
2210                         route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
2211                                             ksock_route_t, ksnr_connect_list);
2212                         
2213                         list_del (&route->ksnr_connect_list);
2214                         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2215
2216                         ksocknal_autoconnect (route);
2217                         ksocknal_put_route (route);
2218
2219                         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2220                         continue;
2221                 }
2222                 
2223                 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2224
2225                 rc = wait_event_interruptible (ksocknal_data.ksnd_autoconnectd_waitq,
2226                                                ksocknal_data.ksnd_shuttingdown ||
2227                                                !list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
2228
2229                 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2230         }
2231
2232         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2233
2234         ksocknal_thread_fini ();
2235         return (0);
2236 }
2237
2238 ksock_conn_t *
2239 ksocknal_find_timed_out_conn (ksock_peer_t *peer) 
2240 {
2241         /* We're called with a shared lock on ksnd_global_lock */
2242         unsigned long      flags;
2243         ksock_conn_t      *conn;
2244         struct list_head  *ctmp;
2245         ksock_tx_t        *tx;
2246         struct list_head  *ttmp;
2247         ksock_sched_t     *sched;
2248
2249         list_for_each (ctmp, &peer->ksnp_conns) {
2250                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2251                 sched = conn->ksnc_scheduler;
2252                 
2253                 if (conn->ksnc_rx_deadline != 0 &&
2254                     conn->ksnc_rx_deadline <= jiffies_64)
2255                         goto timed_out;
2256
2257                 spin_lock_irqsave (&sched->kss_lock, flags);
2258                 
2259                 list_for_each (ttmp, &conn->ksnc_tx_queue) {
2260                         tx = list_entry (ttmp, ksock_tx_t, tx_list);
2261                         LASSERT (tx->tx_deadline != 0);
2262                         
2263                         if (tx->tx_deadline <= jiffies_64)
2264                                 goto timed_out_locked;
2265                 }
2266 #if SOCKNAL_ZC
2267                 list_for_each (ttmp, &conn->ksnc_tx_pending) {
2268                         tx = list_entry (ttmp, ksock_tx_t, tx_list);
2269                         LASSERT (tx->tx_deadline != 0);
2270
2271                         if (tx->tx_deadline <= jiffies_64)
2272                                 goto timed_out_locked;
2273                 }
2274 #endif                
2275                 spin_unlock_irqrestore (&sched->kss_lock, flags);
2276                 continue;
2277
2278         timed_out_locked:
2279                 spin_unlock_irqrestore (&sched->kss_lock, flags);
2280         timed_out:
2281                 atomic_inc (&conn->ksnc_refcount);
2282                 return (conn);
2283         }
2284
2285         return (NULL);
2286 }
2287
2288 void
2289 ksocknal_check_peer_timeouts (struct list_head *peers)
2290 {
2291         struct list_head *ptmp;
2292         ksock_peer_t     *peer;
2293         ksock_conn_t     *conn;
2294
2295  again:
2296         /* NB. We expect to have a look at all the peers and not find any
2297          * connections to time out, so we just use a shared lock while we
2298          * take a look... */
2299         read_lock (&ksocknal_data.ksnd_global_lock);
2300
2301         list_for_each (ptmp, peers) {
2302                 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2303                 conn = ksocknal_find_timed_out_conn (peer);
2304                 
2305                 if (conn != NULL) {
2306                         read_unlock (&ksocknal_data.ksnd_global_lock);
2307
2308                         if (ksocknal_close_conn_unlocked (conn)) {
2309                                 /* I actually closed... */
2310                                 CERROR ("Timeout out conn->"LPX64" ip %x:%d\n",
2311                                         peer->ksnp_nid, conn->ksnc_ipaddr,
2312                                         conn->ksnc_port);
2313                         }
2314                 
2315                         /* NB we won't find this one again, but we can't
2316                          * just proceed with the next peer, since we dropped
2317                          * ksnd_global_lock and it might be dead already! */
2318                         ksocknal_put_conn (conn);
2319                         goto again;
2320                 }
2321         }
2322
2323         read_unlock (&ksocknal_data.ksnd_global_lock);
2324 }
2325
2326 int
2327 ksocknal_reaper (void *arg)
2328 {
2329         wait_queue_t       wait;
2330         unsigned long      flags;
2331         ksock_conn_t      *conn;
2332         int                timeout;
2333         int                peer_index = 0;
2334         __u64              deadline = jiffies_64;
2335         
2336         kportal_daemonize ("ksocknal_reaper");
2337         kportal_blockallsigs ();
2338
2339         init_waitqueue_entry (&wait, current);
2340
2341         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2342
2343         while (!ksocknal_data.ksnd_shuttingdown) {
2344
2345                 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2346                         conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2347                                            ksock_conn_t, ksnc_list);
2348                         list_del (&conn->ksnc_list);
2349                         
2350                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2351
2352                         ksocknal_terminate_conn (conn);
2353                         ksocknal_put_conn (conn);
2354
2355                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2356                         continue;
2357                 }
2358
2359                 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2360                         conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2361                                            ksock_conn_t, ksnc_list);
2362                         list_del (&conn->ksnc_list);
2363                         
2364                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2365
2366                         ksocknal_destroy_conn (conn);
2367
2368                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2369                         continue;
2370                 }
2371                 
2372                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2373
2374                 while ((timeout = deadline - jiffies_64) <= 0) {
2375                         /* Time to check for timeouts on a few more peers */
2376                         ksocknal_check_peer_timeouts (&ksocknal_data.ksnd_peers[peer_index]);
2377
2378                         peer_index = (peer_index + 1) % SOCKNAL_PEER_HASH_SIZE;
2379                         deadline += HZ;
2380                 }
2381
2382                 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2383                 set_current_state (TASK_INTERRUPTIBLE);
2384
2385                 if (!ksocknal_data.ksnd_shuttingdown &&
2386                     list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2387                     list_empty (&ksocknal_data.ksnd_zombie_conns))
2388                         schedule_timeout (timeout);
2389
2390                 set_current_state (TASK_RUNNING);
2391                 remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2392
2393                 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2394         }
2395
2396         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2397
2398         ksocknal_thread_fini ();
2399         return (0);
2400 }
2401
2402 nal_cb_t ksocknal_lib = {
2403         nal_data:       &ksocknal_data,                /* NAL private data */
2404         cb_send:         ksocknal_send,
2405         cb_send_pages:   ksocknal_send_pages,
2406         cb_recv:         ksocknal_recv,
2407         cb_recv_pages:   ksocknal_recv_pages,
2408         cb_read:         ksocknal_read,
2409         cb_write:        ksocknal_write,
2410         cb_callback:     ksocknal_callback,
2411         cb_malloc:       ksocknal_malloc,
2412         cb_free:         ksocknal_free,
2413         cb_printf:       ksocknal_printf,
2414         cb_cli:          ksocknal_cli,
2415         cb_sti:          ksocknal_sti,
2416         cb_dist:         ksocknal_dist
2417 };