Whamcloud - gitweb
* Fixed qswnal peer death notification bug
[fs/lustre-release.git] / lustre / portals / knals / qswnal / qswnal_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8  * W. Marcus Miller - Based on ksocknal
9  *
10  * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  * Portals is free software; you can redistribute it and/or
13  * modify it under the terms of version 2 of the GNU General Public
14  * License as published by the Free Software Foundation.
15  *
16  * Portals is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with Portals; if not, write to the Free Software
23  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  */
26
27 #include "qswnal.h"
28
29 atomic_t kqswnal_packets_launched;
30 atomic_t kqswnal_packets_transmitted;
31 atomic_t kqswnal_packets_received;
32
33
34 /*
35  *  LIB functions follow
36  *
37  */
38 static int
39 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
40              size_t len)
41 {
42         CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
43                 nal->ni.nid, len, src_addr, dst_addr );
44         memcpy( dst_addr, src_addr, len );
45
46         return (0);
47 }
48
49 static int
50 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
51               size_t len)
52 {
53         CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
54                 nal->ni.nid, len, src_addr, dst_addr );
55         memcpy( dst_addr, src_addr, len );
56
57         return (0);
58 }
59
60 static void *
61 kqswnal_malloc(nal_cb_t *nal, size_t len)
62 {
63         void *buf;
64
65         PORTAL_ALLOC(buf, len);
66         return (buf);
67 }
68
69 static void
70 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
71 {
72         PORTAL_FREE(buf, len);
73 }
74
75 static void
76 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
77 {
78         va_list ap;
79         char msg[256];
80
81         va_start (ap, fmt);
82         vsnprintf (msg, sizeof (msg), fmt, ap);        /* sprint safely */
83         va_end (ap);
84
85         msg[sizeof (msg) - 1] = 0;                /* ensure terminated */
86
87         CDEBUG (D_NET, "%s", msg);
88 }
89
90
91 static void
92 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
93 {
94         kqswnal_data_t *data= nal->nal_data;
95
96         spin_lock_irqsave(&data->kqn_statelock, *flags);
97 }
98
99
100 static void
101 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
102 {
103         kqswnal_data_t *data= nal->nal_data;
104
105         spin_unlock_irqrestore(&data->kqn_statelock, *flags);
106 }
107
108
109 static int
110 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
111 {
112         if (nid == nal->ni.nid)
113                 *dist = 0;                      /* it's me */
114         else if (kqswnal_nid2elanid (nid) >= 0)
115                 *dist = 1;                      /* it's my peer */
116         else
117                 *dist = 2;                      /* via router */
118         return (0);
119 }
120
121 void
122 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
123 {
124         struct timeval     now;
125         time_t             then;
126
127         do_gettimeofday (&now);
128         then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
129
130         kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
131 }
132
133 void
134 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
135 {
136         if (ktx->ktx_nmappedpages == 0)
137                 return;
138
139         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
140                 ktx, ktx->ktx_niov, ktx->ktx_basepage, ktx->ktx_nmappedpages);
141
142         LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
143         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
144                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
145
146         elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
147                           kqswnal_data.kqn_eptxdmahandle,
148                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
149         ktx->ktx_nmappedpages = 0;
150 }
151
152 int
153 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
154 {
155         int       nfrags    = ktx->ktx_niov;
156         const int maxfrags  = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]);
157         int       nmapped   = ktx->ktx_nmappedpages;
158         int       maxmapped = ktx->ktx_npages;
159         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
160         char     *ptr;
161         
162         LASSERT (nmapped <= maxmapped);
163         LASSERT (nfrags <= maxfrags);
164         LASSERT (niov > 0);
165         LASSERT (nob > 0);
166         
167         do {
168                 int  fraglen = kiov->kiov_len;
169
170                 /* nob exactly spans the iovs */
171                 LASSERT (fraglen <= nob);
172                 /* each frag fits in a page */
173                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
174
175                 nmapped++;
176                 if (nmapped > maxmapped) {
177                         CERROR("Can't map message in %d pages (max %d)\n",
178                                nmapped, maxmapped);
179                         return (-EMSGSIZE);
180                 }
181
182                 if (nfrags == maxfrags) {
183                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
184                                maxfrags);
185                         return (-EMSGSIZE);
186                 }
187
188                 /* XXX this is really crap, but we'll have to kmap until
189                  * EKC has a page (rather than vaddr) mapping interface */
190
191                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
192
193                 CDEBUG(D_NET,
194                        "%p[%d] loading %p for %d, page %d, %d total\n",
195                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
196
197                 elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
198                                        kqswnal_data.kqn_eptxdmahandle,
199                                        ptr, fraglen,
200                                        basepage, &ktx->ktx_iov[nfrags].Base);
201
202                 kunmap (kiov->kiov_page);
203                 
204                 /* keep in loop for failure case */
205                 ktx->ktx_nmappedpages = nmapped;
206
207                 if (nfrags > 0 &&                /* previous frag mapped */
208                     ktx->ktx_iov[nfrags].Base == /* contiguous with this one */
209                     (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len))
210                         /* just extend previous */
211                         ktx->ktx_iov[nfrags - 1].Len += fraglen;
212                 else {
213                         ktx->ktx_iov[nfrags].Len = fraglen;
214                         nfrags++;                /* new frag */
215                 }
216
217                 basepage++;
218                 kiov++;
219                 niov--;
220                 nob -= fraglen;
221
222                 /* iov must not run out before end of data */
223                 LASSERT (nob == 0 || niov > 0);
224
225         } while (nob > 0);
226
227         ktx->ktx_niov = nfrags;
228         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
229                 ktx, ktx->ktx_niov, ktx->ktx_nmappedpages);
230
231         return (0);
232 }
233
234 int
235 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
236 {
237         int       nfrags    = ktx->ktx_niov;
238         const int maxfrags  = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]);
239         int       nmapped   = ktx->ktx_nmappedpages;
240         int       maxmapped = ktx->ktx_npages;
241         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
242
243         LASSERT (nmapped <= maxmapped);
244         LASSERT (nfrags <= maxfrags);
245         LASSERT (niov > 0);
246         LASSERT (nob > 0);
247
248         do {
249                 int  fraglen = iov->iov_len;
250                 long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
251
252                 /* nob exactly spans the iovs */
253                 LASSERT (fraglen <= nob);
254                 
255                 nmapped += npages;
256                 if (nmapped > maxmapped) {
257                         CERROR("Can't map message in %d pages (max %d)\n",
258                                nmapped, maxmapped);
259                         return (-EMSGSIZE);
260                 }
261
262                 if (nfrags == maxfrags) {
263                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
264                                maxfrags);
265                         return (-EMSGSIZE);
266                 }
267
268                 CDEBUG(D_NET,
269                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
270                         ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
271                         nmapped);
272
273                 elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
274                                        kqswnal_data.kqn_eptxdmahandle,
275                                        iov->iov_base, fraglen,
276                                        basepage, &ktx->ktx_iov[nfrags].Base);
277                 /* keep in loop for failure case */
278                 ktx->ktx_nmappedpages = nmapped;
279
280                 if (nfrags > 0 &&                /* previous frag mapped */
281                     ktx->ktx_iov[nfrags].Base == /* contiguous with this one */
282                     (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len))
283                         /* just extend previous */
284                         ktx->ktx_iov[nfrags - 1].Len += fraglen;
285                 else {
286                         ktx->ktx_iov[nfrags].Len = fraglen;
287                         nfrags++;                /* new frag */
288                 }
289
290                 basepage += npages;
291                 iov++;
292                 niov--;
293                 nob -= fraglen;
294
295                 /* iov must not run out before end of data */
296                 LASSERT (nob == 0 || niov > 0);
297
298         } while (nob > 0);
299
300         ktx->ktx_niov = nfrags;
301         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
302                 ktx, ktx->ktx_niov, ktx->ktx_nmappedpages);
303
304         return (0);
305 }
306
307 void
308 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
309 {
310         kpr_fwd_desc_t   *fwd = NULL;
311         unsigned long     flags;
312
313         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
314
315         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
316
317         list_del (&ktx->ktx_list);              /* take off active list */
318
319         if (ktx->ktx_isnblk) {
320                 /* reserved for non-blocking tx */
321                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
322                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
323                 return;
324         }
325
326         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
327
328         /* anything blocking for a tx descriptor? */
329         if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
330         {
331                 CDEBUG(D_NET,"wakeup fwd\n");
332
333                 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
334                                   kpr_fwd_desc_t, kprfd_list);
335                 list_del (&fwd->kprfd_list);
336         }
337
338         if (waitqueue_active (&kqswnal_data.kqn_idletxd_waitq))  /* process? */
339         {
340                 /* local sender waiting for tx desc */
341                 CDEBUG(D_NET,"wakeup process\n");
342                 wake_up (&kqswnal_data.kqn_idletxd_waitq);
343         }
344
345         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
346
347         if (fwd == NULL)
348                 return;
349
350         /* schedule packet for forwarding again */
351         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
352
353         list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
354         if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
355                 wake_up (&kqswnal_data.kqn_sched_waitq);
356
357         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
358 }
359
360 kqswnal_tx_t *
361 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
362 {
363         unsigned long  flags;
364         kqswnal_tx_t  *ktx = NULL;
365
366         for (;;) {
367                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
368
369                 /* "normal" descriptor is free */
370                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
371                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
372                                           kqswnal_tx_t, ktx_list);
373                         break;
374                 }
375
376                 /* "normal" descriptor pool is empty */
377
378                 if (fwd != NULL) { /* forwarded packet => queue for idle txd */
379                         CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
380                         list_add_tail (&fwd->kprfd_list,
381                                        &kqswnal_data.kqn_idletxd_fwdq);
382                         break;
383                 }
384
385                 /* doing a local transmit */
386                 if (!may_block) {
387                         if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
388                                 CERROR ("intr tx desc pool exhausted\n");
389                                 break;
390                         }
391
392                         ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
393                                           kqswnal_tx_t, ktx_list);
394                         break;
395                 }
396
397                 /* block for idle tx */
398
399                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
400
401                 CDEBUG (D_NET, "blocking for tx desc\n");
402                 wait_event (kqswnal_data.kqn_idletxd_waitq,
403                             !list_empty (&kqswnal_data.kqn_idletxds));
404         }
405
406         if (ktx != NULL) {
407                 list_del (&ktx->ktx_list);
408                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
409                 ktx->ktx_launcher = current->pid;
410         }
411
412         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
413
414         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
415         LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
416         return (ktx);
417 }
418
419 void
420 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
421 {
422         if (ktx->ktx_forwarding)                /* router asked me to forward this packet */
423                 kpr_fwd_done (&kqswnal_data.kqn_router,
424                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
425         else                                    /* packet sourced locally */
426                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
427                               (lib_msg_t *)ktx->ktx_args[1]);
428
429         kqswnal_put_idle_tx (ktx);
430 }
431
432 static void
433 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
434 {
435         kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
436         
437         LASSERT (txd != NULL);
438         LASSERT (ktx != NULL);
439
440         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
441
442         if (status == EP_SUCCESS)
443                 atomic_inc (&kqswnal_packets_transmitted);
444
445         if (status != EP_SUCCESS)
446         {
447                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
448                         ktx->ktx_nid, status);
449
450                 kqswnal_notify_peer_down(ktx);
451                 status = -EIO;
452         }
453
454         kqswnal_tx_done (ktx, status);
455 }
456
457 int
458 kqswnal_launch (kqswnal_tx_t *ktx)
459 {
460         /* Don't block for transmit descriptor if we're in interrupt context */
461         int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
462         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
463         long  flags;
464         int   rc;
465
466         ktx->ktx_launchtime = jiffies;
467
468         LASSERT (dest >= 0);                    /* must be a peer */
469         rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
470                                ktx->ktx_port, attr, kqswnal_txhandler,
471                                ktx, ktx->ktx_iov, ktx->ktx_niov);
472         switch (rc) {
473         case 0: /* success */
474                 atomic_inc (&kqswnal_packets_launched);
475                 return (0);
476
477         case ENOMEM: /* can't allocate ep txd => queue for later */
478                 LASSERT (in_interrupt());
479
480                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
481
482                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
483                 if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
484                         wake_up (&kqswnal_data.kqn_sched_waitq);
485
486                 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
487                 return (0);
488
489         default: /* fatal error */
490                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
491                 kqswnal_notify_peer_down(ktx);
492                 return (rc);
493         }
494 }
495
496 static char *
497 hdr_type_string (ptl_hdr_t *hdr)
498 {
499         switch (hdr->type) {
500         case PTL_MSG_ACK:
501                 return ("ACK");
502         case PTL_MSG_PUT:
503                 return ("PUT");
504         case PTL_MSG_GET:
505                 return ("GET");
506         case PTL_MSG_REPLY:
507                 return ("REPLY");
508         default:
509                 return ("<UNKNOWN>");
510         }
511 }
512
513 static void
514 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
515 {
516         char *type_str = hdr_type_string (hdr);
517
518         CERROR("P3 Header at %p of type %s\n", hdr, type_str);
519         CERROR("    From nid/pid "LPU64"/%u", NTOH__u64(hdr->src_nid),
520                NTOH__u32(hdr->src_pid));
521         CERROR("    To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
522                NTOH__u32(hdr->dest_pid));
523
524         switch (NTOH__u32(hdr->type)) {
525         case PTL_MSG_PUT:
526                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
527                        "match bits "LPX64"\n",
528                        NTOH__u32 (hdr->msg.put.ptl_index),
529                        hdr->msg.put.ack_wmd.wh_interface_cookie,
530                        hdr->msg.put.ack_wmd.wh_object_cookie,
531                        NTOH__u64 (hdr->msg.put.match_bits));
532                 CERROR("    Length %d, offset %d, hdr data "LPX64"\n",
533                        NTOH__u32(PTL_HDR_LENGTH(hdr)),
534                        NTOH__u32(hdr->msg.put.offset),
535                        hdr->msg.put.hdr_data);
536                 break;
537
538         case PTL_MSG_GET:
539                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
540                        "match bits "LPX64"\n",
541                        NTOH__u32 (hdr->msg.get.ptl_index),
542                        hdr->msg.get.return_wmd.wh_interface_cookie,
543                        hdr->msg.get.return_wmd.wh_object_cookie,
544                        hdr->msg.get.match_bits);
545                 CERROR("    Length %d, src offset %d\n",
546                        NTOH__u32 (hdr->msg.get.sink_length),
547                        NTOH__u32 (hdr->msg.get.src_offset));
548                 break;
549
550         case PTL_MSG_ACK:
551                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
552                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
553                        hdr->msg.ack.dst_wmd.wh_object_cookie,
554                        NTOH__u32 (hdr->msg.ack.mlength));
555                 break;
556
557         case PTL_MSG_REPLY:
558                 CERROR("    dst md "LPX64"."LPX64", length %d\n",
559                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
560                        hdr->msg.reply.dst_wmd.wh_object_cookie,
561                        NTOH__u32 (PTL_HDR_LENGTH(hdr)));
562         }
563
564 }                               /* end of print_hdr() */
565
566 static int
567 kqswnal_sendmsg (nal_cb_t     *nal,
568                  void         *private,
569                  lib_msg_t    *cookie,
570                  ptl_hdr_t    *hdr,
571                  int           type,
572                  ptl_nid_t     nid,
573                  ptl_pid_t     pid,
574                  unsigned int  payload_niov,
575                  struct iovec *payload_iov,
576                  ptl_kiov_t   *payload_kiov,
577                  size_t        payload_nob)
578 {
579         kqswnal_tx_t      *ktx;
580         int                rc;
581         ptl_nid_t          gatewaynid;
582 #if KQSW_CHECKSUM
583         int                i;
584         kqsw_csum_t        csum;
585         int                sumnob;
586 #endif
587         
588         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
589                " pid %u\n", payload_nob, payload_niov, nid, pid);
590
591         LASSERT (payload_nob == 0 || payload_niov > 0);
592         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
593
594         /* It must be OK to kmap() if required */
595         LASSERT (payload_kiov == NULL || !in_interrupt ());
596         /* payload is either all vaddrs or all pages */
597         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
598         
599         if (payload_nob > KQSW_MAXPAYLOAD) {
600                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
601                         payload_nob, KQSW_MAXPAYLOAD);
602                 return (PTL_FAIL);
603         }
604
605         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
606                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
607                                  sizeof (ptl_hdr_t) + payload_nob, &gatewaynid);
608                 if (rc != 0) {
609                         CERROR("Can't route to "LPX64": router error %d\n",
610                                nid, rc);
611                         return (PTL_FAIL);
612                 }
613                 if (kqswnal_nid2elanid (gatewaynid) < 0) {
614                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
615                                gatewaynid, nid);
616                         return (PTL_FAIL);
617                 }
618                 nid = gatewaynid;
619         }
620
621         /* I may not block for a transmit descriptor if I might block the
622          * receiver, or an interrupt handler. */
623         ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
624                                           type == PTL_MSG_REPLY ||
625                                           in_interrupt()));
626         if (ktx == NULL) {
627                 kqswnal_cerror_hdr (hdr);
628                 return (PTL_NOSPACE);
629         }
630
631         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
632         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
633
634 #if KQSW_CHECKSUM
635         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
636         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
637         for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) {
638                 if (payload_kiov != NULL) {
639                         ptl_kiov_t *kiov = &payload_kiov[i];
640                         char       *addr = ((char *)kmap (kiov->kiov_page)) +
641                                            kiov->kiov_offset;
642                         
643                         csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len));
644                         sumnob -= kiov->kiov_len;
645                 } else {
646                         struct iovec *iov = &payload_iov[i];
647
648                         csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len));
649                         sumnob -= iov->iov_len;
650                 }
651         }
652         memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
653 #endif
654
655         /* Set up first frag from pre-mapped buffer (it's at least the
656          * portals header) */
657         ktx->ktx_iov[0].Base = ktx->ktx_ebuffer;
658         ktx->ktx_iov[0].Len = KQSW_HDR_SIZE;
659         ktx->ktx_niov = 1;
660
661         if (payload_nob > 0) { /* got some payload (something more to do) */
662                 /* make a single contiguous message? */
663                 if (payload_nob <= KQSW_TX_MAXCONTIG) {
664                         /* copy payload to ktx_buffer, immediately after hdr */
665                         if (payload_kiov != NULL)
666                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
667                                                    payload_niov, payload_kiov, payload_nob);
668                         else
669                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
670                                                   payload_niov, payload_iov, payload_nob);
671                         /* first frag includes payload */
672                         ktx->ktx_iov[0].Len += payload_nob;
673                 } else {
674                         if (payload_kiov != NULL)
675                                 rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
676                                                           payload_niov, payload_kiov);
677                         else
678                                 rc = kqswnal_map_tx_iov (ktx, payload_nob,
679                                                          payload_niov, payload_iov);
680                         if (rc != 0) {
681                                 kqswnal_put_idle_tx (ktx);
682                                 return (PTL_FAIL);
683                         }
684                 } 
685         }
686
687         ktx->ktx_port       = (payload_nob <= KQSW_SMALLPAYLOAD) ?
688                               EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
689         ktx->ktx_nid        = nid;
690         ktx->ktx_forwarding = 0;   /* => lib_finalize() on completion */
691         ktx->ktx_args[0]    = private;
692         ktx->ktx_args[1]    = cookie;
693
694         rc = kqswnal_launch (ktx);
695         if (rc != 0) {                    /* failed? */
696                 CERROR ("Failed to send packet to "LPX64": %d\n", nid, rc);
697                 kqswnal_put_idle_tx (ktx);
698                 return (PTL_FAIL);
699         }
700
701         CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64"\n", payload_nob, nid);
702         return (PTL_OK);
703 }
704
705 static int
706 kqswnal_send (nal_cb_t     *nal,
707               void         *private,
708               lib_msg_t    *cookie,
709               ptl_hdr_t    *hdr,
710               int           type,
711               ptl_nid_t     nid,
712               ptl_pid_t     pid,
713               unsigned int  payload_niov,
714               struct iovec *payload_iov,
715               size_t        payload_nob)
716 {
717         return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid,
718                                  payload_niov, payload_iov, NULL, payload_nob));
719 }
720
721 static int
722 kqswnal_send_pages (nal_cb_t     *nal,
723                     void         *private,
724                     lib_msg_t    *cookie,
725                     ptl_hdr_t    *hdr,
726                     int           type,
727                     ptl_nid_t     nid,
728                     ptl_pid_t     pid,
729                     unsigned int  payload_niov,
730                     ptl_kiov_t   *payload_kiov,
731                     size_t        payload_nob)
732 {
733         return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid,
734                                  payload_niov, NULL, payload_kiov, payload_nob));
735 }
736
737 int kqswnal_fwd_copy_contig = 0;
738
739 void
740 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
741 {
742         int             rc;
743         kqswnal_tx_t   *ktx;
744         struct iovec   *iov = fwd->kprfd_iov;
745         int             niov = fwd->kprfd_niov;
746         int             nob = fwd->kprfd_nob;
747         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
748
749 #if KQSW_CHECKSUM
750         CERROR ("checksums for forwarded packets not implemented\n");
751         LBUG ();
752 #endif
753         /* The router wants this NAL to forward a packet */
754         CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
755                 fwd, nid, niov, nob);
756
757         LASSERT (niov > 0);
758         
759         ktx = kqswnal_get_idle_tx (fwd, FALSE);
760         if (ktx == NULL)        /* can't get txd right now */
761                 return;         /* fwd will be scheduled when tx desc freed */
762
763         if (nid == kqswnal_lib.ni.nid)          /* gateway is me */
764                 nid = fwd->kprfd_target_nid;    /* target is final dest */
765
766         if (kqswnal_nid2elanid (nid) < 0) {
767                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
768                 rc = -EHOSTUNREACH;
769                 goto failed;
770         }
771
772         if (nob > KQSW_NRXMSGBYTES_LARGE) {
773                 CERROR ("Can't forward [%p] to "LPX64
774                         ": size %d bigger than max packet size %ld\n",
775                         fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
776                 rc = -EMSGSIZE;
777                 goto failed;
778         }
779
780         if ((kqswnal_fwd_copy_contig || niov > 1) &&
781             nob <= KQSW_TX_BUFFER_SIZE) 
782         {
783                 /* send from ktx's pre-allocated/mapped contiguous buffer? */
784                 lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
785                 ktx->ktx_iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
786                 ktx->ktx_iov[0].Len = nob;
787                 ktx->ktx_niov = 1;
788
789                 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
790         }
791         else
792         {
793                 /* zero copy */
794                 ktx->ktx_niov = 0;        /* no frags mapped yet */
795                 rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
796                 if (rc != 0)
797                         goto failed;
798
799                 ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
800         }
801
802         ktx->ktx_port       = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
803                               EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
804         ktx->ktx_nid        = nid;
805         ktx->ktx_forwarding = 1;
806         ktx->ktx_args[0]    = fwd;
807
808         rc = kqswnal_launch (ktx);
809         if (rc == 0)
810                 return;
811
812  failed:
813         LASSERT (rc != 0);
814         CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
815
816         kqswnal_put_idle_tx (ktx);
817         /* complete now (with failure) */
818         kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
819 }
820
821 void
822 kqswnal_fwd_callback (void *arg, int error)
823 {
824         kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
825
826         /* The router has finished forwarding this packet */
827
828         if (error != 0)
829         {
830                 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
831
832                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
833                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
834         }
835
836         kqswnal_requeue_rx (krx);
837 }
838
839 void
840 kqswnal_rx (kqswnal_rx_t *krx)
841 {
842         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
843         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
844         int             nob;
845         int             niov;
846
847         if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
848                 /* NB krx requeued when lib_parse() calls back kqswnal_recv */
849                 lib_parse (&kqswnal_lib, hdr, krx);
850                 return;
851         }
852
853 #if KQSW_CHECKSUM
854         CERROR ("checksums for forwarded packets not implemented\n");
855         LBUG ();
856 #endif
857         if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
858         {
859                 CERROR("dropping packet from "LPX64" for "LPX64
860                        ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
861                 kqswnal_requeue_rx (krx);
862                 return;
863         }
864
865         /* NB forwarding may destroy iov; rebuild every time */
866         for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
867         {
868                 LASSERT (niov < krx->krx_npages);
869                 krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
870                 krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
871         }
872
873         kpr_fwd_init (&krx->krx_fwd, dest_nid,
874                       krx->krx_nob, niov, krx->krx_iov,
875                       kqswnal_fwd_callback, krx);
876
877         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
878 }
879
880 /* Receive Interrupt Handler: posts to schedulers */
881 void 
882 kqswnal_rxhandler(EP_RXD *rxd)
883 {
884         long          flags;
885         int           nob    = ep_rxd_len (rxd);
886         int           status = ep_rxd_status (rxd);
887         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
888
889         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
890                rxd, krx, nob, status);
891
892         LASSERT (krx != NULL);
893
894         krx->krx_rxd = rxd;
895         krx->krx_nob = nob;
896
897         /* must receive a whole header to be able to parse */
898         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
899         {
900                 /* receives complete with failure when receiver is removed */
901                 if (kqswnal_data.kqn_shuttingdown)
902                         return;
903
904                 CERROR("receive status failed with status %d nob %d\n",
905                        ep_rxd_status(rxd), nob);
906                 kqswnal_requeue_rx (krx);
907                 return;
908         }
909
910         atomic_inc (&kqswnal_packets_received);
911
912         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
913
914         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
915         if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
916                 wake_up (&kqswnal_data.kqn_sched_waitq);
917
918         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
919 }
920
921 #if KQSW_CHECKSUM
922 void
923 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
924 {
925         ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
926
927         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
928                 ", dpid %d, spid %d, type %d\n",
929                 ishdr ? "Header" : "Payload", krx,
930                 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
931                 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
932                 NTOH__u32(hdr->type));
933
934         switch (NTOH__u32 (hdr->type))
935         {
936         case PTL_MSG_ACK:
937                 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
938                        " len %u\n",
939                        NTOH__u32(hdr->msg.ack.mlength),
940                        hdr->msg.ack.dst_wmd.handle_cookie,
941                        hdr->msg.ack.dst_wmd.handle_idx,
942                        NTOH__u64(hdr->msg.ack.match_bits),
943                        NTOH__u32(hdr->msg.ack.length));
944                 break;
945         case PTL_MSG_PUT:
946                 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
947                        " len %u off %u data "LPX64"\n",
948                        NTOH__u32(hdr->msg.put.ptl_index),
949                        hdr->msg.put.ack_wmd.handle_cookie,
950                        hdr->msg.put.ack_wmd.handle_idx,
951                        NTOH__u64(hdr->msg.put.match_bits),
952                        NTOH__u32(hdr->msg.put.length),
953                        NTOH__u32(hdr->msg.put.offset),
954                        hdr->msg.put.hdr_data);
955                 break;
956         case PTL_MSG_GET:
957                 CERROR ("GET: <>\n");
958                 break;
959         case PTL_MSG_REPLY:
960                 CERROR ("REPLY: <>\n");
961                 break;
962         default:
963                 CERROR ("TYPE?: <>\n");
964         }
965 }
966 #endif
967
968 static int
969 kqswnal_recvmsg (nal_cb_t     *nal,
970                  void         *private,
971                  lib_msg_t    *cookie,
972                  unsigned int  niov,
973                  struct iovec *iov,
974                  ptl_kiov_t   *kiov,
975                  size_t        mlen,
976                  size_t        rlen)
977 {
978         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
979         int           page;
980         char         *page_ptr;
981         int           page_nob;
982         char         *iov_ptr;
983         int           iov_nob;
984         int           frag;
985 #if KQSW_CHECKSUM
986         kqsw_csum_t   senders_csum;
987         kqsw_csum_t   payload_csum = 0;
988         kqsw_csum_t   hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
989                                            sizeof(ptl_hdr_t));
990         size_t        csum_len = mlen;
991         int           csum_frags = 0;
992         int           csum_nob = 0;
993         static atomic_t csum_counter;
994         int           csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
995
996         atomic_inc (&csum_counter);
997
998         memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
999                                 sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1000         if (senders_csum != hdr_csum)
1001                 kqswnal_csum_error (krx, 1);
1002 #endif
1003         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1004
1005         /* What was actually received must be >= payload.
1006          * This is an LASSERT, as lib_finalize() doesn't have a completion status. */
1007         LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen);
1008         LASSERT (mlen <= rlen);
1009
1010         /* It must be OK to kmap() if required */
1011         LASSERT (kiov == NULL || !in_interrupt ());
1012         /* Either all pages or all vaddrs */
1013         LASSERT (!(kiov != NULL && iov != NULL));
1014         
1015         if (mlen != 0)
1016         {
1017                 page     = 0;
1018                 page_ptr = ((char *) page_address(krx->krx_pages[0])) +
1019                         KQSW_HDR_SIZE;
1020                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1021
1022                 LASSERT (niov > 0);
1023                 if (kiov != NULL) {
1024                         iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1025                         iov_nob = kiov->kiov_len;
1026                 } else {
1027                         iov_ptr = iov->iov_base;
1028                         iov_nob = iov->iov_len;
1029                 }
1030
1031                 for (;;)
1032                 {
1033                         /* We expect the iov to exactly match mlen */
1034                         LASSERT (iov_nob <= mlen);
1035                         
1036                         frag = MIN (page_nob, iov_nob);
1037                         memcpy (iov_ptr, page_ptr, frag);
1038 #if KQSW_CHECKSUM
1039                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1040                         csum_nob += frag;
1041                         csum_frags++;
1042 #endif
1043                         mlen -= frag;
1044                         if (mlen == 0)
1045                                 break;
1046
1047                         page_nob -= frag;
1048                         if (page_nob != 0)
1049                                 page_ptr += frag;
1050                         else
1051                         {
1052                                 page++;
1053                                 LASSERT (page < krx->krx_npages);
1054                                 page_ptr = page_address(krx->krx_pages[page]);
1055                                 page_nob = PAGE_SIZE;
1056                         }
1057
1058                         iov_nob -= frag;
1059                         if (iov_nob != 0)
1060                                 iov_ptr += frag;
1061                         else if (kiov != NULL) {
1062                                 kunmap (kiov->kiov_page);
1063                                 kiov++;
1064                                 niov--;
1065                                 LASSERT (niov > 0);
1066                                 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1067                                 iov_nob = kiov->kiov_len;
1068                         } else {
1069                                 iov++;
1070                                 niov--;
1071                                 LASSERT (niov > 0);
1072                                 iov_ptr = iov->iov_base;
1073                                 iov_nob = iov->iov_len;
1074                         }
1075                 }
1076
1077                 if (kiov != NULL)
1078                         kunmap (kiov->kiov_page);
1079         }
1080
1081 #if KQSW_CHECKSUM
1082         memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
1083                 sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
1084
1085         if (csum_len != rlen)
1086                 CERROR("Unable to checksum data in user's buffer\n");
1087         else if (senders_csum != payload_csum)
1088                 kqswnal_csum_error (krx, 0);
1089
1090         if (csum_verbose)
1091                 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1092                        "csum_nob %d\n",
1093                         hdr_csum, payload_csum, csum_frags, csum_nob);
1094 #endif
1095         lib_finalize(nal, private, cookie);
1096
1097         kqswnal_requeue_rx (krx);
1098
1099         return (rlen);
1100 }
1101
1102 static int
1103 kqswnal_recv(nal_cb_t     *nal,
1104              void         *private,
1105              lib_msg_t    *cookie,
1106              unsigned int  niov,
1107              struct iovec *iov,
1108              size_t        mlen,
1109              size_t        rlen)
1110 {
1111         return (kqswnal_recvmsg (nal, private, cookie, niov, iov, NULL, mlen, rlen));
1112 }
1113
1114 static int
1115 kqswnal_recv_pages (nal_cb_t     *nal,
1116                     void         *private,
1117                     lib_msg_t    *cookie,
1118                     unsigned int  niov,
1119                     ptl_kiov_t   *kiov,
1120                     size_t        mlen,
1121                     size_t        rlen)
1122 {
1123         return (kqswnal_recvmsg (nal, private, cookie, niov, NULL, kiov, mlen, rlen));
1124 }
1125
1126 int
1127 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1128 {
1129         long    pid = kernel_thread (fn, arg, 0);
1130
1131         if (pid < 0)
1132                 return ((int)pid);
1133
1134         atomic_inc (&kqswnal_data.kqn_nthreads);
1135         return (0);
1136 }
1137
1138 void
1139 kqswnal_thread_fini (void)
1140 {
1141         atomic_dec (&kqswnal_data.kqn_nthreads);
1142 }
1143
1144 int
1145 kqswnal_scheduler (void *arg)
1146 {
1147         kqswnal_rx_t    *krx;
1148         kqswnal_tx_t    *ktx;
1149         kpr_fwd_desc_t  *fwd;
1150         long             flags;
1151         int              rc;
1152         int              counter = 0;
1153         int              did_something;
1154
1155         kportal_daemonize ("kqswnal_sched");
1156         kportal_blockallsigs ();
1157         
1158         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1159
1160         while (!kqswnal_data.kqn_shuttingdown)
1161         {
1162                 did_something = FALSE;
1163
1164                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1165                 {
1166                         krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1167                                          kqswnal_rx_t, krx_list);
1168                         list_del (&krx->krx_list);
1169                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1170                                                flags);
1171
1172                         kqswnal_rx (krx);
1173
1174                         did_something = TRUE;
1175                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1176                 }
1177
1178                 if (!list_empty (&kqswnal_data.kqn_delayedtxds))
1179                 {
1180                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1181                                          kqswnal_tx_t, ktx_list);
1182                         list_del_init (&ktx->ktx_delayed_list);
1183                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1184                                                flags);
1185
1186                         rc = kqswnal_launch (ktx);
1187                         if (rc != 0)          /* failed: ktx_nid down? */
1188                         {
1189                                 CERROR("Failed delayed transmit to "LPX64
1190                                        ": %d\n", ktx->ktx_nid, rc);
1191                                 kqswnal_tx_done (ktx, rc);
1192                         }
1193
1194                         did_something = TRUE;
1195                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1196                 }
1197
1198                 if (!list_empty (&kqswnal_data.kqn_delayedfwds))
1199                 {
1200                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1201                         list_del (&fwd->kprfd_list);
1202                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1203
1204                         kqswnal_fwd_packet (NULL, fwd);
1205
1206                         did_something = TRUE;
1207                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1208                 }
1209
1210                     /* nothing to do or hogging CPU */
1211                 if (!did_something || counter++ == KQSW_RESCHED) {
1212                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1213                                                flags);
1214
1215                         counter = 0;
1216
1217                         if (!did_something) {
1218                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1219                                                                kqswnal_data.kqn_shuttingdown ||
1220                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
1221                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1222                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
1223                                 LASSERT (rc == 0);
1224                         } else if (current->need_resched)
1225                                 schedule ();
1226
1227                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1228                 }
1229         }
1230
1231         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1232
1233         kqswnal_thread_fini ();
1234         return (0);
1235 }
1236
1237 nal_cb_t kqswnal_lib =
1238 {
1239         nal_data:       &kqswnal_data,         /* NAL private data */
1240         cb_send:        kqswnal_send,
1241         cb_send_pages:  kqswnal_send_pages,
1242         cb_recv:        kqswnal_recv,
1243         cb_recv_pages:  kqswnal_recv_pages,
1244         cb_read:        kqswnal_read,
1245         cb_write:       kqswnal_write,
1246         cb_malloc:      kqswnal_malloc,
1247         cb_free:        kqswnal_free,
1248         cb_printf:      kqswnal_printf,
1249         cb_cli:         kqswnal_cli,
1250         cb_sti:         kqswnal_sti,
1251         cb_dist:        kqswnal_dist
1252 };