Whamcloud - gitweb
land b_eq on HEAD
[fs/lustre-release.git] / lustre / portals / knals / qswnal / qswnal_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8  * W. Marcus Miller - Based on ksocknal
9  *
10  * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  * Portals is free software; you can redistribute it and/or
13  * modify it under the terms of version 2 of the GNU General Public
14  * License as published by the Free Software Foundation.
15  *
16  * Portals is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with Portals; if not, write to the Free Software
23  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  */
26
27 #include "qswnal.h"
28
29 EP_STATUSBLK  kqswnal_rpc_success;
30 EP_STATUSBLK  kqswnal_rpc_failed;
31
32 /*
33  *  LIB functions follow
34  *
35  */
36 static ptl_err_t
37 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
38              size_t len)
39 {
40         CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
41                 nal->ni.nid, len, src_addr, dst_addr );
42         memcpy( dst_addr, src_addr, len );
43
44         return (PTL_OK);
45 }
46
47 static ptl_err_t
48 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
49               size_t len)
50 {
51         CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
52                 nal->ni.nid, len, src_addr, dst_addr );
53         memcpy( dst_addr, src_addr, len );
54
55         return (PTL_OK);
56 }
57
58 static void *
59 kqswnal_malloc(nal_cb_t *nal, size_t len)
60 {
61         void *buf;
62
63         PORTAL_ALLOC(buf, len);
64         return (buf);
65 }
66
67 static void
68 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
69 {
70         PORTAL_FREE(buf, len);
71 }
72
73 static void
74 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
75 {
76         va_list ap;
77         char msg[256];
78
79         va_start (ap, fmt);
80         vsnprintf (msg, sizeof (msg), fmt, ap);        /* sprint safely */
81         va_end (ap);
82
83         msg[sizeof (msg) - 1] = 0;                /* ensure terminated */
84
85         CDEBUG (D_NET, "%s", msg);
86 }
87
88
89 static void
90 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
91 {
92         kqswnal_data_t *data= nal->nal_data;
93
94         spin_lock_irqsave(&data->kqn_statelock, *flags);
95 }
96
97
98 static void
99 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
100 {
101         kqswnal_data_t *data= nal->nal_data;
102
103         spin_unlock_irqrestore(&data->kqn_statelock, *flags);
104 }
105
106
107 static int
108 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
109 {
110         if (nid == nal->ni.nid)
111                 *dist = 0;                      /* it's me */
112         else if (kqswnal_nid2elanid (nid) >= 0)
113                 *dist = 1;                      /* it's my peer */
114         else
115                 *dist = 2;                      /* via router */
116         return (0);
117 }
118
119 void
120 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
121 {
122         struct timeval     now;
123         time_t             then;
124
125         do_gettimeofday (&now);
126         then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
127
128         kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
129 }
130
131 void
132 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
133 {
134 #if MULTIRAIL_EKC
135         int      i;
136 #endif
137
138         if (ktx->ktx_nmappedpages == 0)
139                 return;
140         
141 #if MULTIRAIL_EKC
142         CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
143                ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
144
145         for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
146                 ep_dvma_unload(kqswnal_data.kqn_ep,
147                                kqswnal_data.kqn_ep_tx_nmh,
148                                &ktx->ktx_frags[i]);
149 #else
150         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
151                 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
152
153         LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
154         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
155                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
156
157         elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
158                           kqswnal_data.kqn_eptxdmahandle,
159                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
160 #endif
161         ktx->ktx_nmappedpages = 0;
162 }
163
164 int
165 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
166 {
167         int       nfrags    = ktx->ktx_nfrag;
168         int       nmapped   = ktx->ktx_nmappedpages;
169         int       maxmapped = ktx->ktx_npages;
170         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
171         char     *ptr;
172 #if MULTIRAIL_EKC
173         EP_RAILMASK railmask;
174         int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
175                                             EP_RAILMASK_ALL,
176                                             kqswnal_nid2elanid(ktx->ktx_nid));
177         
178         if (rail < 0) {
179                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
180                 return (-ENETDOWN);
181         }
182         railmask = 1 << rail;
183 #endif
184         LASSERT (nmapped <= maxmapped);
185         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
186         LASSERT (nfrags <= EP_MAXFRAG);
187         LASSERT (niov > 0);
188         LASSERT (nob > 0);
189
190         /* skip complete frags before 'offset' */
191         while (offset >= kiov->kiov_len) {
192                 offset -= kiov->kiov_len;
193                 kiov++;
194                 niov--;
195                 LASSERT (niov > 0);
196         }
197
198         do {
199                 int  fraglen = kiov->kiov_len - offset;
200
201                 /* nob exactly spans the iovs */
202                 LASSERT (fraglen <= nob);
203                 /* each frag fits in a page */
204                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
205
206                 nmapped++;
207                 if (nmapped > maxmapped) {
208                         CERROR("Can't map message in %d pages (max %d)\n",
209                                nmapped, maxmapped);
210                         return (-EMSGSIZE);
211                 }
212
213                 if (nfrags == EP_MAXFRAG) {
214                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
215                                EP_MAXFRAG);
216                         return (-EMSGSIZE);
217                 }
218
219                 /* XXX this is really crap, but we'll have to kmap until
220                  * EKC has a page (rather than vaddr) mapping interface */
221
222                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
223
224                 CDEBUG(D_NET,
225                        "%p[%d] loading %p for %d, page %d, %d total\n",
226                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
227
228 #if MULTIRAIL_EKC
229                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
230                              ptr, fraglen,
231                              kqswnal_data.kqn_ep_tx_nmh, basepage,
232                              &railmask, &ktx->ktx_frags[nfrags]);
233
234                 if (nfrags == ktx->ktx_firsttmpfrag ||
235                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
236                                   &ktx->ktx_frags[nfrags - 1],
237                                   &ktx->ktx_frags[nfrags])) {
238                         /* new frag if this is the first or can't merge */
239                         nfrags++;
240                 }
241 #else
242                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
243                                        kqswnal_data.kqn_eptxdmahandle,
244                                        ptr, fraglen,
245                                        basepage, &ktx->ktx_frags[nfrags].Base);
246
247                 if (nfrags > 0 &&                /* previous frag mapped */
248                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
249                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
250                         /* just extend previous */
251                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
252                 else {
253                         ktx->ktx_frags[nfrags].Len = fraglen;
254                         nfrags++;                /* new frag */
255                 }
256 #endif
257
258                 kunmap (kiov->kiov_page);
259                 
260                 /* keep in loop for failure case */
261                 ktx->ktx_nmappedpages = nmapped;
262
263                 basepage++;
264                 kiov++;
265                 niov--;
266                 nob -= fraglen;
267                 offset = 0;
268
269                 /* iov must not run out before end of data */
270                 LASSERT (nob == 0 || niov > 0);
271
272         } while (nob > 0);
273
274         ktx->ktx_nfrag = nfrags;
275         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
276                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
277
278         return (0);
279 }
280
281 int
282 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
283                     int niov, struct iovec *iov)
284 {
285         int       nfrags    = ktx->ktx_nfrag;
286         int       nmapped   = ktx->ktx_nmappedpages;
287         int       maxmapped = ktx->ktx_npages;
288         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
289 #if MULTIRAIL_EKC
290         EP_RAILMASK railmask;
291         int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
292                                             EP_RAILMASK_ALL,
293                                             kqswnal_nid2elanid(ktx->ktx_nid));
294         
295         if (rail < 0) {
296                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
297                 return (-ENETDOWN);
298         }
299         railmask = 1 << rail;
300 #endif
301         LASSERT (nmapped <= maxmapped);
302         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
303         LASSERT (nfrags <= EP_MAXFRAG);
304         LASSERT (niov > 0);
305         LASSERT (nob > 0);
306
307         /* skip complete frags before offset */
308         while (offset >= iov->iov_len) {
309                 offset -= iov->iov_len;
310                 iov++;
311                 niov--;
312                 LASSERT (niov > 0);
313         }
314         
315         do {
316                 int  fraglen = iov->iov_len - offset;
317                 long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
318
319                 /* nob exactly spans the iovs */
320                 LASSERT (fraglen <= nob);
321                 
322                 nmapped += npages;
323                 if (nmapped > maxmapped) {
324                         CERROR("Can't map message in %d pages (max %d)\n",
325                                nmapped, maxmapped);
326                         return (-EMSGSIZE);
327                 }
328
329                 if (nfrags == EP_MAXFRAG) {
330                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
331                                EP_MAXFRAG);
332                         return (-EMSGSIZE);
333                 }
334
335                 CDEBUG(D_NET,
336                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
337                        ktx, nfrags, iov->iov_base + offset, fraglen, 
338                        basepage, npages, nmapped);
339
340 #if MULTIRAIL_EKC
341                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
342                              iov->iov_base + offset, fraglen,
343                              kqswnal_data.kqn_ep_tx_nmh, basepage,
344                              &railmask, &ktx->ktx_frags[nfrags]);
345
346                 if (nfrags == ktx->ktx_firsttmpfrag ||
347                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
348                                   &ktx->ktx_frags[nfrags - 1],
349                                   &ktx->ktx_frags[nfrags])) {
350                         /* new frag if this is the first or can't merge */
351                         nfrags++;
352                 }
353 #else
354                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
355                                        kqswnal_data.kqn_eptxdmahandle,
356                                        iov->iov_base + offset, fraglen,
357                                        basepage, &ktx->ktx_frags[nfrags].Base);
358
359                 if (nfrags > 0 &&                /* previous frag mapped */
360                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
361                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
362                         /* just extend previous */
363                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
364                 else {
365                         ktx->ktx_frags[nfrags].Len = fraglen;
366                         nfrags++;                /* new frag */
367                 }
368 #endif
369
370                 /* keep in loop for failure case */
371                 ktx->ktx_nmappedpages = nmapped;
372
373                 basepage += npages;
374                 iov++;
375                 niov--;
376                 nob -= fraglen;
377                 offset = 0;
378
379                 /* iov must not run out before end of data */
380                 LASSERT (nob == 0 || niov > 0);
381
382         } while (nob > 0);
383
384         ktx->ktx_nfrag = nfrags;
385         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
386                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
387
388         return (0);
389 }
390
391
392 void
393 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
394 {
395         kpr_fwd_desc_t   *fwd = NULL;
396         unsigned long     flags;
397
398         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
399         ktx->ktx_state = KTX_IDLE;
400
401         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
402
403         list_del (&ktx->ktx_list);              /* take off active list */
404
405         if (ktx->ktx_isnblk) {
406                 /* reserved for non-blocking tx */
407                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
408                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
409                 return;
410         }
411
412         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
413
414         /* anything blocking for a tx descriptor? */
415         if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
416         {
417                 CDEBUG(D_NET,"wakeup fwd\n");
418
419                 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
420                                   kpr_fwd_desc_t, kprfd_list);
421                 list_del (&fwd->kprfd_list);
422         }
423
424         wake_up (&kqswnal_data.kqn_idletxd_waitq);
425
426         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
427
428         if (fwd == NULL)
429                 return;
430
431         /* schedule packet for forwarding again */
432         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
433
434         list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
435         wake_up (&kqswnal_data.kqn_sched_waitq);
436
437         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
438 }
439
440 kqswnal_tx_t *
441 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
442 {
443         unsigned long  flags;
444         kqswnal_tx_t  *ktx = NULL;
445
446         for (;;) {
447                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
448
449                 /* "normal" descriptor is free */
450                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
451                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
452                                           kqswnal_tx_t, ktx_list);
453                         break;
454                 }
455
456                 /* "normal" descriptor pool is empty */
457
458                 if (fwd != NULL) { /* forwarded packet => queue for idle txd */
459                         CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
460                         list_add_tail (&fwd->kprfd_list,
461                                        &kqswnal_data.kqn_idletxd_fwdq);
462                         break;
463                 }
464
465                 /* doing a local transmit */
466                 if (!may_block) {
467                         if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
468                                 CERROR ("intr tx desc pool exhausted\n");
469                                 break;
470                         }
471
472                         ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
473                                           kqswnal_tx_t, ktx_list);
474                         break;
475                 }
476
477                 /* block for idle tx */
478
479                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
480
481                 CDEBUG (D_NET, "blocking for tx desc\n");
482                 wait_event (kqswnal_data.kqn_idletxd_waitq,
483                             !list_empty (&kqswnal_data.kqn_idletxds));
484         }
485
486         if (ktx != NULL) {
487                 list_del (&ktx->ktx_list);
488                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
489                 ktx->ktx_launcher = current->pid;
490         }
491
492         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
493
494         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
495         LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
496
497         return (ktx);
498 }
499
500 void
501 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
502 {
503         lib_msg_t     *msg;
504         lib_msg_t     *repmsg = NULL;
505
506         switch (ktx->ktx_state) {
507         case KTX_FORWARDING:       /* router asked me to forward this packet */
508                 kpr_fwd_done (&kqswnal_data.kqn_router,
509                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
510                 break;
511
512         case KTX_SENDING:          /* packet sourced locally */
513                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
514                               (lib_msg_t *)ktx->ktx_args[1],
515                               (error == 0) ? PTL_OK : 
516                               (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
517                 break;
518
519         case KTX_GETTING:          /* Peer has DMA-ed direct? */
520                 msg = (lib_msg_t *)ktx->ktx_args[1];
521
522                 if (error == 0) {
523                         repmsg = lib_fake_reply_msg (&kqswnal_lib, 
524                                                      ktx->ktx_nid, msg->md);
525                         if (repmsg == NULL)
526                                 error = -ENOMEM;
527                 }
528                 
529                 if (error == 0) {
530                         lib_finalize (&kqswnal_lib, ktx->ktx_args[0], 
531                                       msg, PTL_OK);
532                         lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
533                 } else {
534                         lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
535                                       (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
536                 }
537                 break;
538
539         default:
540                 LASSERT (0);
541         }
542
543         kqswnal_put_idle_tx (ktx);
544 }
545
546 static void
547 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
548 {
549         kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
550
551         LASSERT (txd != NULL);
552         LASSERT (ktx != NULL);
553
554         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
555
556         if (status != EP_SUCCESS) {
557
558                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
559                         ktx->ktx_nid, status);
560
561                 kqswnal_notify_peer_down(ktx);
562                 status = -EHOSTDOWN;
563
564         } else if (ktx->ktx_state == KTX_GETTING) {
565                 /* RPC completed OK; what did our peer put in the status
566                  * block? */
567 #if MULTIRAIL_EKC
568                 status = ep_txd_statusblk(txd)->Data[0];
569 #else
570                 status = ep_txd_statusblk(txd)->Status;
571 #endif
572         } else {
573                 status = 0;
574         }
575
576         kqswnal_tx_done (ktx, status);
577 }
578
579 int
580 kqswnal_launch (kqswnal_tx_t *ktx)
581 {
582         /* Don't block for transmit descriptor if we're in interrupt context */
583         int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
584         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
585         long  flags;
586         int   rc;
587
588         ktx->ktx_launchtime = jiffies;
589
590         LASSERT (dest >= 0);                    /* must be a peer */
591         if (ktx->ktx_state == KTX_GETTING) {
592                 /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
593                  * other frags are the GET sink which we obviously don't
594                  * send here :) */
595 #if MULTIRAIL_EKC
596                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
597                                      ktx->ktx_port, attr,
598                                      kqswnal_txhandler, ktx,
599                                      NULL, ktx->ktx_frags, 1);
600 #else
601                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
602                                      ktx->ktx_port, attr, kqswnal_txhandler,
603                                      ktx, NULL, ktx->ktx_frags, 1);
604 #endif
605         } else {
606 #if MULTIRAIL_EKC
607                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
608                                          ktx->ktx_port, attr,
609                                          kqswnal_txhandler, ktx,
610                                          NULL, ktx->ktx_frags, ktx->ktx_nfrag);
611 #else
612                 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
613                                        ktx->ktx_port, attr, 
614                                        kqswnal_txhandler, ktx, 
615                                        ktx->ktx_frags, ktx->ktx_nfrag);
616 #endif
617         }
618
619         switch (rc) {
620         case EP_SUCCESS: /* success */
621                 return (0);
622
623         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
624                 LASSERT (in_interrupt());
625
626                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
627
628                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
629                 wake_up (&kqswnal_data.kqn_sched_waitq);
630
631                 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
632                 return (0);
633
634         default: /* fatal error */
635                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
636                 kqswnal_notify_peer_down(ktx);
637                 return (-EHOSTUNREACH);
638         }
639 }
640
641 static char *
642 hdr_type_string (ptl_hdr_t *hdr)
643 {
644         switch (hdr->type) {
645         case PTL_MSG_ACK:
646                 return ("ACK");
647         case PTL_MSG_PUT:
648                 return ("PUT");
649         case PTL_MSG_GET:
650                 return ("GET");
651         case PTL_MSG_REPLY:
652                 return ("REPLY");
653         default:
654                 return ("<UNKNOWN>");
655         }
656 }
657
658 static void
659 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
660 {
661         char *type_str = hdr_type_string (hdr);
662
663         CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
664                NTOH__u32(hdr->payload_length));
665         CERROR("    From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
666                NTOH__u32(hdr->src_pid));
667         CERROR("    To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
668                NTOH__u32(hdr->dest_pid));
669
670         switch (NTOH__u32(hdr->type)) {
671         case PTL_MSG_PUT:
672                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
673                        "match bits "LPX64"\n",
674                        NTOH__u32 (hdr->msg.put.ptl_index),
675                        hdr->msg.put.ack_wmd.wh_interface_cookie,
676                        hdr->msg.put.ack_wmd.wh_object_cookie,
677                        NTOH__u64 (hdr->msg.put.match_bits));
678                 CERROR("    offset %d, hdr data "LPX64"\n",
679                        NTOH__u32(hdr->msg.put.offset),
680                        hdr->msg.put.hdr_data);
681                 break;
682
683         case PTL_MSG_GET:
684                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
685                        "match bits "LPX64"\n",
686                        NTOH__u32 (hdr->msg.get.ptl_index),
687                        hdr->msg.get.return_wmd.wh_interface_cookie,
688                        hdr->msg.get.return_wmd.wh_object_cookie,
689                        hdr->msg.get.match_bits);
690                 CERROR("    Length %d, src offset %d\n",
691                        NTOH__u32 (hdr->msg.get.sink_length),
692                        NTOH__u32 (hdr->msg.get.src_offset));
693                 break;
694
695         case PTL_MSG_ACK:
696                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
697                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
698                        hdr->msg.ack.dst_wmd.wh_object_cookie,
699                        NTOH__u32 (hdr->msg.ack.mlength));
700                 break;
701
702         case PTL_MSG_REPLY:
703                 CERROR("    dst md "LPX64"."LPX64"\n",
704                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
705                        hdr->msg.reply.dst_wmd.wh_object_cookie);
706         }
707
708 }                               /* end of print_hdr() */
709
710 #if !MULTIRAIL_EKC
711 void
712 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) 
713 {
714         int          i;
715
716         CDEBUG (how, "%s: %d\n", str, n);
717         for (i = 0; i < n; i++) {
718                 CDEBUG (how, "   %08x for %d\n", iov[i].Base, iov[i].Len);
719         }
720 }
721
722 int
723 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
724                      int nsrc, EP_IOVEC *src,
725                      int ndst, EP_IOVEC *dst) 
726 {
727         int        count;
728         int        nob;
729
730         LASSERT (ndv > 0);
731         LASSERT (nsrc > 0);
732         LASSERT (ndst > 0);
733
734         for (count = 0; count < ndv; count++, dv++) {
735
736                 if (nsrc == 0 || ndst == 0) {
737                         if (nsrc != ndst) {
738                                 /* For now I'll barf on any left over entries */
739                                 CERROR ("mismatched src and dst iovs\n");
740                                 return (-EINVAL);
741                         }
742                         return (count);
743                 }
744
745                 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
746                 dv->Len    = nob;
747                 dv->Source = src->Base;
748                 dv->Dest   = dst->Base;
749
750                 if (nob >= src->Len) {
751                         src++;
752                         nsrc--;
753                 } else {
754                         src->Len -= nob;
755                         src->Base += nob;
756                 }
757                 
758                 if (nob >= dst->Len) {
759                         dst++;
760                         ndst--;
761                 } else {
762                         src->Len -= nob;
763                         src->Base += nob;
764                 }
765         }
766
767         CERROR ("DATAVEC too small\n");
768         return (-E2BIG);
769 }
770 #endif
771
772 int
773 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
774                    struct iovec *iov, ptl_kiov_t *kiov, 
775                    int offset, int nob)
776 {
777         kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
778         char               *buffer = (char *)page_address(krx->krx_pages[0]);
779         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
780         int                 rc;
781 #if MULTIRAIL_EKC
782         int                 i;
783 #else
784         EP_DATAVEC          datav[EP_MAXFRAG];
785         int                 ndatav;
786 #endif
787         LASSERT (krx->krx_rpc_reply_needed);
788         LASSERT ((iov == NULL) != (kiov == NULL));
789
790         /* see kqswnal_sendmsg comment regarding endian-ness */
791         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
792                 /* msg too small to discover rmd size */
793                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
794                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
795                 return (-EINVAL);
796         }
797         
798         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
799                 /* rmd doesn't fit in the incoming message */
800                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
801                         krx->krx_nob, rmd->kqrmd_nfrag,
802                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
803                 return (-EINVAL);
804         }
805
806         /* Map the source data... */
807         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
808         if (kiov != NULL)
809                 rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
810         else
811                 rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
812
813         if (rc != 0) {
814                 CERROR ("Can't map source data: %d\n", rc);
815                 return (rc);
816         }
817
818 #if MULTIRAIL_EKC
819         if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
820                 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
821                        ktx->ktx_nfrag, rmd->kqrmd_nfrag);
822                 return (-EINVAL);
823         }
824         
825         for (i = 0; i < rmd->kqrmd_nfrag; i++)
826                 if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
827                         CERROR("Can't cope with unequal frags %d(%d):"
828                                " %d local %d remote\n",
829                                i, rmd->kqrmd_nfrag, 
830                                ktx->ktx_frags[i].nmd_len, 
831                                rmd->kqrmd_frag[i].nmd_len);
832                         return (-EINVAL);
833                 }
834 #else
835         ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
836                                       ktx->ktx_nfrag, ktx->ktx_frags,
837                                       rmd->kqrmd_nfrag, rmd->kqrmd_frag);
838         if (ndatav < 0) {
839                 CERROR ("Can't create datavec: %d\n", ndatav);
840                 return (ndatav);
841         }
842 #endif
843
844         /* Our caller will start to race with kqswnal_dma_reply_complete... */
845         LASSERT (atomic_read (&krx->krx_refcount) == 1);
846         atomic_set (&krx->krx_refcount, 2);
847
848 #if MULTIRAIL_EKC
849         rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx, 
850                              &kqswnal_rpc_success,
851                              ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
852         if (rc == EP_SUCCESS)
853                 return (0);
854
855         /* Well we tried... */
856         krx->krx_rpc_reply_needed = 0;
857 #else
858         rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
859                               &kqswnal_rpc_success, datav, ndatav);
860         if (rc == EP_SUCCESS)
861                 return (0);
862
863         /* "old" EKC destroys rxd on failed completion */
864         krx->krx_rxd = NULL;
865 #endif
866
867         CERROR("can't complete RPC: %d\n", rc);
868
869         /* reset refcount back to 1: we're not going to be racing with
870          * kqswnal_dma_reply_complete. */
871         atomic_set (&krx->krx_refcount, 1);
872
873         return (-ECONNABORTED);
874 }
875
876 static ptl_err_t
877 kqswnal_sendmsg (nal_cb_t     *nal,
878                  void         *private,
879                  lib_msg_t    *libmsg,
880                  ptl_hdr_t    *hdr,
881                  int           type,
882                  ptl_nid_t     nid,
883                  ptl_pid_t     pid,
884                  unsigned int  payload_niov,
885                  struct iovec *payload_iov,
886                  ptl_kiov_t   *payload_kiov,
887                  size_t        payload_offset,
888                  size_t        payload_nob)
889 {
890         kqswnal_tx_t      *ktx;
891         int                rc;
892         ptl_nid_t          targetnid;
893 #if KQSW_CHECKSUM
894         int                i;
895         kqsw_csum_t        csum;
896         int                sumoff;
897         int                sumnob;
898 #endif
899         
900         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
901                " pid %u\n", payload_nob, payload_niov, nid, pid);
902
903         LASSERT (payload_nob == 0 || payload_niov > 0);
904         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
905
906         /* It must be OK to kmap() if required */
907         LASSERT (payload_kiov == NULL || !in_interrupt ());
908         /* payload is either all vaddrs or all pages */
909         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
910         
911         if (payload_nob > KQSW_MAXPAYLOAD) {
912                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
913                         payload_nob, KQSW_MAXPAYLOAD);
914                 return (PTL_FAIL);
915         }
916
917         targetnid = nid;
918         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
919                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
920                                  sizeof (ptl_hdr_t) + payload_nob, &targetnid);
921                 if (rc != 0) {
922                         CERROR("Can't route to "LPX64": router error %d\n",
923                                nid, rc);
924                         return (PTL_FAIL);
925                 }
926                 if (kqswnal_nid2elanid (targetnid) < 0) {
927                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
928                                targetnid, nid);
929                         return (PTL_FAIL);
930                 }
931         }
932
933         /* I may not block for a transmit descriptor if I might block the
934          * receiver, or an interrupt handler. */
935         ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
936                                           type == PTL_MSG_REPLY ||
937                                           in_interrupt()));
938         if (ktx == NULL) {
939                 kqswnal_cerror_hdr (hdr);
940                 return (PTL_NOSPACE);
941         }
942
943         ktx->ktx_nid     = targetnid;
944         ktx->ktx_args[0] = private;
945         ktx->ktx_args[1] = libmsg;
946
947         if (type == PTL_MSG_REPLY &&
948             ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
949                 if (nid != targetnid ||
950                     kqswnal_nid2elanid(nid) != 
951                     ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
952                         CERROR("Optimized reply nid conflict: "
953                                "nid "LPX64" via "LPX64" elanID %d\n",
954                                nid, targetnid,
955                                ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
956                         return (PTL_FAIL);
957                 }
958
959                 /* peer expects RPC completion with GET data */
960                 rc = kqswnal_dma_reply (ktx, payload_niov, 
961                                         payload_iov, payload_kiov, 
962                                         payload_offset, payload_nob);
963                 if (rc == 0)
964                         return (PTL_OK);
965                 
966                 CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
967                 kqswnal_put_idle_tx (ktx);
968                 return (PTL_FAIL);
969         }
970
971         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
972         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
973
974 #if KQSW_CHECKSUM
975         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
976         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
977         for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
978                 LASSERT(i < niov);
979                 if (payload_kiov != NULL) {
980                         ptl_kiov_t *kiov = &payload_kiov[i];
981
982                         if (sumoff >= kiov->kiov_len) {
983                                 sumoff -= kiov->kiov_len;
984                         } else {
985                                 char *addr = ((char *)kmap (kiov->kiov_page)) +
986                                              kiov->kiov_offset + sumoff;
987                                 int   fragnob = kiov->kiov_len - sumoff;
988
989                                 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
990                                 sumnob -= fragnob;
991                                 sumoff = 0;
992                                 kunmap(kiov->kiov_page);
993                         }
994                 } else {
995                         struct iovec *iov = &payload_iov[i];
996
997                         if (sumoff > iov->iov_len) {
998                                 sumoff -= iov->iov_len;
999                         } else {
1000                                 char *addr = iov->iov_base + sumoff;
1001                                 int   fragnob = iov->iov_len - sumoff;
1002                                 
1003                                 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1004                                 sumnob -= fragnob;
1005                                 sumoff = 0;
1006                         }
1007                 }
1008         }
1009         memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
1010 #endif
1011         
1012         if (kqswnal_data.kqn_optimized_gets &&
1013             type == PTL_MSG_GET &&              /* doing a GET */
1014             nid == targetnid) {                 /* not forwarding */
1015                 lib_md_t           *md = libmsg->md;
1016                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
1017                 
1018                 /* Optimised path: I send over the Elan vaddrs of the get
1019                  * sink buffers, and my peer DMAs directly into them.
1020                  *
1021                  * First I set up ktx as if it was going to send this
1022                  * payload, (it needs to map it anyway).  This fills
1023                  * ktx_frags[1] and onward with the network addresses
1024                  * of the GET sink frags.  I copy these into ktx_buffer,
1025                  * immediately after the header, and send that as my GET
1026                  * message.
1027                  *
1028                  * Note that the addresses are sent in native endian-ness.
1029                  * When EKC copes with different endian nodes, I'll fix
1030                  * this (and eat my hat :) */
1031
1032                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1033                 ktx->ktx_state = KTX_GETTING;
1034
1035                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
1036                         rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
1037                                                   md->md_niov, md->md_iov.kiov);
1038                 else
1039                         rc = kqswnal_map_tx_iov (ktx, 0, md->length,
1040                                                  md->md_niov, md->md_iov.iov);
1041
1042                 if (rc < 0) {
1043                         kqswnal_put_idle_tx (ktx);
1044                         return (PTL_FAIL);
1045                 }
1046
1047                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1048
1049                 payload_nob = offsetof(kqswnal_remotemd_t,
1050                                        kqrmd_frag[rmd->kqrmd_nfrag]);
1051                 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1052
1053 #if MULTIRAIL_EKC
1054                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1055                        rmd->kqrmd_nfrag * sizeof(EP_NMD));
1056
1057                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1058                               0, KQSW_HDR_SIZE + payload_nob);
1059 #else
1060                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1061                        rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1062                 
1063                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1064                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1065 #endif
1066         } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1067
1068                 /* small message: single frag copied into the pre-mapped buffer */
1069
1070                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1071                 ktx->ktx_state = KTX_SENDING;
1072 #if MULTIRAIL_EKC
1073                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1074                               0, KQSW_HDR_SIZE + payload_nob);
1075 #else
1076                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1077                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1078 #endif
1079                 if (payload_nob > 0) {
1080                         if (payload_kiov != NULL)
1081                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1082                                                    payload_niov, payload_kiov, 
1083                                                    payload_offset, payload_nob);
1084                         else
1085                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1086                                                   payload_niov, payload_iov, 
1087                                                   payload_offset, payload_nob);
1088                 }
1089         } else {
1090
1091                 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1092
1093                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1094                 ktx->ktx_state = KTX_SENDING;
1095 #if MULTIRAIL_EKC
1096                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1097                               0, KQSW_HDR_SIZE);
1098 #else
1099                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1100                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1101 #endif
1102                 if (payload_kiov != NULL)
1103                         rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
1104                                                   payload_niov, payload_kiov);
1105                 else
1106                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1107                                                  payload_niov, payload_iov);
1108                 if (rc != 0) {
1109                         kqswnal_put_idle_tx (ktx);
1110                         return (PTL_FAIL);
1111                 }
1112         }
1113         
1114         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1115                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1116
1117         rc = kqswnal_launch (ktx);
1118         if (rc != 0) {                    /* failed? */
1119                 CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
1120                 kqswnal_put_idle_tx (ktx);
1121                 return (PTL_FAIL);
1122         }
1123
1124         CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n", 
1125                payload_nob, nid, targetnid);
1126         return (PTL_OK);
1127 }
1128
1129 static ptl_err_t
1130 kqswnal_send (nal_cb_t     *nal,
1131               void         *private,
1132               lib_msg_t    *libmsg,
1133               ptl_hdr_t    *hdr,
1134               int           type,
1135               ptl_nid_t     nid,
1136               ptl_pid_t     pid,
1137               unsigned int  payload_niov,
1138               struct iovec *payload_iov,
1139               size_t        payload_offset,
1140               size_t        payload_nob)
1141 {
1142         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1143                                  payload_niov, payload_iov, NULL, 
1144                                  payload_offset, payload_nob));
1145 }
1146
1147 static ptl_err_t
1148 kqswnal_send_pages (nal_cb_t     *nal,
1149                     void         *private,
1150                     lib_msg_t    *libmsg,
1151                     ptl_hdr_t    *hdr,
1152                     int           type,
1153                     ptl_nid_t     nid,
1154                     ptl_pid_t     pid,
1155                     unsigned int  payload_niov,
1156                     ptl_kiov_t   *payload_kiov,
1157                     size_t        payload_offset,
1158                     size_t        payload_nob)
1159 {
1160         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1161                                  payload_niov, NULL, payload_kiov, 
1162                                  payload_offset, payload_nob));
1163 }
1164
1165 void
1166 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1167 {
1168         int             rc;
1169         kqswnal_tx_t   *ktx;
1170         struct iovec   *iov = fwd->kprfd_iov;
1171         int             niov = fwd->kprfd_niov;
1172         int             nob = fwd->kprfd_nob;
1173         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
1174
1175 #if KQSW_CHECKSUM
1176         CERROR ("checksums for forwarded packets not implemented\n");
1177         LBUG ();
1178 #endif
1179         /* The router wants this NAL to forward a packet */
1180         CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
1181                 fwd, nid, niov, nob);
1182
1183         LASSERT (niov > 0);
1184         
1185         ktx = kqswnal_get_idle_tx (fwd, 0);
1186         if (ktx == NULL)        /* can't get txd right now */
1187                 return;         /* fwd will be scheduled when tx desc freed */
1188
1189         if (nid == kqswnal_lib.ni.nid)          /* gateway is me */
1190                 nid = fwd->kprfd_target_nid;    /* target is final dest */
1191
1192         if (kqswnal_nid2elanid (nid) < 0) {
1193                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1194                 rc = -EHOSTUNREACH;
1195                 goto failed;
1196         }
1197
1198         if (nob > KQSW_NRXMSGBYTES_LARGE) {
1199                 CERROR ("Can't forward [%p] to "LPX64
1200                         ": size %d bigger than max packet size %ld\n",
1201                         fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
1202                 rc = -EMSGSIZE;
1203                 goto failed;
1204         }
1205
1206         ktx->ktx_port    = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ?
1207                            EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1208         ktx->ktx_nid     = nid;
1209         ktx->ktx_state   = KTX_FORWARDING;
1210         ktx->ktx_args[0] = fwd;
1211
1212         if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) &&
1213             nob <= KQSW_TX_BUFFER_SIZE) 
1214         {
1215                 /* send from ktx's pre-mapped contiguous buffer? */
1216                 lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, 0, nob);
1217 #if MULTIRAIL_EKC
1218                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1219                               0, nob);
1220 #else
1221                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1222                 ktx->ktx_frags[0].Len = nob;
1223 #endif
1224                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1225                 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
1226         }
1227         else
1228         {
1229                 /* zero copy */
1230                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
1231                 rc = kqswnal_map_tx_iov (ktx, 0, nob, niov, iov);
1232                 if (rc != 0)
1233                         goto failed;
1234
1235                 ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
1236         }
1237
1238         rc = kqswnal_launch (ktx);
1239         if (rc == 0)
1240                 return;
1241
1242  failed:
1243         LASSERT (rc != 0);
1244         CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1245
1246         kqswnal_put_idle_tx (ktx);
1247         /* complete now (with failure) */
1248         kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
1249 }
1250
1251 void
1252 kqswnal_fwd_callback (void *arg, int error)
1253 {
1254         kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1255
1256         /* The router has finished forwarding this packet */
1257
1258         if (error != 0)
1259         {
1260                 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
1261
1262                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1263                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
1264         }
1265
1266         kqswnal_requeue_rx (krx);
1267 }
1268
1269 void
1270 kqswnal_dma_reply_complete (EP_RXD *rxd) 
1271 {
1272         int           status = ep_rxd_status(rxd);
1273         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
1274         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
1275         lib_msg_t    *msg = (lib_msg_t *)ktx->ktx_args[1];
1276         
1277         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1278                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
1279
1280         LASSERT (krx->krx_rxd == rxd);
1281         LASSERT (krx->krx_rpc_reply_needed);
1282
1283         krx->krx_rpc_reply_needed = 0;
1284         kqswnal_rx_done (krx);
1285
1286         lib_finalize (&kqswnal_lib, NULL, msg,
1287                       (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
1288         kqswnal_put_idle_tx (ktx);
1289 }
1290
1291 void
1292 kqswnal_rpc_complete (EP_RXD *rxd)
1293 {
1294         int           status = ep_rxd_status(rxd);
1295         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1296         
1297         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1298                "rxd %p, krx %p, status %d\n", rxd, krx, status);
1299
1300         LASSERT (krx->krx_rxd == rxd);
1301         LASSERT (krx->krx_rpc_reply_needed);
1302         
1303         krx->krx_rpc_reply_needed = 0;
1304         kqswnal_requeue_rx (krx);
1305 }
1306
1307 void
1308 kqswnal_requeue_rx (kqswnal_rx_t *krx) 
1309 {
1310         int   rc;
1311
1312         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1313
1314         if (krx->krx_rpc_reply_needed) {
1315
1316                 /* We failed to complete the peer's optimized GET (e.g. we
1317                  * couldn't map the source buffers).  We complete the
1318                  * peer's EKC rpc now with failure. */
1319 #if MULTIRAIL_EKC
1320                 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
1321                                      &kqswnal_rpc_failed, NULL, NULL, 0);
1322                 if (rc == EP_SUCCESS)
1323                         return;
1324                 
1325                 CERROR("can't complete RPC: %d\n", rc);
1326 #else
1327                 if (krx->krx_rxd != NULL) {
1328                         /* We didn't try (and fail) to complete earlier... */
1329                         rc = ep_complete_rpc(krx->krx_rxd, 
1330                                              kqswnal_rpc_complete, krx,
1331                                              &kqswnal_rpc_failed, NULL, 0);
1332                         if (rc == EP_SUCCESS)
1333                                 return;
1334
1335                         CERROR("can't complete RPC: %d\n", rc);
1336                 }
1337                 
1338                 /* NB the old ep_complete_rpc() frees rxd on failure, so we
1339                  * have to requeue from scratch here, unless we're shutting
1340                  * down */
1341                 if (kqswnal_data.kqn_shuttingdown)
1342                         return;
1343
1344                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
1345                                       krx->krx_elanbuffer, 
1346                                       krx->krx_npages * PAGE_SIZE, 0);
1347                 LASSERT (rc == EP_SUCCESS);
1348                 /* We don't handle failure here; it's incredibly rare
1349                  * (never reported?) and only happens with "old" EKC */
1350                 return;
1351 #endif
1352         }
1353
1354 #if MULTIRAIL_EKC
1355         if (kqswnal_data.kqn_shuttingdown) {
1356                 /* free EKC rxd on shutdown */
1357                 ep_complete_receive(krx->krx_rxd);
1358         } else {
1359                 /* repost receive */
1360                 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1361                                    &krx->krx_elanbuffer, 0);
1362         }
1363 #else                
1364         /* don't actually requeue on shutdown */
1365         if (!kqswnal_data.kqn_shuttingdown) 
1366                 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1367                                    krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
1368 #endif
1369 }
1370         
1371 void
1372 kqswnal_rx (kqswnal_rx_t *krx)
1373 {
1374         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
1375         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
1376         int             nob;
1377         int             niov;
1378
1379         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1380
1381         if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
1382                 atomic_set(&krx->krx_refcount, 1);
1383                 lib_parse (&kqswnal_lib, hdr, krx);
1384                 kqswnal_rx_done(krx);
1385                 return;
1386         }
1387
1388 #if KQSW_CHECKSUM
1389         CERROR ("checksums for forwarded packets not implemented\n");
1390         LBUG ();
1391 #endif
1392         if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
1393         {
1394                 CERROR("dropping packet from "LPX64" for "LPX64
1395                        ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
1396
1397                 kqswnal_requeue_rx (krx);
1398                 return;
1399         }
1400
1401         /* NB forwarding may destroy iov; rebuild every time */
1402         for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
1403         {
1404                 LASSERT (niov < krx->krx_npages);
1405                 krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
1406                 krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
1407         }
1408
1409         kpr_fwd_init (&krx->krx_fwd, dest_nid,
1410                       krx->krx_nob, niov, krx->krx_iov,
1411                       kqswnal_fwd_callback, krx);
1412
1413         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1414 }
1415
1416 /* Receive Interrupt Handler: posts to schedulers */
1417 void 
1418 kqswnal_rxhandler(EP_RXD *rxd)
1419 {
1420         long          flags;
1421         int           nob    = ep_rxd_len (rxd);
1422         int           status = ep_rxd_status (rxd);
1423         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1424
1425         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1426                rxd, krx, nob, status);
1427
1428         LASSERT (krx != NULL);
1429
1430         krx->krx_rxd = rxd;
1431         krx->krx_nob = nob;
1432 #if MULTIRAIL_EKC
1433         krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
1434 #else
1435         krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
1436 #endif
1437         
1438         /* must receive a whole header to be able to parse */
1439         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1440         {
1441                 /* receives complete with failure when receiver is removed */
1442 #if MULTIRAIL_EKC
1443                 if (status == EP_SHUTDOWN)
1444                         LASSERT (kqswnal_data.kqn_shuttingdown);
1445                 else
1446                         CERROR("receive status failed with status %d nob %d\n",
1447                                ep_rxd_status(rxd), nob);
1448 #else
1449                 if (!kqswnal_data.kqn_shuttingdown)
1450                         CERROR("receive status failed with status %d nob %d\n",
1451                                ep_rxd_status(rxd), nob);
1452 #endif
1453                 kqswnal_requeue_rx (krx);
1454                 return;
1455         }
1456
1457         if (!in_interrupt()) {
1458                 kqswnal_rx (krx);
1459                 return;
1460         }
1461
1462         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1463
1464         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1465         wake_up (&kqswnal_data.kqn_sched_waitq);
1466
1467         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1468 }
1469
1470 #if KQSW_CHECKSUM
1471 void
1472 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1473 {
1474         ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
1475
1476         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1477                 ", dpid %d, spid %d, type %d\n",
1478                 ishdr ? "Header" : "Payload", krx,
1479                 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
1480                 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
1481                 NTOH__u32(hdr->type));
1482
1483         switch (NTOH__u32 (hdr->type))
1484         {
1485         case PTL_MSG_ACK:
1486                 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1487                        " len %u\n",
1488                        NTOH__u32(hdr->msg.ack.mlength),
1489                        hdr->msg.ack.dst_wmd.handle_cookie,
1490                        hdr->msg.ack.dst_wmd.handle_idx,
1491                        NTOH__u64(hdr->msg.ack.match_bits),
1492                        NTOH__u32(hdr->msg.ack.length));
1493                 break;
1494         case PTL_MSG_PUT:
1495                 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1496                        " len %u off %u data "LPX64"\n",
1497                        NTOH__u32(hdr->msg.put.ptl_index),
1498                        hdr->msg.put.ack_wmd.handle_cookie,
1499                        hdr->msg.put.ack_wmd.handle_idx,
1500                        NTOH__u64(hdr->msg.put.match_bits),
1501                        NTOH__u32(hdr->msg.put.length),
1502                        NTOH__u32(hdr->msg.put.offset),
1503                        hdr->msg.put.hdr_data);
1504                 break;
1505         case PTL_MSG_GET:
1506                 CERROR ("GET: <>\n");
1507                 break;
1508         case PTL_MSG_REPLY:
1509                 CERROR ("REPLY: <>\n");
1510                 break;
1511         default:
1512                 CERROR ("TYPE?: <>\n");
1513         }
1514 }
1515 #endif
1516
1517 static ptl_err_t
1518 kqswnal_recvmsg (nal_cb_t     *nal,
1519                  void         *private,
1520                  lib_msg_t    *libmsg,
1521                  unsigned int  niov,
1522                  struct iovec *iov,
1523                  ptl_kiov_t   *kiov,
1524                  size_t        offset,
1525                  size_t        mlen,
1526                  size_t        rlen)
1527 {
1528         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1529         int           page;
1530         char         *page_ptr;
1531         int           page_nob;
1532         char         *iov_ptr;
1533         int           iov_nob;
1534         int           frag;
1535 #if KQSW_CHECKSUM
1536         kqsw_csum_t   senders_csum;
1537         kqsw_csum_t   payload_csum = 0;
1538         kqsw_csum_t   hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
1539                                            sizeof(ptl_hdr_t));
1540         size_t        csum_len = mlen;
1541         int           csum_frags = 0;
1542         int           csum_nob = 0;
1543         static atomic_t csum_counter;
1544         int           csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1545
1546         atomic_inc (&csum_counter);
1547
1548         memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
1549                                 sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1550         if (senders_csum != hdr_csum)
1551                 kqswnal_csum_error (krx, 1);
1552 #endif
1553         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1554
1555         /* What was actually received must be >= payload. */
1556         LASSERT (mlen <= rlen);
1557         if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
1558                 CERROR("Bad message size: have %d, need %d + %d\n",
1559                        krx->krx_nob, KQSW_HDR_SIZE, mlen);
1560                 return (PTL_FAIL);
1561         }
1562
1563         /* It must be OK to kmap() if required */
1564         LASSERT (kiov == NULL || !in_interrupt ());
1565         /* Either all pages or all vaddrs */
1566         LASSERT (!(kiov != NULL && iov != NULL));
1567         
1568         if (mlen != 0)
1569         {
1570                 page     = 0;
1571                 page_ptr = ((char *) page_address(krx->krx_pages[0])) +
1572                         KQSW_HDR_SIZE;
1573                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1574
1575                 LASSERT (niov > 0);
1576                 
1577                 if (kiov != NULL) {
1578                         /* skip complete frags */
1579                         while (offset >= kiov->kiov_len) {
1580                                 offset -= kiov->kiov_len;
1581                                 kiov++;
1582                                 niov--;
1583                                 LASSERT (niov > 0);
1584                         }
1585                         iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
1586                         iov_nob = kiov->kiov_len - offset;
1587                 } else {
1588                         /* skip complete frags */
1589                         while (offset >= iov->iov_len) {
1590                                 offset -= iov->iov_len;
1591                                 iov++;
1592                                 niov--;
1593                                 LASSERT (niov > 0);
1594                         }
1595                         iov_ptr = iov->iov_base + offset;
1596                         iov_nob = iov->iov_len - offset;
1597                 }
1598                 
1599                 for (;;)
1600                 {
1601                         frag = mlen;
1602                         if (frag > page_nob)
1603                                 frag = page_nob;
1604                         if (frag > iov_nob)
1605                                 frag = iov_nob;
1606
1607                         memcpy (iov_ptr, page_ptr, frag);
1608 #if KQSW_CHECKSUM
1609                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1610                         csum_nob += frag;
1611                         csum_frags++;
1612 #endif
1613                         mlen -= frag;
1614                         if (mlen == 0)
1615                                 break;
1616
1617                         page_nob -= frag;
1618                         if (page_nob != 0)
1619                                 page_ptr += frag;
1620                         else
1621                         {
1622                                 page++;
1623                                 LASSERT (page < krx->krx_npages);
1624                                 page_ptr = page_address(krx->krx_pages[page]);
1625                                 page_nob = PAGE_SIZE;
1626                         }
1627
1628                         iov_nob -= frag;
1629                         if (iov_nob != 0)
1630                                 iov_ptr += frag;
1631                         else if (kiov != NULL) {
1632                                 kunmap (kiov->kiov_page);
1633                                 kiov++;
1634                                 niov--;
1635                                 LASSERT (niov > 0);
1636                                 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1637                                 iov_nob = kiov->kiov_len;
1638                         } else {
1639                                 iov++;
1640                                 niov--;
1641                                 LASSERT (niov > 0);
1642                                 iov_ptr = iov->iov_base;
1643                                 iov_nob = iov->iov_len;
1644                         }
1645                 }
1646
1647                 if (kiov != NULL)
1648                         kunmap (kiov->kiov_page);
1649         }
1650
1651 #if KQSW_CHECKSUM
1652         memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
1653                 sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
1654
1655         if (csum_len != rlen)
1656                 CERROR("Unable to checksum data in user's buffer\n");
1657         else if (senders_csum != payload_csum)
1658                 kqswnal_csum_error (krx, 0);
1659
1660         if (csum_verbose)
1661                 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1662                        "csum_nob %d\n",
1663                         hdr_csum, payload_csum, csum_frags, csum_nob);
1664 #endif
1665         lib_finalize(nal, private, libmsg, PTL_OK);
1666
1667         return (PTL_OK);
1668 }
1669
1670 static ptl_err_t
1671 kqswnal_recv(nal_cb_t     *nal,
1672              void         *private,
1673              lib_msg_t    *libmsg,
1674              unsigned int  niov,
1675              struct iovec *iov,
1676              size_t        offset,
1677              size_t        mlen,
1678              size_t        rlen)
1679 {
1680         return (kqswnal_recvmsg(nal, private, libmsg, 
1681                                 niov, iov, NULL, 
1682                                 offset, mlen, rlen));
1683 }
1684
1685 static ptl_err_t
1686 kqswnal_recv_pages (nal_cb_t     *nal,
1687                     void         *private,
1688                     lib_msg_t    *libmsg,
1689                     unsigned int  niov,
1690                     ptl_kiov_t   *kiov,
1691                     size_t        offset,
1692                     size_t        mlen,
1693                     size_t        rlen)
1694 {
1695         return (kqswnal_recvmsg(nal, private, libmsg, 
1696                                 niov, NULL, kiov, 
1697                                 offset, mlen, rlen));
1698 }
1699
1700 int
1701 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1702 {
1703         long    pid = kernel_thread (fn, arg, 0);
1704
1705         if (pid < 0)
1706                 return ((int)pid);
1707
1708         atomic_inc (&kqswnal_data.kqn_nthreads);
1709         atomic_inc (&kqswnal_data.kqn_nthreads_running);
1710         return (0);
1711 }
1712
1713 void
1714 kqswnal_thread_fini (void)
1715 {
1716         atomic_dec (&kqswnal_data.kqn_nthreads);
1717 }
1718
1719 int
1720 kqswnal_scheduler (void *arg)
1721 {
1722         kqswnal_rx_t    *krx;
1723         kqswnal_tx_t    *ktx;
1724         kpr_fwd_desc_t  *fwd;
1725         long             flags;
1726         int              rc;
1727         int              counter = 0;
1728         int              shuttingdown = 0;
1729         int              did_something;
1730
1731         kportal_daemonize ("kqswnal_sched");
1732         kportal_blockallsigs ();
1733         
1734         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1735
1736         for (;;)
1737         {
1738                 if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
1739
1740                         if (kqswnal_data.kqn_shuttingdown == 2)
1741                                 break;
1742                 
1743                         /* During stage 1 of shutdown we are still responsive
1744                          * to receives */
1745
1746                         atomic_dec (&kqswnal_data.kqn_nthreads_running);
1747                         shuttingdown = kqswnal_data.kqn_shuttingdown;
1748                 }
1749
1750                 did_something = 0;
1751
1752                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1753                 {
1754                         krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1755                                          kqswnal_rx_t, krx_list);
1756                         list_del (&krx->krx_list);
1757                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1758                                                flags);
1759
1760                         kqswnal_rx (krx);
1761
1762                         did_something = 1;
1763                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1764                 }
1765
1766                 if (!shuttingdown &&
1767                     !list_empty (&kqswnal_data.kqn_delayedtxds))
1768                 {
1769                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1770                                          kqswnal_tx_t, ktx_list);
1771                         list_del_init (&ktx->ktx_delayed_list);
1772                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1773                                                flags);
1774
1775                         rc = kqswnal_launch (ktx);
1776                         if (rc != 0)          /* failed: ktx_nid down? */
1777                         {
1778                                 CERROR("Failed delayed transmit to "LPX64
1779                                        ": %d\n", ktx->ktx_nid, rc);
1780                                 kqswnal_tx_done (ktx, rc);
1781                         }
1782
1783                         did_something = 1;
1784                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1785                 }
1786
1787                 if (!shuttingdown &
1788                     !list_empty (&kqswnal_data.kqn_delayedfwds))
1789                 {
1790                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1791                         list_del (&fwd->kprfd_list);
1792                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1793
1794                         kqswnal_fwd_packet (NULL, fwd);
1795
1796                         did_something = 1;
1797                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1798                 }
1799
1800                     /* nothing to do or hogging CPU */
1801                 if (!did_something || counter++ == KQSW_RESCHED) {
1802                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1803                                                flags);
1804
1805                         counter = 0;
1806
1807                         if (!did_something) {
1808                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1809                                                                kqswnal_data.kqn_shuttingdown != shuttingdown ||
1810                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
1811                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1812                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
1813                                 LASSERT (rc == 0);
1814                         } else if (current->need_resched)
1815                                 schedule ();
1816
1817                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1818                 }
1819         }
1820
1821         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1822
1823         kqswnal_thread_fini ();
1824         return (0);
1825 }
1826
1827 nal_cb_t kqswnal_lib =
1828 {
1829         nal_data:       &kqswnal_data,         /* NAL private data */
1830         cb_send:        kqswnal_send,
1831         cb_send_pages:  kqswnal_send_pages,
1832         cb_recv:        kqswnal_recv,
1833         cb_recv_pages:  kqswnal_recv_pages,
1834         cb_read:        kqswnal_read,
1835         cb_write:       kqswnal_write,
1836         cb_malloc:      kqswnal_malloc,
1837         cb_free:        kqswnal_free,
1838         cb_printf:      kqswnal_printf,
1839         cb_cli:         kqswnal_cli,
1840         cb_sti:         kqswnal_sti,
1841         cb_dist:        kqswnal_dist
1842 };