Whamcloud - gitweb
* lctl set_route <nid> <up/down> enables or disables particular portals
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8  * W. Marcus Miller - Based on ksocknal
9  *
10  * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  * Portals is free software; you can redistribute it and/or
13  * modify it under the terms of version 2 of the GNU General Public
14  * License as published by the Free Software Foundation.
15  *
16  * Portals is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with Portals; if not, write to the Free Software
23  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  */
26
27 #include "qswnal.h"
28
29 atomic_t kqswnal_packets_launched;
30 atomic_t kqswnal_packets_transmitted;
31 atomic_t kqswnal_packets_received;
32
33
34 /*
35  *  LIB functions follow
36  *
37  */
38 static int
39 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
40              size_t len)
41 {
42         CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
43                 nal->ni.nid, len, src_addr, dst_addr );
44         memcpy( dst_addr, src_addr, len );
45
46         return (0);
47 }
48
49 static int
50 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
51               size_t len)
52 {
53         CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
54                 nal->ni.nid, len, src_addr, dst_addr );
55         memcpy( dst_addr, src_addr, len );
56
57         return (0);
58 }
59
60 static void *
61 kqswnal_malloc(nal_cb_t *nal, size_t len)
62 {
63         void *buf;
64
65         PORTAL_ALLOC(buf, len);
66         return (buf);
67 }
68
69 static void
70 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
71 {
72         PORTAL_FREE(buf, len);
73 }
74
75 static void
76 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
77 {
78         va_list ap;
79         char msg[256];
80
81         va_start (ap, fmt);
82         vsnprintf (msg, sizeof (msg), fmt, ap);        /* sprint safely */
83         va_end (ap);
84
85         msg[sizeof (msg) - 1] = 0;                /* ensure terminated */
86
87         CDEBUG (D_NET, "%s", msg);
88 }
89
90
91 static void
92 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
93 {
94         kqswnal_data_t *data= nal->nal_data;
95
96         spin_lock_irqsave(&data->kqn_statelock, *flags);
97 }
98
99
100 static void
101 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
102 {
103         kqswnal_data_t *data= nal->nal_data;
104
105         spin_unlock_irqrestore(&data->kqn_statelock, *flags);
106 }
107
108
109 static int
110 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
111 {
112         if (nid == nal->ni.nid)
113                 *dist = 0;                      /* it's me */
114         else if (kqswnal_nid2elanid (nid) >= 0)
115                 *dist = 1;                      /* it's my peer */
116         else
117                 *dist = 2;                      /* via router */
118         return (0);
119 }
120
121 void
122 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
123 {
124         if (ktx->ktx_nmappedpages == 0)
125                 return;
126
127         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
128                 ktx, ktx->ktx_niov, ktx->ktx_basepage, ktx->ktx_nmappedpages);
129
130         LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
131         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
132                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
133
134         elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
135                           kqswnal_data.kqn_eptxdmahandle,
136                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
137         ktx->ktx_nmappedpages = 0;
138 }
139
140 int
141 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
142 {
143         int       nfrags    = ktx->ktx_niov;
144         const int maxfrags  = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]);
145         int       nmapped   = ktx->ktx_nmappedpages;
146         int       maxmapped = ktx->ktx_npages;
147         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
148         char     *ptr;
149         
150         LASSERT (nmapped <= maxmapped);
151         LASSERT (nfrags <= maxfrags);
152         LASSERT (niov > 0);
153         LASSERT (nob > 0);
154         
155         do {
156                 int  fraglen = kiov->kiov_len;
157
158                 /* nob exactly spans the iovs */
159                 LASSERT (fraglen <= nob);
160                 /* each frag fits in a page */
161                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
162
163                 nmapped++;
164                 if (nmapped > maxmapped) {
165                         CERROR("Can't map message in %d pages (max %d)\n",
166                                nmapped, maxmapped);
167                         return (-EMSGSIZE);
168                 }
169
170                 if (nfrags == maxfrags) {
171                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
172                                maxfrags);
173                         return (-EMSGSIZE);
174                 }
175
176                 /* XXX this is really crap, but we'll have to kmap until
177                  * EKC has a page (rather than vaddr) mapping interface */
178
179                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
180
181                 CDEBUG(D_NET,
182                        "%p[%d] loading %p for %d, page %d, %d total\n",
183                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
184
185                 elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
186                                        kqswnal_data.kqn_eptxdmahandle,
187                                        ptr, fraglen,
188                                        basepage, &ktx->ktx_iov[nfrags].Base);
189
190                 kunmap (kiov->kiov_page);
191                 
192                 /* keep in loop for failure case */
193                 ktx->ktx_nmappedpages = nmapped;
194
195                 if (nfrags > 0 &&                /* previous frag mapped */
196                     ktx->ktx_iov[nfrags].Base == /* contiguous with this one */
197                     (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len))
198                         /* just extend previous */
199                         ktx->ktx_iov[nfrags - 1].Len += fraglen;
200                 else {
201                         ktx->ktx_iov[nfrags].Len = fraglen;
202                         nfrags++;                /* new frag */
203                 }
204
205                 basepage++;
206                 kiov++;
207                 niov--;
208                 nob -= fraglen;
209
210                 /* iov must not run out before end of data */
211                 LASSERT (nob == 0 || niov > 0);
212
213         } while (nob > 0);
214
215         ktx->ktx_niov = nfrags;
216         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
217                 ktx, ktx->ktx_niov, ktx->ktx_nmappedpages);
218
219         return (0);
220 }
221
222 int
223 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
224 {
225         int       nfrags    = ktx->ktx_niov;
226         const int maxfrags  = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]);
227         int       nmapped   = ktx->ktx_nmappedpages;
228         int       maxmapped = ktx->ktx_npages;
229         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
230
231         LASSERT (nmapped <= maxmapped);
232         LASSERT (nfrags <= maxfrags);
233         LASSERT (niov > 0);
234         LASSERT (nob > 0);
235
236         do {
237                 int  fraglen = iov->iov_len;
238                 long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
239
240                 /* nob exactly spans the iovs */
241                 LASSERT (fraglen <= nob);
242                 
243                 nmapped += npages;
244                 if (nmapped > maxmapped) {
245                         CERROR("Can't map message in %d pages (max %d)\n",
246                                nmapped, maxmapped);
247                         return (-EMSGSIZE);
248                 }
249
250                 if (nfrags == maxfrags) {
251                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
252                                maxfrags);
253                         return (-EMSGSIZE);
254                 }
255
256                 CDEBUG(D_NET,
257                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
258                         ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
259                         nmapped);
260
261                 elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
262                                        kqswnal_data.kqn_eptxdmahandle,
263                                        iov->iov_base, fraglen,
264                                        basepage, &ktx->ktx_iov[nfrags].Base);
265                 /* keep in loop for failure case */
266                 ktx->ktx_nmappedpages = nmapped;
267
268                 if (nfrags > 0 &&                /* previous frag mapped */
269                     ktx->ktx_iov[nfrags].Base == /* contiguous with this one */
270                     (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len))
271                         /* just extend previous */
272                         ktx->ktx_iov[nfrags - 1].Len += fraglen;
273                 else {
274                         ktx->ktx_iov[nfrags].Len = fraglen;
275                         nfrags++;                /* new frag */
276                 }
277
278                 basepage += npages;
279                 iov++;
280                 niov--;
281                 nob -= fraglen;
282
283                 /* iov must not run out before end of data */
284                 LASSERT (nob == 0 || niov > 0);
285
286         } while (nob > 0);
287
288         ktx->ktx_niov = nfrags;
289         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
290                 ktx, ktx->ktx_niov, ktx->ktx_nmappedpages);
291
292         return (0);
293 }
294
295 void
296 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
297 {
298         kpr_fwd_desc_t   *fwd = NULL;
299         unsigned long     flags;
300
301         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
302
303         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
304
305         list_del (&ktx->ktx_list);              /* take off active list */
306
307         if (ktx->ktx_isnblk) {
308                 /* reserved for non-blocking tx */
309                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
310                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
311                 return;
312         }
313
314         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
315
316         /* anything blocking for a tx descriptor? */
317         if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
318         {
319                 CDEBUG(D_NET,"wakeup fwd\n");
320
321                 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
322                                   kpr_fwd_desc_t, kprfd_list);
323                 list_del (&fwd->kprfd_list);
324         }
325
326         if (waitqueue_active (&kqswnal_data.kqn_idletxd_waitq))  /* process? */
327         {
328                 /* local sender waiting for tx desc */
329                 CDEBUG(D_NET,"wakeup process\n");
330                 wake_up (&kqswnal_data.kqn_idletxd_waitq);
331         }
332
333         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
334
335         if (fwd == NULL)
336                 return;
337
338         /* schedule packet for forwarding again */
339         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
340
341         list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
342         if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
343                 wake_up (&kqswnal_data.kqn_sched_waitq);
344
345         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
346 }
347
348 kqswnal_tx_t *
349 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
350 {
351         unsigned long  flags;
352         kqswnal_tx_t  *ktx = NULL;
353
354         for (;;) {
355                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
356
357                 /* "normal" descriptor is free */
358                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
359                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
360                                           kqswnal_tx_t, ktx_list);
361                         break;
362                 }
363
364                 /* "normal" descriptor pool is empty */
365
366                 if (fwd != NULL) { /* forwarded packet => queue for idle txd */
367                         CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
368                         list_add_tail (&fwd->kprfd_list,
369                                        &kqswnal_data.kqn_idletxd_fwdq);
370                         break;
371                 }
372
373                 /* doing a local transmit */
374                 if (!may_block) {
375                         if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
376                                 CERROR ("intr tx desc pool exhausted\n");
377                                 break;
378                         }
379
380                         ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
381                                           kqswnal_tx_t, ktx_list);
382                         break;
383                 }
384
385                 /* block for idle tx */
386
387                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
388
389                 CDEBUG (D_NET, "blocking for tx desc\n");
390                 wait_event (kqswnal_data.kqn_idletxd_waitq,
391                             !list_empty (&kqswnal_data.kqn_idletxds));
392         }
393
394         if (ktx != NULL) {
395                 list_del (&ktx->ktx_list);
396                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
397                 ktx->ktx_launcher = current->pid;
398         }
399
400         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
401
402         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
403         LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
404         return (ktx);
405 }
406
407 void
408 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
409 {
410         if (ktx->ktx_forwarding)                /* router asked me to forward this packet */
411                 kpr_fwd_done (&kqswnal_data.kqn_router,
412                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
413         else                                    /* packet sourced locally */
414                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
415                               (lib_msg_t *)ktx->ktx_args[1]);
416
417         kqswnal_put_idle_tx (ktx);
418 }
419
420 static void
421 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
422 {
423         kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
424         struct timeval     now;
425         time_t             then;
426         
427         LASSERT (txd != NULL);
428         LASSERT (ktx != NULL);
429
430         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
431
432         if (status == EP_SUCCESS)
433                 atomic_inc (&kqswnal_packets_transmitted);
434
435         if (status != EP_SUCCESS)
436         {
437                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
438                         ktx->ktx_nid, status);
439
440                 do_gettimeofday (&now);
441                 then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
442         
443                 kpr_notify (&kqswnal_data.kqn_router, 
444                             ktx->ktx_nid, 0, then);
445
446                 status = -EIO;
447         }
448
449         kqswnal_tx_done (ktx, status);
450 }
451
452 int
453 kqswnal_launch (kqswnal_tx_t *ktx)
454 {
455         /* Don't block for transmit descriptor if we're in interrupt context */
456         int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
457         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
458         long  flags;
459         int   rc;
460
461         ktx->ktx_launchtime = jiffies;
462
463         LASSERT (dest >= 0);                    /* must be a peer */
464         rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
465                                ktx->ktx_port, attr, kqswnal_txhandler,
466                                ktx, ktx->ktx_iov, ktx->ktx_niov);
467         switch (rc) {
468         case 0: /* success */
469                 atomic_inc (&kqswnal_packets_launched);
470                 return (0);
471
472         case ENOMEM: /* can't allocate ep txd => queue for later */
473                 LASSERT (in_interrupt());
474
475                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
476
477                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
478                 if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
479                         wake_up (&kqswnal_data.kqn_sched_waitq);
480
481                 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
482                 return (0);
483
484         default: /* fatal error */
485                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
486
487                 /* Tell router I think a node is down */
488                 kpr_notify (&kqswnal_data.kqn_router, ktx->ktx_nid,
489                             0, ktx->ktx_launchtime);
490                 return (rc);
491         }
492 }
493
494 static char *
495 hdr_type_string (ptl_hdr_t *hdr)
496 {
497         switch (hdr->type) {
498         case PTL_MSG_ACK:
499                 return ("ACK");
500         case PTL_MSG_PUT:
501                 return ("PUT");
502         case PTL_MSG_GET:
503                 return ("GET");
504         case PTL_MSG_REPLY:
505                 return ("REPLY");
506         default:
507                 return ("<UNKNOWN>");
508         }
509 }
510
511 static void
512 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
513 {
514         char *type_str = hdr_type_string (hdr);
515
516         CERROR("P3 Header at %p of type %s\n", hdr, type_str);
517         CERROR("    From nid/pid "LPU64"/%u", NTOH__u64(hdr->src_nid),
518                NTOH__u32(hdr->src_pid));
519         CERROR("    To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
520                NTOH__u32(hdr->dest_pid));
521
522         switch (NTOH__u32(hdr->type)) {
523         case PTL_MSG_PUT:
524                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
525                        "match bits "LPX64"\n",
526                        NTOH__u32 (hdr->msg.put.ptl_index),
527                        hdr->msg.put.ack_wmd.wh_interface_cookie,
528                        hdr->msg.put.ack_wmd.wh_object_cookie,
529                        NTOH__u64 (hdr->msg.put.match_bits));
530                 CERROR("    Length %d, offset %d, hdr data "LPX64"\n",
531                        NTOH__u32(PTL_HDR_LENGTH(hdr)),
532                        NTOH__u32(hdr->msg.put.offset),
533                        hdr->msg.put.hdr_data);
534                 break;
535
536         case PTL_MSG_GET:
537                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
538                        "match bits "LPX64"\n",
539                        NTOH__u32 (hdr->msg.get.ptl_index),
540                        hdr->msg.get.return_wmd.wh_interface_cookie,
541                        hdr->msg.get.return_wmd.wh_object_cookie,
542                        hdr->msg.get.match_bits);
543                 CERROR("    Length %d, src offset %d\n",
544                        NTOH__u32 (hdr->msg.get.sink_length),
545                        NTOH__u32 (hdr->msg.get.src_offset));
546                 break;
547
548         case PTL_MSG_ACK:
549                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
550                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
551                        hdr->msg.ack.dst_wmd.wh_object_cookie,
552                        NTOH__u32 (hdr->msg.ack.mlength));
553                 break;
554
555         case PTL_MSG_REPLY:
556                 CERROR("    dst md "LPX64"."LPX64", length %d\n",
557                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
558                        hdr->msg.reply.dst_wmd.wh_object_cookie,
559                        NTOH__u32 (PTL_HDR_LENGTH(hdr)));
560         }
561
562 }                               /* end of print_hdr() */
563
564 static int
565 kqswnal_sendmsg (nal_cb_t     *nal,
566                  void         *private,
567                  lib_msg_t    *cookie,
568                  ptl_hdr_t    *hdr,
569                  int           type,
570                  ptl_nid_t     nid,
571                  ptl_pid_t     pid,
572                  unsigned int  payload_niov,
573                  struct iovec *payload_iov,
574                  ptl_kiov_t   *payload_kiov,
575                  size_t        payload_nob)
576 {
577         kqswnal_tx_t      *ktx;
578         int                rc;
579         ptl_nid_t          gatewaynid;
580 #if KQSW_CHECKSUM
581         int                i;
582         kqsw_csum_t        csum;
583         int                sumnob;
584 #endif
585         
586         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
587                " pid %u\n", payload_nob, payload_niov, nid, pid);
588
589         LASSERT (payload_nob == 0 || payload_niov > 0);
590         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
591
592         /* It must be OK to kmap() if required */
593         LASSERT (payload_kiov == NULL || !in_interrupt ());
594         /* payload is either all vaddrs or all pages */
595         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
596         
597         if (payload_nob > KQSW_MAXPAYLOAD) {
598                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
599                         payload_nob, KQSW_MAXPAYLOAD);
600                 return (PTL_FAIL);
601         }
602
603         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
604                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
605                                  sizeof (ptl_hdr_t) + payload_nob, &gatewaynid);
606                 if (rc != 0) {
607                         CERROR("Can't route to "LPX64": router error %d\n",
608                                nid, rc);
609                         return (PTL_FAIL);
610                 }
611                 if (kqswnal_nid2elanid (gatewaynid) < 0) {
612                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
613                                gatewaynid, nid);
614                         return (PTL_FAIL);
615                 }
616                 nid = gatewaynid;
617         }
618
619         /* I may not block for a transmit descriptor if I might block the
620          * receiver, or an interrupt handler. */
621         ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
622                                           type == PTL_MSG_REPLY ||
623                                           in_interrupt()));
624         if (ktx == NULL) {
625                 kqswnal_cerror_hdr (hdr);
626                 return (PTL_NOSPACE);
627         }
628
629         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
630         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
631
632 #if KQSW_CHECKSUM
633         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
634         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
635         for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) {
636                 if (payload_kiov != NULL) {
637                         ptl_kiov_t *kiov = &payload_kiov[i];
638                         char       *addr = ((char *)kmap (kiov->kiov_page)) +
639                                            kiov->kiov_offset;
640                         
641                         csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len));
642                         sumnob -= kiov->kiov_len;
643                 } else {
644                         struct iovec *iov = &payload_iov[i];
645
646                         csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len));
647                         sumnob -= iov->iov_len;
648                 }
649         }
650         memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
651 #endif
652
653         /* Set up first frag from pre-mapped buffer (it's at least the
654          * portals header) */
655         ktx->ktx_iov[0].Base = ktx->ktx_ebuffer;
656         ktx->ktx_iov[0].Len = KQSW_HDR_SIZE;
657         ktx->ktx_niov = 1;
658
659         if (payload_nob > 0) { /* got some payload (something more to do) */
660                 /* make a single contiguous message? */
661                 if (payload_nob <= KQSW_TX_MAXCONTIG) {
662                         /* copy payload to ktx_buffer, immediately after hdr */
663                         if (payload_kiov != NULL)
664                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
665                                                    payload_niov, payload_kiov, payload_nob);
666                         else
667                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
668                                                   payload_niov, payload_iov, payload_nob);
669                         /* first frag includes payload */
670                         ktx->ktx_iov[0].Len += payload_nob;
671                 } else {
672                         if (payload_kiov != NULL)
673                                 rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
674                                                           payload_niov, payload_kiov);
675                         else
676                                 rc = kqswnal_map_tx_iov (ktx, payload_nob,
677                                                          payload_niov, payload_iov);
678                         if (rc != 0) {
679                                 kqswnal_put_idle_tx (ktx);
680                                 return (PTL_FAIL);
681                         }
682                 } 
683         }
684
685         ktx->ktx_port       = (payload_nob <= KQSW_SMALLPAYLOAD) ?
686                               EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
687         ktx->ktx_nid        = nid;
688         ktx->ktx_forwarding = 0;   /* => lib_finalize() on completion */
689         ktx->ktx_args[0]    = private;
690         ktx->ktx_args[1]    = cookie;
691
692         rc = kqswnal_launch (ktx);
693         if (rc != 0) {                    /* failed? */
694                 CERROR ("Failed to send packet to "LPX64": %d\n", nid, rc);
695                 kqswnal_put_idle_tx (ktx);
696                 return (PTL_FAIL);
697         }
698
699         CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64"\n", payload_nob, nid);
700         return (PTL_OK);
701 }
702
703 static int
704 kqswnal_send (nal_cb_t     *nal,
705               void         *private,
706               lib_msg_t    *cookie,
707               ptl_hdr_t    *hdr,
708               int           type,
709               ptl_nid_t     nid,
710               ptl_pid_t     pid,
711               unsigned int  payload_niov,
712               struct iovec *payload_iov,
713               size_t        payload_nob)
714 {
715         return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid,
716                                  payload_niov, payload_iov, NULL, payload_nob));
717 }
718
719 static int
720 kqswnal_send_pages (nal_cb_t     *nal,
721                     void         *private,
722                     lib_msg_t    *cookie,
723                     ptl_hdr_t    *hdr,
724                     int           type,
725                     ptl_nid_t     nid,
726                     ptl_pid_t     pid,
727                     unsigned int  payload_niov,
728                     ptl_kiov_t   *payload_kiov,
729                     size_t        payload_nob)
730 {
731         return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid,
732                                  payload_niov, NULL, payload_kiov, payload_nob));
733 }
734
735 int kqswnal_fwd_copy_contig = 0;
736
737 void
738 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
739 {
740         int             rc;
741         kqswnal_tx_t   *ktx;
742         struct iovec   *iov = fwd->kprfd_iov;
743         int             niov = fwd->kprfd_niov;
744         int             nob = fwd->kprfd_nob;
745         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
746
747 #if KQSW_CHECKSUM
748         CERROR ("checksums for forwarded packets not implemented\n");
749         LBUG ();
750 #endif
751         /* The router wants this NAL to forward a packet */
752         CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
753                 fwd, nid, niov, nob);
754
755         LASSERT (niov > 0);
756         
757         ktx = kqswnal_get_idle_tx (fwd, FALSE);
758         if (ktx == NULL)        /* can't get txd right now */
759                 return;         /* fwd will be scheduled when tx desc freed */
760
761         if (nid == kqswnal_lib.ni.nid)          /* gateway is me */
762                 nid = fwd->kprfd_target_nid;    /* target is final dest */
763
764         if (kqswnal_nid2elanid (nid) < 0) {
765                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
766                 rc = -EHOSTUNREACH;
767                 goto failed;
768         }
769
770         if (nob > KQSW_NRXMSGBYTES_LARGE) {
771                 CERROR ("Can't forward [%p] to "LPX64
772                         ": size %d bigger than max packet size %ld\n",
773                         fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
774                 rc = -EMSGSIZE;
775                 goto failed;
776         }
777
778         if ((kqswnal_fwd_copy_contig || niov > 1) &&
779             nob <= KQSW_TX_BUFFER_SIZE) 
780         {
781                 /* send from ktx's pre-allocated/mapped contiguous buffer? */
782                 lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
783                 ktx->ktx_iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
784                 ktx->ktx_iov[0].Len = nob;
785                 ktx->ktx_niov = 1;
786
787                 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
788         }
789         else
790         {
791                 /* zero copy */
792                 ktx->ktx_niov = 0;        /* no frags mapped yet */
793                 rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
794                 if (rc != 0)
795                         goto failed;
796
797                 ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
798         }
799
800         ktx->ktx_port       = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
801                               EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
802         ktx->ktx_nid        = nid;
803         ktx->ktx_forwarding = 1;
804         ktx->ktx_args[0]    = fwd;
805
806         rc = kqswnal_launch (ktx);
807         if (rc == 0)
808                 return;
809
810  failed:
811         LASSERT (rc != 0);
812         CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
813
814         kqswnal_put_idle_tx (ktx);
815         /* complete now (with failure) */
816         kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
817 }
818
819 void
820 kqswnal_fwd_callback (void *arg, int error)
821 {
822         kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
823
824         /* The router has finished forwarding this packet */
825
826         if (error != 0)
827         {
828                 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
829
830                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
831                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
832         }
833
834         kqswnal_requeue_rx (krx);
835 }
836
837 void
838 kqswnal_rx (kqswnal_rx_t *krx)
839 {
840         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
841         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
842         int             nob;
843         int             niov;
844
845         if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
846                 /* NB krx requeued when lib_parse() calls back kqswnal_recv */
847                 lib_parse (&kqswnal_lib, hdr, krx);
848                 return;
849         }
850
851 #if KQSW_CHECKSUM
852         CERROR ("checksums for forwarded packets not implemented\n");
853         LBUG ();
854 #endif
855         if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
856         {
857                 CERROR("dropping packet from "LPX64" for "LPX64
858                        ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
859                 kqswnal_requeue_rx (krx);
860                 return;
861         }
862
863         /* NB forwarding may destroy iov; rebuild every time */
864         for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
865         {
866                 LASSERT (niov < krx->krx_npages);
867                 krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
868                 krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
869         }
870
871         kpr_fwd_init (&krx->krx_fwd, dest_nid,
872                       krx->krx_nob, niov, krx->krx_iov,
873                       kqswnal_fwd_callback, krx);
874
875         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
876 }
877
878 /* Receive Interrupt Handler: posts to schedulers */
879 void 
880 kqswnal_rxhandler(EP_RXD *rxd)
881 {
882         long          flags;
883         int           nob    = ep_rxd_len (rxd);
884         int           status = ep_rxd_status (rxd);
885         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
886
887         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
888                rxd, krx, nob, status);
889
890         LASSERT (krx != NULL);
891
892         krx->krx_rxd = rxd;
893         krx->krx_nob = nob;
894
895         /* must receive a whole header to be able to parse */
896         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
897         {
898                 /* receives complete with failure when receiver is removed */
899                 if (kqswnal_data.kqn_shuttingdown)
900                         return;
901
902                 CERROR("receive status failed with status %d nob %d\n",
903                        ep_rxd_status(rxd), nob);
904                 kqswnal_requeue_rx (krx);
905                 return;
906         }
907
908         atomic_inc (&kqswnal_packets_received);
909
910         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
911
912         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
913         if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
914                 wake_up (&kqswnal_data.kqn_sched_waitq);
915
916         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
917 }
918
919 #if KQSW_CHECKSUM
920 void
921 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
922 {
923         ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
924
925         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
926                 ", dpid %d, spid %d, type %d\n",
927                 ishdr ? "Header" : "Payload", krx,
928                 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
929                 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
930                 NTOH__u32(hdr->type));
931
932         switch (NTOH__u32 (hdr->type))
933         {
934         case PTL_MSG_ACK:
935                 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
936                        " len %u\n",
937                        NTOH__u32(hdr->msg.ack.mlength),
938                        hdr->msg.ack.dst_wmd.handle_cookie,
939                        hdr->msg.ack.dst_wmd.handle_idx,
940                        NTOH__u64(hdr->msg.ack.match_bits),
941                        NTOH__u32(hdr->msg.ack.length));
942                 break;
943         case PTL_MSG_PUT:
944                 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
945                        " len %u off %u data "LPX64"\n",
946                        NTOH__u32(hdr->msg.put.ptl_index),
947                        hdr->msg.put.ack_wmd.handle_cookie,
948                        hdr->msg.put.ack_wmd.handle_idx,
949                        NTOH__u64(hdr->msg.put.match_bits),
950                        NTOH__u32(hdr->msg.put.length),
951                        NTOH__u32(hdr->msg.put.offset),
952                        hdr->msg.put.hdr_data);
953                 break;
954         case PTL_MSG_GET:
955                 CERROR ("GET: <>\n");
956                 break;
957         case PTL_MSG_REPLY:
958                 CERROR ("REPLY: <>\n");
959                 break;
960         default:
961                 CERROR ("TYPE?: <>\n");
962         }
963 }
964 #endif
965
966 static int
967 kqswnal_recvmsg (nal_cb_t     *nal,
968                  void         *private,
969                  lib_msg_t    *cookie,
970                  unsigned int  niov,
971                  struct iovec *iov,
972                  ptl_kiov_t   *kiov,
973                  size_t        mlen,
974                  size_t        rlen)
975 {
976         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
977         int           page;
978         char         *page_ptr;
979         int           page_nob;
980         char         *iov_ptr;
981         int           iov_nob;
982         int           frag;
983 #if KQSW_CHECKSUM
984         kqsw_csum_t   senders_csum;
985         kqsw_csum_t   payload_csum = 0;
986         kqsw_csum_t   hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
987                                            sizeof(ptl_hdr_t));
988         size_t        csum_len = mlen;
989         int           csum_frags = 0;
990         int           csum_nob = 0;
991         static atomic_t csum_counter;
992         int           csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
993
994         atomic_inc (&csum_counter);
995
996         memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
997                                 sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
998         if (senders_csum != hdr_csum)
999                 kqswnal_csum_error (krx, 1);
1000 #endif
1001         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1002
1003         /* What was actually received must be >= payload.
1004          * This is an LASSERT, as lib_finalize() doesn't have a completion status. */
1005         LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen);
1006         LASSERT (mlen <= rlen);
1007
1008         /* It must be OK to kmap() if required */
1009         LASSERT (kiov == NULL || !in_interrupt ());
1010         /* Either all pages or all vaddrs */
1011         LASSERT (!(kiov != NULL && iov != NULL));
1012         
1013         if (mlen != 0)
1014         {
1015                 page     = 0;
1016                 page_ptr = ((char *) page_address(krx->krx_pages[0])) +
1017                         KQSW_HDR_SIZE;
1018                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1019
1020                 LASSERT (niov > 0);
1021                 if (kiov != NULL) {
1022                         iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1023                         iov_nob = kiov->kiov_len;
1024                 } else {
1025                         iov_ptr = iov->iov_base;
1026                         iov_nob = iov->iov_len;
1027                 }
1028
1029                 for (;;)
1030                 {
1031                         /* We expect the iov to exactly match mlen */
1032                         LASSERT (iov_nob <= mlen);
1033                         
1034                         frag = MIN (page_nob, iov_nob);
1035                         memcpy (iov_ptr, page_ptr, frag);
1036 #if KQSW_CHECKSUM
1037                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1038                         csum_nob += frag;
1039                         csum_frags++;
1040 #endif
1041                         mlen -= frag;
1042                         if (mlen == 0)
1043                                 break;
1044
1045                         page_nob -= frag;
1046                         if (page_nob != 0)
1047                                 page_ptr += frag;
1048                         else
1049                         {
1050                                 page++;
1051                                 LASSERT (page < krx->krx_npages);
1052                                 page_ptr = page_address(krx->krx_pages[page]);
1053                                 page_nob = PAGE_SIZE;
1054                         }
1055
1056                         iov_nob -= frag;
1057                         if (iov_nob != 0)
1058                                 iov_ptr += frag;
1059                         else if (kiov != NULL) {
1060                                 kunmap (kiov->kiov_page);
1061                                 kiov++;
1062                                 niov--;
1063                                 LASSERT (niov > 0);
1064                                 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1065                                 iov_nob = kiov->kiov_len;
1066                         } else {
1067                                 iov++;
1068                                 niov--;
1069                                 LASSERT (niov > 0);
1070                                 iov_ptr = iov->iov_base;
1071                                 iov_nob = iov->iov_len;
1072                         }
1073                 }
1074
1075                 if (kiov != NULL)
1076                         kunmap (kiov->kiov_page);
1077         }
1078
1079 #if KQSW_CHECKSUM
1080         memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
1081                 sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
1082
1083         if (csum_len != rlen)
1084                 CERROR("Unable to checksum data in user's buffer\n");
1085         else if (senders_csum != payload_csum)
1086                 kqswnal_csum_error (krx, 0);
1087
1088         if (csum_verbose)
1089                 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1090                        "csum_nob %d\n",
1091                         hdr_csum, payload_csum, csum_frags, csum_nob);
1092 #endif
1093         lib_finalize(nal, private, cookie);
1094
1095         kqswnal_requeue_rx (krx);
1096
1097         return (rlen);
1098 }
1099
1100 static int
1101 kqswnal_recv(nal_cb_t     *nal,
1102              void         *private,
1103              lib_msg_t    *cookie,
1104              unsigned int  niov,
1105              struct iovec *iov,
1106              size_t        mlen,
1107              size_t        rlen)
1108 {
1109         return (kqswnal_recvmsg (nal, private, cookie, niov, iov, NULL, mlen, rlen));
1110 }
1111
1112 static int
1113 kqswnal_recv_pages (nal_cb_t     *nal,
1114                     void         *private,
1115                     lib_msg_t    *cookie,
1116                     unsigned int  niov,
1117                     ptl_kiov_t   *kiov,
1118                     size_t        mlen,
1119                     size_t        rlen)
1120 {
1121         return (kqswnal_recvmsg (nal, private, cookie, niov, NULL, kiov, mlen, rlen));
1122 }
1123
1124 int
1125 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1126 {
1127         long    pid = kernel_thread (fn, arg, 0);
1128
1129         if (pid < 0)
1130                 return ((int)pid);
1131
1132         atomic_inc (&kqswnal_data.kqn_nthreads);
1133         return (0);
1134 }
1135
1136 void
1137 kqswnal_thread_fini (void)
1138 {
1139         atomic_dec (&kqswnal_data.kqn_nthreads);
1140 }
1141
1142 int
1143 kqswnal_scheduler (void *arg)
1144 {
1145         kqswnal_rx_t    *krx;
1146         kqswnal_tx_t    *ktx;
1147         kpr_fwd_desc_t  *fwd;
1148         long             flags;
1149         int              rc;
1150         int              counter = 0;
1151         int              did_something;
1152
1153         kportal_daemonize ("kqswnal_sched");
1154         kportal_blockallsigs ();
1155         
1156         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1157
1158         while (!kqswnal_data.kqn_shuttingdown)
1159         {
1160                 did_something = FALSE;
1161
1162                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1163                 {
1164                         krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1165                                          kqswnal_rx_t, krx_list);
1166                         list_del (&krx->krx_list);
1167                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1168                                                flags);
1169
1170                         kqswnal_rx (krx);
1171
1172                         did_something = TRUE;
1173                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1174                 }
1175
1176                 if (!list_empty (&kqswnal_data.kqn_delayedtxds))
1177                 {
1178                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1179                                          kqswnal_tx_t, ktx_list);
1180                         list_del_init (&ktx->ktx_delayed_list);
1181                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1182                                                flags);
1183
1184                         rc = kqswnal_launch (ktx);
1185                         if (rc != 0)          /* failed: ktx_nid down? */
1186                         {
1187                                 CERROR("Failed delayed transmit to "LPX64
1188                                        ": %d\n", ktx->ktx_nid, rc);
1189                                 kqswnal_tx_done (ktx, rc);
1190                         }
1191
1192                         did_something = TRUE;
1193                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1194                 }
1195
1196                 if (!list_empty (&kqswnal_data.kqn_delayedfwds))
1197                 {
1198                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1199                         list_del (&fwd->kprfd_list);
1200                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1201
1202                         kqswnal_fwd_packet (NULL, fwd);
1203
1204                         did_something = TRUE;
1205                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1206                 }
1207
1208                     /* nothing to do or hogging CPU */
1209                 if (!did_something || counter++ == KQSW_RESCHED) {
1210                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1211                                                flags);
1212
1213                         counter = 0;
1214
1215                         if (!did_something) {
1216                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1217                                                                kqswnal_data.kqn_shuttingdown ||
1218                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
1219                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1220                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
1221                                 LASSERT (rc == 0);
1222                         } else if (current->need_resched)
1223                                 schedule ();
1224
1225                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1226                 }
1227         }
1228
1229         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1230
1231         kqswnal_thread_fini ();
1232         return (0);
1233 }
1234
1235 nal_cb_t kqswnal_lib =
1236 {
1237         nal_data:       &kqswnal_data,         /* NAL private data */
1238         cb_send:        kqswnal_send,
1239         cb_send_pages:  kqswnal_send_pages,
1240         cb_recv:        kqswnal_recv,
1241         cb_recv_pages:  kqswnal_recv_pages,
1242         cb_read:        kqswnal_read,
1243         cb_write:       kqswnal_write,
1244         cb_malloc:      kqswnal_malloc,
1245         cb_free:        kqswnal_free,
1246         cb_printf:      kqswnal_printf,
1247         cb_cli:         kqswnal_cli,
1248         cb_sti:         kqswnal_sti,
1249         cb_dist:        kqswnal_dist
1250 };