Whamcloud - gitweb
* Fixed bug 2119
[fs/lustre-release.git] / lustre / portals / knals / qswnal / qswnal_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8  * W. Marcus Miller - Based on ksocknal
9  *
10  * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  * Portals is free software; you can redistribute it and/or
13  * modify it under the terms of version 2 of the GNU General Public
14  * License as published by the Free Software Foundation.
15  *
16  * Portals is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with Portals; if not, write to the Free Software
23  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  */
26
27 #include "qswnal.h"
28
29 atomic_t kqswnal_packets_launched;
30 atomic_t kqswnal_packets_transmitted;
31 atomic_t kqswnal_packets_received;
32
33
34 /*
35  *  LIB functions follow
36  *
37  */
38 static int
39 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
40              size_t len)
41 {
42         CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
43                 nal->ni.nid, len, src_addr, dst_addr );
44         memcpy( dst_addr, src_addr, len );
45
46         return (0);
47 }
48
49 static int
50 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
51               size_t len)
52 {
53         CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
54                 nal->ni.nid, len, src_addr, dst_addr );
55         memcpy( dst_addr, src_addr, len );
56
57         return (0);
58 }
59
60 static void *
61 kqswnal_malloc(nal_cb_t *nal, size_t len)
62 {
63         void *buf;
64
65         PORTAL_ALLOC(buf, len);
66         return (buf);
67 }
68
69 static void
70 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
71 {
72         PORTAL_FREE(buf, len);
73 }
74
75 static void
76 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
77 {
78         va_list ap;
79         char msg[256];
80
81         va_start (ap, fmt);
82         vsnprintf (msg, sizeof (msg), fmt, ap);        /* sprint safely */
83         va_end (ap);
84
85         msg[sizeof (msg) - 1] = 0;                /* ensure terminated */
86
87         CDEBUG (D_NET, "%s", msg);
88 }
89
90
91 static void
92 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
93 {
94         kqswnal_data_t *data= nal->nal_data;
95
96         spin_lock_irqsave(&data->kqn_statelock, *flags);
97 }
98
99
100 static void
101 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
102 {
103         kqswnal_data_t *data= nal->nal_data;
104
105         spin_unlock_irqrestore(&data->kqn_statelock, *flags);
106 }
107
108
109 static int
110 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
111 {
112         if (nid == nal->ni.nid)
113                 *dist = 0;                      /* it's me */
114         else if (kqswnal_nid2elanid (nid) >= 0)
115                 *dist = 1;                      /* it's my peer */
116         else
117                 *dist = 2;                      /* via router */
118         return (0);
119 }
120
121 void
122 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
123 {
124         struct timeval     now;
125         time_t             then;
126
127         do_gettimeofday (&now);
128         then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
129
130         kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
131 }
132
133 void
134 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
135 {
136         if (ktx->ktx_nmappedpages == 0)
137                 return;
138
139         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
140                 ktx, ktx->ktx_niov, ktx->ktx_basepage, ktx->ktx_nmappedpages);
141
142         LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
143         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
144                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
145
146         elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
147                           kqswnal_data.kqn_eptxdmahandle,
148                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
149         ktx->ktx_nmappedpages = 0;
150 }
151
152 int
153 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
154 {
155         int       nfrags    = ktx->ktx_niov;
156         const int maxfrags  = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]);
157         int       nmapped   = ktx->ktx_nmappedpages;
158         int       maxmapped = ktx->ktx_npages;
159         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
160         char     *ptr;
161         
162         LASSERT (nmapped <= maxmapped);
163         LASSERT (nfrags <= maxfrags);
164         LASSERT (niov > 0);
165         LASSERT (nob > 0);
166         
167         do {
168                 int  fraglen = kiov->kiov_len;
169
170                 /* nob exactly spans the iovs */
171                 LASSERT (fraglen <= nob);
172                 /* each frag fits in a page */
173                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
174
175                 nmapped++;
176                 if (nmapped > maxmapped) {
177                         CERROR("Can't map message in %d pages (max %d)\n",
178                                nmapped, maxmapped);
179                         return (-EMSGSIZE);
180                 }
181
182                 if (nfrags == maxfrags) {
183                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
184                                maxfrags);
185                         return (-EMSGSIZE);
186                 }
187
188                 /* XXX this is really crap, but we'll have to kmap until
189                  * EKC has a page (rather than vaddr) mapping interface */
190
191                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
192
193                 CDEBUG(D_NET,
194                        "%p[%d] loading %p for %d, page %d, %d total\n",
195                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
196
197                 elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
198                                        kqswnal_data.kqn_eptxdmahandle,
199                                        ptr, fraglen,
200                                        basepage, &ktx->ktx_iov[nfrags].Base);
201
202                 kunmap (kiov->kiov_page);
203                 
204                 /* keep in loop for failure case */
205                 ktx->ktx_nmappedpages = nmapped;
206
207                 if (nfrags > 0 &&                /* previous frag mapped */
208                     ktx->ktx_iov[nfrags].Base == /* contiguous with this one */
209                     (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len))
210                         /* just extend previous */
211                         ktx->ktx_iov[nfrags - 1].Len += fraglen;
212                 else {
213                         ktx->ktx_iov[nfrags].Len = fraglen;
214                         nfrags++;                /* new frag */
215                 }
216
217                 basepage++;
218                 kiov++;
219                 niov--;
220                 nob -= fraglen;
221
222                 /* iov must not run out before end of data */
223                 LASSERT (nob == 0 || niov > 0);
224
225         } while (nob > 0);
226
227         ktx->ktx_niov = nfrags;
228         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
229                 ktx, ktx->ktx_niov, ktx->ktx_nmappedpages);
230
231         return (0);
232 }
233
234 int
235 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
236 {
237         int       nfrags    = ktx->ktx_niov;
238         const int maxfrags  = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]);
239         int       nmapped   = ktx->ktx_nmappedpages;
240         int       maxmapped = ktx->ktx_npages;
241         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
242
243         LASSERT (nmapped <= maxmapped);
244         LASSERT (nfrags <= maxfrags);
245         LASSERT (niov > 0);
246         LASSERT (nob > 0);
247
248         do {
249                 int  fraglen = iov->iov_len;
250                 long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
251
252                 /* nob exactly spans the iovs */
253                 LASSERT (fraglen <= nob);
254                 
255                 nmapped += npages;
256                 if (nmapped > maxmapped) {
257                         CERROR("Can't map message in %d pages (max %d)\n",
258                                nmapped, maxmapped);
259                         return (-EMSGSIZE);
260                 }
261
262                 if (nfrags == maxfrags) {
263                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
264                                maxfrags);
265                         return (-EMSGSIZE);
266                 }
267
268                 CDEBUG(D_NET,
269                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
270                         ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
271                         nmapped);
272
273                 elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
274                                        kqswnal_data.kqn_eptxdmahandle,
275                                        iov->iov_base, fraglen,
276                                        basepage, &ktx->ktx_iov[nfrags].Base);
277                 /* keep in loop for failure case */
278                 ktx->ktx_nmappedpages = nmapped;
279
280                 if (nfrags > 0 &&                /* previous frag mapped */
281                     ktx->ktx_iov[nfrags].Base == /* contiguous with this one */
282                     (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len))
283                         /* just extend previous */
284                         ktx->ktx_iov[nfrags - 1].Len += fraglen;
285                 else {
286                         ktx->ktx_iov[nfrags].Len = fraglen;
287                         nfrags++;                /* new frag */
288                 }
289
290                 basepage += npages;
291                 iov++;
292                 niov--;
293                 nob -= fraglen;
294
295                 /* iov must not run out before end of data */
296                 LASSERT (nob == 0 || niov > 0);
297
298         } while (nob > 0);
299
300         ktx->ktx_niov = nfrags;
301         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
302                 ktx, ktx->ktx_niov, ktx->ktx_nmappedpages);
303
304         return (0);
305 }
306
307 void
308 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
309 {
310         kpr_fwd_desc_t   *fwd = NULL;
311         unsigned long     flags;
312
313         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
314
315         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
316
317         list_del (&ktx->ktx_list);              /* take off active list */
318
319         if (ktx->ktx_isnblk) {
320                 /* reserved for non-blocking tx */
321                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
322                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
323                 return;
324         }
325
326         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
327
328         /* anything blocking for a tx descriptor? */
329         if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
330         {
331                 CDEBUG(D_NET,"wakeup fwd\n");
332
333                 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
334                                   kpr_fwd_desc_t, kprfd_list);
335                 list_del (&fwd->kprfd_list);
336         }
337
338         wake_up (&kqswnal_data.kqn_idletxd_waitq);
339
340         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
341
342         if (fwd == NULL)
343                 return;
344
345         /* schedule packet for forwarding again */
346         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
347
348         list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
349         wake_up (&kqswnal_data.kqn_sched_waitq);
350
351         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
352 }
353
354 kqswnal_tx_t *
355 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
356 {
357         unsigned long  flags;
358         kqswnal_tx_t  *ktx = NULL;
359
360         for (;;) {
361                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
362
363                 /* "normal" descriptor is free */
364                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
365                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
366                                           kqswnal_tx_t, ktx_list);
367                         break;
368                 }
369
370                 /* "normal" descriptor pool is empty */
371
372                 if (fwd != NULL) { /* forwarded packet => queue for idle txd */
373                         CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
374                         list_add_tail (&fwd->kprfd_list,
375                                        &kqswnal_data.kqn_idletxd_fwdq);
376                         break;
377                 }
378
379                 /* doing a local transmit */
380                 if (!may_block) {
381                         if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
382                                 CERROR ("intr tx desc pool exhausted\n");
383                                 break;
384                         }
385
386                         ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
387                                           kqswnal_tx_t, ktx_list);
388                         break;
389                 }
390
391                 /* block for idle tx */
392
393                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
394
395                 CDEBUG (D_NET, "blocking for tx desc\n");
396                 wait_event (kqswnal_data.kqn_idletxd_waitq,
397                             !list_empty (&kqswnal_data.kqn_idletxds));
398         }
399
400         if (ktx != NULL) {
401                 list_del (&ktx->ktx_list);
402                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
403                 ktx->ktx_launcher = current->pid;
404         }
405
406         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
407
408         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
409         LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
410         return (ktx);
411 }
412
413 void
414 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
415 {
416         if (ktx->ktx_forwarding)                /* router asked me to forward this packet */
417                 kpr_fwd_done (&kqswnal_data.kqn_router,
418                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
419         else                                    /* packet sourced locally */
420                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
421                               (lib_msg_t *)ktx->ktx_args[1]);
422
423         kqswnal_put_idle_tx (ktx);
424 }
425
426 static void
427 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
428 {
429         kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
430         
431         LASSERT (txd != NULL);
432         LASSERT (ktx != NULL);
433
434         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
435
436         if (status == EP_SUCCESS)
437                 atomic_inc (&kqswnal_packets_transmitted);
438
439         if (status != EP_SUCCESS)
440         {
441                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
442                         ktx->ktx_nid, status);
443
444                 kqswnal_notify_peer_down(ktx);
445                 status = -EIO;
446         }
447
448         kqswnal_tx_done (ktx, status);
449 }
450
451 int
452 kqswnal_launch (kqswnal_tx_t *ktx)
453 {
454         /* Don't block for transmit descriptor if we're in interrupt context */
455         int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
456         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
457         long  flags;
458         int   rc;
459
460         ktx->ktx_launchtime = jiffies;
461
462         LASSERT (dest >= 0);                    /* must be a peer */
463         rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
464                                ktx->ktx_port, attr, kqswnal_txhandler,
465                                ktx, ktx->ktx_iov, ktx->ktx_niov);
466         switch (rc) {
467         case 0: /* success */
468                 atomic_inc (&kqswnal_packets_launched);
469                 return (0);
470
471         case ENOMEM: /* can't allocate ep txd => queue for later */
472                 LASSERT (in_interrupt());
473
474                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
475
476                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
477                 wake_up (&kqswnal_data.kqn_sched_waitq);
478
479                 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
480                 return (0);
481
482         default: /* fatal error */
483                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
484                 kqswnal_notify_peer_down(ktx);
485                 return (rc);
486         }
487 }
488
489 static char *
490 hdr_type_string (ptl_hdr_t *hdr)
491 {
492         switch (hdr->type) {
493         case PTL_MSG_ACK:
494                 return ("ACK");
495         case PTL_MSG_PUT:
496                 return ("PUT");
497         case PTL_MSG_GET:
498                 return ("GET");
499         case PTL_MSG_REPLY:
500                 return ("REPLY");
501         default:
502                 return ("<UNKNOWN>");
503         }
504 }
505
506 static void
507 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
508 {
509         char *type_str = hdr_type_string (hdr);
510
511         CERROR("P3 Header at %p of type %s\n", hdr, type_str);
512         CERROR("    From nid/pid "LPU64"/%u", NTOH__u64(hdr->src_nid),
513                NTOH__u32(hdr->src_pid));
514         CERROR("    To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
515                NTOH__u32(hdr->dest_pid));
516
517         switch (NTOH__u32(hdr->type)) {
518         case PTL_MSG_PUT:
519                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
520                        "match bits "LPX64"\n",
521                        NTOH__u32 (hdr->msg.put.ptl_index),
522                        hdr->msg.put.ack_wmd.wh_interface_cookie,
523                        hdr->msg.put.ack_wmd.wh_object_cookie,
524                        NTOH__u64 (hdr->msg.put.match_bits));
525                 CERROR("    Length %d, offset %d, hdr data "LPX64"\n",
526                        NTOH__u32(PTL_HDR_LENGTH(hdr)),
527                        NTOH__u32(hdr->msg.put.offset),
528                        hdr->msg.put.hdr_data);
529                 break;
530
531         case PTL_MSG_GET:
532                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
533                        "match bits "LPX64"\n",
534                        NTOH__u32 (hdr->msg.get.ptl_index),
535                        hdr->msg.get.return_wmd.wh_interface_cookie,
536                        hdr->msg.get.return_wmd.wh_object_cookie,
537                        hdr->msg.get.match_bits);
538                 CERROR("    Length %d, src offset %d\n",
539                        NTOH__u32 (hdr->msg.get.sink_length),
540                        NTOH__u32 (hdr->msg.get.src_offset));
541                 break;
542
543         case PTL_MSG_ACK:
544                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
545                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
546                        hdr->msg.ack.dst_wmd.wh_object_cookie,
547                        NTOH__u32 (hdr->msg.ack.mlength));
548                 break;
549
550         case PTL_MSG_REPLY:
551                 CERROR("    dst md "LPX64"."LPX64", length %d\n",
552                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
553                        hdr->msg.reply.dst_wmd.wh_object_cookie,
554                        NTOH__u32 (PTL_HDR_LENGTH(hdr)));
555         }
556
557 }                               /* end of print_hdr() */
558
559 static int
560 kqswnal_sendmsg (nal_cb_t     *nal,
561                  void         *private,
562                  lib_msg_t    *cookie,
563                  ptl_hdr_t    *hdr,
564                  int           type,
565                  ptl_nid_t     nid,
566                  ptl_pid_t     pid,
567                  unsigned int  payload_niov,
568                  struct iovec *payload_iov,
569                  ptl_kiov_t   *payload_kiov,
570                  size_t        payload_nob)
571 {
572         kqswnal_tx_t      *ktx;
573         int                rc;
574         ptl_nid_t          gatewaynid;
575 #if KQSW_CHECKSUM
576         int                i;
577         kqsw_csum_t        csum;
578         int                sumnob;
579 #endif
580         
581         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
582                " pid %u\n", payload_nob, payload_niov, nid, pid);
583
584         LASSERT (payload_nob == 0 || payload_niov > 0);
585         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
586
587         /* It must be OK to kmap() if required */
588         LASSERT (payload_kiov == NULL || !in_interrupt ());
589         /* payload is either all vaddrs or all pages */
590         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
591         
592         if (payload_nob > KQSW_MAXPAYLOAD) {
593                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
594                         payload_nob, KQSW_MAXPAYLOAD);
595                 return (PTL_FAIL);
596         }
597
598         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
599                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
600                                  sizeof (ptl_hdr_t) + payload_nob, &gatewaynid);
601                 if (rc != 0) {
602                         CERROR("Can't route to "LPX64": router error %d\n",
603                                nid, rc);
604                         return (PTL_FAIL);
605                 }
606                 if (kqswnal_nid2elanid (gatewaynid) < 0) {
607                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
608                                gatewaynid, nid);
609                         return (PTL_FAIL);
610                 }
611                 nid = gatewaynid;
612         }
613
614         /* I may not block for a transmit descriptor if I might block the
615          * receiver, or an interrupt handler. */
616         ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
617                                           type == PTL_MSG_REPLY ||
618                                           in_interrupt()));
619         if (ktx == NULL) {
620                 kqswnal_cerror_hdr (hdr);
621                 return (PTL_NOSPACE);
622         }
623
624         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
625         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
626
627 #if KQSW_CHECKSUM
628         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
629         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
630         for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) {
631                 if (payload_kiov != NULL) {
632                         ptl_kiov_t *kiov = &payload_kiov[i];
633                         char       *addr = ((char *)kmap (kiov->kiov_page)) +
634                                            kiov->kiov_offset;
635                         
636                         csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len));
637                         sumnob -= kiov->kiov_len;
638                 } else {
639                         struct iovec *iov = &payload_iov[i];
640
641                         csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len));
642                         sumnob -= iov->iov_len;
643                 }
644         }
645         memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
646 #endif
647
648         /* Set up first frag from pre-mapped buffer (it's at least the
649          * portals header) */
650         ktx->ktx_iov[0].Base = ktx->ktx_ebuffer;
651         ktx->ktx_iov[0].Len = KQSW_HDR_SIZE;
652         ktx->ktx_niov = 1;
653
654         if (payload_nob > 0) { /* got some payload (something more to do) */
655                 /* make a single contiguous message? */
656                 if (payload_nob <= KQSW_TX_MAXCONTIG) {
657                         /* copy payload to ktx_buffer, immediately after hdr */
658                         if (payload_kiov != NULL)
659                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
660                                                    payload_niov, payload_kiov, payload_nob);
661                         else
662                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
663                                                   payload_niov, payload_iov, payload_nob);
664                         /* first frag includes payload */
665                         ktx->ktx_iov[0].Len += payload_nob;
666                 } else {
667                         if (payload_kiov != NULL)
668                                 rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
669                                                           payload_niov, payload_kiov);
670                         else
671                                 rc = kqswnal_map_tx_iov (ktx, payload_nob,
672                                                          payload_niov, payload_iov);
673                         if (rc != 0) {
674                                 kqswnal_put_idle_tx (ktx);
675                                 return (PTL_FAIL);
676                         }
677                 } 
678         }
679
680         ktx->ktx_port       = (payload_nob <= KQSW_SMALLPAYLOAD) ?
681                               EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
682         ktx->ktx_nid        = nid;
683         ktx->ktx_forwarding = 0;   /* => lib_finalize() on completion */
684         ktx->ktx_args[0]    = private;
685         ktx->ktx_args[1]    = cookie;
686
687         rc = kqswnal_launch (ktx);
688         if (rc != 0) {                    /* failed? */
689                 CERROR ("Failed to send packet to "LPX64": %d\n", nid, rc);
690                 kqswnal_put_idle_tx (ktx);
691                 return (PTL_FAIL);
692         }
693
694         CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64"\n", payload_nob, nid);
695         return (PTL_OK);
696 }
697
698 static int
699 kqswnal_send (nal_cb_t     *nal,
700               void         *private,
701               lib_msg_t    *cookie,
702               ptl_hdr_t    *hdr,
703               int           type,
704               ptl_nid_t     nid,
705               ptl_pid_t     pid,
706               unsigned int  payload_niov,
707               struct iovec *payload_iov,
708               size_t        payload_nob)
709 {
710         return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid,
711                                  payload_niov, payload_iov, NULL, payload_nob));
712 }
713
714 static int
715 kqswnal_send_pages (nal_cb_t     *nal,
716                     void         *private,
717                     lib_msg_t    *cookie,
718                     ptl_hdr_t    *hdr,
719                     int           type,
720                     ptl_nid_t     nid,
721                     ptl_pid_t     pid,
722                     unsigned int  payload_niov,
723                     ptl_kiov_t   *payload_kiov,
724                     size_t        payload_nob)
725 {
726         return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid,
727                                  payload_niov, NULL, payload_kiov, payload_nob));
728 }
729
730 int kqswnal_fwd_copy_contig = 0;
731
732 void
733 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
734 {
735         int             rc;
736         kqswnal_tx_t   *ktx;
737         struct iovec   *iov = fwd->kprfd_iov;
738         int             niov = fwd->kprfd_niov;
739         int             nob = fwd->kprfd_nob;
740         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
741
742 #if KQSW_CHECKSUM
743         CERROR ("checksums for forwarded packets not implemented\n");
744         LBUG ();
745 #endif
746         /* The router wants this NAL to forward a packet */
747         CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
748                 fwd, nid, niov, nob);
749
750         LASSERT (niov > 0);
751         
752         ktx = kqswnal_get_idle_tx (fwd, FALSE);
753         if (ktx == NULL)        /* can't get txd right now */
754                 return;         /* fwd will be scheduled when tx desc freed */
755
756         if (nid == kqswnal_lib.ni.nid)          /* gateway is me */
757                 nid = fwd->kprfd_target_nid;    /* target is final dest */
758
759         if (kqswnal_nid2elanid (nid) < 0) {
760                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
761                 rc = -EHOSTUNREACH;
762                 goto failed;
763         }
764
765         if (nob > KQSW_NRXMSGBYTES_LARGE) {
766                 CERROR ("Can't forward [%p] to "LPX64
767                         ": size %d bigger than max packet size %ld\n",
768                         fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
769                 rc = -EMSGSIZE;
770                 goto failed;
771         }
772
773         if ((kqswnal_fwd_copy_contig || niov > 1) &&
774             nob <= KQSW_TX_BUFFER_SIZE) 
775         {
776                 /* send from ktx's pre-allocated/mapped contiguous buffer? */
777                 lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
778                 ktx->ktx_iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
779                 ktx->ktx_iov[0].Len = nob;
780                 ktx->ktx_niov = 1;
781
782                 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
783         }
784         else
785         {
786                 /* zero copy */
787                 ktx->ktx_niov = 0;        /* no frags mapped yet */
788                 rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
789                 if (rc != 0)
790                         goto failed;
791
792                 ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
793         }
794
795         ktx->ktx_port       = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
796                               EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
797         ktx->ktx_nid        = nid;
798         ktx->ktx_forwarding = 1;
799         ktx->ktx_args[0]    = fwd;
800
801         rc = kqswnal_launch (ktx);
802         if (rc == 0)
803                 return;
804
805  failed:
806         LASSERT (rc != 0);
807         CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
808
809         kqswnal_put_idle_tx (ktx);
810         /* complete now (with failure) */
811         kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
812 }
813
814 void
815 kqswnal_fwd_callback (void *arg, int error)
816 {
817         kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
818
819         /* The router has finished forwarding this packet */
820
821         if (error != 0)
822         {
823                 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
824
825                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
826                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
827         }
828
829         kqswnal_requeue_rx (krx);
830 }
831
832 void
833 kqswnal_rx (kqswnal_rx_t *krx)
834 {
835         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
836         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
837         int             nob;
838         int             niov;
839
840         if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
841                 /* NB krx requeued when lib_parse() calls back kqswnal_recv */
842                 lib_parse (&kqswnal_lib, hdr, krx);
843                 return;
844         }
845
846 #if KQSW_CHECKSUM
847         CERROR ("checksums for forwarded packets not implemented\n");
848         LBUG ();
849 #endif
850         if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
851         {
852                 CERROR("dropping packet from "LPX64" for "LPX64
853                        ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
854                 kqswnal_requeue_rx (krx);
855                 return;
856         }
857
858         /* NB forwarding may destroy iov; rebuild every time */
859         for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
860         {
861                 LASSERT (niov < krx->krx_npages);
862                 krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
863                 krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
864         }
865
866         kpr_fwd_init (&krx->krx_fwd, dest_nid,
867                       krx->krx_nob, niov, krx->krx_iov,
868                       kqswnal_fwd_callback, krx);
869
870         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
871 }
872
873 /* Receive Interrupt Handler: posts to schedulers */
874 void 
875 kqswnal_rxhandler(EP_RXD *rxd)
876 {
877         long          flags;
878         int           nob    = ep_rxd_len (rxd);
879         int           status = ep_rxd_status (rxd);
880         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
881
882         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
883                rxd, krx, nob, status);
884
885         LASSERT (krx != NULL);
886
887         krx->krx_rxd = rxd;
888         krx->krx_nob = nob;
889
890         /* must receive a whole header to be able to parse */
891         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
892         {
893                 /* receives complete with failure when receiver is removed */
894                 if (kqswnal_data.kqn_shuttingdown)
895                         return;
896
897                 CERROR("receive status failed with status %d nob %d\n",
898                        ep_rxd_status(rxd), nob);
899                 kqswnal_requeue_rx (krx);
900                 return;
901         }
902
903         atomic_inc (&kqswnal_packets_received);
904
905         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
906
907         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
908         wake_up (&kqswnal_data.kqn_sched_waitq);
909
910         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
911 }
912
913 #if KQSW_CHECKSUM
914 void
915 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
916 {
917         ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
918
919         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
920                 ", dpid %d, spid %d, type %d\n",
921                 ishdr ? "Header" : "Payload", krx,
922                 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
923                 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
924                 NTOH__u32(hdr->type));
925
926         switch (NTOH__u32 (hdr->type))
927         {
928         case PTL_MSG_ACK:
929                 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
930                        " len %u\n",
931                        NTOH__u32(hdr->msg.ack.mlength),
932                        hdr->msg.ack.dst_wmd.handle_cookie,
933                        hdr->msg.ack.dst_wmd.handle_idx,
934                        NTOH__u64(hdr->msg.ack.match_bits),
935                        NTOH__u32(hdr->msg.ack.length));
936                 break;
937         case PTL_MSG_PUT:
938                 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
939                        " len %u off %u data "LPX64"\n",
940                        NTOH__u32(hdr->msg.put.ptl_index),
941                        hdr->msg.put.ack_wmd.handle_cookie,
942                        hdr->msg.put.ack_wmd.handle_idx,
943                        NTOH__u64(hdr->msg.put.match_bits),
944                        NTOH__u32(hdr->msg.put.length),
945                        NTOH__u32(hdr->msg.put.offset),
946                        hdr->msg.put.hdr_data);
947                 break;
948         case PTL_MSG_GET:
949                 CERROR ("GET: <>\n");
950                 break;
951         case PTL_MSG_REPLY:
952                 CERROR ("REPLY: <>\n");
953                 break;
954         default:
955                 CERROR ("TYPE?: <>\n");
956         }
957 }
958 #endif
959
960 static int
961 kqswnal_recvmsg (nal_cb_t     *nal,
962                  void         *private,
963                  lib_msg_t    *cookie,
964                  unsigned int  niov,
965                  struct iovec *iov,
966                  ptl_kiov_t   *kiov,
967                  size_t        mlen,
968                  size_t        rlen)
969 {
970         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
971         int           page;
972         char         *page_ptr;
973         int           page_nob;
974         char         *iov_ptr;
975         int           iov_nob;
976         int           frag;
977 #if KQSW_CHECKSUM
978         kqsw_csum_t   senders_csum;
979         kqsw_csum_t   payload_csum = 0;
980         kqsw_csum_t   hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
981                                            sizeof(ptl_hdr_t));
982         size_t        csum_len = mlen;
983         int           csum_frags = 0;
984         int           csum_nob = 0;
985         static atomic_t csum_counter;
986         int           csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
987
988         atomic_inc (&csum_counter);
989
990         memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
991                                 sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
992         if (senders_csum != hdr_csum)
993                 kqswnal_csum_error (krx, 1);
994 #endif
995         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
996
997         /* What was actually received must be >= payload.
998          * This is an LASSERT, as lib_finalize() doesn't have a completion status. */
999         LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen);
1000         LASSERT (mlen <= rlen);
1001
1002         /* It must be OK to kmap() if required */
1003         LASSERT (kiov == NULL || !in_interrupt ());
1004         /* Either all pages or all vaddrs */
1005         LASSERT (!(kiov != NULL && iov != NULL));
1006         
1007         if (mlen != 0)
1008         {
1009                 page     = 0;
1010                 page_ptr = ((char *) page_address(krx->krx_pages[0])) +
1011                         KQSW_HDR_SIZE;
1012                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1013
1014                 LASSERT (niov > 0);
1015                 if (kiov != NULL) {
1016                         iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1017                         iov_nob = kiov->kiov_len;
1018                 } else {
1019                         iov_ptr = iov->iov_base;
1020                         iov_nob = iov->iov_len;
1021                 }
1022
1023                 for (;;)
1024                 {
1025                         /* We expect the iov to exactly match mlen */
1026                         LASSERT (iov_nob <= mlen);
1027                         
1028                         frag = MIN (page_nob, iov_nob);
1029                         memcpy (iov_ptr, page_ptr, frag);
1030 #if KQSW_CHECKSUM
1031                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1032                         csum_nob += frag;
1033                         csum_frags++;
1034 #endif
1035                         mlen -= frag;
1036                         if (mlen == 0)
1037                                 break;
1038
1039                         page_nob -= frag;
1040                         if (page_nob != 0)
1041                                 page_ptr += frag;
1042                         else
1043                         {
1044                                 page++;
1045                                 LASSERT (page < krx->krx_npages);
1046                                 page_ptr = page_address(krx->krx_pages[page]);
1047                                 page_nob = PAGE_SIZE;
1048                         }
1049
1050                         iov_nob -= frag;
1051                         if (iov_nob != 0)
1052                                 iov_ptr += frag;
1053                         else if (kiov != NULL) {
1054                                 kunmap (kiov->kiov_page);
1055                                 kiov++;
1056                                 niov--;
1057                                 LASSERT (niov > 0);
1058                                 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1059                                 iov_nob = kiov->kiov_len;
1060                         } else {
1061                                 iov++;
1062                                 niov--;
1063                                 LASSERT (niov > 0);
1064                                 iov_ptr = iov->iov_base;
1065                                 iov_nob = iov->iov_len;
1066                         }
1067                 }
1068
1069                 if (kiov != NULL)
1070                         kunmap (kiov->kiov_page);
1071         }
1072
1073 #if KQSW_CHECKSUM
1074         memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
1075                 sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
1076
1077         if (csum_len != rlen)
1078                 CERROR("Unable to checksum data in user's buffer\n");
1079         else if (senders_csum != payload_csum)
1080                 kqswnal_csum_error (krx, 0);
1081
1082         if (csum_verbose)
1083                 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1084                        "csum_nob %d\n",
1085                         hdr_csum, payload_csum, csum_frags, csum_nob);
1086 #endif
1087         lib_finalize(nal, private, cookie);
1088
1089         kqswnal_requeue_rx (krx);
1090
1091         return (rlen);
1092 }
1093
1094 static int
1095 kqswnal_recv(nal_cb_t     *nal,
1096              void         *private,
1097              lib_msg_t    *cookie,
1098              unsigned int  niov,
1099              struct iovec *iov,
1100              size_t        mlen,
1101              size_t        rlen)
1102 {
1103         return (kqswnal_recvmsg (nal, private, cookie, niov, iov, NULL, mlen, rlen));
1104 }
1105
1106 static int
1107 kqswnal_recv_pages (nal_cb_t     *nal,
1108                     void         *private,
1109                     lib_msg_t    *cookie,
1110                     unsigned int  niov,
1111                     ptl_kiov_t   *kiov,
1112                     size_t        mlen,
1113                     size_t        rlen)
1114 {
1115         return (kqswnal_recvmsg (nal, private, cookie, niov, NULL, kiov, mlen, rlen));
1116 }
1117
1118 int
1119 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1120 {
1121         long    pid = kernel_thread (fn, arg, 0);
1122
1123         if (pid < 0)
1124                 return ((int)pid);
1125
1126         atomic_inc (&kqswnal_data.kqn_nthreads);
1127         return (0);
1128 }
1129
1130 void
1131 kqswnal_thread_fini (void)
1132 {
1133         atomic_dec (&kqswnal_data.kqn_nthreads);
1134 }
1135
1136 int
1137 kqswnal_scheduler (void *arg)
1138 {
1139         kqswnal_rx_t    *krx;
1140         kqswnal_tx_t    *ktx;
1141         kpr_fwd_desc_t  *fwd;
1142         long             flags;
1143         int              rc;
1144         int              counter = 0;
1145         int              did_something;
1146
1147         kportal_daemonize ("kqswnal_sched");
1148         kportal_blockallsigs ();
1149         
1150         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1151
1152         while (!kqswnal_data.kqn_shuttingdown)
1153         {
1154                 did_something = FALSE;
1155
1156                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1157                 {
1158                         krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1159                                          kqswnal_rx_t, krx_list);
1160                         list_del (&krx->krx_list);
1161                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1162                                                flags);
1163
1164                         kqswnal_rx (krx);
1165
1166                         did_something = TRUE;
1167                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1168                 }
1169
1170                 if (!list_empty (&kqswnal_data.kqn_delayedtxds))
1171                 {
1172                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1173                                          kqswnal_tx_t, ktx_list);
1174                         list_del_init (&ktx->ktx_delayed_list);
1175                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1176                                                flags);
1177
1178                         rc = kqswnal_launch (ktx);
1179                         if (rc != 0)          /* failed: ktx_nid down? */
1180                         {
1181                                 CERROR("Failed delayed transmit to "LPX64
1182                                        ": %d\n", ktx->ktx_nid, rc);
1183                                 kqswnal_tx_done (ktx, rc);
1184                         }
1185
1186                         did_something = TRUE;
1187                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1188                 }
1189
1190                 if (!list_empty (&kqswnal_data.kqn_delayedfwds))
1191                 {
1192                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1193                         list_del (&fwd->kprfd_list);
1194                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1195
1196                         kqswnal_fwd_packet (NULL, fwd);
1197
1198                         did_something = TRUE;
1199                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1200                 }
1201
1202                     /* nothing to do or hogging CPU */
1203                 if (!did_something || counter++ == KQSW_RESCHED) {
1204                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1205                                                flags);
1206
1207                         counter = 0;
1208
1209                         if (!did_something) {
1210                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1211                                                                kqswnal_data.kqn_shuttingdown ||
1212                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
1213                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1214                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
1215                                 LASSERT (rc == 0);
1216                         } else if (current->need_resched)
1217                                 schedule ();
1218
1219                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1220                 }
1221         }
1222
1223         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1224
1225         kqswnal_thread_fini ();
1226         return (0);
1227 }
1228
1229 nal_cb_t kqswnal_lib =
1230 {
1231         nal_data:       &kqswnal_data,         /* NAL private data */
1232         cb_send:        kqswnal_send,
1233         cb_send_pages:  kqswnal_send_pages,
1234         cb_recv:        kqswnal_recv,
1235         cb_recv_pages:  kqswnal_recv_pages,
1236         cb_read:        kqswnal_read,
1237         cb_write:       kqswnal_write,
1238         cb_malloc:      kqswnal_malloc,
1239         cb_free:        kqswnal_free,
1240         cb_printf:      kqswnal_printf,
1241         cb_cli:         kqswnal_cli,
1242         cb_sti:         kqswnal_sti,
1243         cb_dist:        kqswnal_dist
1244 };