Whamcloud - gitweb
* lctl set_route <nid> <up/down> enables or disables particular portals
[fs/lustre-release.git] / lnet / klnds / toelnd / toenal_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *   Author: Kedar Sovani <kedar@calsoftinc.com>
10  *   Author: Amey Inamdar <amey@calsoftinc.com>
11  *   
12  *   This file is part of Portals, http://www.sf.net/projects/lustre/
13  *
14  *   Portals is free software; you can redistribute it and/or
15  *   modify it under the terms of version 2 of the GNU General Public
16  *   License as published by the Free Software Foundation.
17  *
18  *   Portals is distributed in the hope that it will be useful,
19  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
20  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  *   GNU General Public License for more details.
22  *
23  *   You should have received a copy of the GNU General Public License
24  *   along with Portals; if not, write to the Free Software
25  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26  *
27  */
28
29 #include <linux/poll.h>
30 #include "toenal.h"
31
32 atomic_t   ktoenal_packets_received;
33 long       ktoenal_packets_launched;
34 long       ktoenal_packets_transmitted;
35
36 /*
37  *  LIB functions follow
38  *
39  */
40 int
41 ktoenal_read(nal_cb_t *nal, void *private, void *dst_addr,
42               user_ptr src_addr, size_t len)
43 {
44         CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
45                nal->ni.nid, (long)len, src_addr, dst_addr);
46
47         memcpy( dst_addr, src_addr, len );
48         return 0;
49 }
50
51 int
52 ktoenal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
53                void *src_addr, size_t len)
54 {
55         CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
56                nal->ni.nid, (long)len, src_addr, dst_addr);
57
58         memcpy( dst_addr, src_addr, len );
59         return 0;
60 }
61
62 int 
63 ktoenal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq,
64                          ptl_event_t *ev)
65 {
66         CDEBUG(D_NET, LPX64": callback eq %p ev %p\n",
67                nal->ni.nid, eq, ev);
68
69         if (eq->event_callback != NULL) 
70                 eq->event_callback(ev);
71
72         return 0;
73 }
74
75 void *
76 ktoenal_malloc(nal_cb_t *nal, size_t len)
77 {
78         void *buf;
79
80         PORTAL_ALLOC(buf, len);
81
82         if (buf != NULL)
83                 memset(buf, 0, len);
84
85         return (buf);
86 }
87
88 void
89 ktoenal_free(nal_cb_t *nal, void *buf, size_t len)
90 {
91         PORTAL_FREE(buf, len);
92 }
93
94 void
95 ktoenal_printf(nal_cb_t *nal, const char *fmt, ...)
96 {
97         va_list ap;
98         char msg[256];
99
100         va_start (ap, fmt);
101         vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
102         va_end (ap);
103
104         msg[sizeof (msg) - 1] = 0;              /* ensure terminated */
105
106         CDEBUG (D_NET, "%s", msg);
107 }
108
109 void
110 ktoenal_cli(nal_cb_t *nal, unsigned long *flags)
111 {
112         ksock_nal_data_t *data = nal->nal_data;
113
114         spin_lock(&data->ksnd_nal_cb_lock);
115 }
116
117 void
118 ktoenal_sti(nal_cb_t *nal, unsigned long *flags)
119 {
120         ksock_nal_data_t *data;
121         data = nal->nal_data;
122
123         spin_unlock(&data->ksnd_nal_cb_lock);
124 }
125
126 int
127 ktoenal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
128 {
129         /* I would guess that if ktoenal_get_conn(nid) == NULL,
130            and we're not routing, then 'nid' is very distant :) */
131         if ( nal->ni.nid == nid ) {
132                 *dist = 0;
133         } else {
134                 *dist = 1;
135         }
136
137         return 0;
138 }
139
140 ksock_ltx_t *
141 ktoenal_get_ltx (int may_block)
142 {
143         long         flags;
144         ksock_ltx_t *ltx = NULL;
145         
146         for (;;)
147         {
148                 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
149         
150                 if (!list_empty (&ktoenal_data.ksnd_idle_ltx_list))
151                 {
152                         ltx = list_entry (ktoenal_data.ksnd_idle_ltx_list.next, ksock_ltx_t, ltx_tx.tx_list);
153                         list_del (&ltx->ltx_tx.tx_list);
154                         break;
155                 }
156
157                 if (!may_block)
158                 {
159                         if (!list_empty (&ktoenal_data.ksnd_idle_nblk_ltx_list))
160                         {
161                                 ltx = list_entry (ktoenal_data.ksnd_idle_nblk_ltx_list.next, 
162                                                   ksock_ltx_t, ltx_tx.tx_list);
163                                 list_del (&ltx->ltx_tx.tx_list);
164                         }
165                         break;
166                 }
167                 
168                 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
169                 
170                 wait_event (ktoenal_data.ksnd_idle_ltx_waitq,
171                             !list_empty (&ktoenal_data.ksnd_idle_ltx_list));
172         }
173
174         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
175
176         return (ltx);
177 }
178
179 int
180 ktoenal_sendmsg (struct file *sock, struct iovec *iov, int niov, int nob, int flags)
181 {
182         /* NB This procedure "consumes" iov (actually we do, tcp_sendmsg doesn't)
183          */
184         mm_segment_t oldmm;
185         int           rc;
186
187         LASSERT (niov > 0);
188         LASSERT (nob > 0);
189         
190         oldmm = get_fs();
191         set_fs (KERNEL_DS);
192
193 #ifdef PORTAL_DEBUG
194         {
195                 int total_nob;
196                 int i;
197                 
198                 for (i = total_nob = 0; i < niov; i++)
199                         total_nob += iov[i].iov_len;
200                 
201                 LASSERT (nob == total_nob);
202         }
203 #endif        
204         LASSERT (!in_interrupt());
205        
206         rc = sock->f_op->writev(sock, iov, niov, NULL);
207
208         set_fs (oldmm);
209
210         if (rc > 0)                             /* sent something? */
211         {
212                 nob = rc;                       /* consume iov */
213                 for (;;)
214                 {
215                         LASSERT (niov > 0);
216                         
217                         if (iov->iov_len >= nob)
218                         {
219                                 iov->iov_len -= nob;
220                                 iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob);
221                                 break;
222                         }
223                         nob -= iov->iov_len;
224                         iov->iov_len = 0;
225                         iov++;
226                         niov--;
227                 }
228         }
229
230         return (rc);
231 }
232
233 int
234 ktoenal_recvmsg(struct file *sock, struct iovec *iov, int niov, int toread)
235 {
236         /* NB This procedure "consumes" iov (actually tcp_recvmsg does)
237          */
238         mm_segment_t oldmm;
239         int ret, i, len = 0, origlen = 0;
240         
241         PROF_START(our_recvmsg);
242         for(i = 0; i < niov; i++) {
243                 len += iov[i].iov_len;
244                 if(len >= toread)
245                         break;
246         }
247
248         if(len >= toread) {
249                 origlen = iov[i].iov_len;
250                 iov[i].iov_len -= (len - toread);
251         }
252         else {  /* i == niov */
253                 i = niov - 1;
254         }
255
256         oldmm = get_fs();
257         set_fs(KERNEL_DS);
258
259         ret = sock->f_op->readv(sock, iov, i + 1, NULL);
260         
261         set_fs(oldmm);
262
263         if(origlen)
264                 iov[i].iov_len = origlen;
265
266         PROF_FINISH(our_recvmsg);
267         return ret;
268 }
269
270 void
271 ktoenal_process_transmit (ksock_conn_t *conn, long *irq_flags)
272 {
273         ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next, ksock_tx_t, tx_list);
274         int         rc;
275         
276         LASSERT (conn->ksnc_tx_scheduled);
277         LASSERT (conn->ksnc_tx_ready);
278         LASSERT (!list_empty (&conn->ksnc_tx_queue));
279
280         /* assume transmit will complete now, so dequeue while I've got the lock */
281         list_del (&tx->tx_list);
282
283         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, *irq_flags);
284
285         LASSERT (tx->tx_nob > 0);
286
287         conn->ksnc_tx_ready = 0;                /* write_space may race with me and set ready */
288         mb();                                   /* => clear BEFORE trying to write */
289
290         rc = ktoenal_sendmsg (conn->ksnc_file,
291                                tx->tx_iov, tx->tx_niov, tx->tx_nob,
292                                list_empty (&conn->ksnc_tx_queue) ? 
293                                MSG_DONTWAIT : (MSG_DONTWAIT | MSG_MORE));
294
295         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_nob, rc);
296
297         if (rc < 0)                             /* error */
298         {
299                 if (rc == -EAGAIN)              /* socket full => */
300                         rc = 0;                 /* nothing sent */
301                 else
302                 {
303 #warning FIXME: handle socket errors properly
304                         CERROR ("Error socknal send(%d) %p: %d\n", tx->tx_nob, conn, rc);
305                         rc = tx->tx_nob;        /* kid on for now whole packet went */
306                 }
307         }
308
309         if (rc == tx->tx_nob)                   /* everything went */
310         {
311                 conn->ksnc_tx_ready = 1;        /* assume more can go (ASAP) */
312                 ktoenal_put_conn (conn);       /* release packet's ref */
313
314                 if (tx->tx_isfwd)               /* was a forwarded packet? */
315                 {
316                         kpr_fwd_done (&ktoenal_data.ksnd_router,
317                                       KSOCK_TX_2_KPR_FWD_DESC (tx), 0);
318
319                         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
320                 }
321                 else                            /* local send */
322                 {
323                         ksock_ltx_t *ltx = KSOCK_TX_2_KSOCK_LTX (tx);
324
325                         lib_finalize (&ktoenal_lib, ltx->ltx_private, ltx->ltx_cookie);
326
327                         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
328                         
329                         list_add (&ltx->ltx_tx.tx_list, ltx->ltx_idle);
330
331                         /* normal tx desc => wakeup anyone blocking for one */
332                         if (ltx->ltx_idle == &ktoenal_data.ksnd_idle_ltx_list &&
333                             waitqueue_active (&ktoenal_data.ksnd_idle_ltx_waitq))
334                                 wake_up (&ktoenal_data.ksnd_idle_ltx_waitq);
335                 }
336                 ktoenal_packets_transmitted++;
337         }
338         else
339         {
340                 tx->tx_nob -= rc;
341
342                 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
343
344                 /* back onto HEAD of tx_queue */
345                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
346         }
347
348         if (!conn->ksnc_tx_ready ||             /* no space to write now */
349             list_empty (&conn->ksnc_tx_queue))  /* nothing to write */
350         {
351                 conn->ksnc_tx_scheduled = 0;    /* not being scheduled */
352                 ktoenal_put_conn (conn);       /* release scheduler's ref */
353         }
354         else                                    /* let scheduler call me again */
355                 list_add_tail (&conn->ksnc_tx_list, &ktoenal_data.ksnd_tx_conns);
356 }
357
358 void
359 ktoenal_launch_packet (ksock_conn_t *conn, ksock_tx_t *tx)
360 {
361         long          flags;
362         int           nob = tx->tx_nob;
363         struct iovec *iov = tx->tx_iov;
364         int           niov = 1;
365         
366         LASSERT (nob >= sizeof (ptl_hdr_t));
367
368         /* Truncate iov to exactly match total packet length
369          * since socket sendmsg pays no attention to requested length.
370          */
371         for (;;)
372         {
373                 LASSERT (niov <= tx->tx_niov);
374                 LASSERT (iov->iov_len >= 0);
375                 
376                 if (iov->iov_len >= nob)
377                 {
378                         iov->iov_len = nob;
379                         break;
380                 }
381                 nob -= iov->iov_len;
382                 iov++;
383                 niov++;
384         }
385         tx->tx_niov = niov;
386         
387         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
388         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
389
390         if (conn->ksnc_tx_ready &&              /* able to send */
391             !conn->ksnc_tx_scheduled)           /* not scheduled to send */
392         {
393                 list_add_tail (&conn->ksnc_tx_list, &ktoenal_data.ksnd_tx_conns);
394                 conn->ksnc_tx_scheduled = 1;
395                 atomic_inc (&conn->ksnc_refcount); /* extra ref for scheduler */
396                 if (waitqueue_active (&ktoenal_data.ksnd_sched_waitq))
397                         wake_up (&ktoenal_data.ksnd_sched_waitq);
398         }
399
400         ktoenal_packets_launched++;
401         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
402 }
403
404 int
405 ktoenal_send(nal_cb_t *nal, void *private, lib_msg_t *cookie,
406               ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
407               unsigned int payload_niov, struct iovec *payload_iov, size_t payload_len)
408 {
409         ptl_nid_t     gatewaynid;
410         ksock_conn_t *conn;
411         ksock_ltx_t  *ltx;
412         int           rc;
413         int           i;
414
415         /* By this point, as it happens, we have absolutely no idea what
416          * 'private' is.  It might be ksock_nal_data or it might be ksock_conn.
417          * Ha ha, isn't that a funny joke?
418          *
419          * FIXME: this is not the right way to fix this; the right way is to
420          * always pass in the same kind of structure.  This is hard right now.
421          * To revisit this issue, set a breakpoint in here and watch for when
422          * it's called from lib_finalize.  I think this occurs when we send a
423          * packet as a side-effect of another packet, such as when an ACK has
424          * been requested. -phil */
425
426         CDEBUG(D_NET, "sending %d bytes from [%d](%p,%d)... to nid: "
427                LPX64" pid %d\n", (int)payload_len, payload_niov,
428                payload_niov > 0 ? payload_iov[0].iov_base : NULL,
429                (int)(payload_niov > 0 ? payload_iov[0].iov_len : 0), nid, pid);
430
431         if ((conn = ktoenal_get_conn (nid)) == NULL)
432         {
433                 /* It's not a peer; try to find a gateway */
434                 rc = kpr_lookup (&ktoenal_data.ksnd_router, nid, payload_niov,
435                                  &gatewaynid);
436                 if (rc != 0)
437                 {
438                         CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
439                         return (-1);
440                 }
441
442                 if ((conn = ktoenal_get_conn (gatewaynid)) == NULL)
443                 {
444                         CERROR ("Can't route to "LPX64": gateway "LPX64" is not a peer\n", 
445                                 nid, gatewaynid);
446                         return (-1);
447                 }
448         }
449
450         /* This transmit has now got a ref on conn */
451
452         /* I may not block for a transmit descriptor if I might block the
453          * receiver, or an interrupt handler. */
454         ltx = ktoenal_get_ltx (!(type == PTL_MSG_ACK ||
455                                  type == PTL_MSG_REPLY ||
456                                  in_interrupt ()));
457         if (ltx == NULL)
458         {
459                 CERROR ("Can't allocate tx desc\n");
460                 ktoenal_put_conn (conn);
461                 return (-1);
462         }
463         
464         /* Init common (to sends and forwards) packet part */
465         ltx->ltx_tx.tx_isfwd = 0;
466         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
467         ltx->ltx_tx.tx_niov = 1 + payload_niov;
468         ltx->ltx_tx.tx_iov = ltx->ltx_iov;
469
470         /* Init local send packet (storage for hdr, finalize() args, iov) */
471         ltx->ltx_hdr = *hdr;
472         ltx->ltx_private = private;
473         ltx->ltx_cookie = cookie;
474
475         ltx->ltx_iov[0].iov_base = &ltx->ltx_hdr;
476         ltx->ltx_iov[0].iov_len = sizeof (ltx->ltx_hdr);
477
478         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
479
480         for (i = 0; i < payload_niov; i++)
481         {
482                 ltx->ltx_iov[1 + i].iov_base = payload_iov[i].iov_base;
483                 ltx->ltx_iov[1 + i].iov_len  = payload_iov[i].iov_len;
484         }
485
486         ktoenal_launch_packet (conn, &ltx->ltx_tx);
487         return (0);
488 }
489
490 void
491 ktoenal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
492 {
493         ksock_conn_t *conn;
494         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
495         ksock_tx_t   *tx  = (ksock_tx_t *)&fwd->kprfd_scratch;
496
497         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd, 
498                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
499
500         if (nid == ktoenal_lib.ni.nid)         /* I'm the gateway; must be the last hop */
501                 nid = fwd->kprfd_target_nid;
502         
503         conn = ktoenal_get_conn (nid);
504         if (conn == NULL)
505         {
506                 CERROR ("[%p] fwd to "LPX64" isn't a peer\n", fwd, nid);
507                 kpr_fwd_done (&ktoenal_data.ksnd_router, fwd, -EHOSTUNREACH);
508                 return;
509         }
510
511         /* This forward has now got a ref on conn */
512
513         tx->tx_isfwd = 1;                       /* This is a forwarding packet */
514         tx->tx_nob   = fwd->kprfd_nob;
515         tx->tx_niov  = fwd->kprfd_niov;
516         tx->tx_iov   = fwd->kprfd_iov;
517
518         ktoenal_launch_packet (conn, tx);
519 }
520
521 int
522 ktoenal_thread_start (int (*fn)(void *arg), void *arg)
523 {
524         long    pid = kernel_thread (fn, arg, 0);
525
526         if (pid < 0)
527                 return ((int)pid);
528
529         atomic_inc (&ktoenal_data.ksnd_nthreads);
530         return (0);
531 }
532
533 void
534 ktoenal_thread_fini (void)
535 {
536         atomic_dec (&ktoenal_data.ksnd_nthreads);
537 }
538
539 void
540 ktoenal_fmb_callback (void *arg, int error)
541 {
542         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
543         ptl_hdr_t         *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
544         ksock_conn_t      *conn;
545         long               flags;
546
547         CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": %d\n", 
548                 hdr->src_nid, hdr->dest_nid, error);
549
550         if (error != 0)
551                 CERROR ("Failed to route packet from "LPX64" to "LPX64": %d\n", 
552                         hdr->src_nid, hdr->dest_nid, error);
553
554         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
555         
556         list_add (&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs);
557
558         if (!list_empty (&fmb->fmb_pool->fmp_blocked_conns))
559         {
560                 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next, ksock_conn_t, ksnc_rx_list);
561                 list_del (&conn->ksnc_rx_list);
562
563                 CDEBUG (D_NET, "Scheduling conn %p\n", conn);
564                 LASSERT (conn->ksnc_rx_scheduled);
565                 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
566
567                 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
568                 list_add_tail (&conn->ksnc_rx_list, &ktoenal_data.ksnd_rx_conns);
569
570                 if (waitqueue_active (&ktoenal_data.ksnd_sched_waitq))
571                         wake_up (&ktoenal_data.ksnd_sched_waitq);
572         }
573
574         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
575 }
576
577 ksock_fmb_t *
578 ktoenal_get_idle_fmb (ksock_conn_t *conn)
579 {
580         /* NB called with sched lock held */
581         int               payload_nob = conn->ksnc_rx_nob_left;
582         int               packet_nob = sizeof (ptl_hdr_t) + payload_nob;
583         ksock_fmb_pool_t *pool;
584         ksock_fmb_t      *fmb;
585         
586         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
587
588         if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
589                 pool = &ktoenal_data.ksnd_small_fmp;
590         else
591                 pool = &ktoenal_data.ksnd_large_fmp;
592         
593         if (!list_empty (&pool->fmp_idle_fmbs))
594         {
595                 fmb = list_entry (pool->fmp_idle_fmbs.next, ksock_fmb_t, fmb_list);
596                 list_del (&fmb->fmb_list);
597                 return (fmb);
598         }
599
600         /* deschedule until fmb free */
601
602         conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
603
604         list_add_tail (&conn->ksnc_rx_list,
605                        &pool->fmp_blocked_conns);
606         return (NULL);
607 }
608
609
610 int
611 ktoenal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
612 {
613         int payload_nob = conn->ksnc_rx_nob_left;
614         int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
615         int niov;                               /* at least the header */
616         int nob;
617         
618         LASSERT (conn->ksnc_rx_scheduled);
619         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
620         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
621         LASSERT (payload_nob >= 0);
622         LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
623         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
624         
625         /* Got a forwarding buffer; copy the header we just read into the
626          * forwarding buffer.  If there's payload start reading reading it
627          * into the buffer, otherwise the forwarding buffer can be kicked
628          * off immediately.
629          *
630          * NB fmb->fmb_iov spans the WHOLE packet.
631          *    conn->ksnc_rx_iov spans just the payload.
632          */
633
634         fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
635                 
636         memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t)); /* copy header */
637
638         if (payload_nob == 0)                   /* got complete packet already */
639         {
640                 atomic_inc (&ktoenal_packets_received);
641
642                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n", conn,
643                         conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, packet_nob);
644
645                 fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
646
647                 kpr_fwd_init (&fmb->fmb_fwd, conn->ksnc_hdr.dest_nid, 
648                               packet_nob, 1, fmb->fmb_iov, 
649                               ktoenal_fmb_callback, fmb);
650
651                 kpr_fwd_start (&ktoenal_data.ksnd_router, &fmb->fmb_fwd); /* forward it now */
652
653                 ktoenal_new_packet (conn, 0);  /* on to next packet */
654                 return (1);
655         }
656
657         niov = 1;
658         if (packet_nob <= PAGE_SIZE)            /* whole packet fits in first page */
659                 fmb->fmb_iov[0].iov_len = packet_nob;
660         else
661         {
662                 fmb->fmb_iov[0].iov_len = PAGE_SIZE;
663                 nob = packet_nob - PAGE_SIZE;
664                 
665                 do
666                 {
667                         LASSERT (niov < fmb->fmb_npages);
668                         fmb->fmb_iov[niov].iov_base = page_address (fmb->fmb_pages[niov]);
669                         fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
670                         nob -= PAGE_SIZE;
671                         niov++;
672                 } while (nob > 0);
673         }
674
675         kpr_fwd_init (&fmb->fmb_fwd, conn->ksnc_hdr.dest_nid, 
676                       packet_nob, niov, fmb->fmb_iov, 
677                       ktoenal_fmb_callback, fmb);
678
679         /* stash router's descriptor ready for call to kpr_fwd_start */        
680         conn->ksnc_cookie = &fmb->fmb_fwd;
681
682         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
683
684         /* payload is desc's iov-ed buffer, but skipping the hdr */
685         LASSERT (niov <= sizeof (conn->ksnc_rx_iov) / sizeof (conn->ksnc_rx_iov[0]));
686
687         conn->ksnc_rx_iov[0].iov_base = (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) + sizeof (ptl_hdr_t));
688         conn->ksnc_rx_iov[0].iov_len = fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
689
690         if (niov > 1)
691                 memcpy (&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1], (niov - 1) * sizeof (struct iovec));
692
693         conn->ksnc_rx_niov = niov;
694
695         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
696                 conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, payload_nob);
697         return (0);
698 }
699
700 void
701 ktoenal_fwd_parse (ksock_conn_t *conn)
702 {
703         ksock_conn_t *conn2;
704         int           body_len;
705
706         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
707                 conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, conn->ksnc_rx_nob_left);
708
709         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
710         LASSERT (conn->ksnc_rx_scheduled);
711
712         switch (conn->ksnc_hdr.type)
713         {
714         case PTL_MSG_GET:
715         case PTL_MSG_ACK:
716                 body_len = 0;
717                 break;
718         case PTL_MSG_PUT:
719                 body_len = conn->ksnc_hdr.msg.put.length;
720                 break;
721         case PTL_MSG_REPLY:
722                 body_len = conn->ksnc_hdr.msg.reply.length;
723                 break;
724         default:
725                 /* Unrecognised packet type */
726                 CERROR ("Unrecognised packet type %d from "LPX64" for "LPX64"\n",
727                         conn->ksnc_hdr.type, conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid);
728                 /* Ignore this header and go back to reading a new packet. */
729                 ktoenal_new_packet (conn, 0);
730                 return;
731         }
732
733         if (body_len < 0)                               /* length corrupt */
734         {
735                 CERROR ("dropping packet from "LPX64" for "LPX64": packet size %d illegal\n",
736                         conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, body_len);
737                 ktoenal_new_packet (conn, 0);          /* on to new packet */
738                 return;
739         }
740
741         if (body_len > SOCKNAL_MAX_FWD_PAYLOAD)         /* too big to forward */
742         {
743                 CERROR ("dropping packet from "LPX64" for "LPX64": packet size %d too big\n",
744                         conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, body_len);
745                 ktoenal_new_packet (conn, body_len);    /* on to new packet (skip this one's body) */
746                 return;
747         }
748
749         conn2 = ktoenal_get_conn (conn->ksnc_hdr.dest_nid); /* should have gone direct */
750         if (conn2 != NULL)
751         {
752                 CERROR ("dropping packet from "LPX64" for "LPX64": target is a peer\n",
753                         conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid);
754                 ktoenal_put_conn (conn2);          /* drop ref from get above */
755
756                 ktoenal_new_packet (conn, body_len);  /* on to next packet (skip this one's body) */
757                 return;
758         }
759
760         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;       /* Getting FMB now */
761         conn->ksnc_rx_nob_left = body_len;              /* stash packet size */
762         conn->ksnc_rx_nob_wanted = body_len;            /* (no slop) */
763 }
764
765 int
766 ktoenal_new_packet (ksock_conn_t *conn, int nob_to_skip)
767 {
768         static char ktoenal_slop_buffer[4096];
769
770         int   nob;
771         int   niov;
772         int   skipped;
773
774         if (nob_to_skip == 0)                   /* right at next packet boundary now */
775         {
776                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
777                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
778                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
779
780                 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
781                 conn->ksnc_rx_iov[0].iov_len  = sizeof (ptl_hdr_t);
782                 conn->ksnc_rx_niov = 1;
783                 return (1);
784         }
785
786         /* set up to skip as much a possible now */
787         /* if there's more left (ran out of iov entries) we'll get called again */
788
789         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
790         conn->ksnc_rx_nob_left = nob_to_skip;
791         skipped = 0;
792         niov = 0;
793
794         do
795         {
796                 nob = MIN (nob_to_skip, sizeof (ktoenal_slop_buffer));
797
798                 conn->ksnc_rx_iov[niov].iov_base = ktoenal_slop_buffer;
799                 conn->ksnc_rx_iov[niov].iov_len  = nob;
800                 niov++;
801                 skipped += nob;
802                 nob_to_skip -=nob;
803
804         } while (nob_to_skip != 0 &&            /* mustn't overflow conn's rx iov */
805                  niov < sizeof (conn->ksnc_rx_iov)/sizeof (conn->ksnc_rx_iov[0]));
806
807         conn->ksnc_rx_niov = niov;
808         conn->ksnc_rx_nob_wanted = skipped;
809         return (0);
810 }
811
812 void
813 ktoenal_process_receive (ksock_conn_t *conn, long *irq_flags)
814 {
815         ksock_fmb_t *fmb;
816         int          len;
817         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
818         LASSERT (conn->ksnc_rx_scheduled);
819         LASSERT (conn->ksnc_rx_ready);
820
821         /* NB: sched lock held */
822         CDEBUG(D_NET, "conn %p\n", conn);
823
824         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)     /* doesn't need a forwarding buffer */
825         {
826                 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, *irq_flags);
827                 goto try_read;
828         }
829
830  get_fmb:
831         /* NB: sched lock held */
832         fmb = ktoenal_get_idle_fmb (conn);
833         if (fmb == NULL)                        /* conn descheduled waiting for idle fmb */
834                 return;
835
836         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, *irq_flags);
837         
838         if (ktoenal_init_fmb (conn, fmb)) /* packet forwarded ? */
839                 goto out;               /* come back later for next packet */
840
841  try_read:
842         /* NB: sched lock NOT held */
843         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
844                  conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
845                  conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
846                  conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
847
848         LASSERT (conn->ksnc_rx_niov > 0);
849         LASSERT (conn->ksnc_rx_nob_wanted > 0);
850
851         conn->ksnc_rx_ready = 0;                /* data ready may race with me and set ready */
852         mb();                                   /* => clear BEFORE trying to read */
853
854         /* NB ktoenal_recvmsg "consumes" the iov passed to it */
855         len = ktoenal_recvmsg(conn->ksnc_file,
856                                conn->ksnc_rx_iov, conn->ksnc_rx_niov,
857                                conn->ksnc_rx_nob_wanted);
858         CDEBUG (D_NET, "%p read(%d) %d\n", conn, conn->ksnc_rx_nob_wanted, len);
859
860         if (len <= 0)                           /* nothing ready (EAGAIN) or EOF or error */
861         {
862                 if (len != -EAGAIN &&           /* ! nothing to read now */
863                     len != 0)                   /* ! nothing to read ever */
864                 {
865 #warning FIXME: handle socket errors properly
866                         CERROR ("Error socknal read(%d) %p: %d\n",
867                                 conn->ksnc_rx_nob_wanted, conn, len);
868                 }
869                 goto out;                       /* come back when there's data ready */
870         }
871
872         LASSERT (len <= conn->ksnc_rx_nob_wanted);
873         conn->ksnc_rx_nob_wanted -= len;
874         conn->ksnc_rx_nob_left -= len;
875
876         if (conn->ksnc_rx_nob_wanted != 0)      /* short read */
877                 goto out;                       /* try again later */
878
879         conn->ksnc_rx_ready = 1;                /* assume there's more to be had */
880
881         switch (conn->ksnc_rx_state)
882         {
883         case SOCKNAL_RX_HEADER:
884                 if (conn->ksnc_hdr.dest_nid != ktoenal_lib.ni.nid) /* It's not for me */
885                 {
886                         ktoenal_fwd_parse (conn);
887                         switch (conn->ksnc_rx_state)
888                         {
889                         case SOCKNAL_RX_HEADER: /* skipped this packet (zero payload) */
890                                 goto out;       /* => come back later */
891                         case SOCKNAL_RX_SLOP:   /* skipping this packet's body */
892                                 goto try_read;  /* => go read it */
893                         case SOCKNAL_RX_GET_FMB: /* forwarding */
894                                 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
895                                 goto get_fmb;   /* => go get a fwd msg buffer */
896                         default:
897                                 break;
898                         }
899                         /* Not Reached */
900                         LBUG ();
901                 }
902
903                 PROF_START(lib_parse);
904                 lib_parse(&ktoenal_lib, &conn->ksnc_hdr, conn); /* sets wanted_len, iovs etc */
905                 PROF_FINISH(lib_parse);
906
907                 if (conn->ksnc_rx_nob_wanted != 0) /* need to get some payload? */
908                 {
909                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
910                         goto try_read;          /* go read the payload */
911                 }
912                 /* Fall through (completed packet for me) */
913
914         case SOCKNAL_RX_BODY:
915                 atomic_inc (&ktoenal_packets_received);
916                 lib_finalize(&ktoenal_lib, NULL, conn->ksnc_cookie); /* packet is done now */
917                 /* Fall through */
918
919         case SOCKNAL_RX_SLOP:
920                 if (ktoenal_new_packet (conn, conn->ksnc_rx_nob_left)) /* starting new packet? */
921                         goto out;               /* come back later */
922                 goto try_read;                  /* try to finish reading slop now */
923
924         case SOCKNAL_RX_BODY_FWD:
925                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n", conn,
926                         conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, conn->ksnc_rx_nob_left);
927
928                 atomic_inc (&ktoenal_packets_received);
929
930                 /* ktoenal_init_fmb() stashed router descriptor in conn->ksnc_cookie */
931                 kpr_fwd_start (&ktoenal_data.ksnd_router, (kpr_fwd_desc_t *)conn->ksnc_cookie);
932
933                 LASSERT (conn->ksnc_rx_nob_left == 0); /* no slop in forwarded packets */
934
935                 ktoenal_new_packet (conn, 0);  /* on to next packet */
936                 goto out;                       /* (later) */
937
938         default:
939                 break;
940         }
941
942         /* Not Reached */
943         LBUG ();
944
945  out:
946         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
947
948         if (!conn->ksnc_rx_ready)               /* no data there to read? */
949         {
950                 conn->ksnc_rx_scheduled = 0;    /* let socket callback schedule again */
951                 ktoenal_put_conn (conn);       /* release scheduler's ref */
952         }
953         else                                    /* let scheduler call me again */
954                 list_add_tail (&conn->ksnc_rx_list, &ktoenal_data.ksnd_rx_conns);
955 }
956
957 int
958 ktoenal_recv(nal_cb_t *nal, void *private, lib_msg_t *msg,
959              unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
960 {
961         ksock_conn_t *conn = (ksock_conn_t *)private;
962         int           i;
963
964         conn->ksnc_cookie = msg;
965
966         LASSERT (niov <= PTL_MD_MAX_IOV);
967         for (i = 0; i < niov; i++)
968         {
969                 conn->ksnc_rx_iov[i].iov_len = iov[i].iov_len;
970                 conn->ksnc_rx_iov[i].iov_base = iov[i].iov_base;
971         }
972
973         conn->ksnc_rx_niov       = niov;
974         conn->ksnc_rx_nob_wanted = mlen;
975         conn->ksnc_rx_nob_left   = rlen;
976
977         return (rlen);
978 }
979
980 int
981 ktoenal_scheduler (void *arg)
982 {
983         unsigned long      flags;
984         ksock_conn_t      *conn;
985         int                rc;
986         int                nloops = 0;
987
988         kportal_daemonize ("ktoenal_sched");
989         kportal_blockallsigs ();
990         
991         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
992
993         while (!ktoenal_data.ksnd_shuttingdown)
994         {
995                 int did_something = 0;
996
997                 /* Ensure I progress everything semi-fairly */
998
999                 if (!list_empty (&ktoenal_data.ksnd_rx_conns))
1000                 {
1001                         did_something = 1;
1002                         conn = list_entry (ktoenal_data.ksnd_rx_conns.next,
1003                                            ksock_conn_t, ksnc_rx_list);
1004                         list_del (&conn->ksnc_rx_list);
1005
1006                         ktoenal_process_receive (conn, &flags); /* drops & regains ksnd_sched_lock */
1007                 }
1008
1009                 if (!list_empty (&ktoenal_data.ksnd_tx_conns))
1010                 {
1011                         did_something = 1;
1012                         conn = list_entry (ktoenal_data.ksnd_tx_conns.next,
1013                                            ksock_conn_t, ksnc_tx_list);
1014
1015                         list_del (&conn->ksnc_tx_list);
1016                         ktoenal_process_transmit (conn, &flags); /* drops and regains ksnd_sched_lock */
1017                 }
1018
1019                 if (!did_something ||           /* nothing to do */
1020                     ++nloops == SOCKNAL_RESCHED) /* hogging CPU? */
1021                 {
1022                         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
1023
1024                         nloops = 0;
1025
1026                         if (!did_something) {   /* wait for something to do */
1027                                 rc = wait_event_interruptible (ktoenal_data.ksnd_sched_waitq,
1028                                                                ktoenal_data.ksnd_shuttingdown ||
1029                                                                !list_empty (&ktoenal_data.ksnd_rx_conns) ||
1030                                                                !list_empty (&ktoenal_data.ksnd_tx_conns));
1031                                 LASSERT (rc == 0);
1032                         } else 
1033                                 our_cond_resched();
1034
1035                         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
1036                 }
1037         }
1038
1039         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
1040         ktoenal_thread_fini ();
1041         return (0);
1042 }
1043
1044
1045 int
1046 ktoenal_reaper (void *arg)
1047 {
1048         unsigned long      flags;
1049         ksock_conn_t      *conn;
1050         int                rc;
1051         
1052         kportal_daemonize ("ktoenal_reaper");
1053         kportal_blockallsigs ();
1054
1055         while (!ktoenal_data.ksnd_shuttingdown)
1056         {
1057                 spin_lock_irqsave (&ktoenal_data.ksnd_reaper_lock, flags);
1058
1059                 if (list_empty (&ktoenal_data.ksnd_reaper_list))
1060                         conn = NULL;
1061                 else
1062                 {
1063                         conn = list_entry (ktoenal_data.ksnd_reaper_list.next,
1064                                            ksock_conn_t, ksnc_list);
1065                         list_del (&conn->ksnc_list);
1066                 }
1067
1068                 spin_unlock_irqrestore (&ktoenal_data.ksnd_reaper_lock, flags);
1069
1070                 if (conn != NULL)
1071                         ktoenal_close_conn (conn);
1072                 else {
1073                         rc = wait_event_interruptible (ktoenal_data.ksnd_reaper_waitq,
1074                                                        ktoenal_data.ksnd_shuttingdown ||
1075                                                        !list_empty(&ktoenal_data.ksnd_reaper_list));
1076                         LASSERT (rc == 0);
1077                 }
1078         }
1079
1080         ktoenal_thread_fini ();
1081         return (0);
1082 }
1083
1084 #define POLLREAD        (POLLIN | POLLRDNORM | POLLRDBAND | POLLPRI)
1085 #define POLLWRITE       (POLLOUT | POLLWRNORM | POLLWRBAND)
1086
1087 int
1088 ktoenal_pollthread(void *arg)
1089 {
1090         unsigned int mask;
1091         struct list_head *tmp;
1092         ksock_conn_t *conn;
1093         
1094         /* Save the task struct for waking it up */
1095         ktoenal_data.ksnd_pollthread_tsk = current; 
1096         
1097         kportal_daemonize ("ktoenal_pollthread");
1098         kportal_blockallsigs ();
1099         
1100         poll_initwait(&ktoenal_data.ksnd_pwait);
1101         
1102         while(!ktoenal_data.ksnd_shuttingdown) {
1103                 
1104                 set_current_state(TASK_INTERRUPTIBLE);
1105                 
1106                 read_lock (&ktoenal_data.ksnd_socklist_lock);
1107                 list_for_each(tmp, &ktoenal_data.ksnd_socklist) {
1108                         
1109                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
1110                         atomic_inc(&conn->ksnc_refcount);
1111                         read_unlock (&ktoenal_data.ksnd_socklist_lock);
1112                         
1113                         mask = conn->ksnc_file->f_op->poll(conn->ksnc_file,
1114                                   ktoenal_data.ksnd_slistchange ? 
1115                                   &ktoenal_data.ksnd_pwait : NULL);
1116                          
1117                         if(mask & POLLREAD) {
1118                                 ktoenal_data_ready(conn);
1119                                                         
1120                         } 
1121                         if (mask & POLLWRITE) {
1122                                 ktoenal_write_space(conn);  
1123                               
1124                         }
1125                         if (mask & (POLLERR | POLLHUP)) {
1126                                          /* Do error processing */          
1127                         }      
1128                         
1129                         read_lock (&ktoenal_data.ksnd_socklist_lock);
1130                         if(atomic_dec_and_test(&conn->ksnc_refcount))
1131                                 _ktoenal_put_conn(conn);
1132                 }
1133                 ktoenal_data.ksnd_slistchange = 0;
1134                 read_unlock (&ktoenal_data.ksnd_socklist_lock);
1135                 
1136                 schedule_timeout(MAX_SCHEDULE_TIMEOUT);
1137                 if(ktoenal_data.ksnd_slistchange) {
1138                         poll_freewait(&ktoenal_data.ksnd_pwait); 
1139                         poll_initwait(&ktoenal_data.ksnd_pwait);
1140                 }
1141          }
1142         poll_freewait(&ktoenal_data.ksnd_pwait);
1143         ktoenal_thread_fini();
1144         return (0);
1145 }
1146
1147 void
1148 ktoenal_data_ready (ksock_conn_t *conn)
1149 {
1150         unsigned long  flags;
1151         ENTRY;
1152
1153         if (!test_and_set_bit (0, &conn->ksnc_rx_ready)) { 
1154                 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
1155
1156                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1157                         list_add_tail (&conn->ksnc_rx_list, 
1158                                         &ktoenal_data.ksnd_rx_conns);
1159                         conn->ksnc_rx_scheduled = 1;
1160                         /* extra ref for scheduler */
1161                         atomic_inc (&conn->ksnc_refcount);
1162
1163                         /* This is done to avoid the effects of a sequence
1164                          * of events in which the rx_ready is lost
1165                          */
1166                         conn->ksnc_rx_ready=1;
1167                           
1168                         if (waitqueue_active (&ktoenal_data.ksnd_sched_waitq))
1169                                 wake_up (&ktoenal_data.ksnd_sched_waitq);
1170                 }
1171
1172                 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
1173         }
1174
1175         EXIT;
1176 }
1177
1178 void
1179 ktoenal_write_space (ksock_conn_t *conn)
1180 {
1181         unsigned long  flags;
1182
1183         CDEBUG (D_NET, "conn %p%s%s%s\n",
1184                          conn,
1185                         (conn == NULL) ? "" : (test_bit (0, &conn->ksnc_tx_ready) ? " ready" : " blocked"),
1186                         (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ? " scheduled" : " idle"),
1187                         (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ? " empty" : " queued"));
1188
1189
1190         if (!test_and_set_bit (0, &conn->ksnc_tx_ready)) {
1191                 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
1192
1193                 if (!list_empty (&conn->ksnc_tx_queue) && /* packets to send */
1194                                 !conn->ksnc_tx_scheduled) { /* not being progressed */
1195
1196                         list_add_tail (&conn->ksnc_tx_list, 
1197                                         &ktoenal_data.ksnd_tx_conns);
1198                         conn->ksnc_tx_scheduled = 1;
1199                         /* extra ref for scheduler */
1200                         atomic_inc (&conn->ksnc_refcount);
1201
1202                         if (waitqueue_active (&ktoenal_data.ksnd_sched_waitq))
1203                                 wake_up (&ktoenal_data.ksnd_sched_waitq);
1204                 }
1205                 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
1206         }
1207 }
1208
1209 nal_cb_t ktoenal_lib = {
1210         nal_data:       &ktoenal_data,                /* NAL private data */
1211         cb_send:         ktoenal_send,
1212         cb_recv:         ktoenal_recv,
1213         cb_read:         ktoenal_read,
1214         cb_write:        ktoenal_write,
1215         cb_callback:     ktoenal_callback,
1216         cb_malloc:       ktoenal_malloc,
1217         cb_free:         ktoenal_free,
1218         cb_printf:       ktoenal_printf,
1219         cb_cli:          ktoenal_cli,
1220         cb_sti:          ktoenal_sti,
1221         cb_dist:         ktoenal_dist
1222 };