Whamcloud - gitweb
* Applied the last patch in Bug 2306, which changes the portals router/NAL
[fs/lustre-release.git] / lustre / portals / knals / qswnal / qswnal_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8  * W. Marcus Miller - Based on ksocknal
9  *
10  * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  * Portals is free software; you can redistribute it and/or
13  * modify it under the terms of version 2 of the GNU General Public
14  * License as published by the Free Software Foundation.
15  *
16  * Portals is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with Portals; if not, write to the Free Software
23  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  */
26
27 #include "qswnal.h"
28
29 EP_STATUSBLK  kqswnal_rpc_success;
30 EP_STATUSBLK  kqswnal_rpc_failed;
31
32 /*
33  *  LIB functions follow
34  *
35  */
36 static ptl_err_t
37 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
38              size_t len)
39 {
40         CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
41                 nal->ni.nid, len, src_addr, dst_addr );
42         memcpy( dst_addr, src_addr, len );
43
44         return (PTL_OK);
45 }
46
47 static ptl_err_t
48 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
49               size_t len)
50 {
51         CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
52                 nal->ni.nid, len, src_addr, dst_addr );
53         memcpy( dst_addr, src_addr, len );
54
55         return (PTL_OK);
56 }
57
58 static void *
59 kqswnal_malloc(nal_cb_t *nal, size_t len)
60 {
61         void *buf;
62
63         PORTAL_ALLOC(buf, len);
64         return (buf);
65 }
66
67 static void
68 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
69 {
70         PORTAL_FREE(buf, len);
71 }
72
73 static void
74 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
75 {
76         va_list ap;
77         char msg[256];
78
79         va_start (ap, fmt);
80         vsnprintf (msg, sizeof (msg), fmt, ap);        /* sprint safely */
81         va_end (ap);
82
83         msg[sizeof (msg) - 1] = 0;                /* ensure terminated */
84
85         CDEBUG (D_NET, "%s", msg);
86 }
87
88
89 static void
90 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
91 {
92         kqswnal_data_t *data= nal->nal_data;
93
94         spin_lock_irqsave(&data->kqn_statelock, *flags);
95 }
96
97
98 static void
99 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
100 {
101         kqswnal_data_t *data= nal->nal_data;
102
103         spin_unlock_irqrestore(&data->kqn_statelock, *flags);
104 }
105
106
107 static int
108 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
109 {
110         if (nid == nal->ni.nid)
111                 *dist = 0;                      /* it's me */
112         else if (kqswnal_nid2elanid (nid) >= 0)
113                 *dist = 1;                      /* it's my peer */
114         else
115                 *dist = 2;                      /* via router */
116         return (0);
117 }
118
119 void
120 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
121 {
122         struct timeval     now;
123         time_t             then;
124
125         do_gettimeofday (&now);
126         then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
127
128         kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
129 }
130
131 void
132 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
133 {
134 #if MULTIRAIL_EKC
135         int      i;
136 #endif
137
138         if (ktx->ktx_nmappedpages == 0)
139                 return;
140         
141 #if MULTIRAIL_EKC
142         CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
143                ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
144
145         for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
146                 ep_dvma_unload(kqswnal_data.kqn_ep,
147                                kqswnal_data.kqn_ep_tx_nmh,
148                                &ktx->ktx_frags[i]);
149 #else
150         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
151                 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
152
153         LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
154         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
155                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
156
157         elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
158                           kqswnal_data.kqn_eptxdmahandle,
159                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
160 #endif
161         ktx->ktx_nmappedpages = 0;
162 }
163
164 int
165 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
166 {
167         int       nfrags    = ktx->ktx_nfrag;
168         int       nmapped   = ktx->ktx_nmappedpages;
169         int       maxmapped = ktx->ktx_npages;
170         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
171         char     *ptr;
172 #if MULTIRAIL_EKC
173         EP_RAILMASK railmask;
174         int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
175                                             EP_RAILMASK_ALL,
176                                             kqswnal_nid2elanid(ktx->ktx_nid));
177         
178         if (rail < 0) {
179                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
180                 return (-ENETDOWN);
181         }
182         railmask = 1 << rail;
183 #endif
184         LASSERT (nmapped <= maxmapped);
185         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
186         LASSERT (nfrags <= EP_MAXFRAG);
187         LASSERT (niov > 0);
188         LASSERT (nob > 0);
189
190         /* skip complete frags before 'offset' */
191         while (offset >= kiov->kiov_len) {
192                 offset -= kiov->kiov_len;
193                 kiov++;
194                 niov--;
195                 LASSERT (niov > 0);
196         }
197
198         do {
199                 int  fraglen = kiov->kiov_len - offset;
200
201                 /* nob exactly spans the iovs */
202                 LASSERT (fraglen <= nob);
203                 /* each frag fits in a page */
204                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
205
206                 nmapped++;
207                 if (nmapped > maxmapped) {
208                         CERROR("Can't map message in %d pages (max %d)\n",
209                                nmapped, maxmapped);
210                         return (-EMSGSIZE);
211                 }
212
213                 if (nfrags == EP_MAXFRAG) {
214                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
215                                EP_MAXFRAG);
216                         return (-EMSGSIZE);
217                 }
218
219                 /* XXX this is really crap, but we'll have to kmap until
220                  * EKC has a page (rather than vaddr) mapping interface */
221
222                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
223
224                 CDEBUG(D_NET,
225                        "%p[%d] loading %p for %d, page %d, %d total\n",
226                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
227
228 #if MULTIRAIL_EKC
229                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
230                              ptr, fraglen,
231                              kqswnal_data.kqn_ep_tx_nmh, basepage,
232                              &railmask, &ktx->ktx_frags[nfrags]);
233
234                 if (nfrags == ktx->ktx_firsttmpfrag ||
235                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
236                                   &ktx->ktx_frags[nfrags - 1],
237                                   &ktx->ktx_frags[nfrags])) {
238                         /* new frag if this is the first or can't merge */
239                         nfrags++;
240                 }
241 #else
242                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
243                                        kqswnal_data.kqn_eptxdmahandle,
244                                        ptr, fraglen,
245                                        basepage, &ktx->ktx_frags[nfrags].Base);
246
247                 if (nfrags > 0 &&                /* previous frag mapped */
248                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
249                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
250                         /* just extend previous */
251                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
252                 else {
253                         ktx->ktx_frags[nfrags].Len = fraglen;
254                         nfrags++;                /* new frag */
255                 }
256 #endif
257
258                 kunmap (kiov->kiov_page);
259                 
260                 /* keep in loop for failure case */
261                 ktx->ktx_nmappedpages = nmapped;
262
263                 basepage++;
264                 kiov++;
265                 niov--;
266                 nob -= fraglen;
267                 offset = 0;
268
269                 /* iov must not run out before end of data */
270                 LASSERT (nob == 0 || niov > 0);
271
272         } while (nob > 0);
273
274         ktx->ktx_nfrag = nfrags;
275         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
276                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
277
278         return (0);
279 }
280
281 int
282 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
283                     int niov, struct iovec *iov)
284 {
285         int       nfrags    = ktx->ktx_nfrag;
286         int       nmapped   = ktx->ktx_nmappedpages;
287         int       maxmapped = ktx->ktx_npages;
288         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
289 #if MULTIRAIL_EKC
290         EP_RAILMASK railmask;
291         int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
292                                             EP_RAILMASK_ALL,
293                                             kqswnal_nid2elanid(ktx->ktx_nid));
294         
295         if (rail < 0) {
296                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
297                 return (-ENETDOWN);
298         }
299         railmask = 1 << rail;
300 #endif
301         LASSERT (nmapped <= maxmapped);
302         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
303         LASSERT (nfrags <= EP_MAXFRAG);
304         LASSERT (niov > 0);
305         LASSERT (nob > 0);
306
307         /* skip complete frags before offset */
308         while (offset >= iov->iov_len) {
309                 offset -= iov->iov_len;
310                 iov++;
311                 niov--;
312                 LASSERT (niov > 0);
313         }
314         
315         do {
316                 int  fraglen = iov->iov_len - offset;
317                 long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
318
319                 /* nob exactly spans the iovs */
320                 LASSERT (fraglen <= nob);
321                 
322                 nmapped += npages;
323                 if (nmapped > maxmapped) {
324                         CERROR("Can't map message in %d pages (max %d)\n",
325                                nmapped, maxmapped);
326                         return (-EMSGSIZE);
327                 }
328
329                 if (nfrags == EP_MAXFRAG) {
330                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
331                                EP_MAXFRAG);
332                         return (-EMSGSIZE);
333                 }
334
335                 CDEBUG(D_NET,
336                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
337                        ktx, nfrags, iov->iov_base + offset, fraglen, 
338                        basepage, npages, nmapped);
339
340 #if MULTIRAIL_EKC
341                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
342                              iov->iov_base + offset, fraglen,
343                              kqswnal_data.kqn_ep_tx_nmh, basepage,
344                              &railmask, &ktx->ktx_frags[nfrags]);
345
346                 if (nfrags == ktx->ktx_firsttmpfrag ||
347                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
348                                   &ktx->ktx_frags[nfrags - 1],
349                                   &ktx->ktx_frags[nfrags])) {
350                         /* new frag if this is the first or can't merge */
351                         nfrags++;
352                 }
353 #else
354                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
355                                        kqswnal_data.kqn_eptxdmahandle,
356                                        iov->iov_base + offset, fraglen,
357                                        basepage, &ktx->ktx_frags[nfrags].Base);
358
359                 if (nfrags > 0 &&                /* previous frag mapped */
360                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
361                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
362                         /* just extend previous */
363                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
364                 else {
365                         ktx->ktx_frags[nfrags].Len = fraglen;
366                         nfrags++;                /* new frag */
367                 }
368 #endif
369
370                 /* keep in loop for failure case */
371                 ktx->ktx_nmappedpages = nmapped;
372
373                 basepage += npages;
374                 iov++;
375                 niov--;
376                 nob -= fraglen;
377                 offset = 0;
378
379                 /* iov must not run out before end of data */
380                 LASSERT (nob == 0 || niov > 0);
381
382         } while (nob > 0);
383
384         ktx->ktx_nfrag = nfrags;
385         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
386                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
387
388         return (0);
389 }
390
391
392 void
393 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
394 {
395         kpr_fwd_desc_t   *fwd = NULL;
396         unsigned long     flags;
397
398         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
399         ktx->ktx_state = KTX_IDLE;
400
401         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
402
403         list_del (&ktx->ktx_list);              /* take off active list */
404
405         if (ktx->ktx_isnblk) {
406                 /* reserved for non-blocking tx */
407                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
408                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
409                 return;
410         }
411
412         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
413
414         /* anything blocking for a tx descriptor? */
415         if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
416         {
417                 CDEBUG(D_NET,"wakeup fwd\n");
418
419                 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
420                                   kpr_fwd_desc_t, kprfd_list);
421                 list_del (&fwd->kprfd_list);
422         }
423
424         wake_up (&kqswnal_data.kqn_idletxd_waitq);
425
426         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
427
428         if (fwd == NULL)
429                 return;
430
431         /* schedule packet for forwarding again */
432         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
433
434         list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
435         wake_up (&kqswnal_data.kqn_sched_waitq);
436
437         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
438 }
439
440 kqswnal_tx_t *
441 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
442 {
443         unsigned long  flags;
444         kqswnal_tx_t  *ktx = NULL;
445
446         for (;;) {
447                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
448
449                 /* "normal" descriptor is free */
450                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
451                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
452                                           kqswnal_tx_t, ktx_list);
453                         break;
454                 }
455
456                 /* "normal" descriptor pool is empty */
457
458                 if (fwd != NULL) { /* forwarded packet => queue for idle txd */
459                         CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
460                         list_add_tail (&fwd->kprfd_list,
461                                        &kqswnal_data.kqn_idletxd_fwdq);
462                         break;
463                 }
464
465                 /* doing a local transmit */
466                 if (!may_block) {
467                         if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
468                                 CERROR ("intr tx desc pool exhausted\n");
469                                 break;
470                         }
471
472                         ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
473                                           kqswnal_tx_t, ktx_list);
474                         break;
475                 }
476
477                 /* block for idle tx */
478
479                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
480
481                 CDEBUG (D_NET, "blocking for tx desc\n");
482                 wait_event (kqswnal_data.kqn_idletxd_waitq,
483                             !list_empty (&kqswnal_data.kqn_idletxds));
484         }
485
486         if (ktx != NULL) {
487                 list_del (&ktx->ktx_list);
488                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
489                 ktx->ktx_launcher = current->pid;
490         }
491
492         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
493
494         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
495         LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
496
497         return (ktx);
498 }
499
500 void
501 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
502 {
503         lib_msg_t     *msg;
504         lib_msg_t     *repmsg = NULL;
505
506         switch (ktx->ktx_state) {
507         case KTX_FORWARDING:       /* router asked me to forward this packet */
508                 kpr_fwd_done (&kqswnal_data.kqn_router,
509                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
510                 break;
511
512         case KTX_SENDING:          /* packet sourced locally */
513                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
514                               (lib_msg_t *)ktx->ktx_args[1],
515                               (error == 0) ? PTL_OK : 
516                               (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
517                 break;
518
519         case KTX_GETTING:          /* Peer has DMA-ed direct? */
520                 msg = (lib_msg_t *)ktx->ktx_args[1];
521
522                 if (error == 0) {
523                         repmsg = lib_fake_reply_msg (&kqswnal_lib, 
524                                                      ktx->ktx_nid, msg->md);
525                         if (repmsg == NULL)
526                                 error = -ENOMEM;
527                 }
528                 
529                 if (error == 0) {
530                         lib_finalize (&kqswnal_lib, ktx->ktx_args[0], 
531                                       msg, PTL_OK);
532                         lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
533                 } else {
534                         lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
535                                       (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
536                 }
537                 break;
538
539         default:
540                 LASSERT (0);
541         }
542
543         kqswnal_put_idle_tx (ktx);
544 }
545
546 static void
547 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
548 {
549         kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
550
551         LASSERT (txd != NULL);
552         LASSERT (ktx != NULL);
553
554         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
555
556         if (status != EP_SUCCESS) {
557
558                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
559                         ktx->ktx_nid, status);
560
561                 kqswnal_notify_peer_down(ktx);
562                 status = -EHOSTDOWN;
563
564         } else if (ktx->ktx_state == KTX_GETTING) {
565                 /* RPC completed OK; what did our peer put in the status
566                  * block? */
567 #if MULTIRAIL_EKC
568                 status = ep_txd_statusblk(txd)->Data[0];
569 #else
570                 status = ep_txd_statusblk(txd)->Status;
571 #endif
572         } else {
573                 status = 0;
574         }
575
576         kqswnal_tx_done (ktx, status);
577 }
578
579 int
580 kqswnal_launch (kqswnal_tx_t *ktx)
581 {
582         /* Don't block for transmit descriptor if we're in interrupt context */
583         int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
584         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
585         long  flags;
586         int   rc;
587
588         ktx->ktx_launchtime = jiffies;
589
590         LASSERT (dest >= 0);                    /* must be a peer */
591         if (ktx->ktx_state == KTX_GETTING) {
592                 /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
593                  * other frags are the GET sink which we obviously don't
594                  * send here :) */
595 #if MULTIRAIL_EKC
596                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
597                                      ktx->ktx_port, attr,
598                                      kqswnal_txhandler, ktx,
599                                      NULL, ktx->ktx_frags, 1);
600 #else
601                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
602                                      ktx->ktx_port, attr, kqswnal_txhandler,
603                                      ktx, NULL, ktx->ktx_frags, 1);
604 #endif
605         } else {
606 #if MULTIRAIL_EKC
607                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
608                                          ktx->ktx_port, attr,
609                                          kqswnal_txhandler, ktx,
610                                          NULL, ktx->ktx_frags, ktx->ktx_nfrag);
611 #else
612                 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
613                                        ktx->ktx_port, attr, 
614                                        kqswnal_txhandler, ktx, 
615                                        ktx->ktx_frags, ktx->ktx_nfrag);
616 #endif
617         }
618
619         switch (rc) {
620         case EP_SUCCESS: /* success */
621                 return (0);
622
623         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
624                 LASSERT (in_interrupt());
625
626                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
627
628                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
629                 wake_up (&kqswnal_data.kqn_sched_waitq);
630
631                 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
632                 return (0);
633
634         default: /* fatal error */
635                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
636                 kqswnal_notify_peer_down(ktx);
637                 return (-EHOSTUNREACH);
638         }
639 }
640
641 static char *
642 hdr_type_string (ptl_hdr_t *hdr)
643 {
644         switch (hdr->type) {
645         case PTL_MSG_ACK:
646                 return ("ACK");
647         case PTL_MSG_PUT:
648                 return ("PUT");
649         case PTL_MSG_GET:
650                 return ("GET");
651         case PTL_MSG_REPLY:
652                 return ("REPLY");
653         default:
654                 return ("<UNKNOWN>");
655         }
656 }
657
658 static void
659 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
660 {
661         char *type_str = hdr_type_string (hdr);
662
663         CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
664                NTOH__u32(hdr->payload_length));
665         CERROR("    From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
666                NTOH__u32(hdr->src_pid));
667         CERROR("    To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
668                NTOH__u32(hdr->dest_pid));
669
670         switch (NTOH__u32(hdr->type)) {
671         case PTL_MSG_PUT:
672                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
673                        "match bits "LPX64"\n",
674                        NTOH__u32 (hdr->msg.put.ptl_index),
675                        hdr->msg.put.ack_wmd.wh_interface_cookie,
676                        hdr->msg.put.ack_wmd.wh_object_cookie,
677                        NTOH__u64 (hdr->msg.put.match_bits));
678                 CERROR("    offset %d, hdr data "LPX64"\n",
679                        NTOH__u32(hdr->msg.put.offset),
680                        hdr->msg.put.hdr_data);
681                 break;
682
683         case PTL_MSG_GET:
684                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
685                        "match bits "LPX64"\n",
686                        NTOH__u32 (hdr->msg.get.ptl_index),
687                        hdr->msg.get.return_wmd.wh_interface_cookie,
688                        hdr->msg.get.return_wmd.wh_object_cookie,
689                        hdr->msg.get.match_bits);
690                 CERROR("    Length %d, src offset %d\n",
691                        NTOH__u32 (hdr->msg.get.sink_length),
692                        NTOH__u32 (hdr->msg.get.src_offset));
693                 break;
694
695         case PTL_MSG_ACK:
696                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
697                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
698                        hdr->msg.ack.dst_wmd.wh_object_cookie,
699                        NTOH__u32 (hdr->msg.ack.mlength));
700                 break;
701
702         case PTL_MSG_REPLY:
703                 CERROR("    dst md "LPX64"."LPX64"\n",
704                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
705                        hdr->msg.reply.dst_wmd.wh_object_cookie);
706         }
707
708 }                               /* end of print_hdr() */
709
710 #if !MULTIRAIL_EKC
711 void
712 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) 
713 {
714         int          i;
715
716         CDEBUG (how, "%s: %d\n", str, n);
717         for (i = 0; i < n; i++) {
718                 CDEBUG (how, "   %08x for %d\n", iov[i].Base, iov[i].Len);
719         }
720 }
721
722 int
723 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
724                      int nsrc, EP_IOVEC *src,
725                      int ndst, EP_IOVEC *dst) 
726 {
727         int        count;
728         int        nob;
729
730         LASSERT (ndv > 0);
731         LASSERT (nsrc > 0);
732         LASSERT (ndst > 0);
733
734         for (count = 0; count < ndv; count++, dv++) {
735
736                 if (nsrc == 0 || ndst == 0) {
737                         if (nsrc != ndst) {
738                                 /* For now I'll barf on any left over entries */
739                                 CERROR ("mismatched src and dst iovs\n");
740                                 return (-EINVAL);
741                         }
742                         return (count);
743                 }
744
745                 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
746                 dv->Len    = nob;
747                 dv->Source = src->Base;
748                 dv->Dest   = dst->Base;
749
750                 if (nob >= src->Len) {
751                         src++;
752                         nsrc--;
753                 } else {
754                         src->Len -= nob;
755                         src->Base += nob;
756                 }
757                 
758                 if (nob >= dst->Len) {
759                         dst++;
760                         ndst--;
761                 } else {
762                         src->Len -= nob;
763                         src->Base += nob;
764                 }
765         }
766
767         CERROR ("DATAVEC too small\n");
768         return (-E2BIG);
769 }
770 #endif
771
772 int
773 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
774                    struct iovec *iov, ptl_kiov_t *kiov, 
775                    int offset, int nob)
776 {
777         kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
778         char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
779         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
780         int                 rc;
781 #if MULTIRAIL_EKC
782         int                 i;
783 #else
784         EP_DATAVEC          datav[EP_MAXFRAG];
785         int                 ndatav;
786 #endif
787         LASSERT (krx->krx_rpc_reply_needed);
788         LASSERT ((iov == NULL) != (kiov == NULL));
789
790         /* see kqswnal_sendmsg comment regarding endian-ness */
791         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
792                 /* msg too small to discover rmd size */
793                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
794                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
795                 return (-EINVAL);
796         }
797         
798         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
799                 /* rmd doesn't fit in the incoming message */
800                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
801                         krx->krx_nob, rmd->kqrmd_nfrag,
802                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
803                 return (-EINVAL);
804         }
805
806         /* Map the source data... */
807         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
808         if (kiov != NULL)
809                 rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
810         else
811                 rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
812
813         if (rc != 0) {
814                 CERROR ("Can't map source data: %d\n", rc);
815                 return (rc);
816         }
817
818 #if MULTIRAIL_EKC
819         if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
820                 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
821                        ktx->ktx_nfrag, rmd->kqrmd_nfrag);
822                 return (-EINVAL);
823         }
824         
825         for (i = 0; i < rmd->kqrmd_nfrag; i++)
826                 if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
827                         CERROR("Can't cope with unequal frags %d(%d):"
828                                " %d local %d remote\n",
829                                i, rmd->kqrmd_nfrag, 
830                                ktx->ktx_frags[i].nmd_len, 
831                                rmd->kqrmd_frag[i].nmd_len);
832                         return (-EINVAL);
833                 }
834 #else
835         ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
836                                       ktx->ktx_nfrag, ktx->ktx_frags,
837                                       rmd->kqrmd_nfrag, rmd->kqrmd_frag);
838         if (ndatav < 0) {
839                 CERROR ("Can't create datavec: %d\n", ndatav);
840                 return (ndatav);
841         }
842 #endif
843
844         /* Our caller will start to race with kqswnal_dma_reply_complete... */
845         LASSERT (atomic_read (&krx->krx_refcount) == 1);
846         atomic_set (&krx->krx_refcount, 2);
847
848 #if MULTIRAIL_EKC
849         rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx, 
850                              &kqswnal_rpc_success,
851                              ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
852         if (rc == EP_SUCCESS)
853                 return (0);
854
855         /* Well we tried... */
856         krx->krx_rpc_reply_needed = 0;
857 #else
858         rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
859                               &kqswnal_rpc_success, datav, ndatav);
860         if (rc == EP_SUCCESS)
861                 return (0);
862
863         /* "old" EKC destroys rxd on failed completion */
864         krx->krx_rxd = NULL;
865 #endif
866
867         CERROR("can't complete RPC: %d\n", rc);
868
869         /* reset refcount back to 1: we're not going to be racing with
870          * kqswnal_dma_reply_complete. */
871         atomic_set (&krx->krx_refcount, 1);
872
873         return (-ECONNABORTED);
874 }
875
876 static ptl_err_t
877 kqswnal_sendmsg (nal_cb_t     *nal,
878                  void         *private,
879                  lib_msg_t    *libmsg,
880                  ptl_hdr_t    *hdr,
881                  int           type,
882                  ptl_nid_t     nid,
883                  ptl_pid_t     pid,
884                  unsigned int  payload_niov,
885                  struct iovec *payload_iov,
886                  ptl_kiov_t   *payload_kiov,
887                  size_t        payload_offset,
888                  size_t        payload_nob)
889 {
890         kqswnal_tx_t      *ktx;
891         int                rc;
892         ptl_nid_t          targetnid;
893 #if KQSW_CHECKSUM
894         int                i;
895         kqsw_csum_t        csum;
896         int                sumoff;
897         int                sumnob;
898 #endif
899         
900         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
901                " pid %u\n", payload_nob, payload_niov, nid, pid);
902
903         LASSERT (payload_nob == 0 || payload_niov > 0);
904         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
905
906         /* It must be OK to kmap() if required */
907         LASSERT (payload_kiov == NULL || !in_interrupt ());
908         /* payload is either all vaddrs or all pages */
909         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
910         
911         if (payload_nob > KQSW_MAXPAYLOAD) {
912                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
913                         payload_nob, KQSW_MAXPAYLOAD);
914                 return (PTL_FAIL);
915         }
916
917         targetnid = nid;
918         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
919                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
920                                  sizeof (ptl_hdr_t) + payload_nob, &targetnid);
921                 if (rc != 0) {
922                         CERROR("Can't route to "LPX64": router error %d\n",
923                                nid, rc);
924                         return (PTL_FAIL);
925                 }
926                 if (kqswnal_nid2elanid (targetnid) < 0) {
927                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
928                                targetnid, nid);
929                         return (PTL_FAIL);
930                 }
931         }
932
933         /* I may not block for a transmit descriptor if I might block the
934          * receiver, or an interrupt handler. */
935         ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
936                                           type == PTL_MSG_REPLY ||
937                                           in_interrupt()));
938         if (ktx == NULL) {
939                 kqswnal_cerror_hdr (hdr);
940                 return (PTL_NOSPACE);
941         }
942
943         ktx->ktx_nid     = targetnid;
944         ktx->ktx_args[0] = private;
945         ktx->ktx_args[1] = libmsg;
946
947         if (type == PTL_MSG_REPLY &&
948             ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
949                 if (nid != targetnid ||
950                     kqswnal_nid2elanid(nid) != 
951                     ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
952                         CERROR("Optimized reply nid conflict: "
953                                "nid "LPX64" via "LPX64" elanID %d\n",
954                                nid, targetnid,
955                                ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
956                         return (PTL_FAIL);
957                 }
958
959                 /* peer expects RPC completion with GET data */
960                 rc = kqswnal_dma_reply (ktx, payload_niov, 
961                                         payload_iov, payload_kiov, 
962                                         payload_offset, payload_nob);
963                 if (rc == 0)
964                         return (PTL_OK);
965                 
966                 CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
967                 kqswnal_put_idle_tx (ktx);
968                 return (PTL_FAIL);
969         }
970
971         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
972         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
973
974 #if KQSW_CHECKSUM
975         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
976         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
977         for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
978                 LASSERT(i < niov);
979                 if (payload_kiov != NULL) {
980                         ptl_kiov_t *kiov = &payload_kiov[i];
981
982                         if (sumoff >= kiov->kiov_len) {
983                                 sumoff -= kiov->kiov_len;
984                         } else {
985                                 char *addr = ((char *)kmap (kiov->kiov_page)) +
986                                              kiov->kiov_offset + sumoff;
987                                 int   fragnob = kiov->kiov_len - sumoff;
988
989                                 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
990                                 sumnob -= fragnob;
991                                 sumoff = 0;
992                                 kunmap(kiov->kiov_page);
993                         }
994                 } else {
995                         struct iovec *iov = &payload_iov[i];
996
997                         if (sumoff > iov->iov_len) {
998                                 sumoff -= iov->iov_len;
999                         } else {
1000                                 char *addr = iov->iov_base + sumoff;
1001                                 int   fragnob = iov->iov_len - sumoff;
1002                                 
1003                                 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1004                                 sumnob -= fragnob;
1005                                 sumoff = 0;
1006                         }
1007                 }
1008         }
1009         memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
1010 #endif
1011
1012         if (kqswnal_data.kqn_optimized_gets &&
1013             type == PTL_MSG_GET &&              /* doing a GET */
1014             nid == targetnid) {                 /* not forwarding */
1015                 lib_md_t           *md = libmsg->md;
1016                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
1017                 
1018                 /* Optimised path: I send over the Elan vaddrs of the get
1019                  * sink buffers, and my peer DMAs directly into them.
1020                  *
1021                  * First I set up ktx as if it was going to send this
1022                  * payload, (it needs to map it anyway).  This fills
1023                  * ktx_frags[1] and onward with the network addresses
1024                  * of the GET sink frags.  I copy these into ktx_buffer,
1025                  * immediately after the header, and send that as my GET
1026                  * message.
1027                  *
1028                  * Note that the addresses are sent in native endian-ness.
1029                  * When EKC copes with different endian nodes, I'll fix
1030                  * this (and eat my hat :) */
1031
1032                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1033                 ktx->ktx_state = KTX_GETTING;
1034
1035                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
1036                         rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
1037                                                   md->md_niov, md->md_iov.kiov);
1038                 else
1039                         rc = kqswnal_map_tx_iov (ktx, 0, md->length,
1040                                                  md->md_niov, md->md_iov.iov);
1041
1042                 if (rc < 0) {
1043                         kqswnal_put_idle_tx (ktx);
1044                         return (PTL_FAIL);
1045                 }
1046
1047                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1048
1049                 payload_nob = offsetof(kqswnal_remotemd_t,
1050                                        kqrmd_frag[rmd->kqrmd_nfrag]);
1051                 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1052
1053 #if MULTIRAIL_EKC
1054                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1055                        rmd->kqrmd_nfrag * sizeof(EP_NMD));
1056
1057                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1058                               0, KQSW_HDR_SIZE + payload_nob);
1059 #else
1060                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1061                        rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1062                 
1063                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1064                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1065 #endif
1066         } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1067
1068                 /* small message: single frag copied into the pre-mapped buffer */
1069
1070                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1071                 ktx->ktx_state = KTX_SENDING;
1072 #if MULTIRAIL_EKC
1073                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1074                               0, KQSW_HDR_SIZE + payload_nob);
1075 #else
1076                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1077                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1078 #endif
1079                 if (payload_nob > 0) {
1080                         if (payload_kiov != NULL)
1081                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1082                                                    payload_niov, payload_kiov, 
1083                                                    payload_offset, payload_nob);
1084                         else
1085                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1086                                                   payload_niov, payload_iov, 
1087                                                   payload_offset, payload_nob);
1088                 }
1089         } else {
1090
1091                 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1092
1093                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1094                 ktx->ktx_state = KTX_SENDING;
1095 #if MULTIRAIL_EKC
1096                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1097                               0, KQSW_HDR_SIZE);
1098 #else
1099                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1100                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1101 #endif
1102                 if (payload_kiov != NULL)
1103                         rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
1104                                                   payload_niov, payload_kiov);
1105                 else
1106                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1107                                                  payload_niov, payload_iov);
1108                 if (rc != 0) {
1109                         kqswnal_put_idle_tx (ktx);
1110                         return (PTL_FAIL);
1111                 }
1112         }
1113         
1114         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1115                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1116
1117         rc = kqswnal_launch (ktx);
1118         if (rc != 0) {                    /* failed? */
1119                 CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
1120                 kqswnal_put_idle_tx (ktx);
1121                 return (PTL_FAIL);
1122         }
1123
1124         CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n", 
1125                payload_nob, nid, targetnid);
1126         return (PTL_OK);
1127 }
1128
1129 static ptl_err_t
1130 kqswnal_send (nal_cb_t     *nal,
1131               void         *private,
1132               lib_msg_t    *libmsg,
1133               ptl_hdr_t    *hdr,
1134               int           type,
1135               ptl_nid_t     nid,
1136               ptl_pid_t     pid,
1137               unsigned int  payload_niov,
1138               struct iovec *payload_iov,
1139               size_t        payload_offset,
1140               size_t        payload_nob)
1141 {
1142         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1143                                  payload_niov, payload_iov, NULL, 
1144                                  payload_offset, payload_nob));
1145 }
1146
1147 static ptl_err_t
1148 kqswnal_send_pages (nal_cb_t     *nal,
1149                     void         *private,
1150                     lib_msg_t    *libmsg,
1151                     ptl_hdr_t    *hdr,
1152                     int           type,
1153                     ptl_nid_t     nid,
1154                     ptl_pid_t     pid,
1155                     unsigned int  payload_niov,
1156                     ptl_kiov_t   *payload_kiov,
1157                     size_t        payload_offset,
1158                     size_t        payload_nob)
1159 {
1160         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1161                                  payload_niov, NULL, payload_kiov, 
1162                                  payload_offset, payload_nob));
1163 }
1164
1165 void
1166 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1167 {
1168         int             rc;
1169         kqswnal_tx_t   *ktx;
1170         ptl_kiov_t     *kiov = fwd->kprfd_kiov;
1171         int             niov = fwd->kprfd_niov;
1172         int             nob = fwd->kprfd_nob;
1173         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
1174
1175 #if KQSW_CHECKSUM
1176         CERROR ("checksums for forwarded packets not implemented\n");
1177         LBUG ();
1178 #endif
1179         /* The router wants this NAL to forward a packet */
1180         CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
1181                 fwd, nid, niov, nob);
1182
1183         ktx = kqswnal_get_idle_tx (fwd, 0);
1184         if (ktx == NULL)        /* can't get txd right now */
1185                 return;         /* fwd will be scheduled when tx desc freed */
1186
1187         if (nid == kqswnal_lib.ni.nid)          /* gateway is me */
1188                 nid = fwd->kprfd_target_nid;    /* target is final dest */
1189
1190         if (kqswnal_nid2elanid (nid) < 0) {
1191                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1192                 rc = -EHOSTUNREACH;
1193                 goto failed;
1194         }
1195
1196         /* copy hdr into pre-mapped buffer */
1197         memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
1198         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
1199
1200         ktx->ktx_port    = (nob <= KQSW_SMALLPAYLOAD) ?
1201                            EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1202         ktx->ktx_nid     = nid;
1203         ktx->ktx_state   = KTX_FORWARDING;
1204         ktx->ktx_args[0] = fwd;
1205         ktx->ktx_nfrag   = ktx->ktx_firsttmpfrag = 1;
1206
1207         if (nob <= KQSW_TX_MAXCONTIG) 
1208         {
1209                 /* send payload from ktx's pre-mapped contiguous buffer */
1210 #if MULTIRAIL_EKC
1211                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1212                               0, KQSW_HDR_SIZE + nob);
1213 #else
1214                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1215                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
1216 #endif
1217                 if (nob > 0)
1218                         lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
1219                                           niov, kiov, 0, nob);
1220         }
1221         else
1222         {
1223                 /* zero copy payload */
1224 #if MULTIRAIL_EKC
1225                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1226                               0, KQSW_HDR_SIZE);
1227 #else
1228                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1229                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1230 #endif
1231                 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
1232                 if (rc != 0)
1233                         goto failed;
1234         }
1235
1236         rc = kqswnal_launch (ktx);
1237         if (rc == 0)
1238                 return;
1239
1240  failed:
1241         LASSERT (rc != 0);
1242         CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1243
1244         kqswnal_put_idle_tx (ktx);
1245         /* complete now (with failure) */
1246         kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
1247 }
1248
1249 void
1250 kqswnal_fwd_callback (void *arg, int error)
1251 {
1252         kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1253
1254         /* The router has finished forwarding this packet */
1255
1256         if (error != 0)
1257         {
1258                 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1259
1260                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1261                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
1262         }
1263
1264         kqswnal_requeue_rx (krx);
1265 }
1266
1267 void
1268 kqswnal_dma_reply_complete (EP_RXD *rxd) 
1269 {
1270         int           status = ep_rxd_status(rxd);
1271         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
1272         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
1273         lib_msg_t    *msg = (lib_msg_t *)ktx->ktx_args[1];
1274         
1275         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1276                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
1277
1278         LASSERT (krx->krx_rxd == rxd);
1279         LASSERT (krx->krx_rpc_reply_needed);
1280
1281         krx->krx_rpc_reply_needed = 0;
1282         kqswnal_rx_done (krx);
1283
1284         lib_finalize (&kqswnal_lib, NULL, msg,
1285                       (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
1286         kqswnal_put_idle_tx (ktx);
1287 }
1288
1289 void
1290 kqswnal_rpc_complete (EP_RXD *rxd)
1291 {
1292         int           status = ep_rxd_status(rxd);
1293         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1294         
1295         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1296                "rxd %p, krx %p, status %d\n", rxd, krx, status);
1297
1298         LASSERT (krx->krx_rxd == rxd);
1299         LASSERT (krx->krx_rpc_reply_needed);
1300         
1301         krx->krx_rpc_reply_needed = 0;
1302         kqswnal_requeue_rx (krx);
1303 }
1304
1305 void
1306 kqswnal_requeue_rx (kqswnal_rx_t *krx) 
1307 {
1308         int   rc;
1309
1310         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1311
1312         if (krx->krx_rpc_reply_needed) {
1313
1314                 /* We failed to complete the peer's optimized GET (e.g. we
1315                  * couldn't map the source buffers).  We complete the
1316                  * peer's EKC rpc now with failure. */
1317 #if MULTIRAIL_EKC
1318                 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
1319                                      &kqswnal_rpc_failed, NULL, NULL, 0);
1320                 if (rc == EP_SUCCESS)
1321                         return;
1322                 
1323                 CERROR("can't complete RPC: %d\n", rc);
1324 #else
1325                 if (krx->krx_rxd != NULL) {
1326                         /* We didn't try (and fail) to complete earlier... */
1327                         rc = ep_complete_rpc(krx->krx_rxd, 
1328                                              kqswnal_rpc_complete, krx,
1329                                              &kqswnal_rpc_failed, NULL, 0);
1330                         if (rc == EP_SUCCESS)
1331                                 return;
1332
1333                         CERROR("can't complete RPC: %d\n", rc);
1334                 }
1335                 
1336                 /* NB the old ep_complete_rpc() frees rxd on failure, so we
1337                  * have to requeue from scratch here, unless we're shutting
1338                  * down */
1339                 if (kqswnal_data.kqn_shuttingdown)
1340                         return;
1341
1342                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
1343                                       krx->krx_elanbuffer, 
1344                                       krx->krx_npages * PAGE_SIZE, 0);
1345                 LASSERT (rc == EP_SUCCESS);
1346                 /* We don't handle failure here; it's incredibly rare
1347                  * (never reported?) and only happens with "old" EKC */
1348                 return;
1349 #endif
1350         }
1351
1352 #if MULTIRAIL_EKC
1353         if (kqswnal_data.kqn_shuttingdown) {
1354                 /* free EKC rxd on shutdown */
1355                 ep_complete_receive(krx->krx_rxd);
1356         } else {
1357                 /* repost receive */
1358                 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1359                                    &krx->krx_elanbuffer, 0);
1360         }
1361 #else                
1362         /* don't actually requeue on shutdown */
1363         if (!kqswnal_data.kqn_shuttingdown) 
1364                 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1365                                    krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
1366 #endif
1367 }
1368         
1369 void
1370 kqswnal_rx (kqswnal_rx_t *krx)
1371 {
1372         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
1373         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
1374         int             payload_nob;
1375         int             nob;
1376         int             niov;
1377
1378         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1379
1380         if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
1381                 atomic_set(&krx->krx_refcount, 1);
1382                 lib_parse (&kqswnal_lib, hdr, krx);
1383                 kqswnal_rx_done(krx);
1384                 return;
1385         }
1386
1387 #if KQSW_CHECKSUM
1388         CERROR ("checksums for forwarded packets not implemented\n");
1389         LBUG ();
1390 #endif
1391         if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
1392         {
1393                 CERROR("dropping packet from "LPX64" for "LPX64
1394                        ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
1395
1396                 kqswnal_requeue_rx (krx);
1397                 return;
1398         }
1399
1400         nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
1401         niov = 0;
1402         if (nob > 0) {
1403                 krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
1404                 krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
1405                 niov = 1;
1406                 nob -= PAGE_SIZE - KQSW_HDR_SIZE;
1407                 
1408                 while (nob > 0) {
1409                         LASSERT (niov < krx->krx_npages);
1410                         
1411                         krx->krx_kiov[niov].kiov_offset = 0;
1412                         krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
1413                         niov++;
1414                         nob -= PAGE_SIZE;
1415                 }
1416         }
1417
1418         kpr_fwd_init (&krx->krx_fwd, dest_nid, 
1419                       hdr, payload_nob, niov, krx->krx_kiov,
1420                       kqswnal_fwd_callback, krx);
1421
1422         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1423 }
1424
1425 /* Receive Interrupt Handler: posts to schedulers */
1426 void 
1427 kqswnal_rxhandler(EP_RXD *rxd)
1428 {
1429         long          flags;
1430         int           nob    = ep_rxd_len (rxd);
1431         int           status = ep_rxd_status (rxd);
1432         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1433
1434         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1435                rxd, krx, nob, status);
1436
1437         LASSERT (krx != NULL);
1438
1439         krx->krx_rxd = rxd;
1440         krx->krx_nob = nob;
1441 #if MULTIRAIL_EKC
1442         krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
1443 #else
1444         krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
1445 #endif
1446         
1447         /* must receive a whole header to be able to parse */
1448         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1449         {
1450                 /* receives complete with failure when receiver is removed */
1451 #if MULTIRAIL_EKC
1452                 if (status == EP_SHUTDOWN)
1453                         LASSERT (kqswnal_data.kqn_shuttingdown);
1454                 else
1455                         CERROR("receive status failed with status %d nob %d\n",
1456                                ep_rxd_status(rxd), nob);
1457 #else
1458                 if (!kqswnal_data.kqn_shuttingdown)
1459                         CERROR("receive status failed with status %d nob %d\n",
1460                                ep_rxd_status(rxd), nob);
1461 #endif
1462                 kqswnal_requeue_rx (krx);
1463                 return;
1464         }
1465
1466         if (!in_interrupt()) {
1467                 kqswnal_rx (krx);
1468                 return;
1469         }
1470
1471         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1472
1473         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1474         wake_up (&kqswnal_data.kqn_sched_waitq);
1475
1476         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1477 }
1478
1479 #if KQSW_CHECKSUM
1480 void
1481 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1482 {
1483         ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1484
1485         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1486                 ", dpid %d, spid %d, type %d\n",
1487                 ishdr ? "Header" : "Payload", krx,
1488                 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
1489                 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
1490                 NTOH__u32(hdr->type));
1491
1492         switch (NTOH__u32 (hdr->type))
1493         {
1494         case PTL_MSG_ACK:
1495                 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1496                        " len %u\n",
1497                        NTOH__u32(hdr->msg.ack.mlength),
1498                        hdr->msg.ack.dst_wmd.handle_cookie,
1499                        hdr->msg.ack.dst_wmd.handle_idx,
1500                        NTOH__u64(hdr->msg.ack.match_bits),
1501                        NTOH__u32(hdr->msg.ack.length));
1502                 break;
1503         case PTL_MSG_PUT:
1504                 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1505                        " len %u off %u data "LPX64"\n",
1506                        NTOH__u32(hdr->msg.put.ptl_index),
1507                        hdr->msg.put.ack_wmd.handle_cookie,
1508                        hdr->msg.put.ack_wmd.handle_idx,
1509                        NTOH__u64(hdr->msg.put.match_bits),
1510                        NTOH__u32(hdr->msg.put.length),
1511                        NTOH__u32(hdr->msg.put.offset),
1512                        hdr->msg.put.hdr_data);
1513                 break;
1514         case PTL_MSG_GET:
1515                 CERROR ("GET: <>\n");
1516                 break;
1517         case PTL_MSG_REPLY:
1518                 CERROR ("REPLY: <>\n");
1519                 break;
1520         default:
1521                 CERROR ("TYPE?: <>\n");
1522         }
1523 }
1524 #endif
1525
1526 static ptl_err_t
1527 kqswnal_recvmsg (nal_cb_t     *nal,
1528                  void         *private,
1529                  lib_msg_t    *libmsg,
1530                  unsigned int  niov,
1531                  struct iovec *iov,
1532                  ptl_kiov_t   *kiov,
1533                  size_t        offset,
1534                  size_t        mlen,
1535                  size_t        rlen)
1536 {
1537         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1538         char         *buffer = page_address(krx->krx_kiov[0].kiov_page);
1539         int           page;
1540         char         *page_ptr;
1541         int           page_nob;
1542         char         *iov_ptr;
1543         int           iov_nob;
1544         int           frag;
1545 #if KQSW_CHECKSUM
1546         kqsw_csum_t   senders_csum;
1547         kqsw_csum_t   payload_csum = 0;
1548         kqsw_csum_t   hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
1549         size_t        csum_len = mlen;
1550         int           csum_frags = 0;
1551         int           csum_nob = 0;
1552         static atomic_t csum_counter;
1553         int           csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1554
1555         atomic_inc (&csum_counter);
1556
1557         memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1558         if (senders_csum != hdr_csum)
1559                 kqswnal_csum_error (krx, 1);
1560 #endif
1561         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1562
1563         /* What was actually received must be >= payload. */
1564         LASSERT (mlen <= rlen);
1565         if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
1566                 CERROR("Bad message size: have %d, need %d + %d\n",
1567                        krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
1568                 return (PTL_FAIL);
1569         }
1570
1571         /* It must be OK to kmap() if required */
1572         LASSERT (kiov == NULL || !in_interrupt ());
1573         /* Either all pages or all vaddrs */
1574         LASSERT (!(kiov != NULL && iov != NULL));
1575
1576         if (mlen != 0) {
1577                 page     = 0;
1578                 page_ptr = buffer + KQSW_HDR_SIZE;
1579                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1580
1581                 LASSERT (niov > 0);
1582
1583                 if (kiov != NULL) {
1584                         /* skip complete frags */
1585                         while (offset >= kiov->kiov_len) {
1586                                 offset -= kiov->kiov_len;
1587                                 kiov++;
1588                                 niov--;
1589                                 LASSERT (niov > 0);
1590                         }
1591                         iov_ptr = ((char *)kmap (kiov->kiov_page)) +
1592                                 kiov->kiov_offset + offset;
1593                         iov_nob = kiov->kiov_len - offset;
1594                 } else {
1595                         /* skip complete frags */
1596                         while (offset >= iov->iov_len) {
1597                                 offset -= iov->iov_len;
1598                                 iov++;
1599                                 niov--;
1600                                 LASSERT (niov > 0);
1601                         }
1602                         iov_ptr = iov->iov_base + offset;
1603                         iov_nob = iov->iov_len - offset;
1604                 }
1605                 
1606                 for (;;)
1607                 {
1608                         frag = mlen;
1609                         if (frag > page_nob)
1610                                 frag = page_nob;
1611                         if (frag > iov_nob)
1612                                 frag = iov_nob;
1613
1614                         memcpy (iov_ptr, page_ptr, frag);
1615 #if KQSW_CHECKSUM
1616                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1617                         csum_nob += frag;
1618                         csum_frags++;
1619 #endif
1620                         mlen -= frag;
1621                         if (mlen == 0)
1622                                 break;
1623
1624                         page_nob -= frag;
1625                         if (page_nob != 0)
1626                                 page_ptr += frag;
1627                         else
1628                         {
1629                                 page++;
1630                                 LASSERT (page < krx->krx_npages);
1631                                 page_ptr = page_address(krx->krx_kiov[page].kiov_page);
1632                                 page_nob = PAGE_SIZE;
1633                         }
1634
1635                         iov_nob -= frag;
1636                         if (iov_nob != 0)
1637                                 iov_ptr += frag;
1638                         else if (kiov != NULL) {
1639                                 kunmap (kiov->kiov_page);
1640                                 kiov++;
1641                                 niov--;
1642                                 LASSERT (niov > 0);
1643                                 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1644                                 iov_nob = kiov->kiov_len;
1645                         } else {
1646                                 iov++;
1647                                 niov--;
1648                                 LASSERT (niov > 0);
1649                                 iov_ptr = iov->iov_base;
1650                                 iov_nob = iov->iov_len;
1651                         }
1652                 }
1653
1654                 if (kiov != NULL)
1655                         kunmap (kiov->kiov_page);
1656         }
1657
1658 #if KQSW_CHECKSUM
1659         memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), 
1660                 sizeof(kqsw_csum_t));
1661
1662         if (csum_len != rlen)
1663                 CERROR("Unable to checksum data in user's buffer\n");
1664         else if (senders_csum != payload_csum)
1665                 kqswnal_csum_error (krx, 0);
1666
1667         if (csum_verbose)
1668                 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1669                        "csum_nob %d\n",
1670                         hdr_csum, payload_csum, csum_frags, csum_nob);
1671 #endif
1672         lib_finalize(nal, private, libmsg, PTL_OK);
1673
1674         return (PTL_OK);
1675 }
1676
1677 static ptl_err_t
1678 kqswnal_recv(nal_cb_t     *nal,
1679              void         *private,
1680              lib_msg_t    *libmsg,
1681              unsigned int  niov,
1682              struct iovec *iov,
1683              size_t        offset,
1684              size_t        mlen,
1685              size_t        rlen)
1686 {
1687         return (kqswnal_recvmsg(nal, private, libmsg, 
1688                                 niov, iov, NULL, 
1689                                 offset, mlen, rlen));
1690 }
1691
1692 static ptl_err_t
1693 kqswnal_recv_pages (nal_cb_t     *nal,
1694                     void         *private,
1695                     lib_msg_t    *libmsg,
1696                     unsigned int  niov,
1697                     ptl_kiov_t   *kiov,
1698                     size_t        offset,
1699                     size_t        mlen,
1700                     size_t        rlen)
1701 {
1702         return (kqswnal_recvmsg(nal, private, libmsg, 
1703                                 niov, NULL, kiov, 
1704                                 offset, mlen, rlen));
1705 }
1706
1707 int
1708 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1709 {
1710         long    pid = kernel_thread (fn, arg, 0);
1711
1712         if (pid < 0)
1713                 return ((int)pid);
1714
1715         atomic_inc (&kqswnal_data.kqn_nthreads);
1716         atomic_inc (&kqswnal_data.kqn_nthreads_running);
1717         return (0);
1718 }
1719
1720 void
1721 kqswnal_thread_fini (void)
1722 {
1723         atomic_dec (&kqswnal_data.kqn_nthreads);
1724 }
1725
1726 int
1727 kqswnal_scheduler (void *arg)
1728 {
1729         kqswnal_rx_t    *krx;
1730         kqswnal_tx_t    *ktx;
1731         kpr_fwd_desc_t  *fwd;
1732         long             flags;
1733         int              rc;
1734         int              counter = 0;
1735         int              shuttingdown = 0;
1736         int              did_something;
1737
1738         kportal_daemonize ("kqswnal_sched");
1739         kportal_blockallsigs ();
1740         
1741         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1742
1743         for (;;)
1744         {
1745                 if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
1746
1747                         if (kqswnal_data.kqn_shuttingdown == 2)
1748                                 break;
1749                 
1750                         /* During stage 1 of shutdown we are still responsive
1751                          * to receives */
1752
1753                         atomic_dec (&kqswnal_data.kqn_nthreads_running);
1754                         shuttingdown = kqswnal_data.kqn_shuttingdown;
1755                 }
1756
1757                 did_something = 0;
1758
1759                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1760                 {
1761                         krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1762                                          kqswnal_rx_t, krx_list);
1763                         list_del (&krx->krx_list);
1764                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1765                                                flags);
1766
1767                         kqswnal_rx (krx);
1768
1769                         did_something = 1;
1770                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1771                 }
1772
1773                 if (!shuttingdown &&
1774                     !list_empty (&kqswnal_data.kqn_delayedtxds))
1775                 {
1776                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1777                                          kqswnal_tx_t, ktx_list);
1778                         list_del_init (&ktx->ktx_delayed_list);
1779                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1780                                                flags);
1781
1782                         rc = kqswnal_launch (ktx);
1783                         if (rc != 0)          /* failed: ktx_nid down? */
1784                         {
1785                                 CERROR("Failed delayed transmit to "LPX64
1786                                        ": %d\n", ktx->ktx_nid, rc);
1787                                 kqswnal_tx_done (ktx, rc);
1788                         }
1789
1790                         did_something = 1;
1791                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1792                 }
1793
1794                 if (!shuttingdown &
1795                     !list_empty (&kqswnal_data.kqn_delayedfwds))
1796                 {
1797                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1798                         list_del (&fwd->kprfd_list);
1799                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1800
1801                         kqswnal_fwd_packet (NULL, fwd);
1802
1803                         did_something = 1;
1804                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1805                 }
1806
1807                     /* nothing to do or hogging CPU */
1808                 if (!did_something || counter++ == KQSW_RESCHED) {
1809                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1810                                                flags);
1811
1812                         counter = 0;
1813
1814                         if (!did_something) {
1815                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1816                                                                kqswnal_data.kqn_shuttingdown != shuttingdown ||
1817                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
1818                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1819                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
1820                                 LASSERT (rc == 0);
1821                         } else if (current->need_resched)
1822                                 schedule ();
1823
1824                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1825                 }
1826         }
1827
1828         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1829
1830         kqswnal_thread_fini ();
1831         return (0);
1832 }
1833
1834 nal_cb_t kqswnal_lib =
1835 {
1836         nal_data:       &kqswnal_data,         /* NAL private data */
1837         cb_send:        kqswnal_send,
1838         cb_send_pages:  kqswnal_send_pages,
1839         cb_recv:        kqswnal_recv,
1840         cb_recv_pages:  kqswnal_recv_pages,
1841         cb_read:        kqswnal_read,
1842         cb_write:       kqswnal_write,
1843         cb_malloc:      kqswnal_malloc,
1844         cb_free:        kqswnal_free,
1845         cb_printf:      kqswnal_printf,
1846         cb_cli:         kqswnal_cli,
1847         cb_sti:         kqswnal_sti,
1848         cb_dist:        kqswnal_dist
1849 };