Whamcloud - gitweb
61c88f6de402dd3cb0bdc34538d980a1f4e83d75
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8  * W. Marcus Miller - Based on ksocknal
9  *
10  * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  * Portals is free software; you can redistribute it and/or
13  * modify it under the terms of version 2 of the GNU General Public
14  * License as published by the Free Software Foundation.
15  *
16  * Portals is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with Portals; if not, write to the Free Software
23  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  */
26
27 #include "qswnal.h"
28
29 EP_STATUSBLK  kqswnal_rpc_success;
30 EP_STATUSBLK  kqswnal_rpc_failed;
31
32 /*
33  *  LIB functions follow
34  *
35  */
36 static ptl_err_t
37 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
38              size_t len)
39 {
40         CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
41                 nal->ni.nid, len, src_addr, dst_addr );
42         memcpy( dst_addr, src_addr, len );
43
44         return (PTL_OK);
45 }
46
47 static ptl_err_t
48 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
49               size_t len)
50 {
51         CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
52                 nal->ni.nid, len, src_addr, dst_addr );
53         memcpy( dst_addr, src_addr, len );
54
55         return (PTL_OK);
56 }
57
58 static void *
59 kqswnal_malloc(nal_cb_t *nal, size_t len)
60 {
61         void *buf;
62
63         PORTAL_ALLOC(buf, len);
64         return (buf);
65 }
66
67 static void
68 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
69 {
70         PORTAL_FREE(buf, len);
71 }
72
73 static void
74 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
75 {
76         va_list ap;
77         char msg[256];
78
79         va_start (ap, fmt);
80         vsnprintf (msg, sizeof (msg), fmt, ap);        /* sprint safely */
81         va_end (ap);
82
83         msg[sizeof (msg) - 1] = 0;                /* ensure terminated */
84
85         CDEBUG (D_NET, "%s", msg);
86 }
87
88 #if (defined(CONFIG_SPARC32) || defined(CONFIG_SPARC64))
89 # error "Can't save/restore irq contexts in different procedures"
90 #endif
91
92 static void
93 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
94 {
95         kqswnal_data_t *data= nal->nal_data;
96
97         spin_lock_irqsave(&data->kqn_statelock, *flags);
98 }
99
100
101 static void
102 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
103 {
104         kqswnal_data_t *data= nal->nal_data;
105
106         spin_unlock_irqrestore(&data->kqn_statelock, *flags);
107 }
108
109 static void
110 kqswnal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
111 {
112         /* holding kqn_statelock */
113
114         if (eq->event_callback != NULL)
115                 eq->event_callback(ev);
116
117         if (waitqueue_active(&kqswnal_data.kqn_yield_waitq))
118                 wake_up_all(&kqswnal_data.kqn_yield_waitq);
119 }
120
121 static int
122 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
123 {
124         if (nid == nal->ni.nid)
125                 *dist = 0;                      /* it's me */
126         else if (kqswnal_nid2elanid (nid) >= 0)
127                 *dist = 1;                      /* it's my peer */
128         else
129                 *dist = 2;                      /* via router */
130         return (0);
131 }
132
133 void
134 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
135 {
136         struct timeval     now;
137         time_t             then;
138
139         do_gettimeofday (&now);
140         then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
141
142         kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
143 }
144
145 void
146 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
147 {
148 #if MULTIRAIL_EKC
149         int      i;
150 #endif
151
152         if (ktx->ktx_nmappedpages == 0)
153                 return;
154         
155 #if MULTIRAIL_EKC
156         CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
157                ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
158
159         for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
160                 ep_dvma_unload(kqswnal_data.kqn_ep,
161                                kqswnal_data.kqn_ep_tx_nmh,
162                                &ktx->ktx_frags[i]);
163 #else
164         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
165                 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
166
167         LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
168         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
169                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
170
171         elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
172                           kqswnal_data.kqn_eptxdmahandle,
173                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
174 #endif
175         ktx->ktx_nmappedpages = 0;
176 }
177
178 int
179 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
180 {
181         int       nfrags    = ktx->ktx_nfrag;
182         int       nmapped   = ktx->ktx_nmappedpages;
183         int       maxmapped = ktx->ktx_npages;
184         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
185         char     *ptr;
186 #if MULTIRAIL_EKC
187         EP_RAILMASK railmask;
188         int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
189                                             EP_RAILMASK_ALL,
190                                             kqswnal_nid2elanid(ktx->ktx_nid));
191         
192         if (rail < 0) {
193                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
194                 return (-ENETDOWN);
195         }
196         railmask = 1 << rail;
197 #endif
198         LASSERT (nmapped <= maxmapped);
199         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
200         LASSERT (nfrags <= EP_MAXFRAG);
201         LASSERT (niov > 0);
202         LASSERT (nob > 0);
203
204         /* skip complete frags before 'offset' */
205         while (offset >= kiov->kiov_len) {
206                 offset -= kiov->kiov_len;
207                 kiov++;
208                 niov--;
209                 LASSERT (niov > 0);
210         }
211
212         do {
213                 int  fraglen = kiov->kiov_len - offset;
214
215                 /* nob exactly spans the iovs */
216                 LASSERT (fraglen <= nob);
217                 /* each frag fits in a page */
218                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
219
220                 nmapped++;
221                 if (nmapped > maxmapped) {
222                         CERROR("Can't map message in %d pages (max %d)\n",
223                                nmapped, maxmapped);
224                         return (-EMSGSIZE);
225                 }
226
227                 if (nfrags == EP_MAXFRAG) {
228                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
229                                EP_MAXFRAG);
230                         return (-EMSGSIZE);
231                 }
232
233                 /* XXX this is really crap, but we'll have to kmap until
234                  * EKC has a page (rather than vaddr) mapping interface */
235
236                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
237
238                 CDEBUG(D_NET,
239                        "%p[%d] loading %p for %d, page %d, %d total\n",
240                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
241
242 #if MULTIRAIL_EKC
243                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
244                              ptr, fraglen,
245                              kqswnal_data.kqn_ep_tx_nmh, basepage,
246                              &railmask, &ktx->ktx_frags[nfrags]);
247
248                 if (nfrags == ktx->ktx_firsttmpfrag ||
249                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
250                                   &ktx->ktx_frags[nfrags - 1],
251                                   &ktx->ktx_frags[nfrags])) {
252                         /* new frag if this is the first or can't merge */
253                         nfrags++;
254                 }
255 #else
256                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
257                                        kqswnal_data.kqn_eptxdmahandle,
258                                        ptr, fraglen,
259                                        basepage, &ktx->ktx_frags[nfrags].Base);
260
261                 if (nfrags > 0 &&                /* previous frag mapped */
262                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
263                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
264                         /* just extend previous */
265                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
266                 else {
267                         ktx->ktx_frags[nfrags].Len = fraglen;
268                         nfrags++;                /* new frag */
269                 }
270 #endif
271
272                 kunmap (kiov->kiov_page);
273                 
274                 /* keep in loop for failure case */
275                 ktx->ktx_nmappedpages = nmapped;
276
277                 basepage++;
278                 kiov++;
279                 niov--;
280                 nob -= fraglen;
281                 offset = 0;
282
283                 /* iov must not run out before end of data */
284                 LASSERT (nob == 0 || niov > 0);
285
286         } while (nob > 0);
287
288         ktx->ktx_nfrag = nfrags;
289         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
290                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
291
292         return (0);
293 }
294
295 int
296 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
297                     int niov, struct iovec *iov)
298 {
299         int       nfrags    = ktx->ktx_nfrag;
300         int       nmapped   = ktx->ktx_nmappedpages;
301         int       maxmapped = ktx->ktx_npages;
302         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
303 #if MULTIRAIL_EKC
304         EP_RAILMASK railmask;
305         int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
306                                             EP_RAILMASK_ALL,
307                                             kqswnal_nid2elanid(ktx->ktx_nid));
308         
309         if (rail < 0) {
310                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
311                 return (-ENETDOWN);
312         }
313         railmask = 1 << rail;
314 #endif
315         LASSERT (nmapped <= maxmapped);
316         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
317         LASSERT (nfrags <= EP_MAXFRAG);
318         LASSERT (niov > 0);
319         LASSERT (nob > 0);
320
321         /* skip complete frags before offset */
322         while (offset >= iov->iov_len) {
323                 offset -= iov->iov_len;
324                 iov++;
325                 niov--;
326                 LASSERT (niov > 0);
327         }
328         
329         do {
330                 int  fraglen = iov->iov_len - offset;
331                 long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
332
333                 /* nob exactly spans the iovs */
334                 LASSERT (fraglen <= nob);
335                 
336                 nmapped += npages;
337                 if (nmapped > maxmapped) {
338                         CERROR("Can't map message in %d pages (max %d)\n",
339                                nmapped, maxmapped);
340                         return (-EMSGSIZE);
341                 }
342
343                 if (nfrags == EP_MAXFRAG) {
344                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
345                                EP_MAXFRAG);
346                         return (-EMSGSIZE);
347                 }
348
349                 CDEBUG(D_NET,
350                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
351                        ktx, nfrags, iov->iov_base + offset, fraglen, 
352                        basepage, npages, nmapped);
353
354 #if MULTIRAIL_EKC
355                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
356                              iov->iov_base + offset, fraglen,
357                              kqswnal_data.kqn_ep_tx_nmh, basepage,
358                              &railmask, &ktx->ktx_frags[nfrags]);
359
360                 if (nfrags == ktx->ktx_firsttmpfrag ||
361                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
362                                   &ktx->ktx_frags[nfrags - 1],
363                                   &ktx->ktx_frags[nfrags])) {
364                         /* new frag if this is the first or can't merge */
365                         nfrags++;
366                 }
367 #else
368                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
369                                        kqswnal_data.kqn_eptxdmahandle,
370                                        iov->iov_base + offset, fraglen,
371                                        basepage, &ktx->ktx_frags[nfrags].Base);
372
373                 if (nfrags > 0 &&                /* previous frag mapped */
374                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
375                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
376                         /* just extend previous */
377                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
378                 else {
379                         ktx->ktx_frags[nfrags].Len = fraglen;
380                         nfrags++;                /* new frag */
381                 }
382 #endif
383
384                 /* keep in loop for failure case */
385                 ktx->ktx_nmappedpages = nmapped;
386
387                 basepage += npages;
388                 iov++;
389                 niov--;
390                 nob -= fraglen;
391                 offset = 0;
392
393                 /* iov must not run out before end of data */
394                 LASSERT (nob == 0 || niov > 0);
395
396         } while (nob > 0);
397
398         ktx->ktx_nfrag = nfrags;
399         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
400                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
401
402         return (0);
403 }
404
405
406 void
407 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
408 {
409         kpr_fwd_desc_t   *fwd = NULL;
410         unsigned long     flags;
411
412         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
413         ktx->ktx_state = KTX_IDLE;
414
415         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
416
417         list_del (&ktx->ktx_list);              /* take off active list */
418
419         if (ktx->ktx_isnblk) {
420                 /* reserved for non-blocking tx */
421                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
422                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
423                 return;
424         }
425
426         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
427
428         /* anything blocking for a tx descriptor? */
429         if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
430         {
431                 CDEBUG(D_NET,"wakeup fwd\n");
432
433                 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
434                                   kpr_fwd_desc_t, kprfd_list);
435                 list_del (&fwd->kprfd_list);
436         }
437
438         wake_up (&kqswnal_data.kqn_idletxd_waitq);
439
440         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
441
442         if (fwd == NULL)
443                 return;
444
445         /* schedule packet for forwarding again */
446         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
447
448         list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
449         wake_up (&kqswnal_data.kqn_sched_waitq);
450
451         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
452 }
453
454 kqswnal_tx_t *
455 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
456 {
457         unsigned long  flags;
458         kqswnal_tx_t  *ktx = NULL;
459
460         for (;;) {
461                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
462
463                 /* "normal" descriptor is free */
464                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
465                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
466                                           kqswnal_tx_t, ktx_list);
467                         break;
468                 }
469
470                 /* "normal" descriptor pool is empty */
471
472                 if (fwd != NULL) { /* forwarded packet => queue for idle txd */
473                         CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
474                         list_add_tail (&fwd->kprfd_list,
475                                        &kqswnal_data.kqn_idletxd_fwdq);
476                         break;
477                 }
478
479                 /* doing a local transmit */
480                 if (!may_block) {
481                         if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
482                                 CERROR ("intr tx desc pool exhausted\n");
483                                 break;
484                         }
485
486                         ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
487                                           kqswnal_tx_t, ktx_list);
488                         break;
489                 }
490
491                 /* block for idle tx */
492
493                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
494
495                 CDEBUG (D_NET, "blocking for tx desc\n");
496                 wait_event (kqswnal_data.kqn_idletxd_waitq,
497                             !list_empty (&kqswnal_data.kqn_idletxds));
498         }
499
500         if (ktx != NULL) {
501                 list_del (&ktx->ktx_list);
502                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
503                 ktx->ktx_launcher = current->pid;
504         }
505
506         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
507
508         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
509         LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
510
511         return (ktx);
512 }
513
514 void
515 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
516 {
517         lib_msg_t     *msg;
518         lib_msg_t     *repmsg = NULL;
519
520         switch (ktx->ktx_state) {
521         case KTX_FORWARDING:       /* router asked me to forward this packet */
522                 kpr_fwd_done (&kqswnal_data.kqn_router,
523                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
524                 break;
525
526         case KTX_SENDING:          /* packet sourced locally */
527                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
528                               (lib_msg_t *)ktx->ktx_args[1],
529                               (error == 0) ? PTL_OK : 
530                               (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
531                 break;
532
533         case KTX_GETTING:          /* Peer has DMA-ed direct? */
534                 msg = (lib_msg_t *)ktx->ktx_args[1];
535
536                 if (error == 0) {
537                         repmsg = lib_create_reply_msg (&kqswnal_lib, 
538                                                        ktx->ktx_nid, msg);
539                         if (repmsg == NULL)
540                                 error = -ENOMEM;
541                 }
542                 
543                 if (error == 0) {
544                         lib_finalize (&kqswnal_lib, ktx->ktx_args[0], 
545                                       msg, PTL_OK);
546                         lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
547                 } else {
548                         lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
549                                       (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
550                 }
551                 break;
552
553         default:
554                 LASSERT (0);
555         }
556
557         kqswnal_put_idle_tx (ktx);
558 }
559
560 static void
561 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
562 {
563         kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
564
565         LASSERT (txd != NULL);
566         LASSERT (ktx != NULL);
567
568         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
569
570         if (status != EP_SUCCESS) {
571
572                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
573                         ktx->ktx_nid, status);
574
575                 kqswnal_notify_peer_down(ktx);
576                 status = -EHOSTDOWN;
577
578         } else if (ktx->ktx_state == KTX_GETTING) {
579                 /* RPC completed OK; what did our peer put in the status
580                  * block? */
581 #if MULTIRAIL_EKC
582                 status = ep_txd_statusblk(txd)->Data[0];
583 #else
584                 status = ep_txd_statusblk(txd)->Status;
585 #endif
586         } else {
587                 status = 0;
588         }
589
590         kqswnal_tx_done (ktx, status);
591 }
592
593 int
594 kqswnal_launch (kqswnal_tx_t *ktx)
595 {
596         /* Don't block for transmit descriptor if we're in interrupt context */
597         int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
598         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
599         long  flags;
600         int   rc;
601
602         ktx->ktx_launchtime = jiffies;
603
604         LASSERT (dest >= 0);                    /* must be a peer */
605         if (ktx->ktx_state == KTX_GETTING) {
606                 /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
607                  * other frags are the GET sink which we obviously don't
608                  * send here :) */
609 #if MULTIRAIL_EKC
610                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
611                                      ktx->ktx_port, attr,
612                                      kqswnal_txhandler, ktx,
613                                      NULL, ktx->ktx_frags, 1);
614 #else
615                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
616                                      ktx->ktx_port, attr, kqswnal_txhandler,
617                                      ktx, NULL, ktx->ktx_frags, 1);
618 #endif
619         } else {
620 #if MULTIRAIL_EKC
621                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
622                                          ktx->ktx_port, attr,
623                                          kqswnal_txhandler, ktx,
624                                          NULL, ktx->ktx_frags, ktx->ktx_nfrag);
625 #else
626                 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
627                                        ktx->ktx_port, attr, 
628                                        kqswnal_txhandler, ktx, 
629                                        ktx->ktx_frags, ktx->ktx_nfrag);
630 #endif
631         }
632
633         switch (rc) {
634         case EP_SUCCESS: /* success */
635                 return (0);
636
637         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
638                 LASSERT (in_interrupt());
639
640                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
641
642                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
643                 wake_up (&kqswnal_data.kqn_sched_waitq);
644
645                 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
646                 return (0);
647
648         default: /* fatal error */
649                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
650                 kqswnal_notify_peer_down(ktx);
651                 return (-EHOSTUNREACH);
652         }
653 }
654
655 static char *
656 hdr_type_string (ptl_hdr_t *hdr)
657 {
658         switch (hdr->type) {
659         case PTL_MSG_ACK:
660                 return ("ACK");
661         case PTL_MSG_PUT:
662                 return ("PUT");
663         case PTL_MSG_GET:
664                 return ("GET");
665         case PTL_MSG_REPLY:
666                 return ("REPLY");
667         default:
668                 return ("<UNKNOWN>");
669         }
670 }
671
672 static void
673 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
674 {
675         char *type_str = hdr_type_string (hdr);
676
677         CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
678                NTOH__u32(hdr->payload_length));
679         CERROR("    From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
680                NTOH__u32(hdr->src_pid));
681         CERROR("    To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
682                NTOH__u32(hdr->dest_pid));
683
684         switch (NTOH__u32(hdr->type)) {
685         case PTL_MSG_PUT:
686                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
687                        "match bits "LPX64"\n",
688                        NTOH__u32 (hdr->msg.put.ptl_index),
689                        hdr->msg.put.ack_wmd.wh_interface_cookie,
690                        hdr->msg.put.ack_wmd.wh_object_cookie,
691                        NTOH__u64 (hdr->msg.put.match_bits));
692                 CERROR("    offset %d, hdr data "LPX64"\n",
693                        NTOH__u32(hdr->msg.put.offset),
694                        hdr->msg.put.hdr_data);
695                 break;
696
697         case PTL_MSG_GET:
698                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
699                        "match bits "LPX64"\n",
700                        NTOH__u32 (hdr->msg.get.ptl_index),
701                        hdr->msg.get.return_wmd.wh_interface_cookie,
702                        hdr->msg.get.return_wmd.wh_object_cookie,
703                        hdr->msg.get.match_bits);
704                 CERROR("    Length %d, src offset %d\n",
705                        NTOH__u32 (hdr->msg.get.sink_length),
706                        NTOH__u32 (hdr->msg.get.src_offset));
707                 break;
708
709         case PTL_MSG_ACK:
710                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
711                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
712                        hdr->msg.ack.dst_wmd.wh_object_cookie,
713                        NTOH__u32 (hdr->msg.ack.mlength));
714                 break;
715
716         case PTL_MSG_REPLY:
717                 CERROR("    dst md "LPX64"."LPX64"\n",
718                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
719                        hdr->msg.reply.dst_wmd.wh_object_cookie);
720         }
721
722 }                               /* end of print_hdr() */
723
724 #if !MULTIRAIL_EKC
725 void
726 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) 
727 {
728         int          i;
729
730         CDEBUG (how, "%s: %d\n", str, n);
731         for (i = 0; i < n; i++) {
732                 CDEBUG (how, "   %08x for %d\n", iov[i].Base, iov[i].Len);
733         }
734 }
735
736 int
737 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
738                      int nsrc, EP_IOVEC *src,
739                      int ndst, EP_IOVEC *dst) 
740 {
741         int        count;
742         int        nob;
743
744         LASSERT (ndv > 0);
745         LASSERT (nsrc > 0);
746         LASSERT (ndst > 0);
747
748         for (count = 0; count < ndv; count++, dv++) {
749
750                 if (nsrc == 0 || ndst == 0) {
751                         if (nsrc != ndst) {
752                                 /* For now I'll barf on any left over entries */
753                                 CERROR ("mismatched src and dst iovs\n");
754                                 return (-EINVAL);
755                         }
756                         return (count);
757                 }
758
759                 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
760                 dv->Len    = nob;
761                 dv->Source = src->Base;
762                 dv->Dest   = dst->Base;
763
764                 if (nob >= src->Len) {
765                         src++;
766                         nsrc--;
767                 } else {
768                         src->Len -= nob;
769                         src->Base += nob;
770                 }
771                 
772                 if (nob >= dst->Len) {
773                         dst++;
774                         ndst--;
775                 } else {
776                         src->Len -= nob;
777                         src->Base += nob;
778                 }
779         }
780
781         CERROR ("DATAVEC too small\n");
782         return (-E2BIG);
783 }
784 #endif
785
786 int
787 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
788                    struct iovec *iov, ptl_kiov_t *kiov, 
789                    int offset, int nob)
790 {
791         kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
792         char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
793         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
794         int                 rc;
795 #if MULTIRAIL_EKC
796         int                 i;
797 #else
798         EP_DATAVEC          datav[EP_MAXFRAG];
799         int                 ndatav;
800 #endif
801         LASSERT (krx->krx_rpc_reply_needed);
802         LASSERT ((iov == NULL) != (kiov == NULL));
803
804         /* see kqswnal_sendmsg comment regarding endian-ness */
805         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
806                 /* msg too small to discover rmd size */
807                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
808                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
809                 return (-EINVAL);
810         }
811         
812         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
813                 /* rmd doesn't fit in the incoming message */
814                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
815                         krx->krx_nob, rmd->kqrmd_nfrag,
816                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
817                 return (-EINVAL);
818         }
819
820         /* Map the source data... */
821         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
822         if (kiov != NULL)
823                 rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
824         else
825                 rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
826
827         if (rc != 0) {
828                 CERROR ("Can't map source data: %d\n", rc);
829                 return (rc);
830         }
831
832 #if MULTIRAIL_EKC
833         if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
834                 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
835                        ktx->ktx_nfrag, rmd->kqrmd_nfrag);
836                 return (-EINVAL);
837         }
838         
839         for (i = 0; i < rmd->kqrmd_nfrag; i++)
840                 if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
841                         CERROR("Can't cope with unequal frags %d(%d):"
842                                " %d local %d remote\n",
843                                i, rmd->kqrmd_nfrag, 
844                                ktx->ktx_frags[i].nmd_len, 
845                                rmd->kqrmd_frag[i].nmd_len);
846                         return (-EINVAL);
847                 }
848 #else
849         ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
850                                       ktx->ktx_nfrag, ktx->ktx_frags,
851                                       rmd->kqrmd_nfrag, rmd->kqrmd_frag);
852         if (ndatav < 0) {
853                 CERROR ("Can't create datavec: %d\n", ndatav);
854                 return (ndatav);
855         }
856 #endif
857
858         /* Our caller will start to race with kqswnal_dma_reply_complete... */
859         LASSERT (atomic_read (&krx->krx_refcount) == 1);
860         atomic_set (&krx->krx_refcount, 2);
861
862 #if MULTIRAIL_EKC
863         rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx, 
864                              &kqswnal_rpc_success,
865                              ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
866         if (rc == EP_SUCCESS)
867                 return (0);
868
869         /* Well we tried... */
870         krx->krx_rpc_reply_needed = 0;
871 #else
872         rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
873                               &kqswnal_rpc_success, datav, ndatav);
874         if (rc == EP_SUCCESS)
875                 return (0);
876
877         /* "old" EKC destroys rxd on failed completion */
878         krx->krx_rxd = NULL;
879 #endif
880
881         CERROR("can't complete RPC: %d\n", rc);
882
883         /* reset refcount back to 1: we're not going to be racing with
884          * kqswnal_dma_reply_complete. */
885         atomic_set (&krx->krx_refcount, 1);
886
887         return (-ECONNABORTED);
888 }
889
890 static ptl_err_t
891 kqswnal_sendmsg (nal_cb_t     *nal,
892                  void         *private,
893                  lib_msg_t    *libmsg,
894                  ptl_hdr_t    *hdr,
895                  int           type,
896                  ptl_nid_t     nid,
897                  ptl_pid_t     pid,
898                  unsigned int  payload_niov,
899                  struct iovec *payload_iov,
900                  ptl_kiov_t   *payload_kiov,
901                  size_t        payload_offset,
902                  size_t        payload_nob)
903 {
904         kqswnal_tx_t      *ktx;
905         int                rc;
906         ptl_nid_t          targetnid;
907 #if KQSW_CHECKSUM
908         int                i;
909         kqsw_csum_t        csum;
910         int                sumoff;
911         int                sumnob;
912 #endif
913         
914         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
915                " pid %u\n", payload_nob, payload_niov, nid, pid);
916
917         LASSERT (payload_nob == 0 || payload_niov > 0);
918         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
919
920         /* It must be OK to kmap() if required */
921         LASSERT (payload_kiov == NULL || !in_interrupt ());
922         /* payload is either all vaddrs or all pages */
923         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
924         
925         if (payload_nob > KQSW_MAXPAYLOAD) {
926                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
927                         payload_nob, KQSW_MAXPAYLOAD);
928                 return (PTL_FAIL);
929         }
930
931         targetnid = nid;
932         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
933                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
934                                  sizeof (ptl_hdr_t) + payload_nob, &targetnid);
935                 if (rc != 0) {
936                         CERROR("Can't route to "LPX64": router error %d\n",
937                                nid, rc);
938                         return (PTL_FAIL);
939                 }
940                 if (kqswnal_nid2elanid (targetnid) < 0) {
941                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
942                                targetnid, nid);
943                         return (PTL_FAIL);
944                 }
945         }
946
947         /* I may not block for a transmit descriptor if I might block the
948          * receiver, or an interrupt handler. */
949         ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
950                                           type == PTL_MSG_REPLY ||
951                                           in_interrupt()));
952         if (ktx == NULL) {
953                 kqswnal_cerror_hdr (hdr);
954                 return (PTL_NO_SPACE);
955         }
956
957         ktx->ktx_nid     = targetnid;
958         ktx->ktx_args[0] = private;
959         ktx->ktx_args[1] = libmsg;
960
961         if (type == PTL_MSG_REPLY &&
962             ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
963                 if (nid != targetnid ||
964                     kqswnal_nid2elanid(nid) != 
965                     ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
966                         CERROR("Optimized reply nid conflict: "
967                                "nid "LPX64" via "LPX64" elanID %d\n",
968                                nid, targetnid,
969                                ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
970                         return (PTL_FAIL);
971                 }
972
973                 /* peer expects RPC completion with GET data */
974                 rc = kqswnal_dma_reply (ktx, payload_niov, 
975                                         payload_iov, payload_kiov, 
976                                         payload_offset, payload_nob);
977                 if (rc == 0)
978                         return (PTL_OK);
979                 
980                 CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
981                 kqswnal_put_idle_tx (ktx);
982                 return (PTL_FAIL);
983         }
984
985         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
986         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
987
988 #if KQSW_CHECKSUM
989         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
990         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
991         for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
992                 LASSERT(i < niov);
993                 if (payload_kiov != NULL) {
994                         ptl_kiov_t *kiov = &payload_kiov[i];
995
996                         if (sumoff >= kiov->kiov_len) {
997                                 sumoff -= kiov->kiov_len;
998                         } else {
999                                 char *addr = ((char *)kmap (kiov->kiov_page)) +
1000                                              kiov->kiov_offset + sumoff;
1001                                 int   fragnob = kiov->kiov_len - sumoff;
1002
1003                                 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1004                                 sumnob -= fragnob;
1005                                 sumoff = 0;
1006                                 kunmap(kiov->kiov_page);
1007                         }
1008                 } else {
1009                         struct iovec *iov = &payload_iov[i];
1010
1011                         if (sumoff > iov->iov_len) {
1012                                 sumoff -= iov->iov_len;
1013                         } else {
1014                                 char *addr = iov->iov_base + sumoff;
1015                                 int   fragnob = iov->iov_len - sumoff;
1016                                 
1017                                 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1018                                 sumnob -= fragnob;
1019                                 sumoff = 0;
1020                         }
1021                 }
1022         }
1023         memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
1024 #endif
1025
1026         if (kqswnal_data.kqn_optimized_gets &&
1027             type == PTL_MSG_GET &&              /* doing a GET */
1028             nid == targetnid) {                 /* not forwarding */
1029                 lib_md_t           *md = libmsg->md;
1030                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
1031                 
1032                 /* Optimised path: I send over the Elan vaddrs of the get
1033                  * sink buffers, and my peer DMAs directly into them.
1034                  *
1035                  * First I set up ktx as if it was going to send this
1036                  * payload, (it needs to map it anyway).  This fills
1037                  * ktx_frags[1] and onward with the network addresses
1038                  * of the GET sink frags.  I copy these into ktx_buffer,
1039                  * immediately after the header, and send that as my GET
1040                  * message.
1041                  *
1042                  * Note that the addresses are sent in native endian-ness.
1043                  * When EKC copes with different endian nodes, I'll fix
1044                  * this (and eat my hat :) */
1045
1046                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1047                 ktx->ktx_state = KTX_GETTING;
1048
1049                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
1050                         rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
1051                                                   md->md_niov, md->md_iov.kiov);
1052                 else
1053                         rc = kqswnal_map_tx_iov (ktx, 0, md->length,
1054                                                  md->md_niov, md->md_iov.iov);
1055
1056                 if (rc < 0) {
1057                         kqswnal_put_idle_tx (ktx);
1058                         return (PTL_FAIL);
1059                 }
1060
1061                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1062
1063                 payload_nob = offsetof(kqswnal_remotemd_t,
1064                                        kqrmd_frag[rmd->kqrmd_nfrag]);
1065                 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1066
1067 #if MULTIRAIL_EKC
1068                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1069                        rmd->kqrmd_nfrag * sizeof(EP_NMD));
1070
1071                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1072                               0, KQSW_HDR_SIZE + payload_nob);
1073 #else
1074                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1075                        rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1076                 
1077                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1078                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1079 #endif
1080         } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1081
1082                 /* small message: single frag copied into the pre-mapped buffer */
1083
1084                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1085                 ktx->ktx_state = KTX_SENDING;
1086 #if MULTIRAIL_EKC
1087                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1088                               0, KQSW_HDR_SIZE + payload_nob);
1089 #else
1090                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1091                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1092 #endif
1093                 if (payload_nob > 0) {
1094                         if (payload_kiov != NULL)
1095                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1096                                                    payload_niov, payload_kiov, 
1097                                                    payload_offset, payload_nob);
1098                         else
1099                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1100                                                   payload_niov, payload_iov, 
1101                                                   payload_offset, payload_nob);
1102                 }
1103         } else {
1104
1105                 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1106
1107                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1108                 ktx->ktx_state = KTX_SENDING;
1109 #if MULTIRAIL_EKC
1110                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1111                               0, KQSW_HDR_SIZE);
1112 #else
1113                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1114                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1115 #endif
1116                 if (payload_kiov != NULL)
1117                         rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
1118                                                   payload_niov, payload_kiov);
1119                 else
1120                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1121                                                  payload_niov, payload_iov);
1122                 if (rc != 0) {
1123                         kqswnal_put_idle_tx (ktx);
1124                         return (PTL_FAIL);
1125                 }
1126         }
1127         
1128         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1129                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1130
1131         rc = kqswnal_launch (ktx);
1132         if (rc != 0) {                    /* failed? */
1133                 CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
1134                 kqswnal_put_idle_tx (ktx);
1135                 return (PTL_FAIL);
1136         }
1137
1138         CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n", 
1139                payload_nob, nid, targetnid);
1140         return (PTL_OK);
1141 }
1142
1143 static ptl_err_t
1144 kqswnal_send (nal_cb_t     *nal,
1145               void         *private,
1146               lib_msg_t    *libmsg,
1147               ptl_hdr_t    *hdr,
1148               int           type,
1149               ptl_nid_t     nid,
1150               ptl_pid_t     pid,
1151               unsigned int  payload_niov,
1152               struct iovec *payload_iov,
1153               size_t        payload_offset,
1154               size_t        payload_nob)
1155 {
1156         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1157                                  payload_niov, payload_iov, NULL, 
1158                                  payload_offset, payload_nob));
1159 }
1160
1161 static ptl_err_t
1162 kqswnal_send_pages (nal_cb_t     *nal,
1163                     void         *private,
1164                     lib_msg_t    *libmsg,
1165                     ptl_hdr_t    *hdr,
1166                     int           type,
1167                     ptl_nid_t     nid,
1168                     ptl_pid_t     pid,
1169                     unsigned int  payload_niov,
1170                     ptl_kiov_t   *payload_kiov,
1171                     size_t        payload_offset,
1172                     size_t        payload_nob)
1173 {
1174         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1175                                  payload_niov, NULL, payload_kiov, 
1176                                  payload_offset, payload_nob));
1177 }
1178
1179 void
1180 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1181 {
1182         int             rc;
1183         kqswnal_tx_t   *ktx;
1184         ptl_kiov_t     *kiov = fwd->kprfd_kiov;
1185         int             niov = fwd->kprfd_niov;
1186         int             nob = fwd->kprfd_nob;
1187         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
1188
1189 #if KQSW_CHECKSUM
1190         CERROR ("checksums for forwarded packets not implemented\n");
1191         LBUG ();
1192 #endif
1193         /* The router wants this NAL to forward a packet */
1194         CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
1195                 fwd, nid, niov, nob);
1196
1197         ktx = kqswnal_get_idle_tx (fwd, 0);
1198         if (ktx == NULL)        /* can't get txd right now */
1199                 return;         /* fwd will be scheduled when tx desc freed */
1200
1201         if (nid == kqswnal_lib.ni.nid)          /* gateway is me */
1202                 nid = fwd->kprfd_target_nid;    /* target is final dest */
1203
1204         if (kqswnal_nid2elanid (nid) < 0) {
1205                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1206                 rc = -EHOSTUNREACH;
1207                 goto failed;
1208         }
1209
1210         /* copy hdr into pre-mapped buffer */
1211         memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
1212         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
1213
1214         ktx->ktx_port    = (nob <= KQSW_SMALLPAYLOAD) ?
1215                            EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1216         ktx->ktx_nid     = nid;
1217         ktx->ktx_state   = KTX_FORWARDING;
1218         ktx->ktx_args[0] = fwd;
1219         ktx->ktx_nfrag   = ktx->ktx_firsttmpfrag = 1;
1220
1221         if (nob <= KQSW_TX_MAXCONTIG) 
1222         {
1223                 /* send payload from ktx's pre-mapped contiguous buffer */
1224 #if MULTIRAIL_EKC
1225                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1226                               0, KQSW_HDR_SIZE + nob);
1227 #else
1228                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1229                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
1230 #endif
1231                 if (nob > 0)
1232                         lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
1233                                           niov, kiov, 0, nob);
1234         }
1235         else
1236         {
1237                 /* zero copy payload */
1238 #if MULTIRAIL_EKC
1239                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1240                               0, KQSW_HDR_SIZE);
1241 #else
1242                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1243                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1244 #endif
1245                 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
1246                 if (rc != 0)
1247                         goto failed;
1248         }
1249
1250         rc = kqswnal_launch (ktx);
1251         if (rc == 0)
1252                 return;
1253
1254  failed:
1255         LASSERT (rc != 0);
1256         CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1257
1258         kqswnal_put_idle_tx (ktx);
1259         /* complete now (with failure) */
1260         kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
1261 }
1262
1263 void
1264 kqswnal_fwd_callback (void *arg, int error)
1265 {
1266         kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1267
1268         /* The router has finished forwarding this packet */
1269
1270         if (error != 0)
1271         {
1272                 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1273
1274                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1275                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
1276         }
1277
1278         kqswnal_requeue_rx (krx);
1279 }
1280
1281 void
1282 kqswnal_dma_reply_complete (EP_RXD *rxd) 
1283 {
1284         int           status = ep_rxd_status(rxd);
1285         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
1286         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
1287         lib_msg_t    *msg = (lib_msg_t *)ktx->ktx_args[1];
1288         
1289         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1290                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
1291
1292         LASSERT (krx->krx_rxd == rxd);
1293         LASSERT (krx->krx_rpc_reply_needed);
1294
1295         krx->krx_rpc_reply_needed = 0;
1296         kqswnal_rx_done (krx);
1297
1298         lib_finalize (&kqswnal_lib, NULL, msg,
1299                       (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
1300         kqswnal_put_idle_tx (ktx);
1301 }
1302
1303 void
1304 kqswnal_rpc_complete (EP_RXD *rxd)
1305 {
1306         int           status = ep_rxd_status(rxd);
1307         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1308         
1309         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1310                "rxd %p, krx %p, status %d\n", rxd, krx, status);
1311
1312         LASSERT (krx->krx_rxd == rxd);
1313         LASSERT (krx->krx_rpc_reply_needed);
1314         
1315         krx->krx_rpc_reply_needed = 0;
1316         kqswnal_requeue_rx (krx);
1317 }
1318
1319 void
1320 kqswnal_requeue_rx (kqswnal_rx_t *krx) 
1321 {
1322         int   rc;
1323
1324         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1325
1326         if (krx->krx_rpc_reply_needed) {
1327
1328                 /* We failed to complete the peer's optimized GET (e.g. we
1329                  * couldn't map the source buffers).  We complete the
1330                  * peer's EKC rpc now with failure. */
1331 #if MULTIRAIL_EKC
1332                 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
1333                                      &kqswnal_rpc_failed, NULL, NULL, 0);
1334                 if (rc == EP_SUCCESS)
1335                         return;
1336                 
1337                 CERROR("can't complete RPC: %d\n", rc);
1338 #else
1339                 if (krx->krx_rxd != NULL) {
1340                         /* We didn't try (and fail) to complete earlier... */
1341                         rc = ep_complete_rpc(krx->krx_rxd, 
1342                                              kqswnal_rpc_complete, krx,
1343                                              &kqswnal_rpc_failed, NULL, 0);
1344                         if (rc == EP_SUCCESS)
1345                                 return;
1346
1347                         CERROR("can't complete RPC: %d\n", rc);
1348                 }
1349                 
1350                 /* NB the old ep_complete_rpc() frees rxd on failure, so we
1351                  * have to requeue from scratch here, unless we're shutting
1352                  * down */
1353                 if (kqswnal_data.kqn_shuttingdown)
1354                         return;
1355
1356                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
1357                                       krx->krx_elanbuffer, 
1358                                       krx->krx_npages * PAGE_SIZE, 0);
1359                 LASSERT (rc == EP_SUCCESS);
1360                 /* We don't handle failure here; it's incredibly rare
1361                  * (never reported?) and only happens with "old" EKC */
1362                 return;
1363 #endif
1364         }
1365
1366 #if MULTIRAIL_EKC
1367         if (kqswnal_data.kqn_shuttingdown) {
1368                 /* free EKC rxd on shutdown */
1369                 ep_complete_receive(krx->krx_rxd);
1370         } else {
1371                 /* repost receive */
1372                 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1373                                    &krx->krx_elanbuffer, 0);
1374         }
1375 #else                
1376         /* don't actually requeue on shutdown */
1377         if (!kqswnal_data.kqn_shuttingdown) 
1378                 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1379                                    krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
1380 #endif
1381 }
1382         
1383 void
1384 kqswnal_rx (kqswnal_rx_t *krx)
1385 {
1386         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
1387         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
1388         int             payload_nob;
1389         int             nob;
1390         int             niov;
1391
1392         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1393
1394         if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
1395                 atomic_set(&krx->krx_refcount, 1);
1396                 lib_parse (&kqswnal_lib, hdr, krx);
1397                 kqswnal_rx_done(krx);
1398                 return;
1399         }
1400
1401 #if KQSW_CHECKSUM
1402         CERROR ("checksums for forwarded packets not implemented\n");
1403         LBUG ();
1404 #endif
1405         if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
1406         {
1407                 CERROR("dropping packet from "LPX64" for "LPX64
1408                        ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
1409
1410                 kqswnal_requeue_rx (krx);
1411                 return;
1412         }
1413
1414         nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
1415         niov = 0;
1416         if (nob > 0) {
1417                 krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
1418                 krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
1419                 niov = 1;
1420                 nob -= PAGE_SIZE - KQSW_HDR_SIZE;
1421                 
1422                 while (nob > 0) {
1423                         LASSERT (niov < krx->krx_npages);
1424                         
1425                         krx->krx_kiov[niov].kiov_offset = 0;
1426                         krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
1427                         niov++;
1428                         nob -= PAGE_SIZE;
1429                 }
1430         }
1431
1432         kpr_fwd_init (&krx->krx_fwd, dest_nid, 
1433                       hdr, payload_nob, niov, krx->krx_kiov,
1434                       kqswnal_fwd_callback, krx);
1435
1436         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1437 }
1438
1439 /* Receive Interrupt Handler: posts to schedulers */
1440 void 
1441 kqswnal_rxhandler(EP_RXD *rxd)
1442 {
1443         long          flags;
1444         int           nob    = ep_rxd_len (rxd);
1445         int           status = ep_rxd_status (rxd);
1446         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1447
1448         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1449                rxd, krx, nob, status);
1450
1451         LASSERT (krx != NULL);
1452
1453         krx->krx_rxd = rxd;
1454         krx->krx_nob = nob;
1455 #if MULTIRAIL_EKC
1456         krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
1457 #else
1458         krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
1459 #endif
1460         
1461         /* must receive a whole header to be able to parse */
1462         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1463         {
1464                 /* receives complete with failure when receiver is removed */
1465 #if MULTIRAIL_EKC
1466                 if (status == EP_SHUTDOWN)
1467                         LASSERT (kqswnal_data.kqn_shuttingdown);
1468                 else
1469                         CERROR("receive status failed with status %d nob %d\n",
1470                                ep_rxd_status(rxd), nob);
1471 #else
1472                 if (!kqswnal_data.kqn_shuttingdown)
1473                         CERROR("receive status failed with status %d nob %d\n",
1474                                ep_rxd_status(rxd), nob);
1475 #endif
1476                 kqswnal_requeue_rx (krx);
1477                 return;
1478         }
1479
1480         if (!in_interrupt()) {
1481                 kqswnal_rx (krx);
1482                 return;
1483         }
1484
1485         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1486
1487         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1488         wake_up (&kqswnal_data.kqn_sched_waitq);
1489
1490         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1491 }
1492
1493 #if KQSW_CHECKSUM
1494 void
1495 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1496 {
1497         ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1498
1499         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1500                 ", dpid %d, spid %d, type %d\n",
1501                 ishdr ? "Header" : "Payload", krx,
1502                 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
1503                 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
1504                 NTOH__u32(hdr->type));
1505
1506         switch (NTOH__u32 (hdr->type))
1507         {
1508         case PTL_MSG_ACK:
1509                 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1510                        " len %u\n",
1511                        NTOH__u32(hdr->msg.ack.mlength),
1512                        hdr->msg.ack.dst_wmd.handle_cookie,
1513                        hdr->msg.ack.dst_wmd.handle_idx,
1514                        NTOH__u64(hdr->msg.ack.match_bits),
1515                        NTOH__u32(hdr->msg.ack.length));
1516                 break;
1517         case PTL_MSG_PUT:
1518                 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1519                        " len %u off %u data "LPX64"\n",
1520                        NTOH__u32(hdr->msg.put.ptl_index),
1521                        hdr->msg.put.ack_wmd.handle_cookie,
1522                        hdr->msg.put.ack_wmd.handle_idx,
1523                        NTOH__u64(hdr->msg.put.match_bits),
1524                        NTOH__u32(hdr->msg.put.length),
1525                        NTOH__u32(hdr->msg.put.offset),
1526                        hdr->msg.put.hdr_data);
1527                 break;
1528         case PTL_MSG_GET:
1529                 CERROR ("GET: <>\n");
1530                 break;
1531         case PTL_MSG_REPLY:
1532                 CERROR ("REPLY: <>\n");
1533                 break;
1534         default:
1535                 CERROR ("TYPE?: <>\n");
1536         }
1537 }
1538 #endif
1539
1540 static ptl_err_t
1541 kqswnal_recvmsg (nal_cb_t     *nal,
1542                  void         *private,
1543                  lib_msg_t    *libmsg,
1544                  unsigned int  niov,
1545                  struct iovec *iov,
1546                  ptl_kiov_t   *kiov,
1547                  size_t        offset,
1548                  size_t        mlen,
1549                  size_t        rlen)
1550 {
1551         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1552         char         *buffer = page_address(krx->krx_kiov[0].kiov_page);
1553         int           page;
1554         char         *page_ptr;
1555         int           page_nob;
1556         char         *iov_ptr;
1557         int           iov_nob;
1558         int           frag;
1559 #if KQSW_CHECKSUM
1560         kqsw_csum_t   senders_csum;
1561         kqsw_csum_t   payload_csum = 0;
1562         kqsw_csum_t   hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
1563         size_t        csum_len = mlen;
1564         int           csum_frags = 0;
1565         int           csum_nob = 0;
1566         static atomic_t csum_counter;
1567         int           csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1568
1569         atomic_inc (&csum_counter);
1570
1571         memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1572         if (senders_csum != hdr_csum)
1573                 kqswnal_csum_error (krx, 1);
1574 #endif
1575         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1576
1577         /* What was actually received must be >= payload. */
1578         LASSERT (mlen <= rlen);
1579         if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
1580                 CERROR("Bad message size: have %d, need %d + %d\n",
1581                        krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
1582                 return (PTL_FAIL);
1583         }
1584
1585         /* It must be OK to kmap() if required */
1586         LASSERT (kiov == NULL || !in_interrupt ());
1587         /* Either all pages or all vaddrs */
1588         LASSERT (!(kiov != NULL && iov != NULL));
1589
1590         if (mlen != 0) {
1591                 page     = 0;
1592                 page_ptr = buffer + KQSW_HDR_SIZE;
1593                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1594
1595                 LASSERT (niov > 0);
1596
1597                 if (kiov != NULL) {
1598                         /* skip complete frags */
1599                         while (offset >= kiov->kiov_len) {
1600                                 offset -= kiov->kiov_len;
1601                                 kiov++;
1602                                 niov--;
1603                                 LASSERT (niov > 0);
1604                         }
1605                         iov_ptr = ((char *)kmap (kiov->kiov_page)) +
1606                                 kiov->kiov_offset + offset;
1607                         iov_nob = kiov->kiov_len - offset;
1608                 } else {
1609                         /* skip complete frags */
1610                         while (offset >= iov->iov_len) {
1611                                 offset -= iov->iov_len;
1612                                 iov++;
1613                                 niov--;
1614                                 LASSERT (niov > 0);
1615                         }
1616                         iov_ptr = iov->iov_base + offset;
1617                         iov_nob = iov->iov_len - offset;
1618                 }
1619                 
1620                 for (;;)
1621                 {
1622                         frag = mlen;
1623                         if (frag > page_nob)
1624                                 frag = page_nob;
1625                         if (frag > iov_nob)
1626                                 frag = iov_nob;
1627
1628                         memcpy (iov_ptr, page_ptr, frag);
1629 #if KQSW_CHECKSUM
1630                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1631                         csum_nob += frag;
1632                         csum_frags++;
1633 #endif
1634                         mlen -= frag;
1635                         if (mlen == 0)
1636                                 break;
1637
1638                         page_nob -= frag;
1639                         if (page_nob != 0)
1640                                 page_ptr += frag;
1641                         else
1642                         {
1643                                 page++;
1644                                 LASSERT (page < krx->krx_npages);
1645                                 page_ptr = page_address(krx->krx_kiov[page].kiov_page);
1646                                 page_nob = PAGE_SIZE;
1647                         }
1648
1649                         iov_nob -= frag;
1650                         if (iov_nob != 0)
1651                                 iov_ptr += frag;
1652                         else if (kiov != NULL) {
1653                                 kunmap (kiov->kiov_page);
1654                                 kiov++;
1655                                 niov--;
1656                                 LASSERT (niov > 0);
1657                                 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1658                                 iov_nob = kiov->kiov_len;
1659                         } else {
1660                                 iov++;
1661                                 niov--;
1662                                 LASSERT (niov > 0);
1663                                 iov_ptr = iov->iov_base;
1664                                 iov_nob = iov->iov_len;
1665                         }
1666                 }
1667
1668                 if (kiov != NULL)
1669                         kunmap (kiov->kiov_page);
1670         }
1671
1672 #if KQSW_CHECKSUM
1673         memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), 
1674                 sizeof(kqsw_csum_t));
1675
1676         if (csum_len != rlen)
1677                 CERROR("Unable to checksum data in user's buffer\n");
1678         else if (senders_csum != payload_csum)
1679                 kqswnal_csum_error (krx, 0);
1680
1681         if (csum_verbose)
1682                 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1683                        "csum_nob %d\n",
1684                         hdr_csum, payload_csum, csum_frags, csum_nob);
1685 #endif
1686         lib_finalize(nal, private, libmsg, PTL_OK);
1687
1688         return (PTL_OK);
1689 }
1690
1691 static ptl_err_t
1692 kqswnal_recv(nal_cb_t     *nal,
1693              void         *private,
1694              lib_msg_t    *libmsg,
1695              unsigned int  niov,
1696              struct iovec *iov,
1697              size_t        offset,
1698              size_t        mlen,
1699              size_t        rlen)
1700 {
1701         return (kqswnal_recvmsg(nal, private, libmsg, 
1702                                 niov, iov, NULL, 
1703                                 offset, mlen, rlen));
1704 }
1705
1706 static ptl_err_t
1707 kqswnal_recv_pages (nal_cb_t     *nal,
1708                     void         *private,
1709                     lib_msg_t    *libmsg,
1710                     unsigned int  niov,
1711                     ptl_kiov_t   *kiov,
1712                     size_t        offset,
1713                     size_t        mlen,
1714                     size_t        rlen)
1715 {
1716         return (kqswnal_recvmsg(nal, private, libmsg, 
1717                                 niov, NULL, kiov, 
1718                                 offset, mlen, rlen));
1719 }
1720
1721 int
1722 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1723 {
1724         long    pid = kernel_thread (fn, arg, 0);
1725
1726         if (pid < 0)
1727                 return ((int)pid);
1728
1729         atomic_inc (&kqswnal_data.kqn_nthreads);
1730         atomic_inc (&kqswnal_data.kqn_nthreads_running);
1731         return (0);
1732 }
1733
1734 void
1735 kqswnal_thread_fini (void)
1736 {
1737         atomic_dec (&kqswnal_data.kqn_nthreads);
1738 }
1739
1740 int
1741 kqswnal_scheduler (void *arg)
1742 {
1743         kqswnal_rx_t    *krx;
1744         kqswnal_tx_t    *ktx;
1745         kpr_fwd_desc_t  *fwd;
1746         long             flags;
1747         int              rc;
1748         int              counter = 0;
1749         int              shuttingdown = 0;
1750         int              did_something;
1751
1752         kportal_daemonize ("kqswnal_sched");
1753         kportal_blockallsigs ();
1754         
1755         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1756
1757         for (;;)
1758         {
1759                 if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
1760
1761                         if (kqswnal_data.kqn_shuttingdown == 2)
1762                                 break;
1763                 
1764                         /* During stage 1 of shutdown we are still responsive
1765                          * to receives */
1766
1767                         atomic_dec (&kqswnal_data.kqn_nthreads_running);
1768                         shuttingdown = kqswnal_data.kqn_shuttingdown;
1769                 }
1770
1771                 did_something = 0;
1772
1773                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1774                 {
1775                         krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1776                                          kqswnal_rx_t, krx_list);
1777                         list_del (&krx->krx_list);
1778                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1779                                                flags);
1780
1781                         kqswnal_rx (krx);
1782
1783                         did_something = 1;
1784                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1785                 }
1786
1787                 if (!shuttingdown &&
1788                     !list_empty (&kqswnal_data.kqn_delayedtxds))
1789                 {
1790                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1791                                          kqswnal_tx_t, ktx_list);
1792                         list_del_init (&ktx->ktx_delayed_list);
1793                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1794                                                flags);
1795
1796                         rc = kqswnal_launch (ktx);
1797                         if (rc != 0)          /* failed: ktx_nid down? */
1798                         {
1799                                 CERROR("Failed delayed transmit to "LPX64
1800                                        ": %d\n", ktx->ktx_nid, rc);
1801                                 kqswnal_tx_done (ktx, rc);
1802                         }
1803
1804                         did_something = 1;
1805                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1806                 }
1807
1808                 if (!shuttingdown &
1809                     !list_empty (&kqswnal_data.kqn_delayedfwds))
1810                 {
1811                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1812                         list_del (&fwd->kprfd_list);
1813                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1814
1815                         kqswnal_fwd_packet (NULL, fwd);
1816
1817                         did_something = 1;
1818                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1819                 }
1820
1821                     /* nothing to do or hogging CPU */
1822                 if (!did_something || counter++ == KQSW_RESCHED) {
1823                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1824                                                flags);
1825
1826                         counter = 0;
1827
1828                         if (!did_something) {
1829                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1830                                                                kqswnal_data.kqn_shuttingdown != shuttingdown ||
1831                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
1832                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1833                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
1834                                 LASSERT (rc == 0);
1835                         } else if (current->need_resched)
1836                                 schedule ();
1837
1838                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1839                 }
1840         }
1841
1842         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1843
1844         kqswnal_thread_fini ();
1845         return (0);
1846 }
1847
1848 nal_cb_t kqswnal_lib =
1849 {
1850         nal_data:       &kqswnal_data,         /* NAL private data */
1851         cb_send:        kqswnal_send,
1852         cb_send_pages:  kqswnal_send_pages,
1853         cb_recv:        kqswnal_recv,
1854         cb_recv_pages:  kqswnal_recv_pages,
1855         cb_read:        kqswnal_read,
1856         cb_write:       kqswnal_write,
1857         cb_malloc:      kqswnal_malloc,
1858         cb_free:        kqswnal_free,
1859         cb_printf:      kqswnal_printf,
1860         cb_cli:         kqswnal_cli,
1861         cb_sti:         kqswnal_sti,
1862         cb_callback:    kqswnal_callback,
1863         cb_dist:        kqswnal_dist
1864 };