Whamcloud - gitweb
577c578018f903ebb5ab9ec6689f812ee504030b
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8  * W. Marcus Miller - Based on ksocknal
9  *
10  * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  * Portals is free software; you can redistribute it and/or
13  * modify it under the terms of version 2 of the GNU General Public
14  * License as published by the Free Software Foundation.
15  *
16  * Portals is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with Portals; if not, write to the Free Software
23  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  */
26
27 #include "qswnal.h"
28
29 EP_STATUSBLK  kqswnal_rpc_success;
30 EP_STATUSBLK  kqswnal_rpc_failed;
31
32 /*
33  *  LIB functions follow
34  *
35  */
36 static ptl_err_t
37 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
38              size_t len)
39 {
40         CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
41                 nal->ni.nid, len, src_addr, dst_addr );
42         memcpy( dst_addr, src_addr, len );
43
44         return (PTL_OK);
45 }
46
47 static ptl_err_t
48 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
49               size_t len)
50 {
51         CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
52                 nal->ni.nid, len, src_addr, dst_addr );
53         memcpy( dst_addr, src_addr, len );
54
55         return (PTL_OK);
56 }
57
58 static void *
59 kqswnal_malloc(nal_cb_t *nal, size_t len)
60 {
61         void *buf;
62
63         PORTAL_ALLOC(buf, len);
64         return (buf);
65 }
66
67 static void
68 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
69 {
70         PORTAL_FREE(buf, len);
71 }
72
73 static void
74 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
75 {
76         va_list ap;
77         char msg[256];
78
79         va_start (ap, fmt);
80         vsnprintf (msg, sizeof (msg), fmt, ap);        /* sprint safely */
81         va_end (ap);
82
83         msg[sizeof (msg) - 1] = 0;                /* ensure terminated */
84
85         CDEBUG (D_NET, "%s", msg);
86 }
87
88 #if (defined(CONFIG_SPARC32) || defined(CONFIG_SPARC64))
89 # error "Can't save/restore irq contexts in different procedures"
90 #endif
91
92 static void
93 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
94 {
95         kqswnal_data_t *data= nal->nal_data;
96
97         spin_lock_irqsave(&data->kqn_statelock, *flags);
98 }
99
100
101 static void
102 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
103 {
104         kqswnal_data_t *data= nal->nal_data;
105
106         spin_unlock_irqrestore(&data->kqn_statelock, *flags);
107 }
108
109 static void
110 kqswnal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
111 {
112         /* holding kqn_statelock */
113
114         if (eq->event_callback != NULL)
115                 eq->event_callback(ev);
116
117         if (waitqueue_active(&kqswnal_data.kqn_yield_waitq))
118                 wake_up_all(&kqswnal_data.kqn_yield_waitq);
119 }
120
121 static int
122 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
123 {
124         if (nid == nal->ni.nid)
125                 *dist = 0;                      /* it's me */
126         else if (kqswnal_nid2elanid (nid) >= 0)
127                 *dist = 1;                      /* it's my peer */
128         else
129                 *dist = 2;                      /* via router */
130         return (0);
131 }
132
133 void
134 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
135 {
136         struct timeval     now;
137         time_t             then;
138
139         do_gettimeofday (&now);
140         then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
141
142         kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
143 }
144
145 void
146 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
147 {
148 #if MULTIRAIL_EKC
149         int      i;
150 #endif
151
152         if (ktx->ktx_nmappedpages == 0)
153                 return;
154         
155 #if MULTIRAIL_EKC
156         CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
157                ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
158
159         for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
160                 ep_dvma_unload(kqswnal_data.kqn_ep,
161                                kqswnal_data.kqn_ep_tx_nmh,
162                                &ktx->ktx_frags[i]);
163 #else
164         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
165                 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
166
167         LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
168         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
169                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
170
171         elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
172                           kqswnal_data.kqn_eptxdmahandle,
173                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
174 #endif
175         ktx->ktx_nmappedpages = 0;
176 }
177
178 int
179 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
180 {
181         int       nfrags    = ktx->ktx_nfrag;
182         int       nmapped   = ktx->ktx_nmappedpages;
183         int       maxmapped = ktx->ktx_npages;
184         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
185         char     *ptr;
186 #if MULTIRAIL_EKC
187         EP_RAILMASK railmask;
188         int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
189                                             EP_RAILMASK_ALL,
190                                             kqswnal_nid2elanid(ktx->ktx_nid));
191         
192         if (rail < 0) {
193                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
194                 return (-ENETDOWN);
195         }
196         railmask = 1 << rail;
197 #endif
198         LASSERT (nmapped <= maxmapped);
199         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
200         LASSERT (nfrags <= EP_MAXFRAG);
201         LASSERT (niov > 0);
202         LASSERT (nob > 0);
203
204         /* skip complete frags before 'offset' */
205         while (offset >= kiov->kiov_len) {
206                 offset -= kiov->kiov_len;
207                 kiov++;
208                 niov--;
209                 LASSERT (niov > 0);
210         }
211
212         do {
213                 int  fraglen = kiov->kiov_len - offset;
214
215                 /* nob exactly spans the iovs */
216                 LASSERT (fraglen <= nob);
217                 /* each frag fits in a page */
218                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
219
220                 nmapped++;
221                 if (nmapped > maxmapped) {
222                         CERROR("Can't map message in %d pages (max %d)\n",
223                                nmapped, maxmapped);
224                         return (-EMSGSIZE);
225                 }
226
227                 if (nfrags == EP_MAXFRAG) {
228                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
229                                EP_MAXFRAG);
230                         return (-EMSGSIZE);
231                 }
232
233                 /* XXX this is really crap, but we'll have to kmap until
234                  * EKC has a page (rather than vaddr) mapping interface */
235
236                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
237
238                 CDEBUG(D_NET,
239                        "%p[%d] loading %p for %d, page %d, %d total\n",
240                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
241
242 #if MULTIRAIL_EKC
243                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
244                              ptr, fraglen,
245                              kqswnal_data.kqn_ep_tx_nmh, basepage,
246                              &railmask, &ktx->ktx_frags[nfrags]);
247
248                 if (nfrags == ktx->ktx_firsttmpfrag ||
249                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
250                                   &ktx->ktx_frags[nfrags - 1],
251                                   &ktx->ktx_frags[nfrags])) {
252                         /* new frag if this is the first or can't merge */
253                         nfrags++;
254                 }
255 #else
256                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
257                                        kqswnal_data.kqn_eptxdmahandle,
258                                        ptr, fraglen,
259                                        basepage, &ktx->ktx_frags[nfrags].Base);
260
261                 if (nfrags > 0 &&                /* previous frag mapped */
262                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
263                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
264                         /* just extend previous */
265                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
266                 else {
267                         ktx->ktx_frags[nfrags].Len = fraglen;
268                         nfrags++;                /* new frag */
269                 }
270 #endif
271
272                 kunmap (kiov->kiov_page);
273                 
274                 /* keep in loop for failure case */
275                 ktx->ktx_nmappedpages = nmapped;
276
277                 basepage++;
278                 kiov++;
279                 niov--;
280                 nob -= fraglen;
281                 offset = 0;
282
283                 /* iov must not run out before end of data */
284                 LASSERT (nob == 0 || niov > 0);
285
286         } while (nob > 0);
287
288         ktx->ktx_nfrag = nfrags;
289         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
290                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
291
292         return (0);
293 }
294
295 int
296 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
297                     int niov, struct iovec *iov)
298 {
299         int       nfrags    = ktx->ktx_nfrag;
300         int       nmapped   = ktx->ktx_nmappedpages;
301         int       maxmapped = ktx->ktx_npages;
302         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
303 #if MULTIRAIL_EKC
304         EP_RAILMASK railmask;
305         int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
306                                             EP_RAILMASK_ALL,
307                                             kqswnal_nid2elanid(ktx->ktx_nid));
308         
309         if (rail < 0) {
310                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
311                 return (-ENETDOWN);
312         }
313         railmask = 1 << rail;
314 #endif
315         LASSERT (nmapped <= maxmapped);
316         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
317         LASSERT (nfrags <= EP_MAXFRAG);
318         LASSERT (niov > 0);
319         LASSERT (nob > 0);
320
321         /* skip complete frags before offset */
322         while (offset >= iov->iov_len) {
323                 offset -= iov->iov_len;
324                 iov++;
325                 niov--;
326                 LASSERT (niov > 0);
327         }
328         
329         do {
330                 int  fraglen = iov->iov_len - offset;
331                 long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
332
333                 /* nob exactly spans the iovs */
334                 LASSERT (fraglen <= nob);
335                 
336                 nmapped += npages;
337                 if (nmapped > maxmapped) {
338                         CERROR("Can't map message in %d pages (max %d)\n",
339                                nmapped, maxmapped);
340                         return (-EMSGSIZE);
341                 }
342
343                 if (nfrags == EP_MAXFRAG) {
344                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
345                                EP_MAXFRAG);
346                         return (-EMSGSIZE);
347                 }
348
349                 CDEBUG(D_NET,
350                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
351                        ktx, nfrags, iov->iov_base + offset, fraglen, 
352                        basepage, npages, nmapped);
353
354 #if MULTIRAIL_EKC
355                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
356                              iov->iov_base + offset, fraglen,
357                              kqswnal_data.kqn_ep_tx_nmh, basepage,
358                              &railmask, &ktx->ktx_frags[nfrags]);
359
360                 if (nfrags == ktx->ktx_firsttmpfrag ||
361                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
362                                   &ktx->ktx_frags[nfrags - 1],
363                                   &ktx->ktx_frags[nfrags])) {
364                         /* new frag if this is the first or can't merge */
365                         nfrags++;
366                 }
367 #else
368                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
369                                        kqswnal_data.kqn_eptxdmahandle,
370                                        iov->iov_base + offset, fraglen,
371                                        basepage, &ktx->ktx_frags[nfrags].Base);
372
373                 if (nfrags > 0 &&                /* previous frag mapped */
374                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
375                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
376                         /* just extend previous */
377                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
378                 else {
379                         ktx->ktx_frags[nfrags].Len = fraglen;
380                         nfrags++;                /* new frag */
381                 }
382 #endif
383
384                 /* keep in loop for failure case */
385                 ktx->ktx_nmappedpages = nmapped;
386
387                 basepage += npages;
388                 iov++;
389                 niov--;
390                 nob -= fraglen;
391                 offset = 0;
392
393                 /* iov must not run out before end of data */
394                 LASSERT (nob == 0 || niov > 0);
395
396         } while (nob > 0);
397
398         ktx->ktx_nfrag = nfrags;
399         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
400                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
401
402         return (0);
403 }
404
405
406 void
407 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
408 {
409         kpr_fwd_desc_t   *fwd = NULL;
410         unsigned long     flags;
411
412         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
413         ktx->ktx_state = KTX_IDLE;
414
415         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
416
417         list_del (&ktx->ktx_list);              /* take off active list */
418
419         if (ktx->ktx_isnblk) {
420                 /* reserved for non-blocking tx */
421                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
422                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
423                 return;
424         }
425
426         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
427
428         /* anything blocking for a tx descriptor? */
429         if (!kqswnal_data.kqn_shuttingdown &&
430             !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
431         {
432                 CDEBUG(D_NET,"wakeup fwd\n");
433
434                 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
435                                   kpr_fwd_desc_t, kprfd_list);
436                 list_del (&fwd->kprfd_list);
437         }
438
439         wake_up (&kqswnal_data.kqn_idletxd_waitq);
440
441         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
442
443         if (fwd == NULL)
444                 return;
445
446         /* schedule packet for forwarding again */
447         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
448
449         list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
450         wake_up (&kqswnal_data.kqn_sched_waitq);
451
452         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
453 }
454
455 kqswnal_tx_t *
456 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
457 {
458         unsigned long  flags;
459         kqswnal_tx_t  *ktx = NULL;
460
461         for (;;) {
462                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
463
464                 if (kqswnal_data.kqn_shuttingdown)
465                         break;
466
467                 /* "normal" descriptor is free */
468                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
469                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
470                                           kqswnal_tx_t, ktx_list);
471                         break;
472                 }
473
474                 if (fwd != NULL)                /* forwarded packet? */
475                         break;
476
477                 /* doing a local transmit */
478                 if (!may_block) {
479                         if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
480                                 CERROR ("intr tx desc pool exhausted\n");
481                                 break;
482                         }
483
484                         ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
485                                           kqswnal_tx_t, ktx_list);
486                         break;
487                 }
488
489                 /* block for idle tx */
490
491                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
492
493                 CDEBUG (D_NET, "blocking for tx desc\n");
494                 wait_event (kqswnal_data.kqn_idletxd_waitq,
495                             !list_empty (&kqswnal_data.kqn_idletxds) ||
496                             kqswnal_data.kqn_shuttingdown);
497         }
498
499         if (ktx != NULL) {
500                 list_del (&ktx->ktx_list);
501                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
502                 ktx->ktx_launcher = current->pid;
503                 atomic_inc(&kqswnal_data.kqn_pending_txs);
504         } else if (fwd != NULL) {
505                 /* queue forwarded packet until idle txd available */
506                 CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
507                 list_add_tail (&fwd->kprfd_list,
508                                &kqswnal_data.kqn_idletxd_fwdq);
509         }
510
511         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
512
513         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
514         LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
515
516         return (ktx);
517 }
518
519 void
520 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
521 {
522         lib_msg_t     *msg;
523         lib_msg_t     *repmsg = NULL;
524
525         switch (ktx->ktx_state) {
526         case KTX_FORWARDING:       /* router asked me to forward this packet */
527                 kpr_fwd_done (&kqswnal_data.kqn_router,
528                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
529                 break;
530
531         case KTX_SENDING:          /* packet sourced locally */
532                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
533                               (lib_msg_t *)ktx->ktx_args[1],
534                               (error == 0) ? PTL_OK : 
535                               (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
536                 break;
537
538         case KTX_GETTING:          /* Peer has DMA-ed direct? */
539                 msg = (lib_msg_t *)ktx->ktx_args[1];
540
541                 if (error == 0) {
542                         repmsg = lib_create_reply_msg (&kqswnal_lib, 
543                                                        ktx->ktx_nid, msg);
544                         if (repmsg == NULL)
545                                 error = -ENOMEM;
546                 }
547                 
548                 if (error == 0) {
549                         lib_finalize (&kqswnal_lib, ktx->ktx_args[0], 
550                                       msg, PTL_OK);
551                         lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
552                 } else {
553                         lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
554                                       (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
555                 }
556                 break;
557
558         default:
559                 LASSERT (0);
560         }
561
562         kqswnal_put_idle_tx (ktx);
563 }
564
565 static void
566 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
567 {
568         kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
569
570         LASSERT (txd != NULL);
571         LASSERT (ktx != NULL);
572
573         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
574
575         if (status != EP_SUCCESS) {
576
577                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
578                         ktx->ktx_nid, status);
579
580                 kqswnal_notify_peer_down(ktx);
581                 status = -EHOSTDOWN;
582
583         } else if (ktx->ktx_state == KTX_GETTING) {
584                 /* RPC completed OK; what did our peer put in the status
585                  * block? */
586 #if MULTIRAIL_EKC
587                 status = ep_txd_statusblk(txd)->Data[0];
588 #else
589                 status = ep_txd_statusblk(txd)->Status;
590 #endif
591         } else {
592                 status = 0;
593         }
594
595         kqswnal_tx_done (ktx, status);
596 }
597
598 int
599 kqswnal_launch (kqswnal_tx_t *ktx)
600 {
601         /* Don't block for transmit descriptor if we're in interrupt context */
602         int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
603         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
604         long  flags;
605         int   rc;
606
607         ktx->ktx_launchtime = jiffies;
608
609         if (kqswnal_data.kqn_shuttingdown)
610                 return (-ESHUTDOWN);
611
612         LASSERT (dest >= 0);                    /* must be a peer */
613         if (ktx->ktx_state == KTX_GETTING) {
614                 /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
615                  * other frags are the GET sink which we obviously don't
616                  * send here :) */
617 #if MULTIRAIL_EKC
618                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
619                                      ktx->ktx_port, attr,
620                                      kqswnal_txhandler, ktx,
621                                      NULL, ktx->ktx_frags, 1);
622 #else
623                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
624                                      ktx->ktx_port, attr, kqswnal_txhandler,
625                                      ktx, NULL, ktx->ktx_frags, 1);
626 #endif
627         } else {
628 #if MULTIRAIL_EKC
629                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
630                                          ktx->ktx_port, attr,
631                                          kqswnal_txhandler, ktx,
632                                          NULL, ktx->ktx_frags, ktx->ktx_nfrag);
633 #else
634                 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
635                                        ktx->ktx_port, attr, 
636                                        kqswnal_txhandler, ktx, 
637                                        ktx->ktx_frags, ktx->ktx_nfrag);
638 #endif
639         }
640
641         switch (rc) {
642         case EP_SUCCESS: /* success */
643                 return (0);
644
645         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
646                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
647
648                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
649                 wake_up (&kqswnal_data.kqn_sched_waitq);
650
651                 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
652                 return (0);
653
654         default: /* fatal error */
655                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
656                 kqswnal_notify_peer_down(ktx);
657                 return (-EHOSTUNREACH);
658         }
659 }
660
661 static char *
662 hdr_type_string (ptl_hdr_t *hdr)
663 {
664         switch (hdr->type) {
665         case PTL_MSG_ACK:
666                 return ("ACK");
667         case PTL_MSG_PUT:
668                 return ("PUT");
669         case PTL_MSG_GET:
670                 return ("GET");
671         case PTL_MSG_REPLY:
672                 return ("REPLY");
673         default:
674                 return ("<UNKNOWN>");
675         }
676 }
677
678 static void
679 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
680 {
681         char *type_str = hdr_type_string (hdr);
682
683         CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
684                NTOH__u32(hdr->payload_length));
685         CERROR("    From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
686                NTOH__u32(hdr->src_pid));
687         CERROR("    To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
688                NTOH__u32(hdr->dest_pid));
689
690         switch (NTOH__u32(hdr->type)) {
691         case PTL_MSG_PUT:
692                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
693                        "match bits "LPX64"\n",
694                        NTOH__u32 (hdr->msg.put.ptl_index),
695                        hdr->msg.put.ack_wmd.wh_interface_cookie,
696                        hdr->msg.put.ack_wmd.wh_object_cookie,
697                        NTOH__u64 (hdr->msg.put.match_bits));
698                 CERROR("    offset %d, hdr data "LPX64"\n",
699                        NTOH__u32(hdr->msg.put.offset),
700                        hdr->msg.put.hdr_data);
701                 break;
702
703         case PTL_MSG_GET:
704                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
705                        "match bits "LPX64"\n",
706                        NTOH__u32 (hdr->msg.get.ptl_index),
707                        hdr->msg.get.return_wmd.wh_interface_cookie,
708                        hdr->msg.get.return_wmd.wh_object_cookie,
709                        hdr->msg.get.match_bits);
710                 CERROR("    Length %d, src offset %d\n",
711                        NTOH__u32 (hdr->msg.get.sink_length),
712                        NTOH__u32 (hdr->msg.get.src_offset));
713                 break;
714
715         case PTL_MSG_ACK:
716                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
717                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
718                        hdr->msg.ack.dst_wmd.wh_object_cookie,
719                        NTOH__u32 (hdr->msg.ack.mlength));
720                 break;
721
722         case PTL_MSG_REPLY:
723                 CERROR("    dst md "LPX64"."LPX64"\n",
724                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
725                        hdr->msg.reply.dst_wmd.wh_object_cookie);
726         }
727
728 }                               /* end of print_hdr() */
729
730 #if !MULTIRAIL_EKC
731 void
732 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) 
733 {
734         int          i;
735
736         CDEBUG (how, "%s: %d\n", str, n);
737         for (i = 0; i < n; i++) {
738                 CDEBUG (how, "   %08x for %d\n", iov[i].Base, iov[i].Len);
739         }
740 }
741
742 int
743 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
744                      int nsrc, EP_IOVEC *src,
745                      int ndst, EP_IOVEC *dst) 
746 {
747         int        count;
748         int        nob;
749
750         LASSERT (ndv > 0);
751         LASSERT (nsrc > 0);
752         LASSERT (ndst > 0);
753
754         for (count = 0; count < ndv; count++, dv++) {
755
756                 if (nsrc == 0 || ndst == 0) {
757                         if (nsrc != ndst) {
758                                 /* For now I'll barf on any left over entries */
759                                 CERROR ("mismatched src and dst iovs\n");
760                                 return (-EINVAL);
761                         }
762                         return (count);
763                 }
764
765                 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
766                 dv->Len    = nob;
767                 dv->Source = src->Base;
768                 dv->Dest   = dst->Base;
769
770                 if (nob >= src->Len) {
771                         src++;
772                         nsrc--;
773                 } else {
774                         src->Len -= nob;
775                         src->Base += nob;
776                 }
777                 
778                 if (nob >= dst->Len) {
779                         dst++;
780                         ndst--;
781                 } else {
782                         src->Len -= nob;
783                         src->Base += nob;
784                 }
785         }
786
787         CERROR ("DATAVEC too small\n");
788         return (-E2BIG);
789 }
790 #endif
791
792 int
793 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
794                    struct iovec *iov, ptl_kiov_t *kiov, 
795                    int offset, int nob)
796 {
797         kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
798         char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
799         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
800         int                 rc;
801 #if MULTIRAIL_EKC
802         int                 i;
803 #else
804         EP_DATAVEC          datav[EP_MAXFRAG];
805         int                 ndatav;
806 #endif
807         LASSERT (krx->krx_rpc_reply_needed);
808         LASSERT ((iov == NULL) != (kiov == NULL));
809
810         /* see kqswnal_sendmsg comment regarding endian-ness */
811         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
812                 /* msg too small to discover rmd size */
813                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
814                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
815                 return (-EINVAL);
816         }
817         
818         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
819                 /* rmd doesn't fit in the incoming message */
820                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
821                         krx->krx_nob, rmd->kqrmd_nfrag,
822                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
823                 return (-EINVAL);
824         }
825
826         /* Map the source data... */
827         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
828         if (kiov != NULL)
829                 rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
830         else
831                 rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
832
833         if (rc != 0) {
834                 CERROR ("Can't map source data: %d\n", rc);
835                 return (rc);
836         }
837
838 #if MULTIRAIL_EKC
839         if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
840                 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
841                        ktx->ktx_nfrag, rmd->kqrmd_nfrag);
842                 return (-EINVAL);
843         }
844         
845         for (i = 0; i < rmd->kqrmd_nfrag; i++)
846                 if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
847                         CERROR("Can't cope with unequal frags %d(%d):"
848                                " %d local %d remote\n",
849                                i, rmd->kqrmd_nfrag, 
850                                ktx->ktx_frags[i].nmd_len, 
851                                rmd->kqrmd_frag[i].nmd_len);
852                         return (-EINVAL);
853                 }
854 #else
855         ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
856                                       ktx->ktx_nfrag, ktx->ktx_frags,
857                                       rmd->kqrmd_nfrag, rmd->kqrmd_frag);
858         if (ndatav < 0) {
859                 CERROR ("Can't create datavec: %d\n", ndatav);
860                 return (ndatav);
861         }
862 #endif
863
864         /* Our caller will start to race with kqswnal_dma_reply_complete... */
865         LASSERT (atomic_read (&krx->krx_refcount) == 1);
866         atomic_set (&krx->krx_refcount, 2);
867
868 #if MULTIRAIL_EKC
869         rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx, 
870                              &kqswnal_rpc_success,
871                              ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
872         if (rc == EP_SUCCESS)
873                 return (0);
874
875         /* Well we tried... */
876         krx->krx_rpc_reply_needed = 0;
877 #else
878         rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
879                               &kqswnal_rpc_success, datav, ndatav);
880         if (rc == EP_SUCCESS)
881                 return (0);
882
883         /* "old" EKC destroys rxd on failed completion */
884         krx->krx_rxd = NULL;
885 #endif
886
887         CERROR("can't complete RPC: %d\n", rc);
888
889         /* reset refcount back to 1: we're not going to be racing with
890          * kqswnal_dma_reply_complete. */
891         atomic_set (&krx->krx_refcount, 1);
892
893         return (-ECONNABORTED);
894 }
895
896 static ptl_err_t
897 kqswnal_sendmsg (nal_cb_t     *nal,
898                  void         *private,
899                  lib_msg_t    *libmsg,
900                  ptl_hdr_t    *hdr,
901                  int           type,
902                  ptl_nid_t     nid,
903                  ptl_pid_t     pid,
904                  unsigned int  payload_niov,
905                  struct iovec *payload_iov,
906                  ptl_kiov_t   *payload_kiov,
907                  size_t        payload_offset,
908                  size_t        payload_nob)
909 {
910         kqswnal_tx_t      *ktx;
911         int                rc;
912         ptl_nid_t          targetnid;
913 #if KQSW_CHECKSUM
914         int                i;
915         kqsw_csum_t        csum;
916         int                sumoff;
917         int                sumnob;
918 #endif
919         
920         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
921                " pid %u\n", payload_nob, payload_niov, nid, pid);
922
923         LASSERT (payload_nob == 0 || payload_niov > 0);
924         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
925
926         /* It must be OK to kmap() if required */
927         LASSERT (payload_kiov == NULL || !in_interrupt ());
928         /* payload is either all vaddrs or all pages */
929         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
930
931         if (payload_nob > KQSW_MAXPAYLOAD) {
932                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
933                         payload_nob, KQSW_MAXPAYLOAD);
934                 return (PTL_FAIL);
935         }
936
937         targetnid = nid;
938         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
939                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
940                                  sizeof (ptl_hdr_t) + payload_nob, &targetnid);
941                 if (rc != 0) {
942                         CERROR("Can't route to "LPX64": router error %d\n",
943                                nid, rc);
944                         return (PTL_FAIL);
945                 }
946                 if (kqswnal_nid2elanid (targetnid) < 0) {
947                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
948                                targetnid, nid);
949                         return (PTL_FAIL);
950                 }
951         }
952
953         /* I may not block for a transmit descriptor if I might block the
954          * receiver, or an interrupt handler. */
955         ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
956                                           type == PTL_MSG_REPLY ||
957                                           in_interrupt()));
958         if (ktx == NULL) {
959                 kqswnal_cerror_hdr (hdr);
960                 return (PTL_NO_SPACE);
961         }
962
963         ktx->ktx_nid     = targetnid;
964         ktx->ktx_args[0] = private;
965         ktx->ktx_args[1] = libmsg;
966
967         if (type == PTL_MSG_REPLY &&
968             ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
969                 if (nid != targetnid ||
970                     kqswnal_nid2elanid(nid) != 
971                     ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
972                         CERROR("Optimized reply nid conflict: "
973                                "nid "LPX64" via "LPX64" elanID %d\n",
974                                nid, targetnid,
975                                ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
976                         rc = -EINVAL;
977                         goto out;
978                 }
979
980                 /* peer expects RPC completion with GET data */
981                 rc = kqswnal_dma_reply (ktx, payload_niov, 
982                                         payload_iov, payload_kiov, 
983                                         payload_offset, payload_nob);
984                 if (rc != 0)
985                         CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
986                 goto out;
987         }
988
989         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
990         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
991
992 #if KQSW_CHECKSUM
993         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
994         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
995         for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
996                 LASSERT(i < niov);
997                 if (payload_kiov != NULL) {
998                         ptl_kiov_t *kiov = &payload_kiov[i];
999
1000                         if (sumoff >= kiov->kiov_len) {
1001                                 sumoff -= kiov->kiov_len;
1002                         } else {
1003                                 char *addr = ((char *)kmap (kiov->kiov_page)) +
1004                                              kiov->kiov_offset + sumoff;
1005                                 int   fragnob = kiov->kiov_len - sumoff;
1006
1007                                 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1008                                 sumnob -= fragnob;
1009                                 sumoff = 0;
1010                                 kunmap(kiov->kiov_page);
1011                         }
1012                 } else {
1013                         struct iovec *iov = &payload_iov[i];
1014
1015                         if (sumoff > iov->iov_len) {
1016                                 sumoff -= iov->iov_len;
1017                         } else {
1018                                 char *addr = iov->iov_base + sumoff;
1019                                 int   fragnob = iov->iov_len - sumoff;
1020                                 
1021                                 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1022                                 sumnob -= fragnob;
1023                                 sumoff = 0;
1024                         }
1025                 }
1026         }
1027         memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
1028 #endif
1029
1030         if (kqswnal_data.kqn_optimized_gets &&
1031             type == PTL_MSG_GET &&              /* doing a GET */
1032             nid == targetnid) {                 /* not forwarding */
1033                 lib_md_t           *md = libmsg->md;
1034                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
1035                 
1036                 /* Optimised path: I send over the Elan vaddrs of the get
1037                  * sink buffers, and my peer DMAs directly into them.
1038                  *
1039                  * First I set up ktx as if it was going to send this
1040                  * payload, (it needs to map it anyway).  This fills
1041                  * ktx_frags[1] and onward with the network addresses
1042                  * of the GET sink frags.  I copy these into ktx_buffer,
1043                  * immediately after the header, and send that as my GET
1044                  * message.
1045                  *
1046                  * Note that the addresses are sent in native endian-ness.
1047                  * When EKC copes with different endian nodes, I'll fix
1048                  * this (and eat my hat :) */
1049
1050                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1051                 ktx->ktx_state = KTX_GETTING;
1052
1053                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
1054                         rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
1055                                                   md->md_niov, md->md_iov.kiov);
1056                 else
1057                         rc = kqswnal_map_tx_iov (ktx, 0, md->length,
1058                                                  md->md_niov, md->md_iov.iov);
1059                 if (rc != 0)
1060                         goto out;
1061
1062                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1063
1064                 payload_nob = offsetof(kqswnal_remotemd_t,
1065                                        kqrmd_frag[rmd->kqrmd_nfrag]);
1066                 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1067
1068 #if MULTIRAIL_EKC
1069                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1070                        rmd->kqrmd_nfrag * sizeof(EP_NMD));
1071
1072                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1073                               0, KQSW_HDR_SIZE + payload_nob);
1074 #else
1075                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1076                        rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1077                 
1078                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1079                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1080 #endif
1081         } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1082
1083                 /* small message: single frag copied into the pre-mapped buffer */
1084
1085                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1086                 ktx->ktx_state = KTX_SENDING;
1087 #if MULTIRAIL_EKC
1088                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1089                               0, KQSW_HDR_SIZE + payload_nob);
1090 #else
1091                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1092                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1093 #endif
1094                 if (payload_nob > 0) {
1095                         if (payload_kiov != NULL)
1096                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1097                                                    payload_niov, payload_kiov, 
1098                                                    payload_offset, payload_nob);
1099                         else
1100                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1101                                                   payload_niov, payload_iov, 
1102                                                   payload_offset, payload_nob);
1103                 }
1104         } else {
1105
1106                 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1107
1108                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1109                 ktx->ktx_state = KTX_SENDING;
1110 #if MULTIRAIL_EKC
1111                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1112                               0, KQSW_HDR_SIZE);
1113 #else
1114                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1115                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1116 #endif
1117                 if (payload_kiov != NULL)
1118                         rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
1119                                                   payload_niov, payload_kiov);
1120                 else
1121                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1122                                                  payload_niov, payload_iov);
1123                 if (rc != 0)
1124                         goto out;
1125         }
1126         
1127         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1128                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1129
1130         rc = kqswnal_launch (ktx);
1131
1132  out:
1133         CDEBUG(rc == 0 ? D_NET : D_ERROR, 
1134                "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n", 
1135                rc == 0 ? "Sent" : "Failed to send",
1136                payload_nob, nid, targetnid, rc);
1137
1138         if (rc != 0)
1139                 kqswnal_put_idle_tx (ktx);
1140
1141         atomic_dec(&kqswnal_data.kqn_pending_txs);
1142         return (rc == 0 ? PTL_OK : PTL_FAIL);
1143 }
1144
1145 static ptl_err_t
1146 kqswnal_send (nal_cb_t     *nal,
1147               void         *private,
1148               lib_msg_t    *libmsg,
1149               ptl_hdr_t    *hdr,
1150               int           type,
1151               ptl_nid_t     nid,
1152               ptl_pid_t     pid,
1153               unsigned int  payload_niov,
1154               struct iovec *payload_iov,
1155               size_t        payload_offset,
1156               size_t        payload_nob)
1157 {
1158         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1159                                  payload_niov, payload_iov, NULL, 
1160                                  payload_offset, payload_nob));
1161 }
1162
1163 static ptl_err_t
1164 kqswnal_send_pages (nal_cb_t     *nal,
1165                     void         *private,
1166                     lib_msg_t    *libmsg,
1167                     ptl_hdr_t    *hdr,
1168                     int           type,
1169                     ptl_nid_t     nid,
1170                     ptl_pid_t     pid,
1171                     unsigned int  payload_niov,
1172                     ptl_kiov_t   *payload_kiov,
1173                     size_t        payload_offset,
1174                     size_t        payload_nob)
1175 {
1176         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1177                                  payload_niov, NULL, payload_kiov, 
1178                                  payload_offset, payload_nob));
1179 }
1180
1181 void
1182 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1183 {
1184         int             rc;
1185         kqswnal_tx_t   *ktx;
1186         ptl_kiov_t     *kiov = fwd->kprfd_kiov;
1187         int             niov = fwd->kprfd_niov;
1188         int             nob = fwd->kprfd_nob;
1189         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
1190
1191 #if KQSW_CHECKSUM
1192         CERROR ("checksums for forwarded packets not implemented\n");
1193         LBUG ();
1194 #endif
1195         /* The router wants this NAL to forward a packet */
1196         CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
1197                 fwd, nid, niov, nob);
1198
1199         ktx = kqswnal_get_idle_tx (fwd, 0);
1200         if (ktx == NULL)        /* can't get txd right now */
1201                 return;         /* fwd will be scheduled when tx desc freed */
1202
1203         if (nid == kqswnal_lib.ni.nid)          /* gateway is me */
1204                 nid = fwd->kprfd_target_nid;    /* target is final dest */
1205
1206         if (kqswnal_nid2elanid (nid) < 0) {
1207                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1208                 rc = -EHOSTUNREACH;
1209                 goto out;
1210         }
1211
1212         /* copy hdr into pre-mapped buffer */
1213         memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
1214         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
1215
1216         ktx->ktx_port    = (nob <= KQSW_SMALLPAYLOAD) ?
1217                            EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1218         ktx->ktx_nid     = nid;
1219         ktx->ktx_state   = KTX_FORWARDING;
1220         ktx->ktx_args[0] = fwd;
1221         ktx->ktx_nfrag   = ktx->ktx_firsttmpfrag = 1;
1222
1223         if (nob <= KQSW_TX_MAXCONTIG) 
1224         {
1225                 /* send payload from ktx's pre-mapped contiguous buffer */
1226 #if MULTIRAIL_EKC
1227                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1228                               0, KQSW_HDR_SIZE + nob);
1229 #else
1230                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1231                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
1232 #endif
1233                 if (nob > 0)
1234                         lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
1235                                           niov, kiov, 0, nob);
1236         }
1237         else
1238         {
1239                 /* zero copy payload */
1240 #if MULTIRAIL_EKC
1241                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1242                               0, KQSW_HDR_SIZE);
1243 #else
1244                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1245                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1246 #endif
1247                 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
1248                 if (rc != 0)
1249                         goto out;
1250         }
1251
1252         rc = kqswnal_launch (ktx);
1253  out:
1254         if (rc != 0) {
1255                 CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1256
1257                 kqswnal_put_idle_tx (ktx);
1258                 /* complete now (with failure) */
1259                 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
1260         }
1261
1262         atomic_dec(&kqswnal_data.kqn_pending_txs);
1263 }
1264
1265 void
1266 kqswnal_fwd_callback (void *arg, int error)
1267 {
1268         kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1269
1270         /* The router has finished forwarding this packet */
1271
1272         if (error != 0)
1273         {
1274                 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1275
1276                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1277                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
1278         }
1279
1280         kqswnal_requeue_rx (krx);
1281 }
1282
1283 void
1284 kqswnal_dma_reply_complete (EP_RXD *rxd) 
1285 {
1286         int           status = ep_rxd_status(rxd);
1287         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
1288         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
1289         lib_msg_t    *msg = (lib_msg_t *)ktx->ktx_args[1];
1290         
1291         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1292                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
1293
1294         LASSERT (krx->krx_rxd == rxd);
1295         LASSERT (krx->krx_rpc_reply_needed);
1296
1297         krx->krx_rpc_reply_needed = 0;
1298         kqswnal_rx_done (krx);
1299
1300         lib_finalize (&kqswnal_lib, NULL, msg,
1301                       (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
1302         kqswnal_put_idle_tx (ktx);
1303 }
1304
1305 void
1306 kqswnal_rpc_complete (EP_RXD *rxd)
1307 {
1308         int           status = ep_rxd_status(rxd);
1309         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1310         
1311         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1312                "rxd %p, krx %p, status %d\n", rxd, krx, status);
1313
1314         LASSERT (krx->krx_rxd == rxd);
1315         LASSERT (krx->krx_rpc_reply_needed);
1316         
1317         krx->krx_rpc_reply_needed = 0;
1318         kqswnal_requeue_rx (krx);
1319 }
1320
1321 void
1322 kqswnal_requeue_rx (kqswnal_rx_t *krx) 
1323 {
1324         int   rc;
1325
1326         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1327
1328         if (krx->krx_rpc_reply_needed) {
1329
1330                 /* We failed to complete the peer's optimized GET (e.g. we
1331                  * couldn't map the source buffers).  We complete the
1332                  * peer's EKC rpc now with failure. */
1333 #if MULTIRAIL_EKC
1334                 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
1335                                      &kqswnal_rpc_failed, NULL, NULL, 0);
1336                 if (rc == EP_SUCCESS)
1337                         return;
1338                 
1339                 CERROR("can't complete RPC: %d\n", rc);
1340 #else
1341                 if (krx->krx_rxd != NULL) {
1342                         /* We didn't try (and fail) to complete earlier... */
1343                         rc = ep_complete_rpc(krx->krx_rxd, 
1344                                              kqswnal_rpc_complete, krx,
1345                                              &kqswnal_rpc_failed, NULL, 0);
1346                         if (rc == EP_SUCCESS)
1347                                 return;
1348
1349                         CERROR("can't complete RPC: %d\n", rc);
1350                 }
1351                 
1352                 /* NB the old ep_complete_rpc() frees rxd on failure, so we
1353                  * have to requeue from scratch here, unless we're shutting
1354                  * down */
1355                 if (kqswnal_data.kqn_shuttingdown)
1356                         return;
1357
1358                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
1359                                       krx->krx_elanbuffer, 
1360                                       krx->krx_npages * PAGE_SIZE, 0);
1361                 LASSERT (rc == EP_SUCCESS);
1362                 /* We don't handle failure here; it's incredibly rare
1363                  * (never reported?) and only happens with "old" EKC */
1364                 return;
1365 #endif
1366         }
1367
1368 #if MULTIRAIL_EKC
1369         if (kqswnal_data.kqn_shuttingdown) {
1370                 /* free EKC rxd on shutdown */
1371                 ep_complete_receive(krx->krx_rxd);
1372         } else {
1373                 /* repost receive */
1374                 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1375                                    &krx->krx_elanbuffer, 0);
1376         }
1377 #else                
1378         /* don't actually requeue on shutdown */
1379         if (!kqswnal_data.kqn_shuttingdown) 
1380                 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1381                                    krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
1382 #endif
1383 }
1384         
1385 void
1386 kqswnal_rx (kqswnal_rx_t *krx)
1387 {
1388         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
1389         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
1390         int             payload_nob;
1391         int             nob;
1392         int             niov;
1393
1394         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1395
1396         if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
1397                 atomic_set(&krx->krx_refcount, 1);
1398                 lib_parse (&kqswnal_lib, hdr, krx);
1399                 kqswnal_rx_done(krx);
1400                 return;
1401         }
1402
1403 #if KQSW_CHECKSUM
1404         CERROR ("checksums for forwarded packets not implemented\n");
1405         LBUG ();
1406 #endif
1407         if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
1408         {
1409                 CERROR("dropping packet from "LPX64" for "LPX64
1410                        ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
1411
1412                 kqswnal_requeue_rx (krx);
1413                 return;
1414         }
1415
1416         nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
1417         niov = 0;
1418         if (nob > 0) {
1419                 krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
1420                 krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
1421                 niov = 1;
1422                 nob -= PAGE_SIZE - KQSW_HDR_SIZE;
1423                 
1424                 while (nob > 0) {
1425                         LASSERT (niov < krx->krx_npages);
1426                         
1427                         krx->krx_kiov[niov].kiov_offset = 0;
1428                         krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
1429                         niov++;
1430                         nob -= PAGE_SIZE;
1431                 }
1432         }
1433
1434         kpr_fwd_init (&krx->krx_fwd, dest_nid, 
1435                       hdr, payload_nob, niov, krx->krx_kiov,
1436                       kqswnal_fwd_callback, krx);
1437
1438         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1439 }
1440
1441 /* Receive Interrupt Handler: posts to schedulers */
1442 void 
1443 kqswnal_rxhandler(EP_RXD *rxd)
1444 {
1445         long          flags;
1446         int           nob    = ep_rxd_len (rxd);
1447         int           status = ep_rxd_status (rxd);
1448         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1449
1450         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1451                rxd, krx, nob, status);
1452
1453         LASSERT (krx != NULL);
1454
1455         krx->krx_rxd = rxd;
1456         krx->krx_nob = nob;
1457 #if MULTIRAIL_EKC
1458         krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
1459 #else
1460         krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
1461 #endif
1462         
1463         /* must receive a whole header to be able to parse */
1464         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1465         {
1466                 /* receives complete with failure when receiver is removed */
1467 #if MULTIRAIL_EKC
1468                 if (status == EP_SHUTDOWN)
1469                         LASSERT (kqswnal_data.kqn_shuttingdown);
1470                 else
1471                         CERROR("receive status failed with status %d nob %d\n",
1472                                ep_rxd_status(rxd), nob);
1473 #else
1474                 if (!kqswnal_data.kqn_shuttingdown)
1475                         CERROR("receive status failed with status %d nob %d\n",
1476                                ep_rxd_status(rxd), nob);
1477 #endif
1478                 kqswnal_requeue_rx (krx);
1479                 return;
1480         }
1481
1482         if (!in_interrupt()) {
1483                 kqswnal_rx (krx);
1484                 return;
1485         }
1486
1487         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1488
1489         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1490         wake_up (&kqswnal_data.kqn_sched_waitq);
1491
1492         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1493 }
1494
1495 #if KQSW_CHECKSUM
1496 void
1497 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1498 {
1499         ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1500
1501         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1502                 ", dpid %d, spid %d, type %d\n",
1503                 ishdr ? "Header" : "Payload", krx,
1504                 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
1505                 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
1506                 NTOH__u32(hdr->type));
1507
1508         switch (NTOH__u32 (hdr->type))
1509         {
1510         case PTL_MSG_ACK:
1511                 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1512                        " len %u\n",
1513                        NTOH__u32(hdr->msg.ack.mlength),
1514                        hdr->msg.ack.dst_wmd.handle_cookie,
1515                        hdr->msg.ack.dst_wmd.handle_idx,
1516                        NTOH__u64(hdr->msg.ack.match_bits),
1517                        NTOH__u32(hdr->msg.ack.length));
1518                 break;
1519         case PTL_MSG_PUT:
1520                 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1521                        " len %u off %u data "LPX64"\n",
1522                        NTOH__u32(hdr->msg.put.ptl_index),
1523                        hdr->msg.put.ack_wmd.handle_cookie,
1524                        hdr->msg.put.ack_wmd.handle_idx,
1525                        NTOH__u64(hdr->msg.put.match_bits),
1526                        NTOH__u32(hdr->msg.put.length),
1527                        NTOH__u32(hdr->msg.put.offset),
1528                        hdr->msg.put.hdr_data);
1529                 break;
1530         case PTL_MSG_GET:
1531                 CERROR ("GET: <>\n");
1532                 break;
1533         case PTL_MSG_REPLY:
1534                 CERROR ("REPLY: <>\n");
1535                 break;
1536         default:
1537                 CERROR ("TYPE?: <>\n");
1538         }
1539 }
1540 #endif
1541
1542 static ptl_err_t
1543 kqswnal_recvmsg (nal_cb_t     *nal,
1544                  void         *private,
1545                  lib_msg_t    *libmsg,
1546                  unsigned int  niov,
1547                  struct iovec *iov,
1548                  ptl_kiov_t   *kiov,
1549                  size_t        offset,
1550                  size_t        mlen,
1551                  size_t        rlen)
1552 {
1553         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1554         char         *buffer = page_address(krx->krx_kiov[0].kiov_page);
1555         int           page;
1556         char         *page_ptr;
1557         int           page_nob;
1558         char         *iov_ptr;
1559         int           iov_nob;
1560         int           frag;
1561 #if KQSW_CHECKSUM
1562         kqsw_csum_t   senders_csum;
1563         kqsw_csum_t   payload_csum = 0;
1564         kqsw_csum_t   hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
1565         size_t        csum_len = mlen;
1566         int           csum_frags = 0;
1567         int           csum_nob = 0;
1568         static atomic_t csum_counter;
1569         int           csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1570
1571         atomic_inc (&csum_counter);
1572
1573         memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1574         if (senders_csum != hdr_csum)
1575                 kqswnal_csum_error (krx, 1);
1576 #endif
1577         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1578
1579         /* What was actually received must be >= payload. */
1580         LASSERT (mlen <= rlen);
1581         if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
1582                 CERROR("Bad message size: have %d, need %d + %d\n",
1583                        krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
1584                 return (PTL_FAIL);
1585         }
1586
1587         /* It must be OK to kmap() if required */
1588         LASSERT (kiov == NULL || !in_interrupt ());
1589         /* Either all pages or all vaddrs */
1590         LASSERT (!(kiov != NULL && iov != NULL));
1591
1592         if (mlen != 0) {
1593                 page     = 0;
1594                 page_ptr = buffer + KQSW_HDR_SIZE;
1595                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1596
1597                 LASSERT (niov > 0);
1598
1599                 if (kiov != NULL) {
1600                         /* skip complete frags */
1601                         while (offset >= kiov->kiov_len) {
1602                                 offset -= kiov->kiov_len;
1603                                 kiov++;
1604                                 niov--;
1605                                 LASSERT (niov > 0);
1606                         }
1607                         iov_ptr = ((char *)kmap (kiov->kiov_page)) +
1608                                 kiov->kiov_offset + offset;
1609                         iov_nob = kiov->kiov_len - offset;
1610                 } else {
1611                         /* skip complete frags */
1612                         while (offset >= iov->iov_len) {
1613                                 offset -= iov->iov_len;
1614                                 iov++;
1615                                 niov--;
1616                                 LASSERT (niov > 0);
1617                         }
1618                         iov_ptr = iov->iov_base + offset;
1619                         iov_nob = iov->iov_len - offset;
1620                 }
1621                 
1622                 for (;;)
1623                 {
1624                         frag = mlen;
1625                         if (frag > page_nob)
1626                                 frag = page_nob;
1627                         if (frag > iov_nob)
1628                                 frag = iov_nob;
1629
1630                         memcpy (iov_ptr, page_ptr, frag);
1631 #if KQSW_CHECKSUM
1632                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1633                         csum_nob += frag;
1634                         csum_frags++;
1635 #endif
1636                         mlen -= frag;
1637                         if (mlen == 0)
1638                                 break;
1639
1640                         page_nob -= frag;
1641                         if (page_nob != 0)
1642                                 page_ptr += frag;
1643                         else
1644                         {
1645                                 page++;
1646                                 LASSERT (page < krx->krx_npages);
1647                                 page_ptr = page_address(krx->krx_kiov[page].kiov_page);
1648                                 page_nob = PAGE_SIZE;
1649                         }
1650
1651                         iov_nob -= frag;
1652                         if (iov_nob != 0)
1653                                 iov_ptr += frag;
1654                         else if (kiov != NULL) {
1655                                 kunmap (kiov->kiov_page);
1656                                 kiov++;
1657                                 niov--;
1658                                 LASSERT (niov > 0);
1659                                 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1660                                 iov_nob = kiov->kiov_len;
1661                         } else {
1662                                 iov++;
1663                                 niov--;
1664                                 LASSERT (niov > 0);
1665                                 iov_ptr = iov->iov_base;
1666                                 iov_nob = iov->iov_len;
1667                         }
1668                 }
1669
1670                 if (kiov != NULL)
1671                         kunmap (kiov->kiov_page);
1672         }
1673
1674 #if KQSW_CHECKSUM
1675         memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), 
1676                 sizeof(kqsw_csum_t));
1677
1678         if (csum_len != rlen)
1679                 CERROR("Unable to checksum data in user's buffer\n");
1680         else if (senders_csum != payload_csum)
1681                 kqswnal_csum_error (krx, 0);
1682
1683         if (csum_verbose)
1684                 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1685                        "csum_nob %d\n",
1686                         hdr_csum, payload_csum, csum_frags, csum_nob);
1687 #endif
1688         lib_finalize(nal, private, libmsg, PTL_OK);
1689
1690         return (PTL_OK);
1691 }
1692
1693 static ptl_err_t
1694 kqswnal_recv(nal_cb_t     *nal,
1695              void         *private,
1696              lib_msg_t    *libmsg,
1697              unsigned int  niov,
1698              struct iovec *iov,
1699              size_t        offset,
1700              size_t        mlen,
1701              size_t        rlen)
1702 {
1703         return (kqswnal_recvmsg(nal, private, libmsg, 
1704                                 niov, iov, NULL, 
1705                                 offset, mlen, rlen));
1706 }
1707
1708 static ptl_err_t
1709 kqswnal_recv_pages (nal_cb_t     *nal,
1710                     void         *private,
1711                     lib_msg_t    *libmsg,
1712                     unsigned int  niov,
1713                     ptl_kiov_t   *kiov,
1714                     size_t        offset,
1715                     size_t        mlen,
1716                     size_t        rlen)
1717 {
1718         return (kqswnal_recvmsg(nal, private, libmsg, 
1719                                 niov, NULL, kiov, 
1720                                 offset, mlen, rlen));
1721 }
1722
1723 int
1724 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1725 {
1726         long    pid = kernel_thread (fn, arg, 0);
1727
1728         if (pid < 0)
1729                 return ((int)pid);
1730
1731         atomic_inc (&kqswnal_data.kqn_nthreads);
1732         return (0);
1733 }
1734
1735 void
1736 kqswnal_thread_fini (void)
1737 {
1738         atomic_dec (&kqswnal_data.kqn_nthreads);
1739 }
1740
1741 int
1742 kqswnal_scheduler (void *arg)
1743 {
1744         kqswnal_rx_t    *krx;
1745         kqswnal_tx_t    *ktx;
1746         kpr_fwd_desc_t  *fwd;
1747         long             flags;
1748         int              rc;
1749         int              counter = 0;
1750         int              did_something;
1751
1752         kportal_daemonize ("kqswnal_sched");
1753         kportal_blockallsigs ();
1754         
1755         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1756
1757         for (;;)
1758         {
1759                 did_something = 0;
1760
1761                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1762                 {
1763                         krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1764                                          kqswnal_rx_t, krx_list);
1765                         list_del (&krx->krx_list);
1766                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1767                                                flags);
1768
1769                         kqswnal_rx (krx);
1770
1771                         did_something = 1;
1772                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1773                 }
1774
1775                 if (!list_empty (&kqswnal_data.kqn_delayedtxds))
1776                 {
1777                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1778                                          kqswnal_tx_t, ktx_list);
1779                         list_del_init (&ktx->ktx_delayed_list);
1780                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1781                                                flags);
1782
1783                         rc = kqswnal_launch (ktx);
1784                         if (rc != 0) {
1785                                 CERROR("Failed delayed transmit to "LPX64
1786                                        ": %d\n", ktx->ktx_nid, rc);
1787                                 kqswnal_tx_done (ktx, rc);
1788                         }
1789                         atomic_dec (&kqswnal_data.kqn_pending_txs);
1790
1791                         did_something = 1;
1792                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1793                 }
1794
1795                 if (!list_empty (&kqswnal_data.kqn_delayedfwds))
1796                 {
1797                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1798                         list_del (&fwd->kprfd_list);
1799                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1800
1801                         /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */
1802                         kqswnal_fwd_packet (NULL, fwd);
1803
1804                         did_something = 1;
1805                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1806                 }
1807
1808                 /* nothing to do or hogging CPU */
1809                 if (!did_something || counter++ == KQSW_RESCHED) {
1810                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1811                                                flags);
1812
1813                         counter = 0;
1814
1815                         if (!did_something) {
1816                                 if (kqswnal_data.kqn_shuttingdown == 2) {
1817                                         /* We only exit in stage 2 of shutdown when 
1818                                          * there's nothing left to do */
1819                                         break;
1820                                 }
1821                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1822                                                                kqswnal_data.kqn_shuttingdown == 2 ||
1823                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
1824                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1825                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
1826                                 LASSERT (rc == 0);
1827                         } else if (current->need_resched)
1828                                 schedule ();
1829
1830                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1831                 }
1832         }
1833
1834         kqswnal_thread_fini ();
1835         return (0);
1836 }
1837
1838 nal_cb_t kqswnal_lib =
1839 {
1840         nal_data:       &kqswnal_data,         /* NAL private data */
1841         cb_send:        kqswnal_send,
1842         cb_send_pages:  kqswnal_send_pages,
1843         cb_recv:        kqswnal_recv,
1844         cb_recv_pages:  kqswnal_recv_pages,
1845         cb_read:        kqswnal_read,
1846         cb_write:       kqswnal_write,
1847         cb_malloc:      kqswnal_malloc,
1848         cb_free:        kqswnal_free,
1849         cb_printf:      kqswnal_printf,
1850         cb_cli:         kqswnal_cli,
1851         cb_sti:         kqswnal_sti,
1852         cb_callback:    kqswnal_callback,
1853         cb_dist:        kqswnal_dist
1854 };