Whamcloud - gitweb
Land b1_2_smallfix onto b1_2 (20040616_1009)
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8  * W. Marcus Miller - Based on ksocknal
9  *
10  * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  * Portals is free software; you can redistribute it and/or
13  * modify it under the terms of version 2 of the GNU General Public
14  * License as published by the Free Software Foundation.
15  *
16  * Portals is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with Portals; if not, write to the Free Software
23  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  */
26
27 #include "qswnal.h"
28
29 EP_STATUSBLK  kqswnal_rpc_success;
30 EP_STATUSBLK  kqswnal_rpc_failed;
31
32 /*
33  *  LIB functions follow
34  *
35  */
36 static ptl_err_t
37 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
38              size_t len)
39 {
40         CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
41                 nal->ni.nid, len, src_addr, dst_addr );
42         memcpy( dst_addr, src_addr, len );
43
44         return (PTL_OK);
45 }
46
47 static ptl_err_t
48 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
49               size_t len)
50 {
51         CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
52                 nal->ni.nid, len, src_addr, dst_addr );
53         memcpy( dst_addr, src_addr, len );
54
55         return (PTL_OK);
56 }
57
58 static void *
59 kqswnal_malloc(nal_cb_t *nal, size_t len)
60 {
61         void *buf;
62
63         PORTAL_ALLOC(buf, len);
64         return (buf);
65 }
66
67 static void
68 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
69 {
70         PORTAL_FREE(buf, len);
71 }
72
73 static void
74 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
75 {
76         va_list ap;
77         char msg[256];
78
79         va_start (ap, fmt);
80         vsnprintf (msg, sizeof (msg), fmt, ap);        /* sprint safely */
81         va_end (ap);
82
83         msg[sizeof (msg) - 1] = 0;                /* ensure terminated */
84
85         CDEBUG (D_NET, "%s", msg);
86 }
87
88 #if (defined(CONFIG_SPARC32) || defined(CONFIG_SPARC64))
89 # error "Can't save/restore irq contexts in different procedures"
90 #endif
91
92 static void
93 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
94 {
95         kqswnal_data_t *data= nal->nal_data;
96
97         spin_lock_irqsave(&data->kqn_statelock, *flags);
98 }
99
100
101 static void
102 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
103 {
104         kqswnal_data_t *data= nal->nal_data;
105
106         spin_unlock_irqrestore(&data->kqn_statelock, *flags);
107 }
108
109
110 static int
111 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
112 {
113         if (nid == nal->ni.nid)
114                 *dist = 0;                      /* it's me */
115         else if (kqswnal_nid2elanid (nid) >= 0)
116                 *dist = 1;                      /* it's my peer */
117         else
118                 *dist = 2;                      /* via router */
119         return (0);
120 }
121
122 void
123 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
124 {
125         struct timeval     now;
126         time_t             then;
127
128         do_gettimeofday (&now);
129         then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
130
131         kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
132 }
133
134 void
135 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
136 {
137 #if MULTIRAIL_EKC
138         int      i;
139 #endif
140
141         if (ktx->ktx_nmappedpages == 0)
142                 return;
143         
144 #if MULTIRAIL_EKC
145         CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
146                ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
147
148         for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
149                 ep_dvma_unload(kqswnal_data.kqn_ep,
150                                kqswnal_data.kqn_ep_tx_nmh,
151                                &ktx->ktx_frags[i]);
152 #else
153         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
154                 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
155
156         LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
157         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
158                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
159
160         elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
161                           kqswnal_data.kqn_eptxdmahandle,
162                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
163 #endif
164         ktx->ktx_nmappedpages = 0;
165 }
166
167 int
168 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
169 {
170         int       nfrags    = ktx->ktx_nfrag;
171         int       nmapped   = ktx->ktx_nmappedpages;
172         int       maxmapped = ktx->ktx_npages;
173         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
174         char     *ptr;
175 #if MULTIRAIL_EKC
176         EP_RAILMASK railmask;
177         int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
178                                             EP_RAILMASK_ALL,
179                                             kqswnal_nid2elanid(ktx->ktx_nid));
180         
181         if (rail < 0) {
182                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
183                 return (-ENETDOWN);
184         }
185         railmask = 1 << rail;
186 #endif
187         LASSERT (nmapped <= maxmapped);
188         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
189         LASSERT (nfrags <= EP_MAXFRAG);
190         LASSERT (niov > 0);
191         LASSERT (nob > 0);
192
193         /* skip complete frags before 'offset' */
194         while (offset >= kiov->kiov_len) {
195                 offset -= kiov->kiov_len;
196                 kiov++;
197                 niov--;
198                 LASSERT (niov > 0);
199         }
200
201         do {
202                 int  fraglen = kiov->kiov_len - offset;
203
204                 /* nob exactly spans the iovs */
205                 LASSERT (fraglen <= nob);
206                 /* each frag fits in a page */
207                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
208
209                 nmapped++;
210                 if (nmapped > maxmapped) {
211                         CERROR("Can't map message in %d pages (max %d)\n",
212                                nmapped, maxmapped);
213                         return (-EMSGSIZE);
214                 }
215
216                 if (nfrags == EP_MAXFRAG) {
217                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
218                                EP_MAXFRAG);
219                         return (-EMSGSIZE);
220                 }
221
222                 /* XXX this is really crap, but we'll have to kmap until
223                  * EKC has a page (rather than vaddr) mapping interface */
224
225                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
226
227                 CDEBUG(D_NET,
228                        "%p[%d] loading %p for %d, page %d, %d total\n",
229                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
230
231 #if MULTIRAIL_EKC
232                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
233                              ptr, fraglen,
234                              kqswnal_data.kqn_ep_tx_nmh, basepage,
235                              &railmask, &ktx->ktx_frags[nfrags]);
236
237                 if (nfrags == ktx->ktx_firsttmpfrag ||
238                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
239                                   &ktx->ktx_frags[nfrags - 1],
240                                   &ktx->ktx_frags[nfrags])) {
241                         /* new frag if this is the first or can't merge */
242                         nfrags++;
243                 }
244 #else
245                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
246                                        kqswnal_data.kqn_eptxdmahandle,
247                                        ptr, fraglen,
248                                        basepage, &ktx->ktx_frags[nfrags].Base);
249
250                 if (nfrags > 0 &&                /* previous frag mapped */
251                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
252                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
253                         /* just extend previous */
254                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
255                 else {
256                         ktx->ktx_frags[nfrags].Len = fraglen;
257                         nfrags++;                /* new frag */
258                 }
259 #endif
260
261                 kunmap (kiov->kiov_page);
262                 
263                 /* keep in loop for failure case */
264                 ktx->ktx_nmappedpages = nmapped;
265
266                 basepage++;
267                 kiov++;
268                 niov--;
269                 nob -= fraglen;
270                 offset = 0;
271
272                 /* iov must not run out before end of data */
273                 LASSERT (nob == 0 || niov > 0);
274
275         } while (nob > 0);
276
277         ktx->ktx_nfrag = nfrags;
278         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
279                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
280
281         return (0);
282 }
283
284 int
285 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
286                     int niov, struct iovec *iov)
287 {
288         int       nfrags    = ktx->ktx_nfrag;
289         int       nmapped   = ktx->ktx_nmappedpages;
290         int       maxmapped = ktx->ktx_npages;
291         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
292 #if MULTIRAIL_EKC
293         EP_RAILMASK railmask;
294         int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
295                                             EP_RAILMASK_ALL,
296                                             kqswnal_nid2elanid(ktx->ktx_nid));
297         
298         if (rail < 0) {
299                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
300                 return (-ENETDOWN);
301         }
302         railmask = 1 << rail;
303 #endif
304         LASSERT (nmapped <= maxmapped);
305         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
306         LASSERT (nfrags <= EP_MAXFRAG);
307         LASSERT (niov > 0);
308         LASSERT (nob > 0);
309
310         /* skip complete frags before offset */
311         while (offset >= iov->iov_len) {
312                 offset -= iov->iov_len;
313                 iov++;
314                 niov--;
315                 LASSERT (niov > 0);
316         }
317         
318         do {
319                 int  fraglen = iov->iov_len - offset;
320                 long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
321
322                 /* nob exactly spans the iovs */
323                 LASSERT (fraglen <= nob);
324                 
325                 nmapped += npages;
326                 if (nmapped > maxmapped) {
327                         CERROR("Can't map message in %d pages (max %d)\n",
328                                nmapped, maxmapped);
329                         return (-EMSGSIZE);
330                 }
331
332                 if (nfrags == EP_MAXFRAG) {
333                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
334                                EP_MAXFRAG);
335                         return (-EMSGSIZE);
336                 }
337
338                 CDEBUG(D_NET,
339                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
340                        ktx, nfrags, iov->iov_base + offset, fraglen, 
341                        basepage, npages, nmapped);
342
343 #if MULTIRAIL_EKC
344                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
345                              iov->iov_base + offset, fraglen,
346                              kqswnal_data.kqn_ep_tx_nmh, basepage,
347                              &railmask, &ktx->ktx_frags[nfrags]);
348
349                 if (nfrags == ktx->ktx_firsttmpfrag ||
350                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
351                                   &ktx->ktx_frags[nfrags - 1],
352                                   &ktx->ktx_frags[nfrags])) {
353                         /* new frag if this is the first or can't merge */
354                         nfrags++;
355                 }
356 #else
357                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
358                                        kqswnal_data.kqn_eptxdmahandle,
359                                        iov->iov_base + offset, fraglen,
360                                        basepage, &ktx->ktx_frags[nfrags].Base);
361
362                 if (nfrags > 0 &&                /* previous frag mapped */
363                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
364                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
365                         /* just extend previous */
366                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
367                 else {
368                         ktx->ktx_frags[nfrags].Len = fraglen;
369                         nfrags++;                /* new frag */
370                 }
371 #endif
372
373                 /* keep in loop for failure case */
374                 ktx->ktx_nmappedpages = nmapped;
375
376                 basepage += npages;
377                 iov++;
378                 niov--;
379                 nob -= fraglen;
380                 offset = 0;
381
382                 /* iov must not run out before end of data */
383                 LASSERT (nob == 0 || niov > 0);
384
385         } while (nob > 0);
386
387         ktx->ktx_nfrag = nfrags;
388         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
389                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
390
391         return (0);
392 }
393
394
395 void
396 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
397 {
398         kpr_fwd_desc_t   *fwd = NULL;
399         unsigned long     flags;
400
401         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
402         ktx->ktx_state = KTX_IDLE;
403
404         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
405
406         list_del (&ktx->ktx_list);              /* take off active list */
407
408         if (ktx->ktx_isnblk) {
409                 /* reserved for non-blocking tx */
410                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
411                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
412                 return;
413         }
414
415         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
416
417         /* anything blocking for a tx descriptor? */
418         if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
419         {
420                 CDEBUG(D_NET,"wakeup fwd\n");
421
422                 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
423                                   kpr_fwd_desc_t, kprfd_list);
424                 list_del (&fwd->kprfd_list);
425         }
426
427         wake_up (&kqswnal_data.kqn_idletxd_waitq);
428
429         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
430
431         if (fwd == NULL)
432                 return;
433
434         /* schedule packet for forwarding again */
435         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
436
437         list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
438         wake_up (&kqswnal_data.kqn_sched_waitq);
439
440         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
441 }
442
443 kqswnal_tx_t *
444 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
445 {
446         unsigned long  flags;
447         kqswnal_tx_t  *ktx = NULL;
448
449         for (;;) {
450                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
451
452                 /* "normal" descriptor is free */
453                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
454                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
455                                           kqswnal_tx_t, ktx_list);
456                         break;
457                 }
458
459                 /* "normal" descriptor pool is empty */
460
461                 if (fwd != NULL) { /* forwarded packet => queue for idle txd */
462                         CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
463                         list_add_tail (&fwd->kprfd_list,
464                                        &kqswnal_data.kqn_idletxd_fwdq);
465                         break;
466                 }
467
468                 /* doing a local transmit */
469                 if (!may_block) {
470                         if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
471                                 CERROR ("intr tx desc pool exhausted\n");
472                                 break;
473                         }
474
475                         ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
476                                           kqswnal_tx_t, ktx_list);
477                         break;
478                 }
479
480                 /* block for idle tx */
481
482                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
483
484                 CDEBUG (D_NET, "blocking for tx desc\n");
485                 wait_event (kqswnal_data.kqn_idletxd_waitq,
486                             !list_empty (&kqswnal_data.kqn_idletxds));
487         }
488
489         if (ktx != NULL) {
490                 list_del (&ktx->ktx_list);
491                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
492                 ktx->ktx_launcher = current->pid;
493         }
494
495         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
496
497         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
498         LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
499
500         return (ktx);
501 }
502
503 void
504 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
505 {
506         lib_msg_t     *msg;
507         lib_msg_t     *repmsg = NULL;
508
509         switch (ktx->ktx_state) {
510         case KTX_FORWARDING:       /* router asked me to forward this packet */
511                 kpr_fwd_done (&kqswnal_data.kqn_router,
512                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
513                 break;
514
515         case KTX_SENDING:          /* packet sourced locally */
516                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
517                               (lib_msg_t *)ktx->ktx_args[1],
518                               (error == 0) ? PTL_OK : 
519                               (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
520                 break;
521
522         case KTX_GETTING:          /* Peer has DMA-ed direct? */
523                 msg = (lib_msg_t *)ktx->ktx_args[1];
524
525                 if (error == 0) {
526                         repmsg = lib_fake_reply_msg (&kqswnal_lib, 
527                                                      ktx->ktx_nid, msg->md);
528                         if (repmsg == NULL)
529                                 error = -ENOMEM;
530                 }
531                 
532                 if (error == 0) {
533                         lib_finalize (&kqswnal_lib, ktx->ktx_args[0], 
534                                       msg, PTL_OK);
535                         lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
536                 } else {
537                         lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
538                                       (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
539                 }
540                 break;
541
542         default:
543                 LASSERT (0);
544         }
545
546         kqswnal_put_idle_tx (ktx);
547 }
548
549 static void
550 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
551 {
552         kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
553
554         LASSERT (txd != NULL);
555         LASSERT (ktx != NULL);
556
557         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
558
559         if (status != EP_SUCCESS) {
560
561                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
562                         ktx->ktx_nid, status);
563
564                 kqswnal_notify_peer_down(ktx);
565                 status = -EHOSTDOWN;
566
567         } else if (ktx->ktx_state == KTX_GETTING) {
568                 /* RPC completed OK; what did our peer put in the status
569                  * block? */
570 #if MULTIRAIL_EKC
571                 status = ep_txd_statusblk(txd)->Data[0];
572 #else
573                 status = ep_txd_statusblk(txd)->Status;
574 #endif
575         } else {
576                 status = 0;
577         }
578
579         kqswnal_tx_done (ktx, status);
580 }
581
582 int
583 kqswnal_launch (kqswnal_tx_t *ktx)
584 {
585         /* Don't block for transmit descriptor if we're in interrupt context */
586         int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
587         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
588         unsigned long flags;
589         int   rc;
590
591         ktx->ktx_launchtime = jiffies;
592
593         LASSERT (dest >= 0);                    /* must be a peer */
594         if (ktx->ktx_state == KTX_GETTING) {
595                 /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
596                  * other frags are the GET sink which we obviously don't
597                  * send here :) */
598 #if MULTIRAIL_EKC
599                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
600                                      ktx->ktx_port, attr,
601                                      kqswnal_txhandler, ktx,
602                                      NULL, ktx->ktx_frags, 1);
603 #else
604                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
605                                      ktx->ktx_port, attr, kqswnal_txhandler,
606                                      ktx, NULL, ktx->ktx_frags, 1);
607 #endif
608         } else {
609 #if MULTIRAIL_EKC
610                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
611                                          ktx->ktx_port, attr,
612                                          kqswnal_txhandler, ktx,
613                                          NULL, ktx->ktx_frags, ktx->ktx_nfrag);
614 #else
615                 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
616                                        ktx->ktx_port, attr, 
617                                        kqswnal_txhandler, ktx, 
618                                        ktx->ktx_frags, ktx->ktx_nfrag);
619 #endif
620         }
621
622         switch (rc) {
623         case EP_SUCCESS: /* success */
624                 return (0);
625
626         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
627                 LASSERT (in_interrupt());
628
629                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
630
631                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
632                 wake_up (&kqswnal_data.kqn_sched_waitq);
633
634                 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
635                 return (0);
636
637         default: /* fatal error */
638                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
639                 kqswnal_notify_peer_down(ktx);
640                 return (-EHOSTUNREACH);
641         }
642 }
643
644 static char *
645 hdr_type_string (ptl_hdr_t *hdr)
646 {
647         switch (hdr->type) {
648         case PTL_MSG_ACK:
649                 return ("ACK");
650         case PTL_MSG_PUT:
651                 return ("PUT");
652         case PTL_MSG_GET:
653                 return ("GET");
654         case PTL_MSG_REPLY:
655                 return ("REPLY");
656         default:
657                 return ("<UNKNOWN>");
658         }
659 }
660
661 static void
662 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
663 {
664         char *type_str = hdr_type_string (hdr);
665
666         CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
667                NTOH__u32(hdr->payload_length));
668         CERROR("    From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
669                NTOH__u32(hdr->src_pid));
670         CERROR("    To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
671                NTOH__u32(hdr->dest_pid));
672
673         switch (NTOH__u32(hdr->type)) {
674         case PTL_MSG_PUT:
675                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
676                        "match bits "LPX64"\n",
677                        NTOH__u32 (hdr->msg.put.ptl_index),
678                        hdr->msg.put.ack_wmd.wh_interface_cookie,
679                        hdr->msg.put.ack_wmd.wh_object_cookie,
680                        NTOH__u64 (hdr->msg.put.match_bits));
681                 CERROR("    offset %d, hdr data "LPX64"\n",
682                        NTOH__u32(hdr->msg.put.offset),
683                        hdr->msg.put.hdr_data);
684                 break;
685
686         case PTL_MSG_GET:
687                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
688                        "match bits "LPX64"\n",
689                        NTOH__u32 (hdr->msg.get.ptl_index),
690                        hdr->msg.get.return_wmd.wh_interface_cookie,
691                        hdr->msg.get.return_wmd.wh_object_cookie,
692                        hdr->msg.get.match_bits);
693                 CERROR("    Length %d, src offset %d\n",
694                        NTOH__u32 (hdr->msg.get.sink_length),
695                        NTOH__u32 (hdr->msg.get.src_offset));
696                 break;
697
698         case PTL_MSG_ACK:
699                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
700                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
701                        hdr->msg.ack.dst_wmd.wh_object_cookie,
702                        NTOH__u32 (hdr->msg.ack.mlength));
703                 break;
704
705         case PTL_MSG_REPLY:
706                 CERROR("    dst md "LPX64"."LPX64"\n",
707                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
708                        hdr->msg.reply.dst_wmd.wh_object_cookie);
709         }
710
711 }                               /* end of print_hdr() */
712
713 #if !MULTIRAIL_EKC
714 void
715 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) 
716 {
717         int          i;
718
719         CDEBUG (how, "%s: %d\n", str, n);
720         for (i = 0; i < n; i++) {
721                 CDEBUG (how, "   %08x for %d\n", iov[i].Base, iov[i].Len);
722         }
723 }
724
725 int
726 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
727                      int nsrc, EP_IOVEC *src,
728                      int ndst, EP_IOVEC *dst) 
729 {
730         int        count;
731         int        nob;
732
733         LASSERT (ndv > 0);
734         LASSERT (nsrc > 0);
735         LASSERT (ndst > 0);
736
737         for (count = 0; count < ndv; count++, dv++) {
738
739                 if (nsrc == 0 || ndst == 0) {
740                         if (nsrc != ndst) {
741                                 /* For now I'll barf on any left over entries */
742                                 CERROR ("mismatched src and dst iovs\n");
743                                 return (-EINVAL);
744                         }
745                         return (count);
746                 }
747
748                 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
749                 dv->Len    = nob;
750                 dv->Source = src->Base;
751                 dv->Dest   = dst->Base;
752
753                 if (nob >= src->Len) {
754                         src++;
755                         nsrc--;
756                 } else {
757                         src->Len -= nob;
758                         src->Base += nob;
759                 }
760                 
761                 if (nob >= dst->Len) {
762                         dst++;
763                         ndst--;
764                 } else {
765                         src->Len -= nob;
766                         src->Base += nob;
767                 }
768         }
769
770         CERROR ("DATAVEC too small\n");
771         return (-E2BIG);
772 }
773 #endif
774
775 int
776 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
777                    struct iovec *iov, ptl_kiov_t *kiov, 
778                    int offset, int nob)
779 {
780         kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
781         char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
782         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
783         int                 rc;
784 #if MULTIRAIL_EKC
785         int                 i;
786 #else
787         EP_DATAVEC          datav[EP_MAXFRAG];
788         int                 ndatav;
789 #endif
790         LASSERT (krx->krx_rpc_reply_needed);
791         LASSERT ((iov == NULL) != (kiov == NULL));
792
793         /* see kqswnal_sendmsg comment regarding endian-ness */
794         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
795                 /* msg too small to discover rmd size */
796                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
797                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
798                 return (-EINVAL);
799         }
800         
801         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
802                 /* rmd doesn't fit in the incoming message */
803                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
804                         krx->krx_nob, rmd->kqrmd_nfrag,
805                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
806                 return (-EINVAL);
807         }
808
809         /* Map the source data... */
810         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
811         if (kiov != NULL)
812                 rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
813         else
814                 rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
815
816         if (rc != 0) {
817                 CERROR ("Can't map source data: %d\n", rc);
818                 return (rc);
819         }
820
821 #if MULTIRAIL_EKC
822         if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
823                 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
824                        ktx->ktx_nfrag, rmd->kqrmd_nfrag);
825                 return (-EINVAL);
826         }
827         
828         for (i = 0; i < rmd->kqrmd_nfrag; i++)
829                 if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
830                         CERROR("Can't cope with unequal frags %d(%d):"
831                                " %d local %d remote\n",
832                                i, rmd->kqrmd_nfrag, 
833                                ktx->ktx_frags[i].nmd_len, 
834                                rmd->kqrmd_frag[i].nmd_len);
835                         return (-EINVAL);
836                 }
837 #else
838         ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
839                                       ktx->ktx_nfrag, ktx->ktx_frags,
840                                       rmd->kqrmd_nfrag, rmd->kqrmd_frag);
841         if (ndatav < 0) {
842                 CERROR ("Can't create datavec: %d\n", ndatav);
843                 return (ndatav);
844         }
845 #endif
846
847         /* Our caller will start to race with kqswnal_dma_reply_complete... */
848         LASSERT (atomic_read (&krx->krx_refcount) == 1);
849         atomic_set (&krx->krx_refcount, 2);
850
851 #if MULTIRAIL_EKC
852         rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx, 
853                              &kqswnal_rpc_success,
854                              ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
855         if (rc == EP_SUCCESS)
856                 return (0);
857
858         /* Well we tried... */
859         krx->krx_rpc_reply_needed = 0;
860 #else
861         rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
862                               &kqswnal_rpc_success, datav, ndatav);
863         if (rc == EP_SUCCESS)
864                 return (0);
865
866         /* "old" EKC destroys rxd on failed completion */
867         krx->krx_rxd = NULL;
868 #endif
869
870         CERROR("can't complete RPC: %d\n", rc);
871
872         /* reset refcount back to 1: we're not going to be racing with
873          * kqswnal_dma_reply_complete. */
874         atomic_set (&krx->krx_refcount, 1);
875
876         return (-ECONNABORTED);
877 }
878
879 static ptl_err_t
880 kqswnal_sendmsg (nal_cb_t     *nal,
881                  void         *private,
882                  lib_msg_t    *libmsg,
883                  ptl_hdr_t    *hdr,
884                  int           type,
885                  ptl_nid_t     nid,
886                  ptl_pid_t     pid,
887                  unsigned int  payload_niov,
888                  struct iovec *payload_iov,
889                  ptl_kiov_t   *payload_kiov,
890                  size_t        payload_offset,
891                  size_t        payload_nob)
892 {
893         kqswnal_tx_t      *ktx;
894         int                rc;
895         ptl_nid_t          targetnid;
896 #if KQSW_CHECKSUM
897         int                i;
898         kqsw_csum_t        csum;
899         int                sumoff;
900         int                sumnob;
901 #endif
902         
903         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
904                " pid %u\n", payload_nob, payload_niov, nid, pid);
905
906         LASSERT (payload_nob == 0 || payload_niov > 0);
907         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
908
909         /* It must be OK to kmap() if required */
910         LASSERT (payload_kiov == NULL || !in_interrupt ());
911         /* payload is either all vaddrs or all pages */
912         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
913         
914         if (payload_nob > KQSW_MAXPAYLOAD) {
915                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
916                         payload_nob, KQSW_MAXPAYLOAD);
917                 return (PTL_FAIL);
918         }
919
920         targetnid = nid;
921         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
922                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
923                                  sizeof (ptl_hdr_t) + payload_nob, &targetnid);
924                 if (rc != 0) {
925                         CERROR("Can't route to "LPX64": router error %d\n",
926                                nid, rc);
927                         return (PTL_FAIL);
928                 }
929                 if (kqswnal_nid2elanid (targetnid) < 0) {
930                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
931                                targetnid, nid);
932                         return (PTL_FAIL);
933                 }
934         }
935
936         /* I may not block for a transmit descriptor if I might block the
937          * receiver, or an interrupt handler. */
938         ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
939                                           type == PTL_MSG_REPLY ||
940                                           in_interrupt()));
941         if (ktx == NULL) {
942                 kqswnal_cerror_hdr (hdr);
943                 return (PTL_NOSPACE);
944         }
945
946         ktx->ktx_nid     = targetnid;
947         ktx->ktx_args[0] = private;
948         ktx->ktx_args[1] = libmsg;
949
950         if (type == PTL_MSG_REPLY &&
951             ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
952                 if (nid != targetnid ||
953                     kqswnal_nid2elanid(nid) != 
954                     ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
955                         CERROR("Optimized reply nid conflict: "
956                                "nid "LPX64" via "LPX64" elanID %d\n",
957                                nid, targetnid,
958                                ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
959                         return (PTL_FAIL);
960                 }
961
962                 /* peer expects RPC completion with GET data */
963                 rc = kqswnal_dma_reply (ktx, payload_niov, 
964                                         payload_iov, payload_kiov, 
965                                         payload_offset, payload_nob);
966                 if (rc == 0)
967                         return (PTL_OK);
968                 
969                 CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
970                 kqswnal_put_idle_tx (ktx);
971                 return (PTL_FAIL);
972         }
973
974         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
975         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
976
977 #if KQSW_CHECKSUM
978         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
979         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
980         for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
981                 LASSERT(i < niov);
982                 if (payload_kiov != NULL) {
983                         ptl_kiov_t *kiov = &payload_kiov[i];
984
985                         if (sumoff >= kiov->kiov_len) {
986                                 sumoff -= kiov->kiov_len;
987                         } else {
988                                 char *addr = ((char *)kmap (kiov->kiov_page)) +
989                                              kiov->kiov_offset + sumoff;
990                                 int   fragnob = kiov->kiov_len - sumoff;
991
992                                 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
993                                 sumnob -= fragnob;
994                                 sumoff = 0;
995                                 kunmap(kiov->kiov_page);
996                         }
997                 } else {
998                         struct iovec *iov = &payload_iov[i];
999
1000                         if (sumoff > iov->iov_len) {
1001                                 sumoff -= iov->iov_len;
1002                         } else {
1003                                 char *addr = iov->iov_base + sumoff;
1004                                 int   fragnob = iov->iov_len - sumoff;
1005                                 
1006                                 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1007                                 sumnob -= fragnob;
1008                                 sumoff = 0;
1009                         }
1010                 }
1011         }
1012         memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
1013 #endif
1014
1015         if (kqswnal_data.kqn_optimized_gets &&
1016             type == PTL_MSG_GET &&              /* doing a GET */
1017             nid == targetnid) {                 /* not forwarding */
1018                 lib_md_t           *md = libmsg->md;
1019                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
1020                 
1021                 /* Optimised path: I send over the Elan vaddrs of the get
1022                  * sink buffers, and my peer DMAs directly into them.
1023                  *
1024                  * First I set up ktx as if it was going to send this
1025                  * payload, (it needs to map it anyway).  This fills
1026                  * ktx_frags[1] and onward with the network addresses
1027                  * of the GET sink frags.  I copy these into ktx_buffer,
1028                  * immediately after the header, and send that as my GET
1029                  * message.
1030                  *
1031                  * Note that the addresses are sent in native endian-ness.
1032                  * When EKC copes with different endian nodes, I'll fix
1033                  * this (and eat my hat :) */
1034
1035                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1036                 ktx->ktx_state = KTX_GETTING;
1037
1038                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
1039                         rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
1040                                                   md->md_niov, md->md_iov.kiov);
1041                 else
1042                         rc = kqswnal_map_tx_iov (ktx, 0, md->length,
1043                                                  md->md_niov, md->md_iov.iov);
1044
1045                 if (rc < 0) {
1046                         kqswnal_put_idle_tx (ktx);
1047                         return (PTL_FAIL);
1048                 }
1049
1050                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1051
1052                 payload_nob = offsetof(kqswnal_remotemd_t,
1053                                        kqrmd_frag[rmd->kqrmd_nfrag]);
1054                 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1055
1056 #if MULTIRAIL_EKC
1057                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1058                        rmd->kqrmd_nfrag * sizeof(EP_NMD));
1059
1060                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1061                               0, KQSW_HDR_SIZE + payload_nob);
1062 #else
1063                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1064                        rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1065                 
1066                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1067                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1068 #endif
1069         } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1070
1071                 /* small message: single frag copied into the pre-mapped buffer */
1072
1073                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1074                 ktx->ktx_state = KTX_SENDING;
1075 #if MULTIRAIL_EKC
1076                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1077                               0, KQSW_HDR_SIZE + payload_nob);
1078 #else
1079                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1080                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1081 #endif
1082                 if (payload_nob > 0) {
1083                         if (payload_kiov != NULL)
1084                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1085                                                    payload_niov, payload_kiov, 
1086                                                    payload_offset, payload_nob);
1087                         else
1088                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1089                                                   payload_niov, payload_iov, 
1090                                                   payload_offset, payload_nob);
1091                 }
1092         } else {
1093
1094                 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1095
1096                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1097                 ktx->ktx_state = KTX_SENDING;
1098 #if MULTIRAIL_EKC
1099                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1100                               0, KQSW_HDR_SIZE);
1101 #else
1102                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1103                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1104 #endif
1105                 if (payload_kiov != NULL)
1106                         rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
1107                                                   payload_niov, payload_kiov);
1108                 else
1109                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1110                                                  payload_niov, payload_iov);
1111                 if (rc != 0) {
1112                         kqswnal_put_idle_tx (ktx);
1113                         return (PTL_FAIL);
1114                 }
1115         }
1116         
1117         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1118                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1119
1120         rc = kqswnal_launch (ktx);
1121         if (rc != 0) {                    /* failed? */
1122                 CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
1123                 kqswnal_put_idle_tx (ktx);
1124                 return (PTL_FAIL);
1125         }
1126
1127         CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n", 
1128                payload_nob, nid, targetnid);
1129         return (PTL_OK);
1130 }
1131
1132 static ptl_err_t
1133 kqswnal_send (nal_cb_t     *nal,
1134               void         *private,
1135               lib_msg_t    *libmsg,
1136               ptl_hdr_t    *hdr,
1137               int           type,
1138               ptl_nid_t     nid,
1139               ptl_pid_t     pid,
1140               unsigned int  payload_niov,
1141               struct iovec *payload_iov,
1142               size_t        payload_offset,
1143               size_t        payload_nob)
1144 {
1145         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1146                                  payload_niov, payload_iov, NULL, 
1147                                  payload_offset, payload_nob));
1148 }
1149
1150 static ptl_err_t
1151 kqswnal_send_pages (nal_cb_t     *nal,
1152                     void         *private,
1153                     lib_msg_t    *libmsg,
1154                     ptl_hdr_t    *hdr,
1155                     int           type,
1156                     ptl_nid_t     nid,
1157                     ptl_pid_t     pid,
1158                     unsigned int  payload_niov,
1159                     ptl_kiov_t   *payload_kiov,
1160                     size_t        payload_offset,
1161                     size_t        payload_nob)
1162 {
1163         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1164                                  payload_niov, NULL, payload_kiov, 
1165                                  payload_offset, payload_nob));
1166 }
1167
1168 void
1169 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1170 {
1171         int             rc;
1172         kqswnal_tx_t   *ktx;
1173         ptl_kiov_t     *kiov = fwd->kprfd_kiov;
1174         int             niov = fwd->kprfd_niov;
1175         int             nob = fwd->kprfd_nob;
1176         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
1177
1178 #if KQSW_CHECKSUM
1179         CERROR ("checksums for forwarded packets not implemented\n");
1180         LBUG ();
1181 #endif
1182         /* The router wants this NAL to forward a packet */
1183         CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
1184                 fwd, nid, niov, nob);
1185
1186         ktx = kqswnal_get_idle_tx (fwd, 0);
1187         if (ktx == NULL)        /* can't get txd right now */
1188                 return;         /* fwd will be scheduled when tx desc freed */
1189
1190         if (nid == kqswnal_lib.ni.nid)          /* gateway is me */
1191                 nid = fwd->kprfd_target_nid;    /* target is final dest */
1192
1193         if (kqswnal_nid2elanid (nid) < 0) {
1194                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1195                 rc = -EHOSTUNREACH;
1196                 goto failed;
1197         }
1198
1199         /* copy hdr into pre-mapped buffer */
1200         memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
1201         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
1202
1203         ktx->ktx_port    = (nob <= KQSW_SMALLPAYLOAD) ?
1204                            EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1205         ktx->ktx_nid     = nid;
1206         ktx->ktx_state   = KTX_FORWARDING;
1207         ktx->ktx_args[0] = fwd;
1208         ktx->ktx_nfrag   = ktx->ktx_firsttmpfrag = 1;
1209
1210         if (nob <= KQSW_TX_MAXCONTIG) 
1211         {
1212                 /* send payload from ktx's pre-mapped contiguous buffer */
1213 #if MULTIRAIL_EKC
1214                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1215                               0, KQSW_HDR_SIZE + nob);
1216 #else
1217                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1218                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
1219 #endif
1220                 if (nob > 0)
1221                         lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
1222                                           niov, kiov, 0, nob);
1223         }
1224         else
1225         {
1226                 /* zero copy payload */
1227 #if MULTIRAIL_EKC
1228                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1229                               0, KQSW_HDR_SIZE);
1230 #else
1231                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1232                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1233 #endif
1234                 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
1235                 if (rc != 0)
1236                         goto failed;
1237         }
1238
1239         rc = kqswnal_launch (ktx);
1240         if (rc == 0)
1241                 return;
1242
1243  failed:
1244         LASSERT (rc != 0);
1245         CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1246
1247         kqswnal_put_idle_tx (ktx);
1248         /* complete now (with failure) */
1249         kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
1250 }
1251
1252 void
1253 kqswnal_fwd_callback (void *arg, int error)
1254 {
1255         kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1256
1257         /* The router has finished forwarding this packet */
1258
1259         if (error != 0)
1260         {
1261                 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1262
1263                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1264                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
1265         }
1266
1267         kqswnal_requeue_rx (krx);
1268 }
1269
1270 void
1271 kqswnal_dma_reply_complete (EP_RXD *rxd) 
1272 {
1273         int           status = ep_rxd_status(rxd);
1274         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
1275         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
1276         lib_msg_t    *msg = (lib_msg_t *)ktx->ktx_args[1];
1277         
1278         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1279                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
1280
1281         LASSERT (krx->krx_rxd == rxd);
1282         LASSERT (krx->krx_rpc_reply_needed);
1283
1284         krx->krx_rpc_reply_needed = 0;
1285         kqswnal_rx_done (krx);
1286
1287         lib_finalize (&kqswnal_lib, NULL, msg,
1288                       (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
1289         kqswnal_put_idle_tx (ktx);
1290 }
1291
1292 void
1293 kqswnal_rpc_complete (EP_RXD *rxd)
1294 {
1295         int           status = ep_rxd_status(rxd);
1296         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1297         
1298         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1299                "rxd %p, krx %p, status %d\n", rxd, krx, status);
1300
1301         LASSERT (krx->krx_rxd == rxd);
1302         LASSERT (krx->krx_rpc_reply_needed);
1303         
1304         krx->krx_rpc_reply_needed = 0;
1305         kqswnal_requeue_rx (krx);
1306 }
1307
1308 void
1309 kqswnal_requeue_rx (kqswnal_rx_t *krx) 
1310 {
1311         int   rc;
1312
1313         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1314
1315         if (krx->krx_rpc_reply_needed) {
1316
1317                 /* We failed to complete the peer's optimized GET (e.g. we
1318                  * couldn't map the source buffers).  We complete the
1319                  * peer's EKC rpc now with failure. */
1320 #if MULTIRAIL_EKC
1321                 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
1322                                      &kqswnal_rpc_failed, NULL, NULL, 0);
1323                 if (rc == EP_SUCCESS)
1324                         return;
1325                 
1326                 CERROR("can't complete RPC: %d\n", rc);
1327 #else
1328                 if (krx->krx_rxd != NULL) {
1329                         /* We didn't try (and fail) to complete earlier... */
1330                         rc = ep_complete_rpc(krx->krx_rxd, 
1331                                              kqswnal_rpc_complete, krx,
1332                                              &kqswnal_rpc_failed, NULL, 0);
1333                         if (rc == EP_SUCCESS)
1334                                 return;
1335
1336                         CERROR("can't complete RPC: %d\n", rc);
1337                 }
1338                 
1339                 /* NB the old ep_complete_rpc() frees rxd on failure, so we
1340                  * have to requeue from scratch here, unless we're shutting
1341                  * down */
1342                 if (kqswnal_data.kqn_shuttingdown)
1343                         return;
1344
1345                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
1346                                       krx->krx_elanbuffer, 
1347                                       krx->krx_npages * PAGE_SIZE, 0);
1348                 LASSERT (rc == EP_SUCCESS);
1349                 /* We don't handle failure here; it's incredibly rare
1350                  * (never reported?) and only happens with "old" EKC */
1351                 return;
1352 #endif
1353         }
1354
1355 #if MULTIRAIL_EKC
1356         if (kqswnal_data.kqn_shuttingdown) {
1357                 /* free EKC rxd on shutdown */
1358                 ep_complete_receive(krx->krx_rxd);
1359         } else {
1360                 /* repost receive */
1361                 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1362                                    &krx->krx_elanbuffer, 0);
1363         }
1364 #else                
1365         /* don't actually requeue on shutdown */
1366         if (!kqswnal_data.kqn_shuttingdown) 
1367                 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1368                                    krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
1369 #endif
1370 }
1371         
1372 void
1373 kqswnal_rx (kqswnal_rx_t *krx)
1374 {
1375         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
1376         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
1377         int             payload_nob;
1378         int             nob;
1379         int             niov;
1380
1381         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1382
1383         if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
1384                 atomic_set(&krx->krx_refcount, 1);
1385                 lib_parse (&kqswnal_lib, hdr, krx);
1386                 kqswnal_rx_done(krx);
1387                 return;
1388         }
1389
1390 #if KQSW_CHECKSUM
1391         CERROR ("checksums for forwarded packets not implemented\n");
1392         LBUG ();
1393 #endif
1394         if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
1395         {
1396                 CERROR("dropping packet from "LPX64" for "LPX64
1397                        ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
1398
1399                 kqswnal_requeue_rx (krx);
1400                 return;
1401         }
1402
1403         nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
1404         niov = 0;
1405         if (nob > 0) {
1406                 krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
1407                 krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
1408                 niov = 1;
1409                 nob -= PAGE_SIZE - KQSW_HDR_SIZE;
1410                 
1411                 while (nob > 0) {
1412                         LASSERT (niov < krx->krx_npages);
1413                         
1414                         krx->krx_kiov[niov].kiov_offset = 0;
1415                         krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
1416                         niov++;
1417                         nob -= PAGE_SIZE;
1418                 }
1419         }
1420
1421         kpr_fwd_init (&krx->krx_fwd, dest_nid, 
1422                       hdr, payload_nob, niov, krx->krx_kiov,
1423                       kqswnal_fwd_callback, krx);
1424
1425         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1426 }
1427
1428 /* Receive Interrupt Handler: posts to schedulers */
1429 void 
1430 kqswnal_rxhandler(EP_RXD *rxd)
1431 {
1432         unsigned long flags;
1433         int           nob    = ep_rxd_len (rxd);
1434         int           status = ep_rxd_status (rxd);
1435         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1436
1437         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1438                rxd, krx, nob, status);
1439
1440         LASSERT (krx != NULL);
1441
1442         krx->krx_rxd = rxd;
1443         krx->krx_nob = nob;
1444 #if MULTIRAIL_EKC
1445         krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
1446 #else
1447         krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
1448 #endif
1449         
1450         /* must receive a whole header to be able to parse */
1451         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1452         {
1453                 /* receives complete with failure when receiver is removed */
1454 #if MULTIRAIL_EKC
1455                 if (status == EP_SHUTDOWN)
1456                         LASSERT (kqswnal_data.kqn_shuttingdown);
1457                 else
1458                         CERROR("receive status failed with status %d nob %d\n",
1459                                ep_rxd_status(rxd), nob);
1460 #else
1461                 if (!kqswnal_data.kqn_shuttingdown)
1462                         CERROR("receive status failed with status %d nob %d\n",
1463                                ep_rxd_status(rxd), nob);
1464 #endif
1465                 kqswnal_requeue_rx (krx);
1466                 return;
1467         }
1468
1469         if (!in_interrupt()) {
1470                 kqswnal_rx (krx);
1471                 return;
1472         }
1473
1474         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1475
1476         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1477         wake_up (&kqswnal_data.kqn_sched_waitq);
1478
1479         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1480 }
1481
1482 #if KQSW_CHECKSUM
1483 void
1484 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1485 {
1486         ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1487
1488         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1489                 ", dpid %d, spid %d, type %d\n",
1490                 ishdr ? "Header" : "Payload", krx,
1491                 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
1492                 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
1493                 NTOH__u32(hdr->type));
1494
1495         switch (NTOH__u32 (hdr->type))
1496         {
1497         case PTL_MSG_ACK:
1498                 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1499                        " len %u\n",
1500                        NTOH__u32(hdr->msg.ack.mlength),
1501                        hdr->msg.ack.dst_wmd.handle_cookie,
1502                        hdr->msg.ack.dst_wmd.handle_idx,
1503                        NTOH__u64(hdr->msg.ack.match_bits),
1504                        NTOH__u32(hdr->msg.ack.length));
1505                 break;
1506         case PTL_MSG_PUT:
1507                 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1508                        " len %u off %u data "LPX64"\n",
1509                        NTOH__u32(hdr->msg.put.ptl_index),
1510                        hdr->msg.put.ack_wmd.handle_cookie,
1511                        hdr->msg.put.ack_wmd.handle_idx,
1512                        NTOH__u64(hdr->msg.put.match_bits),
1513                        NTOH__u32(hdr->msg.put.length),
1514                        NTOH__u32(hdr->msg.put.offset),
1515                        hdr->msg.put.hdr_data);
1516                 break;
1517         case PTL_MSG_GET:
1518                 CERROR ("GET: <>\n");
1519                 break;
1520         case PTL_MSG_REPLY:
1521                 CERROR ("REPLY: <>\n");
1522                 break;
1523         default:
1524                 CERROR ("TYPE?: <>\n");
1525         }
1526 }
1527 #endif
1528
1529 static ptl_err_t
1530 kqswnal_recvmsg (nal_cb_t     *nal,
1531                  void         *private,
1532                  lib_msg_t    *libmsg,
1533                  unsigned int  niov,
1534                  struct iovec *iov,
1535                  ptl_kiov_t   *kiov,
1536                  size_t        offset,
1537                  size_t        mlen,
1538                  size_t        rlen)
1539 {
1540         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1541         char         *buffer = page_address(krx->krx_kiov[0].kiov_page);
1542         int           page;
1543         char         *page_ptr;
1544         int           page_nob;
1545         char         *iov_ptr;
1546         int           iov_nob;
1547         int           frag;
1548 #if KQSW_CHECKSUM
1549         kqsw_csum_t   senders_csum;
1550         kqsw_csum_t   payload_csum = 0;
1551         kqsw_csum_t   hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
1552         size_t        csum_len = mlen;
1553         int           csum_frags = 0;
1554         int           csum_nob = 0;
1555         static atomic_t csum_counter;
1556         int           csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1557
1558         atomic_inc (&csum_counter);
1559
1560         memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1561         if (senders_csum != hdr_csum)
1562                 kqswnal_csum_error (krx, 1);
1563 #endif
1564         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1565
1566         /* What was actually received must be >= payload. */
1567         LASSERT (mlen <= rlen);
1568         if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
1569                 CERROR("Bad message size: have %d, need %d + %d\n",
1570                        krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
1571                 return (PTL_FAIL);
1572         }
1573
1574         /* It must be OK to kmap() if required */
1575         LASSERT (kiov == NULL || !in_interrupt ());
1576         /* Either all pages or all vaddrs */
1577         LASSERT (!(kiov != NULL && iov != NULL));
1578
1579         if (mlen != 0) {
1580                 page     = 0;
1581                 page_ptr = buffer + KQSW_HDR_SIZE;
1582                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1583
1584                 LASSERT (niov > 0);
1585
1586                 if (kiov != NULL) {
1587                         /* skip complete frags */
1588                         while (offset >= kiov->kiov_len) {
1589                                 offset -= kiov->kiov_len;
1590                                 kiov++;
1591                                 niov--;
1592                                 LASSERT (niov > 0);
1593                         }
1594                         iov_ptr = ((char *)kmap (kiov->kiov_page)) +
1595                                 kiov->kiov_offset + offset;
1596                         iov_nob = kiov->kiov_len - offset;
1597                 } else {
1598                         /* skip complete frags */
1599                         while (offset >= iov->iov_len) {
1600                                 offset -= iov->iov_len;
1601                                 iov++;
1602                                 niov--;
1603                                 LASSERT (niov > 0);
1604                         }
1605                         iov_ptr = iov->iov_base + offset;
1606                         iov_nob = iov->iov_len - offset;
1607                 }
1608                 
1609                 for (;;)
1610                 {
1611                         frag = mlen;
1612                         if (frag > page_nob)
1613                                 frag = page_nob;
1614                         if (frag > iov_nob)
1615                                 frag = iov_nob;
1616
1617                         memcpy (iov_ptr, page_ptr, frag);
1618 #if KQSW_CHECKSUM
1619                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1620                         csum_nob += frag;
1621                         csum_frags++;
1622 #endif
1623                         mlen -= frag;
1624                         if (mlen == 0)
1625                                 break;
1626
1627                         page_nob -= frag;
1628                         if (page_nob != 0)
1629                                 page_ptr += frag;
1630                         else
1631                         {
1632                                 page++;
1633                                 LASSERT (page < krx->krx_npages);
1634                                 page_ptr = page_address(krx->krx_kiov[page].kiov_page);
1635                                 page_nob = PAGE_SIZE;
1636                         }
1637
1638                         iov_nob -= frag;
1639                         if (iov_nob != 0)
1640                                 iov_ptr += frag;
1641                         else if (kiov != NULL) {
1642                                 kunmap (kiov->kiov_page);
1643                                 kiov++;
1644                                 niov--;
1645                                 LASSERT (niov > 0);
1646                                 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1647                                 iov_nob = kiov->kiov_len;
1648                         } else {
1649                                 iov++;
1650                                 niov--;
1651                                 LASSERT (niov > 0);
1652                                 iov_ptr = iov->iov_base;
1653                                 iov_nob = iov->iov_len;
1654                         }
1655                 }
1656
1657                 if (kiov != NULL)
1658                         kunmap (kiov->kiov_page);
1659         }
1660
1661 #if KQSW_CHECKSUM
1662         memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), 
1663                 sizeof(kqsw_csum_t));
1664
1665         if (csum_len != rlen)
1666                 CERROR("Unable to checksum data in user's buffer\n");
1667         else if (senders_csum != payload_csum)
1668                 kqswnal_csum_error (krx, 0);
1669
1670         if (csum_verbose)
1671                 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1672                        "csum_nob %d\n",
1673                         hdr_csum, payload_csum, csum_frags, csum_nob);
1674 #endif
1675         lib_finalize(nal, private, libmsg, PTL_OK);
1676
1677         return (PTL_OK);
1678 }
1679
1680 static ptl_err_t
1681 kqswnal_recv(nal_cb_t     *nal,
1682              void         *private,
1683              lib_msg_t    *libmsg,
1684              unsigned int  niov,
1685              struct iovec *iov,
1686              size_t        offset,
1687              size_t        mlen,
1688              size_t        rlen)
1689 {
1690         return (kqswnal_recvmsg(nal, private, libmsg, 
1691                                 niov, iov, NULL, 
1692                                 offset, mlen, rlen));
1693 }
1694
1695 static ptl_err_t
1696 kqswnal_recv_pages (nal_cb_t     *nal,
1697                     void         *private,
1698                     lib_msg_t    *libmsg,
1699                     unsigned int  niov,
1700                     ptl_kiov_t   *kiov,
1701                     size_t        offset,
1702                     size_t        mlen,
1703                     size_t        rlen)
1704 {
1705         return (kqswnal_recvmsg(nal, private, libmsg, 
1706                                 niov, NULL, kiov, 
1707                                 offset, mlen, rlen));
1708 }
1709
1710 int
1711 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1712 {
1713         long    pid = kernel_thread (fn, arg, 0);
1714
1715         if (pid < 0)
1716                 return ((int)pid);
1717
1718         atomic_inc (&kqswnal_data.kqn_nthreads);
1719         atomic_inc (&kqswnal_data.kqn_nthreads_running);
1720         return (0);
1721 }
1722
1723 void
1724 kqswnal_thread_fini (void)
1725 {
1726         atomic_dec (&kqswnal_data.kqn_nthreads);
1727 }
1728
1729 int
1730 kqswnal_scheduler (void *arg)
1731 {
1732         kqswnal_rx_t    *krx;
1733         kqswnal_tx_t    *ktx;
1734         kpr_fwd_desc_t  *fwd;
1735         unsigned long    flags;
1736         int              rc;
1737         int              counter = 0;
1738         int              shuttingdown = 0;
1739         int              did_something;
1740
1741         kportal_daemonize ("kqswnal_sched");
1742         kportal_blockallsigs ();
1743         
1744         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1745
1746         for (;;)
1747         {
1748                 if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
1749
1750                         if (kqswnal_data.kqn_shuttingdown == 2)
1751                                 break;
1752                 
1753                         /* During stage 1 of shutdown we are still responsive
1754                          * to receives */
1755
1756                         atomic_dec (&kqswnal_data.kqn_nthreads_running);
1757                         shuttingdown = kqswnal_data.kqn_shuttingdown;
1758                 }
1759
1760                 did_something = 0;
1761
1762                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1763                 {
1764                         krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1765                                          kqswnal_rx_t, krx_list);
1766                         list_del (&krx->krx_list);
1767                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1768                                                flags);
1769
1770                         kqswnal_rx (krx);
1771
1772                         did_something = 1;
1773                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1774                 }
1775
1776                 if (!shuttingdown &&
1777                     !list_empty (&kqswnal_data.kqn_delayedtxds))
1778                 {
1779                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1780                                          kqswnal_tx_t, ktx_list);
1781                         list_del_init (&ktx->ktx_delayed_list);
1782                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1783                                                flags);
1784
1785                         rc = kqswnal_launch (ktx);
1786                         if (rc != 0)          /* failed: ktx_nid down? */
1787                         {
1788                                 CERROR("Failed delayed transmit to "LPX64
1789                                        ": %d\n", ktx->ktx_nid, rc);
1790                                 kqswnal_tx_done (ktx, rc);
1791                         }
1792
1793                         did_something = 1;
1794                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1795                 }
1796
1797                 if (!shuttingdown &
1798                     !list_empty (&kqswnal_data.kqn_delayedfwds))
1799                 {
1800                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1801                         list_del (&fwd->kprfd_list);
1802                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1803
1804                         kqswnal_fwd_packet (NULL, fwd);
1805
1806                         did_something = 1;
1807                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1808                 }
1809
1810                     /* nothing to do or hogging CPU */
1811                 if (!did_something || counter++ == KQSW_RESCHED) {
1812                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1813                                                flags);
1814
1815                         counter = 0;
1816
1817                         if (!did_something) {
1818                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1819                                                                kqswnal_data.kqn_shuttingdown != shuttingdown ||
1820                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
1821                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1822                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
1823                                 LASSERT (rc == 0);
1824                         } else if (need_resched())
1825                                 schedule ();
1826
1827                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1828                 }
1829         }
1830
1831         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1832
1833         kqswnal_thread_fini ();
1834         return (0);
1835 }
1836
1837 nal_cb_t kqswnal_lib =
1838 {
1839         nal_data:       &kqswnal_data,         /* NAL private data */
1840         cb_send:        kqswnal_send,
1841         cb_send_pages:  kqswnal_send_pages,
1842         cb_recv:        kqswnal_recv,
1843         cb_recv_pages:  kqswnal_recv_pages,
1844         cb_read:        kqswnal_read,
1845         cb_write:       kqswnal_write,
1846         cb_malloc:      kqswnal_malloc,
1847         cb_free:        kqswnal_free,
1848         cb_printf:      kqswnal_printf,
1849         cb_cli:         kqswnal_cli,
1850         cb_sti:         kqswnal_sti,
1851         cb_dist:        kqswnal_dist
1852 };