Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lnet / klnds / toelnd / toenal_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *   Author: Kedar Sovani <kedar@calsoftinc.com>
10  *   Author: Amey Inamdar <amey@calsoftinc.com>
11  *   
12  *   This file is part of Portals, http://www.sf.net/projects/lustre/
13  *
14  *   Portals is free software; you can redistribute it and/or
15  *   modify it under the terms of version 2 of the GNU General Public
16  *   License as published by the Free Software Foundation.
17  *
18  *   Portals is distributed in the hope that it will be useful,
19  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
20  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  *   GNU General Public License for more details.
22  *
23  *   You should have received a copy of the GNU General Public License
24  *   along with Portals; if not, write to the Free Software
25  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26  *
27  */
28
29 #include <linux/poll.h>
30 #include "toenal.h"
31
32 atomic_t   ktoenal_packets_received;
33 long       ktoenal_packets_launched;
34 long       ktoenal_packets_transmitted;
35
36 /*
37  *  LIB functions follow
38  *
39  */
40 int
41 ktoenal_read(nal_cb_t *nal, void *private, void *dst_addr,
42               user_ptr src_addr, size_t len)
43 {
44         CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
45                nal->ni.nid, (long)len, src_addr, dst_addr);
46
47         memcpy( dst_addr, src_addr, len );
48         return 0;
49 }
50
51 int
52 ktoenal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
53                void *src_addr, size_t len)
54 {
55         CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
56                nal->ni.nid, (long)len, src_addr, dst_addr);
57
58         memcpy( dst_addr, src_addr, len );
59         return 0;
60 }
61
62 int 
63 ktoenal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq,
64                          ptl_event_t *ev)
65 {
66         CDEBUG(D_NET, LPX64": callback eq %p ev %p\n",
67                nal->ni.nid, eq, ev);
68
69         if (eq->event_callback != NULL) 
70                 eq->event_callback(ev);
71
72         return 0;
73 }
74
75 void *
76 ktoenal_malloc(nal_cb_t *nal, size_t len)
77 {
78         void *buf;
79
80         PORTAL_ALLOC(buf, len);
81
82         if (buf != NULL)
83                 memset(buf, 0, len);
84
85         return (buf);
86 }
87
88 void
89 ktoenal_free(nal_cb_t *nal, void *buf, size_t len)
90 {
91         PORTAL_FREE(buf, len);
92 }
93
94 void
95 ktoenal_printf(nal_cb_t *nal, const char *fmt, ...)
96 {
97         va_list ap;
98         char msg[256];
99
100         va_start (ap, fmt);
101         vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
102         va_end (ap);
103
104         msg[sizeof (msg) - 1] = 0;              /* ensure terminated */
105
106         CDEBUG (D_NET, "%s", msg);
107 }
108
109 void
110 ktoenal_cli(nal_cb_t *nal, unsigned long *flags)
111 {
112         ksock_nal_data_t *data = nal->nal_data;
113
114         spin_lock(&data->ksnd_nal_cb_lock);
115 }
116
117 void
118 ktoenal_sti(nal_cb_t *nal, unsigned long *flags)
119 {
120         ksock_nal_data_t *data;
121         data = nal->nal_data;
122
123         spin_unlock(&data->ksnd_nal_cb_lock);
124 }
125
126 int
127 ktoenal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
128 {
129         /* I would guess that if ktoenal_get_conn(nid) == NULL,
130            and we're not routing, then 'nid' is very distant :) */
131         if ( nal->ni.nid == nid ) {
132                 *dist = 0;
133         } else {
134                 *dist = 1;
135         }
136
137         return 0;
138 }
139
140 ksock_ltx_t *
141 ktoenal_get_ltx (int may_block)
142 {
143         long         flags;
144         ksock_ltx_t *ltx = NULL;
145         
146         for (;;)
147         {
148                 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
149         
150                 if (!list_empty (&ktoenal_data.ksnd_idle_ltx_list))
151                 {
152                         ltx = list_entry (ktoenal_data.ksnd_idle_ltx_list.next, ksock_ltx_t, ltx_tx.tx_list);
153                         list_del (&ltx->ltx_tx.tx_list);
154                         break;
155                 }
156
157                 if (!may_block)
158                 {
159                         if (!list_empty (&ktoenal_data.ksnd_idle_nblk_ltx_list))
160                         {
161                                 ltx = list_entry (ktoenal_data.ksnd_idle_nblk_ltx_list.next, 
162                                                   ksock_ltx_t, ltx_tx.tx_list);
163                                 list_del (&ltx->ltx_tx.tx_list);
164                         }
165                         break;
166                 }
167                 
168                 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
169                 
170                 wait_event (ktoenal_data.ksnd_idle_ltx_waitq,
171                             !list_empty (&ktoenal_data.ksnd_idle_ltx_list));
172         }
173
174         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
175
176         return (ltx);
177 }
178
179 int
180 ktoenal_sendmsg (struct file *sock, struct iovec *iov, int niov, int nob, int flags)
181 {
182         /* NB This procedure "consumes" iov (actually we do, tcp_sendmsg doesn't)
183          */
184         mm_segment_t oldmm;
185         int           rc;
186
187         LASSERT (niov > 0);
188         LASSERT (nob > 0);
189         
190         oldmm = get_fs();
191         set_fs (KERNEL_DS);
192
193 #ifdef PORTAL_DEBUG
194         {
195                 int total_nob;
196                 int i;
197                 
198                 for (i = total_nob = 0; i < niov; i++)
199                         total_nob += iov[i].iov_len;
200                 
201                 LASSERT (nob == total_nob);
202         }
203 #endif        
204         LASSERT (!in_interrupt());
205        
206         rc = sock->f_op->writev(sock, iov, niov, NULL);
207
208         set_fs (oldmm);
209
210         if (rc > 0)                             /* sent something? */
211         {
212                 nob = rc;                       /* consume iov */
213                 for (;;)
214                 {
215                         LASSERT (niov > 0);
216                         
217                         if (iov->iov_len >= nob)
218                         {
219                                 iov->iov_len -= nob;
220                                 iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob);
221                                 break;
222                         }
223                         nob -= iov->iov_len;
224                         iov->iov_len = 0;
225                         iov++;
226                         niov--;
227                 }
228         }
229
230         return (rc);
231 }
232
233 int
234 ktoenal_recvmsg(struct file *sock, struct iovec *iov, int niov, int toread)
235 {
236         /* NB This procedure "consumes" iov (actually tcp_recvmsg does)
237          */
238         mm_segment_t oldmm;
239         int ret, i, len = 0, origlen = 0;
240         
241         PROF_START(our_recvmsg);
242         for(i = 0; i < niov; i++) {
243                 len += iov[i].iov_len;
244                 if(len >= toread)
245                         break;
246         }
247
248         if(len >= toread) {
249                 origlen = iov[i].iov_len;
250                 iov[i].iov_len -= (len - toread);
251         }
252         else {  /* i == niov */
253                 i = niov - 1;
254         }
255
256         oldmm = get_fs();
257         set_fs(KERNEL_DS);
258
259         ret = sock->f_op->readv(sock, iov, i + 1, NULL);
260         
261         set_fs(oldmm);
262
263         if(origlen)
264                 iov[i].iov_len = origlen;
265
266         PROF_FINISH(our_recvmsg);
267         return ret;
268 }
269
270 void
271 ktoenal_process_transmit (ksock_conn_t *conn, long *irq_flags)
272 {
273         ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next, ksock_tx_t, tx_list);
274         int         rc;
275         
276         LASSERT (conn->ksnc_tx_scheduled);
277         LASSERT (conn->ksnc_tx_ready);
278         LASSERT (!list_empty (&conn->ksnc_tx_queue));
279
280         /* assume transmit will complete now, so dequeue while I've got the lock */
281         list_del (&tx->tx_list);
282
283         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, *irq_flags);
284
285         LASSERT (tx->tx_nob > 0);
286
287         conn->ksnc_tx_ready = 0;                /* write_space may race with me and set ready */
288         mb();                                   /* => clear BEFORE trying to write */
289
290         rc = ktoenal_sendmsg (conn->ksnc_file,
291                                tx->tx_iov, tx->tx_niov, tx->tx_nob,
292                                list_empty (&conn->ksnc_tx_queue) ? 
293                                MSG_DONTWAIT : (MSG_DONTWAIT | MSG_MORE));
294
295         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_nob, rc);
296
297         if (rc < 0)                             /* error */
298         {
299                 if (rc == -EAGAIN)              /* socket full => */
300                         rc = 0;                 /* nothing sent */
301                 else
302                 {
303 #warning FIXME: handle socket errors properly
304                         CERROR ("Error socknal send(%d) %p: %d\n", tx->tx_nob, conn, rc);
305                         rc = tx->tx_nob;        /* kid on for now whole packet went */
306                 }
307         }
308
309         if (rc == tx->tx_nob)                   /* everything went */
310         {
311                 conn->ksnc_tx_ready = 1;        /* assume more can go (ASAP) */
312                 ktoenal_put_conn (conn);       /* release packet's ref */
313
314                 if (tx->tx_isfwd)               /* was a forwarded packet? */
315                 {
316                         kpr_fwd_done (&ktoenal_data.ksnd_router,
317                                       KSOCK_TX_2_KPR_FWD_DESC (tx), 0);
318
319                         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
320                 }
321                 else                            /* local send */
322                 {
323                         ksock_ltx_t *ltx = KSOCK_TX_2_KSOCK_LTX (tx);
324
325                         lib_finalize (&ktoenal_lib, ltx->ltx_private, ltx->ltx_cookie);
326
327                         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
328                         
329                         list_add (&ltx->ltx_tx.tx_list, ltx->ltx_idle);
330
331                         /* normal tx desc => wakeup anyone blocking for one */
332                         if (ltx->ltx_idle == &ktoenal_data.ksnd_idle_ltx_list &&
333                             waitqueue_active (&ktoenal_data.ksnd_idle_ltx_waitq))
334                                 wake_up (&ktoenal_data.ksnd_idle_ltx_waitq);
335                 }
336                 ktoenal_packets_transmitted++;
337         }
338         else
339         {
340                 tx->tx_nob -= rc;
341
342                 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
343
344                 /* back onto HEAD of tx_queue */
345                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
346         }
347
348         if (!conn->ksnc_tx_ready ||             /* no space to write now */
349             list_empty (&conn->ksnc_tx_queue))  /* nothing to write */
350         {
351                 conn->ksnc_tx_scheduled = 0;    /* not being scheduled */
352                 ktoenal_put_conn (conn);       /* release scheduler's ref */
353         }
354         else                                    /* let scheduler call me again */
355                 list_add_tail (&conn->ksnc_tx_list, &ktoenal_data.ksnd_tx_conns);
356 }
357
358 void
359 ktoenal_launch_packet (ksock_conn_t *conn, ksock_tx_t *tx)
360 {
361         long          flags;
362         int           nob = tx->tx_nob;
363         struct iovec *iov = tx->tx_iov;
364         int           niov = 1;
365         
366         LASSERT (nob >= sizeof (ptl_hdr_t));
367
368         /* Truncate iov to exactly match total packet length
369          * since socket sendmsg pays no attention to requested length.
370          */
371         for (;;)
372         {
373                 LASSERT (niov <= tx->tx_niov);
374                 LASSERT (iov->iov_len >= 0);
375                 
376                 if (iov->iov_len >= nob)
377                 {
378                         iov->iov_len = nob;
379                         break;
380                 }
381                 nob -= iov->iov_len;
382                 iov++;
383                 niov++;
384         }
385         tx->tx_niov = niov;
386         
387         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
388         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
389
390         if (conn->ksnc_tx_ready &&              /* able to send */
391             !conn->ksnc_tx_scheduled)           /* not scheduled to send */
392         {
393                 list_add_tail (&conn->ksnc_tx_list, &ktoenal_data.ksnd_tx_conns);
394                 conn->ksnc_tx_scheduled = 1;
395                 atomic_inc (&conn->ksnc_refcount); /* extra ref for scheduler */
396                 if (waitqueue_active (&ktoenal_data.ksnd_sched_waitq))
397                         wake_up (&ktoenal_data.ksnd_sched_waitq);
398         }
399
400         ktoenal_packets_launched++;
401         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
402 }
403
404 int
405 ktoenal_send(nal_cb_t *nal, void *private, lib_msg_t *cookie,
406               ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
407               unsigned int payload_niov, struct iovec *payload_iov, size_t payload_len)
408 {
409         ptl_nid_t     gatewaynid;
410         ksock_conn_t *conn;
411         ksock_ltx_t  *ltx;
412         int           rc;
413         int           i;
414
415         /* By this point, as it happens, we have absolutely no idea what
416          * 'private' is.  It might be ksock_nal_data or it might be ksock_conn.
417          * Ha ha, isn't that a funny joke?
418          *
419          * FIXME: this is not the right way to fix this; the right way is to
420          * always pass in the same kind of structure.  This is hard right now.
421          * To revisit this issue, set a breakpoint in here and watch for when
422          * it's called from lib_finalize.  I think this occurs when we send a
423          * packet as a side-effect of another packet, such as when an ACK has
424          * been requested. -phil */
425
426         CDEBUG(D_NET, "sending %d bytes from [%d](%p,%d)... to nid: "
427                LPX64" pid %d\n", (int)payload_len, payload_niov,
428                payload_niov > 0 ? payload_iov[0].iov_base : NULL,
429                (int)(payload_niov > 0 ? payload_iov[0].iov_len : 0), nid, pid);
430
431         if ((conn = ktoenal_get_conn (nid)) == NULL)
432         {
433                 /* It's not a peer; try to find a gateway */
434                 rc = kpr_lookup (&ktoenal_data.ksnd_router, nid, &gatewaynid);
435                 if (rc != 0)
436                 {
437                         CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
438                         return (-1);
439                 }
440
441                 if ((conn = ktoenal_get_conn (gatewaynid)) == NULL)
442                 {
443                         CERROR ("Can't route to "LPX64": gateway "LPX64" is not a peer\n", 
444                                 nid, gatewaynid);
445                         return (-1);
446                 }
447         }
448
449         /* This transmit has now got a ref on conn */
450
451         /* I may not block for a transmit descriptor if I might block the
452          * receiver, or an interrupt handler. */
453         ltx = ktoenal_get_ltx (!(type == PTL_MSG_ACK ||
454                                  type == PTL_MSG_REPLY ||
455                                  in_interrupt ()));
456         if (ltx == NULL)
457         {
458                 CERROR ("Can't allocate tx desc\n");
459                 ktoenal_put_conn (conn);
460                 return (-1);
461         }
462         
463         /* Init common (to sends and forwards) packet part */
464         ltx->ltx_tx.tx_isfwd = 0;
465         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
466         ltx->ltx_tx.tx_niov = 1 + payload_niov;
467         ltx->ltx_tx.tx_iov = ltx->ltx_iov;
468
469         /* Init local send packet (storage for hdr, finalize() args, iov) */
470         ltx->ltx_hdr = *hdr;
471         ltx->ltx_private = private;
472         ltx->ltx_cookie = cookie;
473
474         ltx->ltx_iov[0].iov_base = &ltx->ltx_hdr;
475         ltx->ltx_iov[0].iov_len = sizeof (ltx->ltx_hdr);
476
477         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
478
479         for (i = 0; i < payload_niov; i++)
480         {
481                 ltx->ltx_iov[1 + i].iov_base = payload_iov[i].iov_base;
482                 ltx->ltx_iov[1 + i].iov_len  = payload_iov[i].iov_len;
483         }
484
485         ktoenal_launch_packet (conn, &ltx->ltx_tx);
486         return (0);
487 }
488
489 void
490 ktoenal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
491 {
492         ksock_conn_t *conn;
493         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
494         ksock_tx_t   *tx  = (ksock_tx_t *)&fwd->kprfd_scratch;
495
496         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd, 
497                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
498
499         if (nid == ktoenal_lib.ni.nid)         /* I'm the gateway; must be the last hop */
500                 nid = fwd->kprfd_target_nid;
501         
502         conn = ktoenal_get_conn (nid);
503         if (conn == NULL)
504         {
505                 CERROR ("[%p] fwd to "LPX64" isn't a peer\n", fwd, nid);
506                 kpr_fwd_done (&ktoenal_data.ksnd_router, fwd, -EHOSTUNREACH);
507                 return;
508         }
509
510         /* This forward has now got a ref on conn */
511
512         tx->tx_isfwd = 1;                       /* This is a forwarding packet */
513         tx->tx_nob   = fwd->kprfd_nob;
514         tx->tx_niov  = fwd->kprfd_niov;
515         tx->tx_iov   = fwd->kprfd_iov;
516
517         ktoenal_launch_packet (conn, tx);
518 }
519
520 int
521 ktoenal_thread_start (int (*fn)(void *arg), void *arg)
522 {
523         long    pid = kernel_thread (fn, arg, 0);
524
525         if (pid < 0)
526                 return ((int)pid);
527
528         atomic_inc (&ktoenal_data.ksnd_nthreads);
529         return (0);
530 }
531
532 void
533 ktoenal_thread_fini (void)
534 {
535         atomic_dec (&ktoenal_data.ksnd_nthreads);
536 }
537
538 void
539 ktoenal_fmb_callback (void *arg, int error)
540 {
541         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
542         ptl_hdr_t         *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
543         ksock_conn_t      *conn;
544         long               flags;
545
546         CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": %d\n", 
547                 hdr->src_nid, hdr->dest_nid, error);
548
549         if (error != 0)
550                 CERROR ("Failed to route packet from "LPX64" to "LPX64": %d\n", 
551                         hdr->src_nid, hdr->dest_nid, error);
552
553         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
554         
555         list_add (&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs);
556
557         if (!list_empty (&fmb->fmb_pool->fmp_blocked_conns))
558         {
559                 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next, ksock_conn_t, ksnc_rx_list);
560                 list_del (&conn->ksnc_rx_list);
561
562                 CDEBUG (D_NET, "Scheduling conn %p\n", conn);
563                 LASSERT (conn->ksnc_rx_scheduled);
564                 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
565
566                 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
567                 list_add_tail (&conn->ksnc_rx_list, &ktoenal_data.ksnd_rx_conns);
568
569                 if (waitqueue_active (&ktoenal_data.ksnd_sched_waitq))
570                         wake_up (&ktoenal_data.ksnd_sched_waitq);
571         }
572
573         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
574 }
575
576 ksock_fmb_t *
577 ktoenal_get_idle_fmb (ksock_conn_t *conn)
578 {
579         /* NB called with sched lock held */
580         int               payload_nob = conn->ksnc_rx_nob_left;
581         int               packet_nob = sizeof (ptl_hdr_t) + payload_nob;
582         ksock_fmb_pool_t *pool;
583         ksock_fmb_t      *fmb;
584         
585         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
586
587         if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
588                 pool = &ktoenal_data.ksnd_small_fmp;
589         else
590                 pool = &ktoenal_data.ksnd_large_fmp;
591         
592         if (!list_empty (&pool->fmp_idle_fmbs))
593         {
594                 fmb = list_entry (pool->fmp_idle_fmbs.next, ksock_fmb_t, fmb_list);
595                 list_del (&fmb->fmb_list);
596                 return (fmb);
597         }
598
599         /* deschedule until fmb free */
600
601         conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
602
603         list_add_tail (&conn->ksnc_rx_list,
604                        &pool->fmp_blocked_conns);
605         return (NULL);
606 }
607
608
609 int
610 ktoenal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
611 {
612         int payload_nob = conn->ksnc_rx_nob_left;
613         int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
614         int niov;                               /* at least the header */
615         int nob;
616         
617         LASSERT (conn->ksnc_rx_scheduled);
618         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
619         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
620         LASSERT (payload_nob >= 0);
621         LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
622         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
623         
624         /* Got a forwarding buffer; copy the header we just read into the
625          * forwarding buffer.  If there's payload start reading reading it
626          * into the buffer, otherwise the forwarding buffer can be kicked
627          * off immediately.
628          *
629          * NB fmb->fmb_iov spans the WHOLE packet.
630          *    conn->ksnc_rx_iov spans just the payload.
631          */
632
633         fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
634                 
635         memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t)); /* copy header */
636
637         if (payload_nob == 0)                   /* got complete packet already */
638         {
639                 atomic_inc (&ktoenal_packets_received);
640
641                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n", conn,
642                         conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, packet_nob);
643
644                 fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
645
646                 kpr_fwd_init (&fmb->fmb_fwd, conn->ksnc_hdr.dest_nid, 
647                               packet_nob, 1, fmb->fmb_iov, 
648                               ktoenal_fmb_callback, fmb);
649
650                 kpr_fwd_start (&ktoenal_data.ksnd_router, &fmb->fmb_fwd); /* forward it now */
651
652                 ktoenal_new_packet (conn, 0);  /* on to next packet */
653                 return (1);
654         }
655
656         niov = 1;
657         if (packet_nob <= PAGE_SIZE)            /* whole packet fits in first page */
658                 fmb->fmb_iov[0].iov_len = packet_nob;
659         else
660         {
661                 fmb->fmb_iov[0].iov_len = PAGE_SIZE;
662                 nob = packet_nob - PAGE_SIZE;
663                 
664                 do
665                 {
666                         LASSERT (niov < fmb->fmb_npages);
667                         fmb->fmb_iov[niov].iov_base = page_address (fmb->fmb_pages[niov]);
668                         fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
669                         nob -= PAGE_SIZE;
670                         niov++;
671                 } while (nob > 0);
672         }
673
674         kpr_fwd_init (&fmb->fmb_fwd, conn->ksnc_hdr.dest_nid, 
675                       packet_nob, niov, fmb->fmb_iov, 
676                       ktoenal_fmb_callback, fmb);
677
678         /* stash router's descriptor ready for call to kpr_fwd_start */        
679         conn->ksnc_cookie = &fmb->fmb_fwd;
680
681         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
682
683         /* payload is desc's iov-ed buffer, but skipping the hdr */
684         LASSERT (niov <= sizeof (conn->ksnc_rx_iov) / sizeof (conn->ksnc_rx_iov[0]));
685
686         conn->ksnc_rx_iov[0].iov_base = (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) + sizeof (ptl_hdr_t));
687         conn->ksnc_rx_iov[0].iov_len = fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
688
689         if (niov > 1)
690                 memcpy (&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1], (niov - 1) * sizeof (struct iovec));
691
692         conn->ksnc_rx_niov = niov;
693
694         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
695                 conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, payload_nob);
696         return (0);
697 }
698
699 void
700 ktoenal_fwd_parse (ksock_conn_t *conn)
701 {
702         ksock_conn_t *conn2;
703         int           body_len;
704
705         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
706                 conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, conn->ksnc_rx_nob_left);
707
708         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
709         LASSERT (conn->ksnc_rx_scheduled);
710
711         switch (conn->ksnc_hdr.type)
712         {
713         case PTL_MSG_GET:
714         case PTL_MSG_ACK:
715                 body_len = 0;
716                 break;
717         case PTL_MSG_PUT:
718                 body_len = conn->ksnc_hdr.msg.put.length;
719                 break;
720         case PTL_MSG_REPLY:
721                 body_len = conn->ksnc_hdr.msg.reply.length;
722                 break;
723         default:
724                 /* Unrecognised packet type */
725                 CERROR ("Unrecognised packet type %d from "LPX64" for "LPX64"\n",
726                         conn->ksnc_hdr.type, conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid);
727                 /* Ignore this header and go back to reading a new packet. */
728                 ktoenal_new_packet (conn, 0);
729                 return;
730         }
731
732         if (body_len < 0)                               /* length corrupt */
733         {
734                 CERROR ("dropping packet from "LPX64" for "LPX64": packet size %d illegal\n",
735                         conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, body_len);
736                 ktoenal_new_packet (conn, 0);          /* on to new packet */
737                 return;
738         }
739
740         if (body_len > SOCKNAL_MAX_FWD_PAYLOAD)         /* too big to forward */
741         {
742                 CERROR ("dropping packet from "LPX64" for "LPX64": packet size %d too big\n",
743                         conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, body_len);
744                 ktoenal_new_packet (conn, body_len);    /* on to new packet (skip this one's body) */
745                 return;
746         }
747
748         conn2 = ktoenal_get_conn (conn->ksnc_hdr.dest_nid); /* should have gone direct */
749         if (conn2 != NULL)
750         {
751                 CERROR ("dropping packet from "LPX64" for "LPX64": target is a peer\n",
752                         conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid);
753                 ktoenal_put_conn (conn2);          /* drop ref from get above */
754
755                 ktoenal_new_packet (conn, body_len);  /* on to next packet (skip this one's body) */
756                 return;
757         }
758
759         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;       /* Getting FMB now */
760         conn->ksnc_rx_nob_left = body_len;              /* stash packet size */
761         conn->ksnc_rx_nob_wanted = body_len;            /* (no slop) */
762 }
763
764 int
765 ktoenal_new_packet (ksock_conn_t *conn, int nob_to_skip)
766 {
767         static char ktoenal_slop_buffer[4096];
768
769         int   nob;
770         int   niov;
771         int   skipped;
772
773         if (nob_to_skip == 0)                   /* right at next packet boundary now */
774         {
775                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
776                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
777                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
778
779                 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
780                 conn->ksnc_rx_iov[0].iov_len  = sizeof (ptl_hdr_t);
781                 conn->ksnc_rx_niov = 1;
782                 return (1);
783         }
784
785         /* set up to skip as much a possible now */
786         /* if there's more left (ran out of iov entries) we'll get called again */
787
788         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
789         conn->ksnc_rx_nob_left = nob_to_skip;
790         skipped = 0;
791         niov = 0;
792
793         do
794         {
795                 nob = MIN (nob_to_skip, sizeof (ktoenal_slop_buffer));
796
797                 conn->ksnc_rx_iov[niov].iov_base = ktoenal_slop_buffer;
798                 conn->ksnc_rx_iov[niov].iov_len  = nob;
799                 niov++;
800                 skipped += nob;
801                 nob_to_skip -=nob;
802
803         } while (nob_to_skip != 0 &&            /* mustn't overflow conn's rx iov */
804                  niov < sizeof (conn->ksnc_rx_iov)/sizeof (conn->ksnc_rx_iov[0]));
805
806         conn->ksnc_rx_niov = niov;
807         conn->ksnc_rx_nob_wanted = skipped;
808         return (0);
809 }
810
811 void
812 ktoenal_process_receive (ksock_conn_t *conn, long *irq_flags)
813 {
814         ksock_fmb_t *fmb;
815         int          len;
816         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
817         LASSERT (conn->ksnc_rx_scheduled);
818         LASSERT (conn->ksnc_rx_ready);
819
820         /* NB: sched lock held */
821         CDEBUG(D_NET, "conn %p\n", conn);
822
823         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)     /* doesn't need a forwarding buffer */
824         {
825                 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, *irq_flags);
826                 goto try_read;
827         }
828
829  get_fmb:
830         /* NB: sched lock held */
831         fmb = ktoenal_get_idle_fmb (conn);
832         if (fmb == NULL)                        /* conn descheduled waiting for idle fmb */
833                 return;
834
835         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, *irq_flags);
836         
837         if (ktoenal_init_fmb (conn, fmb)) /* packet forwarded ? */
838                 goto out;               /* come back later for next packet */
839
840  try_read:
841         /* NB: sched lock NOT held */
842         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
843                  conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
844                  conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
845                  conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
846
847         LASSERT (conn->ksnc_rx_niov > 0);
848         LASSERT (conn->ksnc_rx_nob_wanted > 0);
849
850         conn->ksnc_rx_ready = 0;                /* data ready may race with me and set ready */
851         mb();                                   /* => clear BEFORE trying to read */
852
853         /* NB ktoenal_recvmsg "consumes" the iov passed to it */
854         len = ktoenal_recvmsg(conn->ksnc_file,
855                                conn->ksnc_rx_iov, conn->ksnc_rx_niov,
856                                conn->ksnc_rx_nob_wanted);
857         CDEBUG (D_NET, "%p read(%d) %d\n", conn, conn->ksnc_rx_nob_wanted, len);
858
859         if (len <= 0)                           /* nothing ready (EAGAIN) or EOF or error */
860         {
861                 if (len != -EAGAIN &&           /* ! nothing to read now */
862                     len != 0)                   /* ! nothing to read ever */
863                 {
864 #warning FIXME: handle socket errors properly
865                         CERROR ("Error socknal read(%d) %p: %d\n",
866                                 conn->ksnc_rx_nob_wanted, conn, len);
867                 }
868                 goto out;                       /* come back when there's data ready */
869         }
870
871         LASSERT (len <= conn->ksnc_rx_nob_wanted);
872         conn->ksnc_rx_nob_wanted -= len;
873         conn->ksnc_rx_nob_left -= len;
874
875         if (conn->ksnc_rx_nob_wanted != 0)      /* short read */
876                 goto out;                       /* try again later */
877
878         conn->ksnc_rx_ready = 1;                /* assume there's more to be had */
879
880         switch (conn->ksnc_rx_state)
881         {
882         case SOCKNAL_RX_HEADER:
883                 if (conn->ksnc_hdr.dest_nid != ktoenal_lib.ni.nid) /* It's not for me */
884                 {
885                         ktoenal_fwd_parse (conn);
886                         switch (conn->ksnc_rx_state)
887                         {
888                         case SOCKNAL_RX_HEADER: /* skipped this packet (zero payload) */
889                                 goto out;       /* => come back later */
890                         case SOCKNAL_RX_SLOP:   /* skipping this packet's body */
891                                 goto try_read;  /* => go read it */
892                         case SOCKNAL_RX_GET_FMB: /* forwarding */
893                                 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
894                                 goto get_fmb;   /* => go get a fwd msg buffer */
895                         default:
896                         }
897                         /* Not Reached */
898                         LBUG ();
899                 }
900
901                 PROF_START(lib_parse);
902                 lib_parse(&ktoenal_lib, &conn->ksnc_hdr, conn); /* sets wanted_len, iovs etc */
903                 PROF_FINISH(lib_parse);
904
905                 if (conn->ksnc_rx_nob_wanted != 0) /* need to get some payload? */
906                 {
907                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
908                         goto try_read;          /* go read the payload */
909                 }
910                 /* Fall through (completed packet for me) */
911
912         case SOCKNAL_RX_BODY:
913                 atomic_inc (&ktoenal_packets_received);
914                 lib_finalize(&ktoenal_lib, NULL, conn->ksnc_cookie); /* packet is done now */
915                 /* Fall through */
916
917         case SOCKNAL_RX_SLOP:
918                 if (ktoenal_new_packet (conn, conn->ksnc_rx_nob_left)) /* starting new packet? */
919                         goto out;               /* come back later */
920                 goto try_read;                  /* try to finish reading slop now */
921
922         case SOCKNAL_RX_BODY_FWD:
923                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n", conn,
924                         conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, conn->ksnc_rx_nob_left);
925
926                 atomic_inc (&ktoenal_packets_received);
927
928                 /* ktoenal_init_fmb() stashed router descriptor in conn->ksnc_cookie */
929                 kpr_fwd_start (&ktoenal_data.ksnd_router, (kpr_fwd_desc_t *)conn->ksnc_cookie);
930
931                 LASSERT (conn->ksnc_rx_nob_left == 0); /* no slop in forwarded packets */
932
933                 ktoenal_new_packet (conn, 0);  /* on to next packet */
934                 goto out;                       /* (later) */
935
936         default:
937         }
938
939         /* Not Reached */
940         LBUG ();
941
942  out:
943         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
944
945         if (!conn->ksnc_rx_ready)               /* no data there to read? */
946         {
947                 conn->ksnc_rx_scheduled = 0;    /* let socket callback schedule again */
948                 ktoenal_put_conn (conn);       /* release scheduler's ref */
949         }
950         else                                    /* let scheduler call me again */
951                 list_add_tail (&conn->ksnc_rx_list, &ktoenal_data.ksnd_rx_conns);
952 }
953
954 int
955 ktoenal_recv(nal_cb_t *nal, void *private, lib_msg_t *msg,
956              unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
957 {
958         ksock_conn_t *conn = (ksock_conn_t *)private;
959         int           i;
960
961         conn->ksnc_cookie = msg;
962
963         LASSERT (niov <= PTL_MD_MAX_IOV);
964         for (i = 0; i < niov; i++)
965         {
966                 conn->ksnc_rx_iov[i].iov_len = iov[i].iov_len;
967                 conn->ksnc_rx_iov[i].iov_base = iov[i].iov_base;
968         }
969
970         conn->ksnc_rx_niov       = niov;
971         conn->ksnc_rx_nob_wanted = mlen;
972         conn->ksnc_rx_nob_left   = rlen;
973
974         return (rlen);
975 }
976
977 int
978 ktoenal_scheduler (void *arg)
979 {
980         unsigned long      flags;
981         ksock_conn_t      *conn;
982         int                rc;
983         int                nloops = 0;
984
985         kportal_daemonize ("ktoenal_sched");
986         kportal_blockallsigs ();
987         
988         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
989
990         while (!ktoenal_data.ksnd_shuttingdown)
991         {
992                 int did_something = 0;
993
994                 /* Ensure I progress everything semi-fairly */
995
996                 if (!list_empty (&ktoenal_data.ksnd_rx_conns))
997                 {
998                         did_something = 1;
999                         conn = list_entry (ktoenal_data.ksnd_rx_conns.next,
1000                                            ksock_conn_t, ksnc_rx_list);
1001                         list_del (&conn->ksnc_rx_list);
1002
1003                         ktoenal_process_receive (conn, &flags); /* drops & regains ksnd_sched_lock */
1004                 }
1005
1006                 if (!list_empty (&ktoenal_data.ksnd_tx_conns))
1007                 {
1008                         did_something = 1;
1009                         conn = list_entry (ktoenal_data.ksnd_tx_conns.next,
1010                                            ksock_conn_t, ksnc_tx_list);
1011
1012                         list_del (&conn->ksnc_tx_list);
1013                         ktoenal_process_transmit (conn, &flags); /* drops and regains ksnd_sched_lock */
1014                 }
1015
1016                 if (!did_something ||           /* nothing to do */
1017                     ++nloops == SOCKNAL_RESCHED) /* hogging CPU? */
1018                 {
1019                         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
1020
1021                         nloops = 0;
1022
1023                         if (!did_something) {   /* wait for something to do */
1024                                 rc = wait_event_interruptible (ktoenal_data.ksnd_sched_waitq,
1025                                                                ktoenal_data.ksnd_shuttingdown ||
1026                                                                !list_empty (&ktoenal_data.ksnd_rx_conns) ||
1027                                                                !list_empty (&ktoenal_data.ksnd_tx_conns));
1028                                 LASSERT (rc == 0);
1029                         } else 
1030                                 our_cond_resched();
1031
1032                         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
1033                 }
1034         }
1035
1036         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
1037         ktoenal_thread_fini ();
1038         return (0);
1039 }
1040
1041
1042 int
1043 ktoenal_reaper (void *arg)
1044 {
1045         unsigned long      flags;
1046         ksock_conn_t      *conn;
1047         int                rc;
1048         
1049         kportal_daemonize ("ktoenal_reaper");
1050         kportal_blockallsigs ();
1051
1052         while (!ktoenal_data.ksnd_shuttingdown)
1053         {
1054                 spin_lock_irqsave (&ktoenal_data.ksnd_reaper_lock, flags);
1055
1056                 if (list_empty (&ktoenal_data.ksnd_reaper_list))
1057                         conn = NULL;
1058                 else
1059                 {
1060                         conn = list_entry (ktoenal_data.ksnd_reaper_list.next,
1061                                            ksock_conn_t, ksnc_list);
1062                         list_del (&conn->ksnc_list);
1063                 }
1064
1065                 spin_unlock_irqrestore (&ktoenal_data.ksnd_reaper_lock, flags);
1066
1067                 if (conn != NULL)
1068                         ktoenal_close_conn (conn);
1069                 else {
1070                         rc = wait_event_interruptible (ktoenal_data.ksnd_reaper_waitq,
1071                                                        ktoenal_data.ksnd_shuttingdown ||
1072                                                        !list_empty(&ktoenal_data.ksnd_reaper_list));
1073                         LASSERT (rc == 0);
1074                 }
1075         }
1076
1077         ktoenal_thread_fini ();
1078         return (0);
1079 }
1080
1081 #define POLLREAD        (POLLIN | POLLRDNORM | POLLRDBAND | POLLPRI)
1082 #define POLLWRITE       (POLLOUT | POLLWRNORM | POLLWRBAND)
1083
1084 int
1085 ktoenal_pollthread(void *arg)
1086 {
1087         unsigned int mask;
1088         struct list_head *tmp;
1089         ksock_conn_t *conn;
1090         
1091         /* Save the task struct for waking it up */
1092         ktoenal_data.ksnd_pollthread_tsk = current; 
1093         
1094         kportal_daemonize ("ktoenal_pollthread");
1095         kportal_blockallsigs ();
1096         
1097         poll_initwait(&ktoenal_data.ksnd_pwait);
1098         
1099         while(!ktoenal_data.ksnd_shuttingdown) {
1100                 
1101                 set_current_state(TASK_INTERRUPTIBLE);
1102                 
1103                 read_lock (&ktoenal_data.ksnd_socklist_lock);
1104                 list_for_each(tmp, &ktoenal_data.ksnd_socklist) {
1105                         
1106                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
1107                         atomic_inc(&conn->ksnc_refcount);
1108                         read_unlock (&ktoenal_data.ksnd_socklist_lock);
1109                         
1110                         mask = conn->ksnc_file->f_op->poll(conn->ksnc_file,
1111                                   ktoenal_data.ksnd_slistchange ? 
1112                                   &ktoenal_data.ksnd_pwait : NULL);
1113                          
1114                         if(mask & POLLREAD) {
1115                                 ktoenal_data_ready(conn);
1116                                                         
1117                         } 
1118                         if (mask & POLLWRITE) {
1119                                 ktoenal_write_space(conn);  
1120                               
1121                         }
1122                         if (mask & (POLLERR | POLLHUP)) {
1123                                          /* Do error processing */          
1124                         }      
1125                         
1126                         read_lock (&ktoenal_data.ksnd_socklist_lock);
1127                         if(atomic_dec_and_test(&conn->ksnc_refcount))
1128                                 _ktoenal_put_conn(conn);
1129                 }
1130                 ktoenal_data.ksnd_slistchange = 0;
1131                 read_unlock (&ktoenal_data.ksnd_socklist_lock);
1132                 
1133                 schedule_timeout(MAX_SCHEDULE_TIMEOUT);
1134                 if(ktoenal_data.ksnd_slistchange) {
1135                         poll_freewait(&ktoenal_data.ksnd_pwait); 
1136                         poll_initwait(&ktoenal_data.ksnd_pwait);
1137                 }
1138          }
1139         poll_freewait(&ktoenal_data.ksnd_pwait);
1140         ktoenal_thread_fini();
1141         return (0);
1142 }
1143
1144 void
1145 ktoenal_data_ready (ksock_conn_t *conn)
1146 {
1147         unsigned long  flags;
1148         ENTRY;
1149
1150         if (!test_and_set_bit (0, &conn->ksnc_rx_ready)) { 
1151                 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
1152
1153                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1154                         list_add_tail (&conn->ksnc_rx_list, 
1155                                         &ktoenal_data.ksnd_rx_conns);
1156                         conn->ksnc_rx_scheduled = 1;
1157                         /* extra ref for scheduler */
1158                         atomic_inc (&conn->ksnc_refcount);
1159
1160                         /* This is done to avoid the effects of a sequence
1161                          * of events in which the rx_ready is lost
1162                          */
1163                         conn->ksnc_rx_ready=1;
1164                           
1165                         if (waitqueue_active (&ktoenal_data.ksnd_sched_waitq))
1166                                 wake_up (&ktoenal_data.ksnd_sched_waitq);
1167                 }
1168
1169                 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
1170         }
1171
1172         EXIT;
1173 }
1174
1175 void
1176 ktoenal_write_space (ksock_conn_t *conn)
1177 {
1178         unsigned long  flags;
1179
1180         CDEBUG (D_NET, "conn %p%s%s%s\n",
1181                          conn,
1182                         (conn == NULL) ? "" : (test_bit (0, &conn->ksnc_tx_ready) ? " ready" : " blocked"),
1183                         (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ? " scheduled" : " idle"),
1184                         (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ? " empty" : " queued"));
1185
1186
1187         if (!test_and_set_bit (0, &conn->ksnc_tx_ready)) {
1188                 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
1189
1190                 if (!list_empty (&conn->ksnc_tx_queue) && /* packets to send */
1191                                 !conn->ksnc_tx_scheduled) { /* not being progressed */
1192
1193                         list_add_tail (&conn->ksnc_tx_list, 
1194                                         &ktoenal_data.ksnd_tx_conns);
1195                         conn->ksnc_tx_scheduled = 1;
1196                         /* extra ref for scheduler */
1197                         atomic_inc (&conn->ksnc_refcount);
1198
1199                         if (waitqueue_active (&ktoenal_data.ksnd_sched_waitq))
1200                                 wake_up (&ktoenal_data.ksnd_sched_waitq);
1201                 }
1202                 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
1203         }
1204 }
1205
1206 nal_cb_t ktoenal_lib = {
1207         nal_data:       &ktoenal_data,                /* NAL private data */
1208         cb_send:         ktoenal_send,
1209         cb_recv:         ktoenal_recv,
1210         cb_read:         ktoenal_read,
1211         cb_write:        ktoenal_write,
1212         cb_callback:     ktoenal_callback,
1213         cb_malloc:       ktoenal_malloc,
1214         cb_free:         ktoenal_free,
1215         cb_printf:       ktoenal_printf,
1216         cb_cli:          ktoenal_cli,
1217         cb_sti:          ktoenal_sti,
1218         cb_dist:         ktoenal_dist
1219 };