Whamcloud - gitweb
7aee376ce620ae644cc417d9cad9a3d72beba68a
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8  * W. Marcus Miller - Based on ksocknal
9  *
10  * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  * Portals is free software; you can redistribute it and/or
13  * modify it under the terms of version 2 of the GNU General Public
14  * License as published by the Free Software Foundation.
15  *
16  * Portals is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with Portals; if not, write to the Free Software
23  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  */
26
27 #include "qswnal.h"
28
29 /*
30  *  LIB functions follow
31  *
32  */
33 static int
34 kqswnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
35 {
36         if (nid == nal->libnal_ni.ni_pid.nid)
37                 *dist = 0;                      /* it's me */
38         else if (kqswnal_nid2elanid (nid) >= 0)
39                 *dist = 1;                      /* it's my peer */
40         else
41                 *dist = 2;                      /* via router */
42         return (0);
43 }
44
45 void
46 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
47 {
48         struct timeval     now;
49         time_t             then;
50
51         do_gettimeofday (&now);
52         then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
53
54         kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
55 }
56
57 void
58 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
59 {
60 #if MULTIRAIL_EKC
61         int      i;
62
63         ktx->ktx_rail = -1;                     /* unset rail */
64 #endif
65
66         if (ktx->ktx_nmappedpages == 0)
67                 return;
68         
69 #if MULTIRAIL_EKC
70         CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
71                ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
72
73         for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
74                 ep_dvma_unload(kqswnal_data.kqn_ep,
75                                kqswnal_data.kqn_ep_tx_nmh,
76                                &ktx->ktx_frags[i]);
77 #else
78         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
79                 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
80
81         LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
82         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
83                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
84
85         elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
86                           kqswnal_data.kqn_eptxdmahandle,
87                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
88 #endif
89         ktx->ktx_nmappedpages = 0;
90 }
91
92 int
93 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
94 {
95         int       nfrags    = ktx->ktx_nfrag;
96         int       nmapped   = ktx->ktx_nmappedpages;
97         int       maxmapped = ktx->ktx_npages;
98         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
99         char     *ptr;
100 #if MULTIRAIL_EKC
101         EP_RAILMASK railmask;
102         int         rail;
103
104         if (ktx->ktx_rail < 0)
105                 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
106                                                  EP_RAILMASK_ALL,
107                                                  kqswnal_nid2elanid(ktx->ktx_nid));
108         rail = ktx->ktx_rail;
109         if (rail < 0) {
110                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
111                 return (-ENETDOWN);
112         }
113         railmask = 1 << rail;
114 #endif
115         LASSERT (nmapped <= maxmapped);
116         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
117         LASSERT (nfrags <= EP_MAXFRAG);
118         LASSERT (niov > 0);
119         LASSERT (nob > 0);
120
121         /* skip complete frags before 'offset' */
122         while (offset >= kiov->kiov_len) {
123                 offset -= kiov->kiov_len;
124                 kiov++;
125                 niov--;
126                 LASSERT (niov > 0);
127         }
128
129         do {
130                 int  fraglen = kiov->kiov_len - offset;
131
132                 /* each page frag is contained in one page */
133                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
134
135                 if (fraglen > nob)
136                         fraglen = nob;
137
138                 nmapped++;
139                 if (nmapped > maxmapped) {
140                         CERROR("Can't map message in %d pages (max %d)\n",
141                                nmapped, maxmapped);
142                         return (-EMSGSIZE);
143                 }
144
145                 if (nfrags == EP_MAXFRAG) {
146                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
147                                EP_MAXFRAG);
148                         return (-EMSGSIZE);
149                 }
150
151                 /* XXX this is really crap, but we'll have to kmap until
152                  * EKC has a page (rather than vaddr) mapping interface */
153
154                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
155
156                 CDEBUG(D_NET,
157                        "%p[%d] loading %p for %d, page %d, %d total\n",
158                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
159
160 #if MULTIRAIL_EKC
161                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
162                              ptr, fraglen,
163                              kqswnal_data.kqn_ep_tx_nmh, basepage,
164                              &railmask, &ktx->ktx_frags[nfrags]);
165
166                 if (nfrags == ktx->ktx_firsttmpfrag ||
167                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
168                                   &ktx->ktx_frags[nfrags - 1],
169                                   &ktx->ktx_frags[nfrags])) {
170                         /* new frag if this is the first or can't merge */
171                         nfrags++;
172                 }
173 #else
174                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
175                                        kqswnal_data.kqn_eptxdmahandle,
176                                        ptr, fraglen,
177                                        basepage, &ktx->ktx_frags[nfrags].Base);
178
179                 if (nfrags > 0 &&                /* previous frag mapped */
180                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
181                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
182                         /* just extend previous */
183                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
184                 else {
185                         ktx->ktx_frags[nfrags].Len = fraglen;
186                         nfrags++;                /* new frag */
187                 }
188 #endif
189
190                 kunmap (kiov->kiov_page);
191                 
192                 /* keep in loop for failure case */
193                 ktx->ktx_nmappedpages = nmapped;
194
195                 basepage++;
196                 kiov++;
197                 niov--;
198                 nob -= fraglen;
199                 offset = 0;
200
201                 /* iov must not run out before end of data */
202                 LASSERT (nob == 0 || niov > 0);
203
204         } while (nob > 0);
205
206         ktx->ktx_nfrag = nfrags;
207         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
208                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
209
210         return (0);
211 }
212
213 int
214 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
215                     int niov, struct iovec *iov)
216 {
217         int       nfrags    = ktx->ktx_nfrag;
218         int       nmapped   = ktx->ktx_nmappedpages;
219         int       maxmapped = ktx->ktx_npages;
220         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
221 #if MULTIRAIL_EKC
222         EP_RAILMASK railmask;
223         int         rail;
224         
225         if (ktx->ktx_rail < 0)
226                 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
227                                                  EP_RAILMASK_ALL,
228                                                  kqswnal_nid2elanid(ktx->ktx_nid));
229         rail = ktx->ktx_rail;
230         if (rail < 0) {
231                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
232                 return (-ENETDOWN);
233         }
234         railmask = 1 << rail;
235 #endif
236         LASSERT (nmapped <= maxmapped);
237         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
238         LASSERT (nfrags <= EP_MAXFRAG);
239         LASSERT (niov > 0);
240         LASSERT (nob > 0);
241
242         /* skip complete frags before offset */
243         while (offset >= iov->iov_len) {
244                 offset -= iov->iov_len;
245                 iov++;
246                 niov--;
247                 LASSERT (niov > 0);
248         }
249         
250         do {
251                 int  fraglen = iov->iov_len - offset;
252                 long npages;
253                 
254                 if (fraglen > nob)
255                         fraglen = nob;
256                 npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
257
258                 nmapped += npages;
259                 if (nmapped > maxmapped) {
260                         CERROR("Can't map message in %d pages (max %d)\n",
261                                nmapped, maxmapped);
262                         return (-EMSGSIZE);
263                 }
264
265                 if (nfrags == EP_MAXFRAG) {
266                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
267                                EP_MAXFRAG);
268                         return (-EMSGSIZE);
269                 }
270
271                 CDEBUG(D_NET,
272                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
273                        ktx, nfrags, iov->iov_base + offset, fraglen, 
274                        basepage, npages, nmapped);
275
276 #if MULTIRAIL_EKC
277                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
278                              iov->iov_base + offset, fraglen,
279                              kqswnal_data.kqn_ep_tx_nmh, basepage,
280                              &railmask, &ktx->ktx_frags[nfrags]);
281
282                 if (nfrags == ktx->ktx_firsttmpfrag ||
283                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
284                                   &ktx->ktx_frags[nfrags - 1],
285                                   &ktx->ktx_frags[nfrags])) {
286                         /* new frag if this is the first or can't merge */
287                         nfrags++;
288                 }
289 #else
290                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
291                                        kqswnal_data.kqn_eptxdmahandle,
292                                        iov->iov_base + offset, fraglen,
293                                        basepage, &ktx->ktx_frags[nfrags].Base);
294
295                 if (nfrags > 0 &&                /* previous frag mapped */
296                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
297                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
298                         /* just extend previous */
299                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
300                 else {
301                         ktx->ktx_frags[nfrags].Len = fraglen;
302                         nfrags++;                /* new frag */
303                 }
304 #endif
305
306                 /* keep in loop for failure case */
307                 ktx->ktx_nmappedpages = nmapped;
308
309                 basepage += npages;
310                 iov++;
311                 niov--;
312                 nob -= fraglen;
313                 offset = 0;
314
315                 /* iov must not run out before end of data */
316                 LASSERT (nob == 0 || niov > 0);
317
318         } while (nob > 0);
319
320         ktx->ktx_nfrag = nfrags;
321         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
322                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
323
324         return (0);
325 }
326
327
328 void
329 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
330 {
331         kpr_fwd_desc_t   *fwd = NULL;
332         unsigned long     flags;
333
334         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
335         ktx->ktx_state = KTX_IDLE;
336
337         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
338
339         list_del (&ktx->ktx_list);              /* take off active list */
340
341         if (ktx->ktx_isnblk) {
342                 /* reserved for non-blocking tx */
343                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
344                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
345                 return;
346         }
347
348         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
349
350         /* anything blocking for a tx descriptor? */
351         if (!kqswnal_data.kqn_shuttingdown &&
352             !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
353         {
354                 CDEBUG(D_NET,"wakeup fwd\n");
355
356                 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
357                                   kpr_fwd_desc_t, kprfd_list);
358                 list_del (&fwd->kprfd_list);
359         }
360
361         wake_up (&kqswnal_data.kqn_idletxd_waitq);
362
363         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
364
365         if (fwd == NULL)
366                 return;
367
368         /* schedule packet for forwarding again */
369         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
370
371         list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
372         wake_up (&kqswnal_data.kqn_sched_waitq);
373
374         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
375 }
376
377 kqswnal_tx_t *
378 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
379 {
380         unsigned long  flags;
381         kqswnal_tx_t  *ktx = NULL;
382
383         for (;;) {
384                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
385
386                 if (kqswnal_data.kqn_shuttingdown)
387                         break;
388
389                 /* "normal" descriptor is free */
390                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
391                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
392                                           kqswnal_tx_t, ktx_list);
393                         break;
394                 }
395
396                 if (fwd != NULL)                /* forwarded packet? */
397                         break;
398
399                 /* doing a local transmit */
400                 if (!may_block) {
401                         if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
402                                 CERROR ("intr tx desc pool exhausted\n");
403                                 break;
404                         }
405
406                         ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
407                                           kqswnal_tx_t, ktx_list);
408                         break;
409                 }
410
411                 /* block for idle tx */
412
413                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
414
415                 CDEBUG (D_NET, "blocking for tx desc\n");
416                 wait_event (kqswnal_data.kqn_idletxd_waitq,
417                             !list_empty (&kqswnal_data.kqn_idletxds) ||
418                             kqswnal_data.kqn_shuttingdown);
419         }
420
421         if (ktx != NULL) {
422                 list_del (&ktx->ktx_list);
423                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
424                 ktx->ktx_launcher = current->pid;
425                 atomic_inc(&kqswnal_data.kqn_pending_txs);
426         } else if (fwd != NULL) {
427                 /* queue forwarded packet until idle txd available */
428                 CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
429                 list_add_tail (&fwd->kprfd_list,
430                                &kqswnal_data.kqn_idletxd_fwdq);
431         }
432
433         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
434
435         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
436         LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
437
438         return (ktx);
439 }
440
441 void
442 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
443 {
444         switch (ktx->ktx_state) {
445         case KTX_FORWARDING:       /* router asked me to forward this packet */
446                 kpr_fwd_done (&kqswnal_data.kqn_router,
447                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
448                 break;
449
450         case KTX_RDMAING:          /* optimized GET/PUT handled */
451         case KTX_PUTTING:          /* optimized PUT sent */
452         case KTX_SENDING:          /* normal send */
453                 lib_finalize (&kqswnal_lib, NULL,
454                               (lib_msg_t *)ktx->ktx_args[1],
455                               (error == 0) ? PTL_OK : PTL_FAIL);
456                 break;
457
458         case KTX_GETTING:          /* optimized GET sent & REPLY received */
459                 /* Complete the GET with success since we can't avoid
460                  * delivering a REPLY event; we committed to it when we
461                  * launched the GET */
462                 lib_finalize (&kqswnal_lib, NULL, 
463                               (lib_msg_t *)ktx->ktx_args[1], PTL_OK);
464                 lib_finalize (&kqswnal_lib, NULL,
465                               (lib_msg_t *)ktx->ktx_args[2],
466                               (error == 0) ? PTL_OK : PTL_FAIL);
467                 break;
468
469         default:
470                 LASSERT (0);
471         }
472
473         kqswnal_put_idle_tx (ktx);
474 }
475
476 static void
477 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
478 {
479         kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
480
481         LASSERT (txd != NULL);
482         LASSERT (ktx != NULL);
483
484         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
485
486         if (status != EP_SUCCESS) {
487
488                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
489                         ktx->ktx_nid, status);
490
491                 kqswnal_notify_peer_down(ktx);
492                 status = -EHOSTDOWN;
493
494         } else switch (ktx->ktx_state) {
495
496         case KTX_GETTING:
497         case KTX_PUTTING:
498                 /* RPC completed OK; but what did our peer put in the status
499                  * block? */
500 #if MULTIRAIL_EKC
501                 status = ep_txd_statusblk(txd)->Data[0];
502 #else
503                 status = ep_txd_statusblk(txd)->Status;
504 #endif
505                 break;
506                 
507         case KTX_FORWARDING:
508         case KTX_SENDING:
509                 status = 0;
510                 break;
511                 
512         default:
513                 LBUG();
514                 break;
515         }
516
517         kqswnal_tx_done (ktx, status);
518 }
519
520 int
521 kqswnal_launch (kqswnal_tx_t *ktx)
522 {
523         /* Don't block for transmit descriptor if we're in interrupt context */
524         int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
525         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
526         unsigned long flags;
527         int   rc;
528
529         ktx->ktx_launchtime = jiffies;
530
531         if (kqswnal_data.kqn_shuttingdown)
532                 return (-ESHUTDOWN);
533
534         LASSERT (dest >= 0);                    /* must be a peer */
535
536 #if MULTIRAIL_EKC
537         if (ktx->ktx_nmappedpages != 0)
538                 attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
539 #endif
540
541         switch (ktx->ktx_state) {
542         case KTX_GETTING:
543         case KTX_PUTTING:
544                 /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
545                  * The other frags are the payload, awaiting RDMA */
546                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
547                                      ktx->ktx_port, attr,
548                                      kqswnal_txhandler, ktx,
549                                      NULL, ktx->ktx_frags, 1);
550                 break;
551
552         case KTX_FORWARDING:
553         case KTX_SENDING:
554 #if MULTIRAIL_EKC
555                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
556                                          ktx->ktx_port, attr,
557                                          kqswnal_txhandler, ktx,
558                                          NULL, ktx->ktx_frags, ktx->ktx_nfrag);
559 #else
560                 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
561                                        ktx->ktx_port, attr, 
562                                        kqswnal_txhandler, ktx, 
563                                        ktx->ktx_frags, ktx->ktx_nfrag);
564 #endif
565                 break;
566                 
567         default:
568                 LBUG();
569                 rc = -EINVAL;                   /* no compiler warning please */
570                 break;
571         }
572
573         switch (rc) {
574         case EP_SUCCESS: /* success */
575                 return (0);
576
577         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
578                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
579
580                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
581                 wake_up (&kqswnal_data.kqn_sched_waitq);
582
583                 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
584                 return (0);
585
586         default: /* fatal error */
587                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
588                 kqswnal_notify_peer_down(ktx);
589                 return (-EHOSTUNREACH);
590         }
591 }
592
593 #if 0
594 static char *
595 hdr_type_string (ptl_hdr_t *hdr)
596 {
597         switch (hdr->type) {
598         case PTL_MSG_ACK:
599                 return ("ACK");
600         case PTL_MSG_PUT:
601                 return ("PUT");
602         case PTL_MSG_GET:
603                 return ("GET");
604         case PTL_MSG_REPLY:
605                 return ("REPLY");
606         default:
607                 return ("<UNKNOWN>");
608         }
609 }
610
611 static void
612 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
613 {
614         char *type_str = hdr_type_string (hdr);
615
616         CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
617                le32_to_cpu(hdr->payload_length));
618         CERROR("    From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),
619                le32_to_cpu(hdr->src_pid));
620         CERROR("    To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),
621                le32_to_cpu(hdr->dest_pid));
622
623         switch (le32_to_cpu(hdr->type)) {
624         case PTL_MSG_PUT:
625                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
626                        "match bits "LPX64"\n",
627                        le32_to_cpu(hdr->msg.put.ptl_index),
628                        hdr->msg.put.ack_wmd.wh_interface_cookie,
629                        hdr->msg.put.ack_wmd.wh_object_cookie,
630                        le64_to_cpu(hdr->msg.put.match_bits));
631                 CERROR("    offset %d, hdr data "LPX64"\n",
632                        le32_to_cpu(hdr->msg.put.offset),
633                        hdr->msg.put.hdr_data);
634                 break;
635
636         case PTL_MSG_GET:
637                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
638                        "match bits "LPX64"\n",
639                        le32_to_cpu(hdr->msg.get.ptl_index),
640                        hdr->msg.get.return_wmd.wh_interface_cookie,
641                        hdr->msg.get.return_wmd.wh_object_cookie,
642                        hdr->msg.get.match_bits);
643                 CERROR("    Length %d, src offset %d\n",
644                        le32_to_cpu(hdr->msg.get.sink_length),
645                        le32_to_cpu(hdr->msg.get.src_offset));
646                 break;
647
648         case PTL_MSG_ACK:
649                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
650                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
651                        hdr->msg.ack.dst_wmd.wh_object_cookie,
652                        le32_to_cpu(hdr->msg.ack.mlength));
653                 break;
654
655         case PTL_MSG_REPLY:
656                 CERROR("    dst md "LPX64"."LPX64"\n",
657                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
658                        hdr->msg.reply.dst_wmd.wh_object_cookie);
659         }
660
661 }                               /* end of print_hdr() */
662 #endif
663
664 #if !MULTIRAIL_EKC
665 void
666 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) 
667 {
668         int          i;
669
670         CDEBUG (how, "%s: %d\n", str, n);
671         for (i = 0; i < n; i++) {
672                 CDEBUG (how, "   %08x for %d\n", iov[i].Base, iov[i].Len);
673         }
674 }
675
676 int
677 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
678                      int nsrc, EP_IOVEC *src,
679                      int ndst, EP_IOVEC *dst) 
680 {
681         int        count;
682         int        nob;
683
684         LASSERT (ndv > 0);
685         LASSERT (nsrc > 0);
686         LASSERT (ndst > 0);
687
688         for (count = 0; count < ndv; count++, dv++) {
689
690                 if (nsrc == 0 || ndst == 0) {
691                         if (nsrc != ndst) {
692                                 /* For now I'll barf on any left over entries */
693                                 CERROR ("mismatched src and dst iovs\n");
694                                 return (-EINVAL);
695                         }
696                         return (count);
697                 }
698
699                 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
700                 dv->Len    = nob;
701                 dv->Source = src->Base;
702                 dv->Dest   = dst->Base;
703
704                 if (nob >= src->Len) {
705                         src++;
706                         nsrc--;
707                 } else {
708                         src->Len -= nob;
709                         src->Base += nob;
710                 }
711                 
712                 if (nob >= dst->Len) {
713                         dst++;
714                         ndst--;
715                 } else {
716                         src->Len -= nob;
717                         src->Base += nob;
718                 }
719         }
720
721         CERROR ("DATAVEC too small\n");
722         return (-E2BIG);
723 }
724 #else
725 int
726 kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
727                     int nrfrag, EP_NMD *rfrag)
728 {
729         int  i;
730
731         if (nlfrag != nrfrag) {
732                 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
733                        nlfrag, nrfrag);
734                 return (-EINVAL);
735         }
736         
737         for (i = 0; i < nlfrag; i++)
738                 if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
739                         CERROR("Can't cope with unequal frags %d(%d):"
740                                " %d local %d remote\n",
741                                i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
742                         return (-EINVAL);
743                 }
744         
745         return (0);
746 }
747 #endif
748
749 kqswnal_remotemd_t *
750 kqswnal_parse_rmd (kqswnal_rx_t *krx, int type, ptl_nid_t expected_nid)
751 {
752         char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
753         ptl_hdr_t          *hdr = (ptl_hdr_t *)buffer;
754         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
755         ptl_nid_t           nid = kqswnal_rx_nid(krx);
756
757         /* Note (1) lib_parse has already flipped hdr.
758          *      (2) RDMA addresses are sent in native endian-ness.  When
759          *      EKC copes with different endian nodes, I'll fix this (and
760          *      eat my hat :) */
761
762         LASSERT (krx->krx_nob >= sizeof(*hdr));
763
764         if (hdr->type != type) {
765                 CERROR ("Unexpected optimized get/put type %d (%d expected)"
766                         "from "LPX64"\n", hdr->type, type, nid);
767                 return (NULL);
768         }
769         
770         if (hdr->src_nid != nid) {
771                 CERROR ("Unexpected optimized get/put source NID "
772                         LPX64" from "LPX64"\n", hdr->src_nid, nid);
773                 return (NULL);
774         }
775
776         LASSERT (nid == expected_nid);
777
778         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
779                 /* msg too small to discover rmd size */
780                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
781                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
782                 return (NULL);
783         }
784
785         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
786                 /* rmd doesn't fit in the incoming message */
787                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
788                         krx->krx_nob, rmd->kqrmd_nfrag,
789                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
790                 return (NULL);
791         }
792
793         return (rmd);
794 }
795
796 void
797 kqswnal_rdma_store_complete (EP_RXD *rxd) 
798 {
799         int           status = ep_rxd_status(rxd);
800         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
801         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
802         
803         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
804                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
805
806         LASSERT (ktx->ktx_state == KTX_RDMAING);
807         LASSERT (krx->krx_rxd == rxd);
808         LASSERT (krx->krx_rpc_reply_needed);
809
810         krx->krx_rpc_reply_needed = 0;
811         kqswnal_rx_decref (krx);
812
813         /* free ktx & finalize() its lib_msg_t */
814         kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
815 }
816
817 void
818 kqswnal_rdma_fetch_complete (EP_RXD *rxd) 
819 {
820         /* Completed fetching the PUT data */
821         int           status = ep_rxd_status(rxd);
822         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
823         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
824         unsigned long flags;
825         
826         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
827                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
828
829         LASSERT (ktx->ktx_state == KTX_RDMAING);
830         LASSERT (krx->krx_rxd == rxd);
831         /* RPC completes with failure by default */
832         LASSERT (krx->krx_rpc_reply_needed);
833         LASSERT (krx->krx_rpc_reply_status != 0);
834
835         if (status == EP_SUCCESS) {
836                 status = krx->krx_rpc_reply_status = 0;
837         } else {
838                 /* Abandon RPC since get failed */
839                 krx->krx_rpc_reply_needed = 0;
840                 status = -ECONNABORTED;
841         }
842
843         /* free ktx & finalize() its lib_msg_t */
844         kqswnal_tx_done(ktx, status);
845
846         if (!in_interrupt()) {
847                 /* OK to complete the RPC now (iff I had the last ref) */
848                 kqswnal_rx_decref (krx);
849                 return;
850         }
851
852         LASSERT (krx->krx_state == KRX_PARSE);
853         krx->krx_state = KRX_COMPLETING;
854
855         /* Complete the RPC in thread context */
856         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
857
858         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
859         wake_up (&kqswnal_data.kqn_sched_waitq);
860
861         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
862 }
863
864 int
865 kqswnal_rdma (kqswnal_rx_t *krx, lib_msg_t *libmsg, int type,
866               int niov, struct iovec *iov, ptl_kiov_t *kiov,
867               size_t offset, size_t len)
868 {
869         kqswnal_remotemd_t *rmd;
870         kqswnal_tx_t       *ktx;
871         int                 eprc;
872         int                 rc;
873 #if !MULTIRAIL_EKC
874         EP_DATAVEC          datav[EP_MAXFRAG];
875         int                 ndatav;
876 #endif
877
878         LASSERT (type == PTL_MSG_GET || type == PTL_MSG_PUT);
879         /* Not both mapped and paged payload */
880         LASSERT (iov == NULL || kiov == NULL);
881         /* RPC completes with failure by default */
882         LASSERT (krx->krx_rpc_reply_needed);
883         LASSERT (krx->krx_rpc_reply_status != 0);
884
885         rmd = kqswnal_parse_rmd(krx, type, libmsg->ev.initiator.nid);
886         if (rmd == NULL)
887                 return (-EPROTO);
888
889         if (len == 0) {
890                 /* data got truncated to nothing. */
891                 lib_finalize(&kqswnal_lib, krx, libmsg, PTL_OK);
892                 /* Let kqswnal_rx_done() complete the RPC with success */
893                 krx->krx_rpc_reply_status = 0;
894                 return (0);
895         }
896         
897         /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
898            actually sending a portals message with it */
899         ktx = kqswnal_get_idle_tx(NULL, 0);
900         if (ktx == NULL) {
901                 CERROR ("Can't get txd for RDMA with "LPX64"\n",
902                         libmsg->ev.initiator.nid);
903                 return (-ENOMEM);
904         }
905
906         ktx->ktx_state   = KTX_RDMAING;
907         ktx->ktx_nid     = libmsg->ev.initiator.nid;
908         ktx->ktx_args[0] = krx;
909         ktx->ktx_args[1] = libmsg;
910
911 #if MULTIRAIL_EKC
912         /* Map on the rail the RPC prefers */
913         ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
914                                          ep_rxd_railmask(krx->krx_rxd));
915 #endif
916
917         /* Start mapping at offset 0 (we're not mapping any headers) */
918         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
919         
920         if (kiov != NULL)
921                 rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
922         else
923                 rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
924
925         if (rc != 0) {
926                 CERROR ("Can't map local RDMA data: %d\n", rc);
927                 goto out;
928         }
929
930 #if MULTIRAIL_EKC
931         rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
932                                  rmd->kqrmd_nfrag, rmd->kqrmd_frag);
933         if (rc != 0) {
934                 CERROR ("Incompatible RDMA descriptors\n");
935                 goto out;
936         }
937 #else
938         switch (type) {
939         default:
940                 LBUG();
941
942         case PTL_MSG_GET:
943                 ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
944                                              ktx->ktx_nfrag, ktx->ktx_frags,
945                                              rmd->kqrmd_nfrag, rmd->kqrmd_frag);
946                 break;
947
948         case PTL_MSG_PUT:
949                 ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
950                                              rmd->kqrmd_nfrag, rmd->kqrmd_frag,
951                                              ktx->ktx_nfrag, ktx->ktx_frags);
952                 break;
953         }
954                 
955         if (ndatav < 0) {
956                 CERROR ("Can't create datavec: %d\n", ndatav);
957                 rc = ndatav;
958                 goto out;
959         }
960 #endif
961
962         LASSERT (atomic_read(&krx->krx_refcount) > 0);
963         /* Take an extra ref for the completion callback */
964         atomic_inc(&krx->krx_refcount);
965
966         switch (type) {
967         default:
968                 LBUG();
969
970         case PTL_MSG_GET:
971 #if MULTIRAIL_EKC
972                 eprc = ep_complete_rpc(krx->krx_rxd, 
973                                        kqswnal_rdma_store_complete, ktx, 
974                                        &kqswnal_data.kqn_rpc_success,
975                                        ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
976 #else
977                 eprc = ep_complete_rpc (krx->krx_rxd, 
978                                         kqswnal_rdma_store_complete, ktx,
979                                         &kqswnal_data.kqn_rpc_success, 
980                                         datav, ndatav);
981                 if (eprc != EP_SUCCESS) /* "old" EKC destroys rxd on failed completion */
982                         krx->krx_rxd = NULL;
983 #endif
984                 if (eprc != EP_SUCCESS) {
985                         CERROR("can't complete RPC: %d\n", eprc);
986                         /* don't re-attempt RPC completion */
987                         krx->krx_rpc_reply_needed = 0;
988                         rc = -ECONNABORTED;
989                 }
990                 break;
991                 
992         case PTL_MSG_PUT:
993 #if MULTIRAIL_EKC
994                 eprc = ep_rpc_get (krx->krx_rxd, 
995                                    kqswnal_rdma_fetch_complete, ktx,
996                                    rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
997 #else
998                 eprc = ep_rpc_get (krx->krx_rxd,
999                                    kqswnal_rdma_fetch_complete, ktx,
1000                                    datav, ndatav);
1001 #endif
1002                 if (eprc != EP_SUCCESS) {
1003                         CERROR("ep_rpc_get failed: %d\n", eprc);
1004                         /* Don't attempt RPC completion: 
1005                          * EKC nuked it when the get failed */
1006                         krx->krx_rpc_reply_needed = 0;
1007                         rc = -ECONNABORTED;
1008                 }
1009                 break;
1010         }
1011
1012  out:
1013         if (rc != 0) {
1014                 kqswnal_rx_decref(krx);                 /* drop callback's ref */
1015                 kqswnal_put_idle_tx (ktx);
1016         }
1017
1018         atomic_dec(&kqswnal_data.kqn_pending_txs);
1019         return (rc);
1020 }
1021
1022 static ptl_err_t
1023 kqswnal_sendmsg (lib_nal_t    *nal,
1024                  void         *private,
1025                  lib_msg_t    *libmsg,
1026                  ptl_hdr_t    *hdr,
1027                  int           type,
1028                  ptl_nid_t     nid,
1029                  ptl_pid_t     pid,
1030                  unsigned int  payload_niov,
1031                  struct iovec *payload_iov,
1032                  ptl_kiov_t   *payload_kiov,
1033                  size_t        payload_offset,
1034                  size_t        payload_nob)
1035 {
1036         kqswnal_tx_t      *ktx;
1037         int                rc;
1038         ptl_nid_t          targetnid;
1039 #if KQSW_CHECKSUM
1040         int                i;
1041         kqsw_csum_t        csum;
1042         int                sumoff;
1043         int                sumnob;
1044 #endif
1045         /* NB 1. hdr is in network byte order */
1046         /*    2. 'private' depends on the message type */
1047         
1048         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
1049                " pid %u\n", payload_nob, payload_niov, nid, pid);
1050
1051         LASSERT (payload_nob == 0 || payload_niov > 0);
1052         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1053
1054         /* It must be OK to kmap() if required */
1055         LASSERT (payload_kiov == NULL || !in_interrupt ());
1056         /* payload is either all vaddrs or all pages */
1057         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1058
1059         if (payload_nob > KQSW_MAXPAYLOAD) {
1060                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
1061                         payload_nob, KQSW_MAXPAYLOAD);
1062                 return (PTL_FAIL);
1063         }
1064
1065         if (type == PTL_MSG_REPLY &&            /* can I look in 'private' */
1066             ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) { /* is it an RPC */
1067                 /* Must be a REPLY for an optimized GET */
1068                 rc = kqswnal_rdma ((kqswnal_rx_t *)private, libmsg, PTL_MSG_GET,
1069                                    payload_niov, payload_iov, payload_kiov, 
1070                                    payload_offset, payload_nob);
1071                 return ((rc == 0) ? PTL_OK : PTL_FAIL);
1072         }
1073
1074         targetnid = nid;
1075         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
1076                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
1077                                  sizeof (ptl_hdr_t) + payload_nob, &targetnid);
1078                 if (rc != 0) {
1079                         CERROR("Can't route to "LPX64": router error %d\n",
1080                                nid, rc);
1081                         return (PTL_FAIL);
1082                 }
1083                 if (kqswnal_nid2elanid (targetnid) < 0) {
1084                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
1085                                targetnid, nid);
1086                         return (PTL_FAIL);
1087                 }
1088         }
1089
1090         /* I may not block for a transmit descriptor if I might block the
1091          * receiver, or an interrupt handler. */
1092         ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
1093                                           type == PTL_MSG_REPLY ||
1094                                           in_interrupt()));
1095         if (ktx == NULL) {
1096                 CERROR ("Can't get txd for msg type %d for "LPX64"\n",
1097                         type, libmsg->ev.initiator.nid);
1098                 return (PTL_NO_SPACE);
1099         }
1100
1101         ktx->ktx_state   = KTX_SENDING;
1102         ktx->ktx_nid     = targetnid;
1103         ktx->ktx_args[0] = private;
1104         ktx->ktx_args[1] = libmsg;
1105         ktx->ktx_args[2] = NULL;    /* set when a GET commits to REPLY */
1106
1107         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
1108
1109 #if KQSW_CHECKSUM
1110         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
1111         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
1112         for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
1113                 LASSERT(i < niov);
1114                 if (payload_kiov != NULL) {
1115                         ptl_kiov_t *kiov = &payload_kiov[i];
1116
1117                         if (sumoff >= kiov->kiov_len) {
1118                                 sumoff -= kiov->kiov_len;
1119                         } else {
1120                                 char *addr = ((char *)kmap (kiov->kiov_page)) +
1121                                              kiov->kiov_offset + sumoff;
1122                                 int   fragnob = kiov->kiov_len - sumoff;
1123
1124                                 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1125                                 sumnob -= fragnob;
1126                                 sumoff = 0;
1127                                 kunmap(kiov->kiov_page);
1128                         }
1129                 } else {
1130                         struct iovec *iov = &payload_iov[i];
1131
1132                         if (sumoff > iov->iov_len) {
1133                                 sumoff -= iov->iov_len;
1134                         } else {
1135                                 char *addr = iov->iov_base + sumoff;
1136                                 int   fragnob = iov->iov_len - sumoff;
1137                                 
1138                                 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1139                                 sumnob -= fragnob;
1140                                 sumoff = 0;
1141                         }
1142                 }
1143         }
1144         memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
1145 #endif
1146
1147         /* The first frag will be the pre-mapped buffer for (at least) the
1148          * portals header. */
1149         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1150
1151         if (nid == targetnid &&                 /* not forwarding */
1152             ((type == PTL_MSG_GET &&            /* optimize GET? */
1153               kqswnal_tunables.kqn_optimized_gets != 0 &&
1154               le32_to_cpu(hdr->msg.get.sink_length) >= kqswnal_tunables.kqn_optimized_gets) ||
1155              (type == PTL_MSG_PUT &&            /* optimize PUT? */
1156               kqswnal_tunables.kqn_optimized_puts != 0 &&
1157               payload_nob >= kqswnal_tunables.kqn_optimized_puts))) {
1158                 lib_md_t           *md = libmsg->md;
1159                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
1160                 
1161                 /* Optimised path: I send over the Elan vaddrs of the local
1162                  * buffers, and my peer DMAs directly to/from them.
1163                  *
1164                  * First I set up ktx as if it was going to send this
1165                  * payload, (it needs to map it anyway).  This fills
1166                  * ktx_frags[1] and onward with the network addresses
1167                  * of the GET sink frags.  I copy these into ktx_buffer,
1168                  * immediately after the header, and send that as my
1169                  * message. */
1170
1171                 ktx->ktx_state = (type == PTL_MSG_PUT) ? KTX_PUTTING : KTX_GETTING;
1172
1173                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
1174                         rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
1175                                                   md->md_niov, md->md_iov.kiov);
1176                 else
1177                         rc = kqswnal_map_tx_iov (ktx, 0, md->length,
1178                                                  md->md_niov, md->md_iov.iov);
1179                 if (rc != 0)
1180                         goto out;
1181
1182                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1183
1184                 payload_nob = offsetof(kqswnal_remotemd_t,
1185                                        kqrmd_frag[rmd->kqrmd_nfrag]);
1186                 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1187
1188 #if MULTIRAIL_EKC
1189                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1190                        rmd->kqrmd_nfrag * sizeof(EP_NMD));
1191
1192                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1193                               0, KQSW_HDR_SIZE + payload_nob);
1194 #else
1195                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1196                        rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1197                 
1198                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1199                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1200 #endif
1201                 if (type == PTL_MSG_GET) {
1202                         /* Allocate reply message now while I'm in thread context */
1203                         ktx->ktx_args[2] = lib_create_reply_msg (&kqswnal_lib,
1204                                                                  nid, libmsg);
1205                         if (ktx->ktx_args[2] == NULL)
1206                                 goto out;
1207
1208                         /* NB finalizing the REPLY message is my
1209                          * responsibility now, whatever happens. */
1210                 }
1211                 
1212         } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1213
1214                 /* small message: single frag copied into the pre-mapped buffer */
1215
1216 #if MULTIRAIL_EKC
1217                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1218                               0, KQSW_HDR_SIZE + payload_nob);
1219 #else
1220                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1221                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1222 #endif
1223                 if (payload_nob > 0) {
1224                         if (payload_kiov != NULL)
1225                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1226                                                    payload_niov, payload_kiov, 
1227                                                    payload_offset, payload_nob);
1228                         else
1229                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1230                                                   payload_niov, payload_iov, 
1231                                                   payload_offset, payload_nob);
1232                 }
1233         } else {
1234
1235                 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1236
1237 #if MULTIRAIL_EKC
1238                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1239                               0, KQSW_HDR_SIZE);
1240 #else
1241                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1242                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1243 #endif
1244                 if (payload_kiov != NULL)
1245                         rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
1246                                                   payload_niov, payload_kiov);
1247                 else
1248                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1249                                                  payload_niov, payload_iov);
1250                 if (rc != 0)
1251                         goto out;
1252         }
1253         
1254         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1255                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1256
1257         rc = kqswnal_launch (ktx);
1258
1259  out:
1260         CDEBUG(rc == 0 ? D_NET : D_ERROR, 
1261                "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n", 
1262                rc == 0 ? "Sent" : "Failed to send",
1263                payload_nob, nid, targetnid, rc);
1264
1265         if (rc != 0) {
1266                 if (ktx->ktx_state == KTX_GETTING &&
1267                     ktx->ktx_args[2] != NULL) {
1268                         /* We committed to reply, but there was a problem
1269                          * launching the GET.  We can't avoid delivering a
1270                          * REPLY event since we committed above, so we
1271                          * pretend the GET succeeded but the REPLY
1272                          * failed. */
1273                         rc = 0;
1274                         lib_finalize (&kqswnal_lib, private, libmsg, PTL_OK);
1275                         lib_finalize (&kqswnal_lib, private,
1276                                       (lib_msg_t *)ktx->ktx_args[2], PTL_FAIL);
1277                 }
1278                 
1279                 kqswnal_put_idle_tx (ktx);
1280         }
1281         
1282         atomic_dec(&kqswnal_data.kqn_pending_txs);
1283         return (rc == 0 ? PTL_OK : PTL_FAIL);
1284 }
1285
1286 static ptl_err_t
1287 kqswnal_send (lib_nal_t    *nal,
1288               void         *private,
1289               lib_msg_t    *libmsg,
1290               ptl_hdr_t    *hdr,
1291               int           type,
1292               ptl_nid_t     nid,
1293               ptl_pid_t     pid,
1294               unsigned int  payload_niov,
1295               struct iovec *payload_iov,
1296               size_t        payload_offset,
1297               size_t        payload_nob)
1298 {
1299         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1300                                  payload_niov, payload_iov, NULL, 
1301                                  payload_offset, payload_nob));
1302 }
1303
1304 static ptl_err_t
1305 kqswnal_send_pages (lib_nal_t    *nal,
1306                     void         *private,
1307                     lib_msg_t    *libmsg,
1308                     ptl_hdr_t    *hdr,
1309                     int           type,
1310                     ptl_nid_t     nid,
1311                     ptl_pid_t     pid,
1312                     unsigned int  payload_niov,
1313                     ptl_kiov_t   *payload_kiov,
1314                     size_t        payload_offset,
1315                     size_t        payload_nob)
1316 {
1317         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1318                                  payload_niov, NULL, payload_kiov, 
1319                                  payload_offset, payload_nob));
1320 }
1321
1322 void
1323 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1324 {
1325         int             rc;
1326         kqswnal_tx_t   *ktx;
1327         ptl_kiov_t     *kiov = fwd->kprfd_kiov;
1328         int             niov = fwd->kprfd_niov;
1329         int             nob = fwd->kprfd_nob;
1330         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
1331
1332 #if KQSW_CHECKSUM
1333         CERROR ("checksums for forwarded packets not implemented\n");
1334         LBUG ();
1335 #endif
1336         /* The router wants this NAL to forward a packet */
1337         CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
1338                 fwd, nid, niov, nob);
1339
1340         ktx = kqswnal_get_idle_tx (fwd, 0);
1341         if (ktx == NULL)        /* can't get txd right now */
1342                 return;         /* fwd will be scheduled when tx desc freed */
1343
1344         if (nid == kqswnal_lib.libnal_ni.ni_pid.nid) /* gateway is me */
1345                 nid = fwd->kprfd_target_nid;    /* target is final dest */
1346
1347         if (kqswnal_nid2elanid (nid) < 0) {
1348                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1349                 rc = -EHOSTUNREACH;
1350                 goto out;
1351         }
1352
1353         /* copy hdr into pre-mapped buffer */
1354         memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
1355
1356         ktx->ktx_port    = (nob <= KQSW_SMALLPAYLOAD) ?
1357                            EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1358         ktx->ktx_nid     = nid;
1359         ktx->ktx_state   = KTX_FORWARDING;
1360         ktx->ktx_args[0] = fwd;
1361         ktx->ktx_nfrag   = ktx->ktx_firsttmpfrag = 1;
1362
1363         if (nob <= KQSW_TX_MAXCONTIG) 
1364         {
1365                 /* send payload from ktx's pre-mapped contiguous buffer */
1366 #if MULTIRAIL_EKC
1367                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1368                               0, KQSW_HDR_SIZE + nob);
1369 #else
1370                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1371                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
1372 #endif
1373                 if (nob > 0)
1374                         lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
1375                                           niov, kiov, 0, nob);
1376         }
1377         else
1378         {
1379                 /* zero copy payload */
1380 #if MULTIRAIL_EKC
1381                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1382                               0, KQSW_HDR_SIZE);
1383 #else
1384                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1385                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1386 #endif
1387                 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
1388                 if (rc != 0)
1389                         goto out;
1390         }
1391
1392         rc = kqswnal_launch (ktx);
1393  out:
1394         if (rc != 0) {
1395                 CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1396
1397                 /* complete now (with failure) */
1398                 kqswnal_tx_done (ktx, rc);
1399         }
1400
1401         atomic_dec(&kqswnal_data.kqn_pending_txs);
1402 }
1403
1404 void
1405 kqswnal_fwd_callback (void *arg, int error)
1406 {
1407         kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1408
1409         /* The router has finished forwarding this packet */
1410
1411         if (error != 0)
1412         {
1413                 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1414
1415                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1416                        le64_to_cpu(hdr->src_nid), le64_to_cpu(hdr->dest_nid),error);
1417         }
1418
1419         LASSERT (atomic_read(&krx->krx_refcount) == 1);
1420         kqswnal_rx_decref (krx);
1421 }
1422
1423 void
1424 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1425 {
1426         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1427         LASSERT (!krx->krx_rpc_reply_needed);
1428
1429         krx->krx_state = KRX_POSTED;
1430
1431 #if MULTIRAIL_EKC
1432         if (kqswnal_data.kqn_shuttingdown) {
1433                 /* free EKC rxd on shutdown */
1434                 ep_complete_receive(krx->krx_rxd);
1435         } else {
1436                 /* repost receive */
1437                 ep_requeue_receive(krx->krx_rxd, 
1438                                    kqswnal_rxhandler, krx,
1439                                    &krx->krx_elanbuffer, 0);
1440         }
1441 #else                
1442         if (kqswnal_data.kqn_shuttingdown)
1443                 return;
1444
1445         if (krx->krx_rxd == NULL) {
1446                 /* We had a failed ep_complete_rpc() which nukes the
1447                  * descriptor in "old" EKC */
1448                 int eprc = ep_queue_receive(krx->krx_eprx, 
1449                                             kqswnal_rxhandler, krx,
1450                                             krx->krx_elanbuffer, 
1451                                             krx->krx_npages * PAGE_SIZE, 0);
1452                 LASSERT (eprc == EP_SUCCESS);
1453                 /* We don't handle failure here; it's incredibly rare
1454                  * (never reported?) and only happens with "old" EKC */
1455         } else {
1456                 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1457                                    krx->krx_elanbuffer, 
1458                                    krx->krx_npages * PAGE_SIZE);
1459         }
1460 #endif
1461 }
1462
1463 void
1464 kqswnal_rpc_complete (EP_RXD *rxd)
1465 {
1466         int           status = ep_rxd_status(rxd);
1467         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1468         
1469         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1470                "rxd %p, krx %p, status %d\n", rxd, krx, status);
1471
1472         LASSERT (krx->krx_rxd == rxd);
1473         LASSERT (krx->krx_rpc_reply_needed);
1474         
1475         krx->krx_rpc_reply_needed = 0;
1476         kqswnal_requeue_rx (krx);
1477 }
1478
1479 void
1480 kqswnal_rx_done (kqswnal_rx_t *krx) 
1481 {
1482         int           rc;
1483         EP_STATUSBLK *sblk;
1484
1485         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1486
1487         if (krx->krx_rpc_reply_needed) {
1488                 /* We've not completed the peer's RPC yet... */
1489                 sblk = (krx->krx_rpc_reply_status == 0) ? 
1490                        &kqswnal_data.kqn_rpc_success : 
1491                        &kqswnal_data.kqn_rpc_failed;
1492
1493                 LASSERT (!in_interrupt());
1494 #if MULTIRAIL_EKC
1495                 rc = ep_complete_rpc(krx->krx_rxd, 
1496                                      kqswnal_rpc_complete, krx,
1497                                      sblk, NULL, NULL, 0);
1498                 if (rc == EP_SUCCESS)
1499                         return;
1500 #else
1501                 rc = ep_complete_rpc(krx->krx_rxd, 
1502                                      kqswnal_rpc_complete, krx,
1503                                      sblk, NULL, 0);
1504                 if (rc == EP_SUCCESS)
1505                         return;
1506
1507                 /* "old" EKC destroys rxd on failed completion */
1508                 krx->krx_rxd = NULL;
1509 #endif
1510                 CERROR("can't complete RPC: %d\n", rc);
1511                 krx->krx_rpc_reply_needed = 0;
1512         }
1513
1514         kqswnal_requeue_rx(krx);
1515 }
1516         
1517 void
1518 kqswnal_parse (kqswnal_rx_t *krx)
1519 {
1520         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
1521         ptl_nid_t       dest_nid = le64_to_cpu(hdr->dest_nid);
1522         int             payload_nob;
1523         int             nob;
1524         int             niov;
1525
1526         LASSERT (atomic_read(&krx->krx_refcount) == 1);
1527
1528         if (dest_nid == kqswnal_lib.libnal_ni.ni_pid.nid) { /* It's for me :) */
1529                 /* I ignore parse errors since I'm not consuming a byte
1530                  * stream */
1531                 (void)lib_parse (&kqswnal_lib, hdr, krx);
1532
1533                 /* Drop my ref; any RDMA activity takes an additional ref */
1534                 kqswnal_rx_decref(krx);
1535                 return;
1536         }
1537
1538 #if KQSW_CHECKSUM
1539         LASSERTF (0, "checksums for forwarded packets not implemented\n");
1540 #endif
1541
1542         if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
1543         {
1544                 CERROR("dropping packet from "LPX64" for "LPX64
1545                        ": target is peer\n", le64_to_cpu(hdr->src_nid), dest_nid);
1546
1547                 kqswnal_rx_decref (krx);
1548                 return;
1549         }
1550
1551         nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
1552         niov = 0;
1553         if (nob > 0) {
1554                 krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
1555                 krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
1556                 niov = 1;
1557                 nob -= PAGE_SIZE - KQSW_HDR_SIZE;
1558                 
1559                 while (nob > 0) {
1560                         LASSERT (niov < krx->krx_npages);
1561                         
1562                         krx->krx_kiov[niov].kiov_offset = 0;
1563                         krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
1564                         niov++;
1565                         nob -= PAGE_SIZE;
1566                 }
1567         }
1568
1569         kpr_fwd_init (&krx->krx_fwd, dest_nid, 
1570                       hdr, payload_nob, niov, krx->krx_kiov,
1571                       kqswnal_fwd_callback, krx);
1572
1573         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1574 }
1575
1576 /* Receive Interrupt Handler: posts to schedulers */
1577 void 
1578 kqswnal_rxhandler(EP_RXD *rxd)
1579 {
1580         unsigned long flags;
1581         int           nob    = ep_rxd_len (rxd);
1582         int           status = ep_rxd_status (rxd);
1583         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1584
1585         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1586                rxd, krx, nob, status);
1587
1588         LASSERT (krx != NULL);
1589         LASSERT (krx->krx_state = KRX_POSTED);
1590         
1591         krx->krx_state = KRX_PARSE;
1592         krx->krx_rxd = rxd;
1593         krx->krx_nob = nob;
1594
1595         /* RPC reply iff rpc request received without error */
1596         krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
1597                                     (status == EP_SUCCESS ||
1598                                      status == EP_MSG_TOO_BIG);
1599
1600         /* Default to failure if an RPC reply is requested but not handled */
1601         krx->krx_rpc_reply_status = -EPROTO;
1602         atomic_set (&krx->krx_refcount, 1);
1603
1604         /* must receive a whole header to be able to parse */
1605         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1606         {
1607                 /* receives complete with failure when receiver is removed */
1608 #if MULTIRAIL_EKC
1609                 if (status == EP_SHUTDOWN)
1610                         LASSERT (kqswnal_data.kqn_shuttingdown);
1611                 else
1612                         CERROR("receive status failed with status %d nob %d\n",
1613                                ep_rxd_status(rxd), nob);
1614 #else
1615                 if (!kqswnal_data.kqn_shuttingdown)
1616                         CERROR("receive status failed with status %d nob %d\n",
1617                                ep_rxd_status(rxd), nob);
1618 #endif
1619                 kqswnal_rx_decref(krx);
1620                 return;
1621         }
1622
1623         if (!in_interrupt()) {
1624                 kqswnal_parse(krx);
1625                 return;
1626         }
1627
1628         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1629
1630         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1631         wake_up (&kqswnal_data.kqn_sched_waitq);
1632
1633         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1634 }
1635
1636 #if KQSW_CHECKSUM
1637 void
1638 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1639 {
1640         ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1641
1642         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1643                 ", dpid %d, spid %d, type %d\n",
1644                 ishdr ? "Header" : "Payload", krx,
1645                 le64_to_cpu(hdr->dest_nid), le64_to_cpu(hdr->src_nid)
1646                 le32_to_cpu(hdr->dest_pid), le32_to_cpu(hdr->src_pid),
1647                 le32_to_cpu(hdr->type));
1648
1649         switch (le32_to_cpu(hdr->type))
1650         {
1651         case PTL_MSG_ACK:
1652                 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1653                        " len %u\n",
1654                        le32_to_cpu(hdr->msg.ack.mlength),
1655                        hdr->msg.ack.dst_wmd.handle_cookie,
1656                        hdr->msg.ack.dst_wmd.handle_idx,
1657                        le64_to_cpu(hdr->msg.ack.match_bits),
1658                        le32_to_cpu(hdr->msg.ack.length));
1659                 break;
1660         case PTL_MSG_PUT:
1661                 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1662                        " len %u off %u data "LPX64"\n",
1663                        le32_to_cpu(hdr->msg.put.ptl_index),
1664                        hdr->msg.put.ack_wmd.handle_cookie,
1665                        hdr->msg.put.ack_wmd.handle_idx,
1666                        le64_to_cpu(hdr->msg.put.match_bits),
1667                        le32_to_cpu(hdr->msg.put.length),
1668                        le32_to_cpu(hdr->msg.put.offset),
1669                        hdr->msg.put.hdr_data);
1670                 break;
1671         case PTL_MSG_GET:
1672                 CERROR ("GET: <>\n");
1673                 break;
1674         case PTL_MSG_REPLY:
1675                 CERROR ("REPLY: <>\n");
1676                 break;
1677         default:
1678                 CERROR ("TYPE?: <>\n");
1679         }
1680 }
1681 #endif
1682
1683 static ptl_err_t
1684 kqswnal_recvmsg (lib_nal_t    *nal,
1685                  void         *private,
1686                  lib_msg_t    *libmsg,
1687                  unsigned int  niov,
1688                  struct iovec *iov,
1689                  ptl_kiov_t   *kiov,
1690                  size_t        offset,
1691                  size_t        mlen,
1692                  size_t        rlen)
1693 {
1694         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1695         char         *buffer = page_address(krx->krx_kiov[0].kiov_page);
1696         ptl_hdr_t    *hdr = (ptl_hdr_t *)buffer;
1697         int           page;
1698         char         *page_ptr;
1699         int           page_nob;
1700         char         *iov_ptr;
1701         int           iov_nob;
1702         int           frag;
1703         int           rc;
1704 #if KQSW_CHECKSUM
1705         kqsw_csum_t   senders_csum;
1706         kqsw_csum_t   payload_csum = 0;
1707         kqsw_csum_t   hdr_csum = kqsw_csum(0, hdr, sizeof(*hdr));
1708         size_t        csum_len = mlen;
1709         int           csum_frags = 0;
1710         int           csum_nob = 0;
1711         static atomic_t csum_counter;
1712         int           csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1713
1714         atomic_inc (&csum_counter);
1715
1716         memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1717         if (senders_csum != hdr_csum)
1718                 kqswnal_csum_error (krx, 1);
1719 #endif
1720         /* NB lib_parse() has already flipped *hdr */
1721
1722         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1723
1724         if (krx->krx_rpc_reply_needed &&
1725             hdr->type == PTL_MSG_PUT) {
1726                 /* This must be an optimized PUT */
1727                 rc = kqswnal_rdma (krx, libmsg, PTL_MSG_PUT,
1728                                    niov, iov, kiov, offset, mlen);
1729                 return (rc == 0 ? PTL_OK : PTL_FAIL);
1730         }
1731
1732         /* What was actually received must be >= payload. */
1733         LASSERT (mlen <= rlen);
1734         if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
1735                 CERROR("Bad message size: have %d, need %d + %d\n",
1736                        krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
1737                 return (PTL_FAIL);
1738         }
1739
1740         /* It must be OK to kmap() if required */
1741         LASSERT (kiov == NULL || !in_interrupt ());
1742         /* Either all pages or all vaddrs */
1743         LASSERT (!(kiov != NULL && iov != NULL));
1744
1745         if (mlen != 0) {
1746                 page     = 0;
1747                 page_ptr = buffer + KQSW_HDR_SIZE;
1748                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1749
1750                 LASSERT (niov > 0);
1751
1752                 if (kiov != NULL) {
1753                         /* skip complete frags */
1754                         while (offset >= kiov->kiov_len) {
1755                                 offset -= kiov->kiov_len;
1756                                 kiov++;
1757                                 niov--;
1758                                 LASSERT (niov > 0);
1759                         }
1760                         iov_ptr = ((char *)kmap (kiov->kiov_page)) +
1761                                 kiov->kiov_offset + offset;
1762                         iov_nob = kiov->kiov_len - offset;
1763                 } else {
1764                         /* skip complete frags */
1765                         while (offset >= iov->iov_len) {
1766                                 offset -= iov->iov_len;
1767                                 iov++;
1768                                 niov--;
1769                                 LASSERT (niov > 0);
1770                         }
1771                         iov_ptr = iov->iov_base + offset;
1772                         iov_nob = iov->iov_len - offset;
1773                 }
1774                 
1775                 for (;;)
1776                 {
1777                         frag = mlen;
1778                         if (frag > page_nob)
1779                                 frag = page_nob;
1780                         if (frag > iov_nob)
1781                                 frag = iov_nob;
1782
1783                         memcpy (iov_ptr, page_ptr, frag);
1784 #if KQSW_CHECKSUM
1785                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1786                         csum_nob += frag;
1787                         csum_frags++;
1788 #endif
1789                         mlen -= frag;
1790                         if (mlen == 0)
1791                                 break;
1792
1793                         page_nob -= frag;
1794                         if (page_nob != 0)
1795                                 page_ptr += frag;
1796                         else
1797                         {
1798                                 page++;
1799                                 LASSERT (page < krx->krx_npages);
1800                                 page_ptr = page_address(krx->krx_kiov[page].kiov_page);
1801                                 page_nob = PAGE_SIZE;
1802                         }
1803
1804                         iov_nob -= frag;
1805                         if (iov_nob != 0)
1806                                 iov_ptr += frag;
1807                         else if (kiov != NULL) {
1808                                 kunmap (kiov->kiov_page);
1809                                 kiov++;
1810                                 niov--;
1811                                 LASSERT (niov > 0);
1812                                 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1813                                 iov_nob = kiov->kiov_len;
1814                         } else {
1815                                 iov++;
1816                                 niov--;
1817                                 LASSERT (niov > 0);
1818                                 iov_ptr = iov->iov_base;
1819                                 iov_nob = iov->iov_len;
1820                         }
1821                 }
1822
1823                 if (kiov != NULL)
1824                         kunmap (kiov->kiov_page);
1825         }
1826
1827 #if KQSW_CHECKSUM
1828         memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), 
1829                 sizeof(kqsw_csum_t));
1830
1831         if (csum_len != rlen)
1832                 CERROR("Unable to checksum data in user's buffer\n");
1833         else if (senders_csum != payload_csum)
1834                 kqswnal_csum_error (krx, 0);
1835
1836         if (csum_verbose)
1837                 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1838                        "csum_nob %d\n",
1839                         hdr_csum, payload_csum, csum_frags, csum_nob);
1840 #endif
1841         lib_finalize(nal, private, libmsg, PTL_OK);
1842
1843         return (PTL_OK);
1844 }
1845
1846 static ptl_err_t
1847 kqswnal_recv(lib_nal_t    *nal,
1848              void         *private,
1849              lib_msg_t    *libmsg,
1850              unsigned int  niov,
1851              struct iovec *iov,
1852              size_t        offset,
1853              size_t        mlen,
1854              size_t        rlen)
1855 {
1856         return (kqswnal_recvmsg(nal, private, libmsg, 
1857                                 niov, iov, NULL, 
1858                                 offset, mlen, rlen));
1859 }
1860
1861 static ptl_err_t
1862 kqswnal_recv_pages (lib_nal_t    *nal,
1863                     void         *private,
1864                     lib_msg_t    *libmsg,
1865                     unsigned int  niov,
1866                     ptl_kiov_t   *kiov,
1867                     size_t        offset,
1868                     size_t        mlen,
1869                     size_t        rlen)
1870 {
1871         return (kqswnal_recvmsg(nal, private, libmsg, 
1872                                 niov, NULL, kiov, 
1873                                 offset, mlen, rlen));
1874 }
1875
1876 int
1877 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1878 {
1879         long    pid = kernel_thread (fn, arg, 0);
1880
1881         if (pid < 0)
1882                 return ((int)pid);
1883
1884         atomic_inc (&kqswnal_data.kqn_nthreads);
1885         return (0);
1886 }
1887
1888 void
1889 kqswnal_thread_fini (void)
1890 {
1891         atomic_dec (&kqswnal_data.kqn_nthreads);
1892 }
1893
1894 int
1895 kqswnal_scheduler (void *arg)
1896 {
1897         kqswnal_rx_t    *krx;
1898         kqswnal_tx_t    *ktx;
1899         kpr_fwd_desc_t  *fwd;
1900         unsigned long    flags;
1901         int              rc;
1902         int              counter = 0;
1903         int              did_something;
1904
1905         kportal_daemonize ("kqswnal_sched");
1906         kportal_blockallsigs ();
1907         
1908         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1909
1910         for (;;)
1911         {
1912                 did_something = 0;
1913
1914                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1915                 {
1916                         krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1917                                          kqswnal_rx_t, krx_list);
1918                         list_del (&krx->krx_list);
1919                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1920                                                flags);
1921
1922                         switch (krx->krx_state) {
1923                         case KRX_PARSE:
1924                                 kqswnal_parse (krx);
1925                                 break;
1926                         case KRX_COMPLETING:
1927                                 kqswnal_rx_decref (krx);
1928                                 break;
1929                         default:
1930                                 LBUG();
1931                         }
1932
1933                         did_something = 1;
1934                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1935                 }
1936
1937                 if (!list_empty (&kqswnal_data.kqn_delayedtxds))
1938                 {
1939                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1940                                          kqswnal_tx_t, ktx_list);
1941                         list_del_init (&ktx->ktx_delayed_list);
1942                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1943                                                flags);
1944
1945                         rc = kqswnal_launch (ktx);
1946                         if (rc != 0) {
1947                                 CERROR("Failed delayed transmit to "LPX64
1948                                        ": %d\n", ktx->ktx_nid, rc);
1949                                 kqswnal_tx_done (ktx, rc);
1950                         }
1951                         atomic_dec (&kqswnal_data.kqn_pending_txs);
1952
1953                         did_something = 1;
1954                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1955                 }
1956
1957                 if (!list_empty (&kqswnal_data.kqn_delayedfwds))
1958                 {
1959                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1960                         list_del (&fwd->kprfd_list);
1961                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1962
1963                         /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */
1964                         kqswnal_fwd_packet (NULL, fwd);
1965
1966                         did_something = 1;
1967                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1968                 }
1969
1970                 /* nothing to do or hogging CPU */
1971                 if (!did_something || counter++ == KQSW_RESCHED) {
1972                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1973                                                flags);
1974
1975                         counter = 0;
1976
1977                         if (!did_something) {
1978                                 if (kqswnal_data.kqn_shuttingdown == 2) {
1979                                         /* We only exit in stage 2 of shutdown when 
1980                                          * there's nothing left to do */
1981                                         break;
1982                                 }
1983                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1984                                                                kqswnal_data.kqn_shuttingdown == 2 ||
1985                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
1986                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1987                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
1988                                 LASSERT (rc == 0);
1989                         } else if (need_resched())
1990                                 schedule ();
1991
1992                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1993                 }
1994         }
1995
1996         kqswnal_thread_fini ();
1997         return (0);
1998 }
1999
2000 lib_nal_t kqswnal_lib =
2001 {
2002         libnal_data:       &kqswnal_data,         /* NAL private data */
2003         libnal_send:        kqswnal_send,
2004         libnal_send_pages:  kqswnal_send_pages,
2005         libnal_recv:        kqswnal_recv,
2006         libnal_recv_pages:  kqswnal_recv_pages,
2007         libnal_dist:        kqswnal_dist
2008 };