Whamcloud - gitweb
file jbd-stats-2.6.9.patch was initially added on branch b1_4.
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  * This file is part of Portals, http://www.lustre.org
8  *
9  * Portals is free software; you can redistribute it and/or
10  * modify it under the terms of version 2 of the GNU General Public
11  * License as published by the Free Software Foundation.
12  *
13  * Portals is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with Portals; if not, write to the Free Software
20  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "qswnal.h"
25
26 /*
27  *  LIB functions follow
28  *
29  */
30 static int
31 kqswnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
32 {
33         if (nid == nal->libnal_ni.ni_pid.nid)
34                 *dist = 0;                      /* it's me */
35         else if (kqswnal_nid2elanid (nid) >= 0)
36                 *dist = 1;                      /* it's my peer */
37         else
38                 *dist = 2;                      /* via router */
39         return (0);
40 }
41
42 void
43 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
44 {
45         struct timeval     now;
46         time_t             then;
47
48         do_gettimeofday (&now);
49         then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
50
51         kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
52 }
53
54 void
55 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
56 {
57 #if MULTIRAIL_EKC
58         int      i;
59
60         ktx->ktx_rail = -1;                     /* unset rail */
61 #endif
62
63         if (ktx->ktx_nmappedpages == 0)
64                 return;
65         
66 #if MULTIRAIL_EKC
67         CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
68                ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
69
70         for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
71                 ep_dvma_unload(kqswnal_data.kqn_ep,
72                                kqswnal_data.kqn_ep_tx_nmh,
73                                &ktx->ktx_frags[i]);
74 #else
75         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
76                 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
77
78         LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
79         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
80                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
81
82         elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
83                           kqswnal_data.kqn_eptxdmahandle,
84                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
85 #endif
86         ktx->ktx_nmappedpages = 0;
87 }
88
89 int
90 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
91 {
92         int       nfrags    = ktx->ktx_nfrag;
93         int       nmapped   = ktx->ktx_nmappedpages;
94         int       maxmapped = ktx->ktx_npages;
95         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
96         char     *ptr;
97 #if MULTIRAIL_EKC
98         EP_RAILMASK railmask;
99         int         rail;
100
101         if (ktx->ktx_rail < 0)
102                 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
103                                                  EP_RAILMASK_ALL,
104                                                  kqswnal_nid2elanid(ktx->ktx_nid));
105         rail = ktx->ktx_rail;
106         if (rail < 0) {
107                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
108                 return (-ENETDOWN);
109         }
110         railmask = 1 << rail;
111 #endif
112         LASSERT (nmapped <= maxmapped);
113         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
114         LASSERT (nfrags <= EP_MAXFRAG);
115         LASSERT (niov > 0);
116         LASSERT (nob > 0);
117
118         /* skip complete frags before 'offset' */
119         while (offset >= kiov->kiov_len) {
120                 offset -= kiov->kiov_len;
121                 kiov++;
122                 niov--;
123                 LASSERT (niov > 0);
124         }
125
126         do {
127                 int  fraglen = kiov->kiov_len - offset;
128
129                 /* each page frag is contained in one page */
130                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
131
132                 if (fraglen > nob)
133                         fraglen = nob;
134
135                 nmapped++;
136                 if (nmapped > maxmapped) {
137                         CERROR("Can't map message in %d pages (max %d)\n",
138                                nmapped, maxmapped);
139                         return (-EMSGSIZE);
140                 }
141
142                 if (nfrags == EP_MAXFRAG) {
143                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
144                                EP_MAXFRAG);
145                         return (-EMSGSIZE);
146                 }
147
148                 /* XXX this is really crap, but we'll have to kmap until
149                  * EKC has a page (rather than vaddr) mapping interface */
150
151                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
152
153                 CDEBUG(D_NET,
154                        "%p[%d] loading %p for %d, page %d, %d total\n",
155                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
156
157 #if MULTIRAIL_EKC
158                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
159                              ptr, fraglen,
160                              kqswnal_data.kqn_ep_tx_nmh, basepage,
161                              &railmask, &ktx->ktx_frags[nfrags]);
162
163                 if (nfrags == ktx->ktx_firsttmpfrag ||
164                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
165                                   &ktx->ktx_frags[nfrags - 1],
166                                   &ktx->ktx_frags[nfrags])) {
167                         /* new frag if this is the first or can't merge */
168                         nfrags++;
169                 }
170 #else
171                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
172                                        kqswnal_data.kqn_eptxdmahandle,
173                                        ptr, fraglen,
174                                        basepage, &ktx->ktx_frags[nfrags].Base);
175
176                 if (nfrags > 0 &&                /* previous frag mapped */
177                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
178                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
179                         /* just extend previous */
180                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
181                 else {
182                         ktx->ktx_frags[nfrags].Len = fraglen;
183                         nfrags++;                /* new frag */
184                 }
185 #endif
186
187                 kunmap (kiov->kiov_page);
188                 
189                 /* keep in loop for failure case */
190                 ktx->ktx_nmappedpages = nmapped;
191
192                 basepage++;
193                 kiov++;
194                 niov--;
195                 nob -= fraglen;
196                 offset = 0;
197
198                 /* iov must not run out before end of data */
199                 LASSERT (nob == 0 || niov > 0);
200
201         } while (nob > 0);
202
203         ktx->ktx_nfrag = nfrags;
204         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
205                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
206
207         return (0);
208 }
209
210 int
211 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
212                     int niov, struct iovec *iov)
213 {
214         int       nfrags    = ktx->ktx_nfrag;
215         int       nmapped   = ktx->ktx_nmappedpages;
216         int       maxmapped = ktx->ktx_npages;
217         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
218 #if MULTIRAIL_EKC
219         EP_RAILMASK railmask;
220         int         rail;
221         
222         if (ktx->ktx_rail < 0)
223                 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
224                                                  EP_RAILMASK_ALL,
225                                                  kqswnal_nid2elanid(ktx->ktx_nid));
226         rail = ktx->ktx_rail;
227         if (rail < 0) {
228                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
229                 return (-ENETDOWN);
230         }
231         railmask = 1 << rail;
232 #endif
233         LASSERT (nmapped <= maxmapped);
234         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
235         LASSERT (nfrags <= EP_MAXFRAG);
236         LASSERT (niov > 0);
237         LASSERT (nob > 0);
238
239         /* skip complete frags before offset */
240         while (offset >= iov->iov_len) {
241                 offset -= iov->iov_len;
242                 iov++;
243                 niov--;
244                 LASSERT (niov > 0);
245         }
246         
247         do {
248                 int  fraglen = iov->iov_len - offset;
249                 long npages;
250                 
251                 if (fraglen > nob)
252                         fraglen = nob;
253                 npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
254
255                 nmapped += npages;
256                 if (nmapped > maxmapped) {
257                         CERROR("Can't map message in %d pages (max %d)\n",
258                                nmapped, maxmapped);
259                         return (-EMSGSIZE);
260                 }
261
262                 if (nfrags == EP_MAXFRAG) {
263                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
264                                EP_MAXFRAG);
265                         return (-EMSGSIZE);
266                 }
267
268                 CDEBUG(D_NET,
269                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
270                        ktx, nfrags, iov->iov_base + offset, fraglen, 
271                        basepage, npages, nmapped);
272
273 #if MULTIRAIL_EKC
274                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
275                              iov->iov_base + offset, fraglen,
276                              kqswnal_data.kqn_ep_tx_nmh, basepage,
277                              &railmask, &ktx->ktx_frags[nfrags]);
278
279                 if (nfrags == ktx->ktx_firsttmpfrag ||
280                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
281                                   &ktx->ktx_frags[nfrags - 1],
282                                   &ktx->ktx_frags[nfrags])) {
283                         /* new frag if this is the first or can't merge */
284                         nfrags++;
285                 }
286 #else
287                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
288                                        kqswnal_data.kqn_eptxdmahandle,
289                                        iov->iov_base + offset, fraglen,
290                                        basepage, &ktx->ktx_frags[nfrags].Base);
291
292                 if (nfrags > 0 &&                /* previous frag mapped */
293                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
294                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
295                         /* just extend previous */
296                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
297                 else {
298                         ktx->ktx_frags[nfrags].Len = fraglen;
299                         nfrags++;                /* new frag */
300                 }
301 #endif
302
303                 /* keep in loop for failure case */
304                 ktx->ktx_nmappedpages = nmapped;
305
306                 basepage += npages;
307                 iov++;
308                 niov--;
309                 nob -= fraglen;
310                 offset = 0;
311
312                 /* iov must not run out before end of data */
313                 LASSERT (nob == 0 || niov > 0);
314
315         } while (nob > 0);
316
317         ktx->ktx_nfrag = nfrags;
318         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
319                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
320
321         return (0);
322 }
323
324
325 void
326 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
327 {
328         kpr_fwd_desc_t   *fwd = NULL;
329         unsigned long     flags;
330
331         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
332         ktx->ktx_state = KTX_IDLE;
333
334         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
335
336         list_del (&ktx->ktx_list);              /* take off active list */
337
338         if (ktx->ktx_isnblk) {
339                 /* reserved for non-blocking tx */
340                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
341                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
342                 return;
343         }
344
345         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
346
347         /* anything blocking for a tx descriptor? */
348         if (!kqswnal_data.kqn_shuttingdown &&
349             !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
350         {
351                 CDEBUG(D_NET,"wakeup fwd\n");
352
353                 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
354                                   kpr_fwd_desc_t, kprfd_list);
355                 list_del (&fwd->kprfd_list);
356         }
357
358         wake_up (&kqswnal_data.kqn_idletxd_waitq);
359
360         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
361
362         if (fwd == NULL)
363                 return;
364
365         /* schedule packet for forwarding again */
366         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
367
368         list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
369         wake_up (&kqswnal_data.kqn_sched_waitq);
370
371         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
372 }
373
374 kqswnal_tx_t *
375 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
376 {
377         unsigned long  flags;
378         kqswnal_tx_t  *ktx = NULL;
379
380         for (;;) {
381                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
382
383                 if (kqswnal_data.kqn_shuttingdown)
384                         break;
385
386                 /* "normal" descriptor is free */
387                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
388                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
389                                           kqswnal_tx_t, ktx_list);
390                         break;
391                 }
392
393                 if (fwd != NULL)                /* forwarded packet? */
394                         break;
395
396                 /* doing a local transmit */
397                 if (!may_block) {
398                         if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
399                                 CERROR ("intr tx desc pool exhausted\n");
400                                 break;
401                         }
402
403                         ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
404                                           kqswnal_tx_t, ktx_list);
405                         break;
406                 }
407
408                 /* block for idle tx */
409
410                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
411
412                 CDEBUG (D_NET, "blocking for tx desc\n");
413                 wait_event (kqswnal_data.kqn_idletxd_waitq,
414                             !list_empty (&kqswnal_data.kqn_idletxds) ||
415                             kqswnal_data.kqn_shuttingdown);
416         }
417
418         if (ktx != NULL) {
419                 list_del (&ktx->ktx_list);
420                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
421                 ktx->ktx_launcher = current->pid;
422                 atomic_inc(&kqswnal_data.kqn_pending_txs);
423         } else if (fwd != NULL) {
424                 /* queue forwarded packet until idle txd available */
425                 CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
426                 list_add_tail (&fwd->kprfd_list,
427                                &kqswnal_data.kqn_idletxd_fwdq);
428         }
429
430         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
431
432         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
433         LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
434
435         return (ktx);
436 }
437
438 void
439 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
440 {
441         switch (ktx->ktx_state) {
442         case KTX_FORWARDING:       /* router asked me to forward this packet */
443                 kpr_fwd_done (&kqswnal_data.kqn_router,
444                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
445                 break;
446
447         case KTX_RDMAING:          /* optimized GET/PUT handled */
448         case KTX_PUTTING:          /* optimized PUT sent */
449         case KTX_SENDING:          /* normal send */
450                 lib_finalize (&kqswnal_lib, NULL,
451                               (lib_msg_t *)ktx->ktx_args[1],
452                               (error == 0) ? PTL_OK : PTL_FAIL);
453                 break;
454
455         case KTX_GETTING:          /* optimized GET sent & REPLY received */
456                 /* Complete the GET with success since we can't avoid
457                  * delivering a REPLY event; we committed to it when we
458                  * launched the GET */
459                 lib_finalize (&kqswnal_lib, NULL, 
460                               (lib_msg_t *)ktx->ktx_args[1], PTL_OK);
461                 lib_finalize (&kqswnal_lib, NULL,
462                               (lib_msg_t *)ktx->ktx_args[2],
463                               (error == 0) ? PTL_OK : PTL_FAIL);
464                 break;
465
466         default:
467                 LASSERT (0);
468         }
469
470         kqswnal_put_idle_tx (ktx);
471 }
472
473 static void
474 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
475 {
476         kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
477
478         LASSERT (txd != NULL);
479         LASSERT (ktx != NULL);
480
481         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
482
483         if (status != EP_SUCCESS) {
484
485                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
486                         ktx->ktx_nid, status);
487
488                 kqswnal_notify_peer_down(ktx);
489                 status = -EHOSTDOWN;
490
491         } else switch (ktx->ktx_state) {
492
493         case KTX_GETTING:
494         case KTX_PUTTING:
495                 /* RPC completed OK; but what did our peer put in the status
496                  * block? */
497 #if MULTIRAIL_EKC
498                 status = ep_txd_statusblk(txd)->Data[0];
499 #else
500                 status = ep_txd_statusblk(txd)->Status;
501 #endif
502                 break;
503                 
504         case KTX_FORWARDING:
505         case KTX_SENDING:
506                 status = 0;
507                 break;
508                 
509         default:
510                 LBUG();
511                 break;
512         }
513
514         kqswnal_tx_done (ktx, status);
515 }
516
517 int
518 kqswnal_launch (kqswnal_tx_t *ktx)
519 {
520         /* Don't block for transmit descriptor if we're in interrupt context */
521         int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
522         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
523         unsigned long flags;
524         int   rc;
525
526         ktx->ktx_launchtime = jiffies;
527
528         if (kqswnal_data.kqn_shuttingdown)
529                 return (-ESHUTDOWN);
530
531         LASSERT (dest >= 0);                    /* must be a peer */
532
533 #if MULTIRAIL_EKC
534         if (ktx->ktx_nmappedpages != 0)
535                 attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
536 #endif
537
538         switch (ktx->ktx_state) {
539         case KTX_GETTING:
540         case KTX_PUTTING:
541                 /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
542                  * The other frags are the payload, awaiting RDMA */
543                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
544                                      ktx->ktx_port, attr,
545                                      kqswnal_txhandler, ktx,
546                                      NULL, ktx->ktx_frags, 1);
547                 break;
548
549         case KTX_FORWARDING:
550         case KTX_SENDING:
551 #if MULTIRAIL_EKC
552                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
553                                          ktx->ktx_port, attr,
554                                          kqswnal_txhandler, ktx,
555                                          NULL, ktx->ktx_frags, ktx->ktx_nfrag);
556 #else
557                 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
558                                        ktx->ktx_port, attr, 
559                                        kqswnal_txhandler, ktx, 
560                                        ktx->ktx_frags, ktx->ktx_nfrag);
561 #endif
562                 break;
563                 
564         default:
565                 LBUG();
566                 rc = -EINVAL;                   /* no compiler warning please */
567                 break;
568         }
569
570         switch (rc) {
571         case EP_SUCCESS: /* success */
572                 return (0);
573
574         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
575                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
576
577                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
578                 wake_up (&kqswnal_data.kqn_sched_waitq);
579
580                 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
581                 return (0);
582
583         default: /* fatal error */
584                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
585                 kqswnal_notify_peer_down(ktx);
586                 return (-EHOSTUNREACH);
587         }
588 }
589
590 #if 0
591 static char *
592 hdr_type_string (ptl_hdr_t *hdr)
593 {
594         switch (hdr->type) {
595         case PTL_MSG_ACK:
596                 return ("ACK");
597         case PTL_MSG_PUT:
598                 return ("PUT");
599         case PTL_MSG_GET:
600                 return ("GET");
601         case PTL_MSG_REPLY:
602                 return ("REPLY");
603         default:
604                 return ("<UNKNOWN>");
605         }
606 }
607
608 static void
609 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
610 {
611         char *type_str = hdr_type_string (hdr);
612
613         CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
614                le32_to_cpu(hdr->payload_length));
615         CERROR("    From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),
616                le32_to_cpu(hdr->src_pid));
617         CERROR("    To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),
618                le32_to_cpu(hdr->dest_pid));
619
620         switch (le32_to_cpu(hdr->type)) {
621         case PTL_MSG_PUT:
622                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
623                        "match bits "LPX64"\n",
624                        le32_to_cpu(hdr->msg.put.ptl_index),
625                        hdr->msg.put.ack_wmd.wh_interface_cookie,
626                        hdr->msg.put.ack_wmd.wh_object_cookie,
627                        le64_to_cpu(hdr->msg.put.match_bits));
628                 CERROR("    offset %d, hdr data "LPX64"\n",
629                        le32_to_cpu(hdr->msg.put.offset),
630                        hdr->msg.put.hdr_data);
631                 break;
632
633         case PTL_MSG_GET:
634                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
635                        "match bits "LPX64"\n",
636                        le32_to_cpu(hdr->msg.get.ptl_index),
637                        hdr->msg.get.return_wmd.wh_interface_cookie,
638                        hdr->msg.get.return_wmd.wh_object_cookie,
639                        hdr->msg.get.match_bits);
640                 CERROR("    Length %d, src offset %d\n",
641                        le32_to_cpu(hdr->msg.get.sink_length),
642                        le32_to_cpu(hdr->msg.get.src_offset));
643                 break;
644
645         case PTL_MSG_ACK:
646                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
647                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
648                        hdr->msg.ack.dst_wmd.wh_object_cookie,
649                        le32_to_cpu(hdr->msg.ack.mlength));
650                 break;
651
652         case PTL_MSG_REPLY:
653                 CERROR("    dst md "LPX64"."LPX64"\n",
654                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
655                        hdr->msg.reply.dst_wmd.wh_object_cookie);
656         }
657
658 }                               /* end of print_hdr() */
659 #endif
660
661 #if !MULTIRAIL_EKC
662 void
663 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) 
664 {
665         int          i;
666
667         CDEBUG (how, "%s: %d\n", str, n);
668         for (i = 0; i < n; i++) {
669                 CDEBUG (how, "   %08x for %d\n", iov[i].Base, iov[i].Len);
670         }
671 }
672
673 int
674 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
675                      int nsrc, EP_IOVEC *src,
676                      int ndst, EP_IOVEC *dst) 
677 {
678         int        count;
679         int        nob;
680
681         LASSERT (ndv > 0);
682         LASSERT (nsrc > 0);
683         LASSERT (ndst > 0);
684
685         for (count = 0; count < ndv; count++, dv++) {
686
687                 if (nsrc == 0 || ndst == 0) {
688                         if (nsrc != ndst) {
689                                 /* For now I'll barf on any left over entries */
690                                 CERROR ("mismatched src and dst iovs\n");
691                                 return (-EINVAL);
692                         }
693                         return (count);
694                 }
695
696                 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
697                 dv->Len    = nob;
698                 dv->Source = src->Base;
699                 dv->Dest   = dst->Base;
700
701                 if (nob >= src->Len) {
702                         src++;
703                         nsrc--;
704                 } else {
705                         src->Len -= nob;
706                         src->Base += nob;
707                 }
708                 
709                 if (nob >= dst->Len) {
710                         dst++;
711                         ndst--;
712                 } else {
713                         src->Len -= nob;
714                         src->Base += nob;
715                 }
716         }
717
718         CERROR ("DATAVEC too small\n");
719         return (-E2BIG);
720 }
721 #else
722 int
723 kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
724                     int nrfrag, EP_NMD *rfrag)
725 {
726         int  i;
727
728         if (nlfrag != nrfrag) {
729                 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
730                        nlfrag, nrfrag);
731                 return (-EINVAL);
732         }
733         
734         for (i = 0; i < nlfrag; i++)
735                 if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
736                         CERROR("Can't cope with unequal frags %d(%d):"
737                                " %d local %d remote\n",
738                                i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
739                         return (-EINVAL);
740                 }
741         
742         return (0);
743 }
744 #endif
745
746 kqswnal_remotemd_t *
747 kqswnal_parse_rmd (kqswnal_rx_t *krx, int type, ptl_nid_t expected_nid)
748 {
749         char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
750         ptl_hdr_t          *hdr = (ptl_hdr_t *)buffer;
751         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
752         ptl_nid_t           nid = kqswnal_rx_nid(krx);
753
754         /* Note (1) lib_parse has already flipped hdr.
755          *      (2) RDMA addresses are sent in native endian-ness.  When
756          *      EKC copes with different endian nodes, I'll fix this (and
757          *      eat my hat :) */
758
759         LASSERT (krx->krx_nob >= sizeof(*hdr));
760
761         if (hdr->type != type) {
762                 CERROR ("Unexpected optimized get/put type %d (%d expected)"
763                         "from "LPX64"\n", hdr->type, type, nid);
764                 return (NULL);
765         }
766         
767         if (hdr->src_nid != nid) {
768                 CERROR ("Unexpected optimized get/put source NID "
769                         LPX64" from "LPX64"\n", hdr->src_nid, nid);
770                 return (NULL);
771         }
772
773         LASSERT (nid == expected_nid);
774
775         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
776                 /* msg too small to discover rmd size */
777                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
778                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
779                 return (NULL);
780         }
781
782         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
783                 /* rmd doesn't fit in the incoming message */
784                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
785                         krx->krx_nob, rmd->kqrmd_nfrag,
786                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
787                 return (NULL);
788         }
789
790         return (rmd);
791 }
792
793 void
794 kqswnal_rdma_store_complete (EP_RXD *rxd) 
795 {
796         int           status = ep_rxd_status(rxd);
797         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
798         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
799         
800         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
801                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
802
803         LASSERT (ktx->ktx_state == KTX_RDMAING);
804         LASSERT (krx->krx_rxd == rxd);
805         LASSERT (krx->krx_rpc_reply_needed);
806
807         krx->krx_rpc_reply_needed = 0;
808         kqswnal_rx_decref (krx);
809
810         /* free ktx & finalize() its lib_msg_t */
811         kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
812 }
813
814 void
815 kqswnal_rdma_fetch_complete (EP_RXD *rxd) 
816 {
817         /* Completed fetching the PUT data */
818         int           status = ep_rxd_status(rxd);
819         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
820         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
821         unsigned long flags;
822         
823         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
824                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
825
826         LASSERT (ktx->ktx_state == KTX_RDMAING);
827         LASSERT (krx->krx_rxd == rxd);
828         /* RPC completes with failure by default */
829         LASSERT (krx->krx_rpc_reply_needed);
830         LASSERT (krx->krx_rpc_reply_status != 0);
831
832         if (status == EP_SUCCESS) {
833                 status = krx->krx_rpc_reply_status = 0;
834         } else {
835                 /* Abandon RPC since get failed */
836                 krx->krx_rpc_reply_needed = 0;
837                 status = -ECONNABORTED;
838         }
839
840         /* free ktx & finalize() its lib_msg_t */
841         kqswnal_tx_done(ktx, status);
842
843         if (!in_interrupt()) {
844                 /* OK to complete the RPC now (iff I had the last ref) */
845                 kqswnal_rx_decref (krx);
846                 return;
847         }
848
849         LASSERT (krx->krx_state == KRX_PARSE);
850         krx->krx_state = KRX_COMPLETING;
851
852         /* Complete the RPC in thread context */
853         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
854
855         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
856         wake_up (&kqswnal_data.kqn_sched_waitq);
857
858         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
859 }
860
861 int
862 kqswnal_rdma (kqswnal_rx_t *krx, lib_msg_t *libmsg, int type,
863               int niov, struct iovec *iov, ptl_kiov_t *kiov,
864               size_t offset, size_t len)
865 {
866         kqswnal_remotemd_t *rmd;
867         kqswnal_tx_t       *ktx;
868         int                 eprc;
869         int                 rc;
870 #if !MULTIRAIL_EKC
871         EP_DATAVEC          datav[EP_MAXFRAG];
872         int                 ndatav;
873 #endif
874
875         LASSERT (type == PTL_MSG_GET || type == PTL_MSG_PUT);
876         /* Not both mapped and paged payload */
877         LASSERT (iov == NULL || kiov == NULL);
878         /* RPC completes with failure by default */
879         LASSERT (krx->krx_rpc_reply_needed);
880         LASSERT (krx->krx_rpc_reply_status != 0);
881
882         rmd = kqswnal_parse_rmd(krx, type, libmsg->ev.initiator.nid);
883         if (rmd == NULL)
884                 return (-EPROTO);
885
886         if (len == 0) {
887                 /* data got truncated to nothing. */
888                 lib_finalize(&kqswnal_lib, krx, libmsg, PTL_OK);
889                 /* Let kqswnal_rx_done() complete the RPC with success */
890                 krx->krx_rpc_reply_status = 0;
891                 return (0);
892         }
893         
894         /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
895            actually sending a portals message with it */
896         ktx = kqswnal_get_idle_tx(NULL, 0);
897         if (ktx == NULL) {
898                 CERROR ("Can't get txd for RDMA with "LPX64"\n",
899                         libmsg->ev.initiator.nid);
900                 return (-ENOMEM);
901         }
902
903         ktx->ktx_state   = KTX_RDMAING;
904         ktx->ktx_nid     = libmsg->ev.initiator.nid;
905         ktx->ktx_args[0] = krx;
906         ktx->ktx_args[1] = libmsg;
907
908 #if MULTIRAIL_EKC
909         /* Map on the rail the RPC prefers */
910         ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
911                                          ep_rxd_railmask(krx->krx_rxd));
912 #endif
913
914         /* Start mapping at offset 0 (we're not mapping any headers) */
915         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
916         
917         if (kiov != NULL)
918                 rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
919         else
920                 rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
921
922         if (rc != 0) {
923                 CERROR ("Can't map local RDMA data: %d\n", rc);
924                 goto out;
925         }
926
927 #if MULTIRAIL_EKC
928         rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
929                                  rmd->kqrmd_nfrag, rmd->kqrmd_frag);
930         if (rc != 0) {
931                 CERROR ("Incompatible RDMA descriptors\n");
932                 goto out;
933         }
934 #else
935         switch (type) {
936         default:
937                 LBUG();
938
939         case PTL_MSG_GET:
940                 ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
941                                              ktx->ktx_nfrag, ktx->ktx_frags,
942                                              rmd->kqrmd_nfrag, rmd->kqrmd_frag);
943                 break;
944
945         case PTL_MSG_PUT:
946                 ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
947                                              rmd->kqrmd_nfrag, rmd->kqrmd_frag,
948                                              ktx->ktx_nfrag, ktx->ktx_frags);
949                 break;
950         }
951                 
952         if (ndatav < 0) {
953                 CERROR ("Can't create datavec: %d\n", ndatav);
954                 rc = ndatav;
955                 goto out;
956         }
957 #endif
958
959         LASSERT (atomic_read(&krx->krx_refcount) > 0);
960         /* Take an extra ref for the completion callback */
961         atomic_inc(&krx->krx_refcount);
962
963         switch (type) {
964         default:
965                 LBUG();
966
967         case PTL_MSG_GET:
968 #if MULTIRAIL_EKC
969                 eprc = ep_complete_rpc(krx->krx_rxd, 
970                                        kqswnal_rdma_store_complete, ktx, 
971                                        &kqswnal_data.kqn_rpc_success,
972                                        ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
973 #else
974                 eprc = ep_complete_rpc (krx->krx_rxd, 
975                                         kqswnal_rdma_store_complete, ktx,
976                                         &kqswnal_data.kqn_rpc_success, 
977                                         datav, ndatav);
978                 if (eprc != EP_SUCCESS) /* "old" EKC destroys rxd on failed completion */
979                         krx->krx_rxd = NULL;
980 #endif
981                 if (eprc != EP_SUCCESS) {
982                         CERROR("can't complete RPC: %d\n", eprc);
983                         /* don't re-attempt RPC completion */
984                         krx->krx_rpc_reply_needed = 0;
985                         rc = -ECONNABORTED;
986                 }
987                 break;
988                 
989         case PTL_MSG_PUT:
990 #if MULTIRAIL_EKC
991                 eprc = ep_rpc_get (krx->krx_rxd, 
992                                    kqswnal_rdma_fetch_complete, ktx,
993                                    rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
994 #else
995                 eprc = ep_rpc_get (krx->krx_rxd,
996                                    kqswnal_rdma_fetch_complete, ktx,
997                                    datav, ndatav);
998 #endif
999                 if (eprc != EP_SUCCESS) {
1000                         CERROR("ep_rpc_get failed: %d\n", eprc);
1001                         /* Don't attempt RPC completion: 
1002                          * EKC nuked it when the get failed */
1003                         krx->krx_rpc_reply_needed = 0;
1004                         rc = -ECONNABORTED;
1005                 }
1006                 break;
1007         }
1008
1009  out:
1010         if (rc != 0) {
1011                 kqswnal_rx_decref(krx);                 /* drop callback's ref */
1012                 kqswnal_put_idle_tx (ktx);
1013         }
1014
1015         atomic_dec(&kqswnal_data.kqn_pending_txs);
1016         return (rc);
1017 }
1018
1019 static ptl_err_t
1020 kqswnal_sendmsg (lib_nal_t    *nal,
1021                  void         *private,
1022                  lib_msg_t    *libmsg,
1023                  ptl_hdr_t    *hdr,
1024                  int           type,
1025                  ptl_nid_t     nid,
1026                  ptl_pid_t     pid,
1027                  unsigned int  payload_niov,
1028                  struct iovec *payload_iov,
1029                  ptl_kiov_t   *payload_kiov,
1030                  size_t        payload_offset,
1031                  size_t        payload_nob)
1032 {
1033         kqswnal_tx_t      *ktx;
1034         int                rc;
1035         ptl_nid_t          targetnid;
1036 #if KQSW_CHECKSUM
1037         int                i;
1038         kqsw_csum_t        csum;
1039         int                sumoff;
1040         int                sumnob;
1041 #endif
1042         /* NB 1. hdr is in network byte order */
1043         /*    2. 'private' depends on the message type */
1044         
1045         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
1046                " pid %u\n", payload_nob, payload_niov, nid, pid);
1047
1048         LASSERT (payload_nob == 0 || payload_niov > 0);
1049         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1050
1051         /* It must be OK to kmap() if required */
1052         LASSERT (payload_kiov == NULL || !in_interrupt ());
1053         /* payload is either all vaddrs or all pages */
1054         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1055
1056         if (payload_nob > KQSW_MAXPAYLOAD) {
1057                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
1058                         payload_nob, KQSW_MAXPAYLOAD);
1059                 return (PTL_FAIL);
1060         }
1061
1062         if (type == PTL_MSG_REPLY &&            /* can I look in 'private' */
1063             ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) { /* is it an RPC */
1064                 /* Must be a REPLY for an optimized GET */
1065                 rc = kqswnal_rdma ((kqswnal_rx_t *)private, libmsg, PTL_MSG_GET,
1066                                    payload_niov, payload_iov, payload_kiov, 
1067                                    payload_offset, payload_nob);
1068                 return ((rc == 0) ? PTL_OK : PTL_FAIL);
1069         }
1070
1071         targetnid = nid;
1072         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
1073                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
1074                                  sizeof (ptl_hdr_t) + payload_nob, &targetnid);
1075                 if (rc != 0) {
1076                         CERROR("Can't route to "LPX64": router error %d\n",
1077                                nid, rc);
1078                         return (PTL_FAIL);
1079                 }
1080                 if (kqswnal_nid2elanid (targetnid) < 0) {
1081                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
1082                                targetnid, nid);
1083                         return (PTL_FAIL);
1084                 }
1085         }
1086
1087         /* I may not block for a transmit descriptor if I might block the
1088          * receiver, or an interrupt handler. */
1089         ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
1090                                           type == PTL_MSG_REPLY ||
1091                                           in_interrupt()));
1092         if (ktx == NULL) {
1093                 CERROR ("Can't get txd for msg type %d for "LPX64"\n",
1094                         type, libmsg->ev.initiator.nid);
1095                 return (PTL_NO_SPACE);
1096         }
1097
1098         ktx->ktx_state   = KTX_SENDING;
1099         ktx->ktx_nid     = targetnid;
1100         ktx->ktx_args[0] = private;
1101         ktx->ktx_args[1] = libmsg;
1102         ktx->ktx_args[2] = NULL;    /* set when a GET commits to REPLY */
1103
1104         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
1105
1106 #if KQSW_CHECKSUM
1107         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
1108         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
1109         for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
1110                 LASSERT(i < niov);
1111                 if (payload_kiov != NULL) {
1112                         ptl_kiov_t *kiov = &payload_kiov[i];
1113
1114                         if (sumoff >= kiov->kiov_len) {
1115                                 sumoff -= kiov->kiov_len;
1116                         } else {
1117                                 char *addr = ((char *)kmap (kiov->kiov_page)) +
1118                                              kiov->kiov_offset + sumoff;
1119                                 int   fragnob = kiov->kiov_len - sumoff;
1120
1121                                 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1122                                 sumnob -= fragnob;
1123                                 sumoff = 0;
1124                                 kunmap(kiov->kiov_page);
1125                         }
1126                 } else {
1127                         struct iovec *iov = &payload_iov[i];
1128
1129                         if (sumoff > iov->iov_len) {
1130                                 sumoff -= iov->iov_len;
1131                         } else {
1132                                 char *addr = iov->iov_base + sumoff;
1133                                 int   fragnob = iov->iov_len - sumoff;
1134                                 
1135                                 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1136                                 sumnob -= fragnob;
1137                                 sumoff = 0;
1138                         }
1139                 }
1140         }
1141         memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
1142 #endif
1143
1144         /* The first frag will be the pre-mapped buffer for (at least) the
1145          * portals header. */
1146         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1147
1148         if (nid == targetnid &&                 /* not forwarding */
1149             ((type == PTL_MSG_GET &&            /* optimize GET? */
1150               kqswnal_tunables.kqn_optimized_gets != 0 &&
1151               le32_to_cpu(hdr->msg.get.sink_length) >= kqswnal_tunables.kqn_optimized_gets) ||
1152              (type == PTL_MSG_PUT &&            /* optimize PUT? */
1153               kqswnal_tunables.kqn_optimized_puts != 0 &&
1154               payload_nob >= kqswnal_tunables.kqn_optimized_puts))) {
1155                 lib_md_t           *md = libmsg->md;
1156                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
1157                 
1158                 /* Optimised path: I send over the Elan vaddrs of the local
1159                  * buffers, and my peer DMAs directly to/from them.
1160                  *
1161                  * First I set up ktx as if it was going to send this
1162                  * payload, (it needs to map it anyway).  This fills
1163                  * ktx_frags[1] and onward with the network addresses
1164                  * of the GET sink frags.  I copy these into ktx_buffer,
1165                  * immediately after the header, and send that as my
1166                  * message. */
1167
1168                 ktx->ktx_state = (type == PTL_MSG_PUT) ? KTX_PUTTING : KTX_GETTING;
1169
1170                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
1171                         rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
1172                                                   md->md_niov, md->md_iov.kiov);
1173                 else
1174                         rc = kqswnal_map_tx_iov (ktx, 0, md->length,
1175                                                  md->md_niov, md->md_iov.iov);
1176                 if (rc != 0)
1177                         goto out;
1178
1179                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1180
1181                 payload_nob = offsetof(kqswnal_remotemd_t,
1182                                        kqrmd_frag[rmd->kqrmd_nfrag]);
1183                 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1184
1185 #if MULTIRAIL_EKC
1186                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1187                        rmd->kqrmd_nfrag * sizeof(EP_NMD));
1188
1189                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1190                               0, KQSW_HDR_SIZE + payload_nob);
1191 #else
1192                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1193                        rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1194                 
1195                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1196                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1197 #endif
1198                 if (type == PTL_MSG_GET) {
1199                         /* Allocate reply message now while I'm in thread context */
1200                         ktx->ktx_args[2] = lib_create_reply_msg (&kqswnal_lib,
1201                                                                  nid, libmsg);
1202                         if (ktx->ktx_args[2] == NULL)
1203                                 goto out;
1204
1205                         /* NB finalizing the REPLY message is my
1206                          * responsibility now, whatever happens. */
1207                 }
1208                 
1209         } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1210
1211                 /* small message: single frag copied into the pre-mapped buffer */
1212
1213 #if MULTIRAIL_EKC
1214                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1215                               0, KQSW_HDR_SIZE + payload_nob);
1216 #else
1217                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1218                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1219 #endif
1220                 if (payload_nob > 0) {
1221                         if (payload_kiov != NULL)
1222                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1223                                                    payload_niov, payload_kiov, 
1224                                                    payload_offset, payload_nob);
1225                         else
1226                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1227                                                   payload_niov, payload_iov, 
1228                                                   payload_offset, payload_nob);
1229                 }
1230         } else {
1231
1232                 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1233
1234 #if MULTIRAIL_EKC
1235                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1236                               0, KQSW_HDR_SIZE);
1237 #else
1238                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1239                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1240 #endif
1241                 if (payload_kiov != NULL)
1242                         rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
1243                                                   payload_niov, payload_kiov);
1244                 else
1245                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1246                                                  payload_niov, payload_iov);
1247                 if (rc != 0)
1248                         goto out;
1249         }
1250         
1251         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1252                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1253
1254         rc = kqswnal_launch (ktx);
1255
1256  out:
1257         CDEBUG(rc == 0 ? D_NET : D_ERROR, 
1258                "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n", 
1259                rc == 0 ? "Sent" : "Failed to send",
1260                payload_nob, nid, targetnid, rc);
1261
1262         if (rc != 0) {
1263                 if (ktx->ktx_state == KTX_GETTING &&
1264                     ktx->ktx_args[2] != NULL) {
1265                         /* We committed to reply, but there was a problem
1266                          * launching the GET.  We can't avoid delivering a
1267                          * REPLY event since we committed above, so we
1268                          * pretend the GET succeeded but the REPLY
1269                          * failed. */
1270                         rc = 0;
1271                         lib_finalize (&kqswnal_lib, private, libmsg, PTL_OK);
1272                         lib_finalize (&kqswnal_lib, private,
1273                                       (lib_msg_t *)ktx->ktx_args[2], PTL_FAIL);
1274                 }
1275                 
1276                 kqswnal_put_idle_tx (ktx);
1277         }
1278         
1279         atomic_dec(&kqswnal_data.kqn_pending_txs);
1280         return (rc == 0 ? PTL_OK : PTL_FAIL);
1281 }
1282
1283 static ptl_err_t
1284 kqswnal_send (lib_nal_t    *nal,
1285               void         *private,
1286               lib_msg_t    *libmsg,
1287               ptl_hdr_t    *hdr,
1288               int           type,
1289               ptl_nid_t     nid,
1290               ptl_pid_t     pid,
1291               unsigned int  payload_niov,
1292               struct iovec *payload_iov,
1293               size_t        payload_offset,
1294               size_t        payload_nob)
1295 {
1296         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1297                                  payload_niov, payload_iov, NULL, 
1298                                  payload_offset, payload_nob));
1299 }
1300
1301 static ptl_err_t
1302 kqswnal_send_pages (lib_nal_t    *nal,
1303                     void         *private,
1304                     lib_msg_t    *libmsg,
1305                     ptl_hdr_t    *hdr,
1306                     int           type,
1307                     ptl_nid_t     nid,
1308                     ptl_pid_t     pid,
1309                     unsigned int  payload_niov,
1310                     ptl_kiov_t   *payload_kiov,
1311                     size_t        payload_offset,
1312                     size_t        payload_nob)
1313 {
1314         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1315                                  payload_niov, NULL, payload_kiov, 
1316                                  payload_offset, payload_nob));
1317 }
1318
1319 void
1320 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1321 {
1322         int             rc;
1323         kqswnal_tx_t   *ktx;
1324         ptl_kiov_t     *kiov = fwd->kprfd_kiov;
1325         int             niov = fwd->kprfd_niov;
1326         int             nob = fwd->kprfd_nob;
1327         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
1328
1329 #if KQSW_CHECKSUM
1330         CERROR ("checksums for forwarded packets not implemented\n");
1331         LBUG ();
1332 #endif
1333         /* The router wants this NAL to forward a packet */
1334         CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
1335                 fwd, nid, niov, nob);
1336
1337         ktx = kqswnal_get_idle_tx (fwd, 0);
1338         if (ktx == NULL)        /* can't get txd right now */
1339                 return;         /* fwd will be scheduled when tx desc freed */
1340
1341         if (nid == kqswnal_lib.libnal_ni.ni_pid.nid) /* gateway is me */
1342                 nid = fwd->kprfd_target_nid;    /* target is final dest */
1343
1344         /* copy hdr into pre-mapped buffer */
1345         memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
1346
1347         ktx->ktx_port    = (nob <= KQSW_SMALLPAYLOAD) ?
1348                            EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1349         ktx->ktx_nid     = nid;
1350         ktx->ktx_state   = KTX_FORWARDING;
1351         ktx->ktx_args[0] = fwd;
1352         ktx->ktx_nfrag   = ktx->ktx_firsttmpfrag = 1;
1353
1354         if (kqswnal_nid2elanid (nid) < 0) {
1355                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1356                 rc = -EHOSTUNREACH;
1357                 goto out;
1358         }
1359
1360         if (nob <= KQSW_TX_MAXCONTIG) 
1361         {
1362                 /* send payload from ktx's pre-mapped contiguous buffer */
1363 #if MULTIRAIL_EKC
1364                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1365                               0, KQSW_HDR_SIZE + nob);
1366 #else
1367                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1368                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
1369 #endif
1370                 if (nob > 0)
1371                         lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
1372                                           niov, kiov, 0, nob);
1373         }
1374         else
1375         {
1376                 /* zero copy payload */
1377 #if MULTIRAIL_EKC
1378                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1379                               0, KQSW_HDR_SIZE);
1380 #else
1381                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1382                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1383 #endif
1384                 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
1385                 if (rc != 0)
1386                         goto out;
1387         }
1388
1389         rc = kqswnal_launch (ktx);
1390  out:
1391         if (rc != 0) {
1392                 CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1393
1394                 /* complete now (with failure) */
1395                 kqswnal_tx_done (ktx, rc);
1396         }
1397
1398         atomic_dec(&kqswnal_data.kqn_pending_txs);
1399 }
1400
1401 void
1402 kqswnal_fwd_callback (void *arg, int error)
1403 {
1404         kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1405
1406         /* The router has finished forwarding this packet */
1407
1408         if (error != 0)
1409         {
1410                 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1411
1412                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1413                        le64_to_cpu(hdr->src_nid), le64_to_cpu(hdr->dest_nid),error);
1414         }
1415
1416         LASSERT (atomic_read(&krx->krx_refcount) == 1);
1417         kqswnal_rx_decref (krx);
1418 }
1419
1420 void
1421 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1422 {
1423         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1424         LASSERT (!krx->krx_rpc_reply_needed);
1425
1426         krx->krx_state = KRX_POSTED;
1427
1428 #if MULTIRAIL_EKC
1429         if (kqswnal_data.kqn_shuttingdown) {
1430                 /* free EKC rxd on shutdown */
1431                 ep_complete_receive(krx->krx_rxd);
1432         } else {
1433                 /* repost receive */
1434                 ep_requeue_receive(krx->krx_rxd, 
1435                                    kqswnal_rxhandler, krx,
1436                                    &krx->krx_elanbuffer, 0);
1437         }
1438 #else                
1439         if (kqswnal_data.kqn_shuttingdown)
1440                 return;
1441
1442         if (krx->krx_rxd == NULL) {
1443                 /* We had a failed ep_complete_rpc() which nukes the
1444                  * descriptor in "old" EKC */
1445                 int eprc = ep_queue_receive(krx->krx_eprx, 
1446                                             kqswnal_rxhandler, krx,
1447                                             krx->krx_elanbuffer, 
1448                                             krx->krx_npages * PAGE_SIZE, 0);
1449                 LASSERT (eprc == EP_SUCCESS);
1450                 /* We don't handle failure here; it's incredibly rare
1451                  * (never reported?) and only happens with "old" EKC */
1452         } else {
1453                 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1454                                    krx->krx_elanbuffer, 
1455                                    krx->krx_npages * PAGE_SIZE);
1456         }
1457 #endif
1458 }
1459
1460 void
1461 kqswnal_rpc_complete (EP_RXD *rxd)
1462 {
1463         int           status = ep_rxd_status(rxd);
1464         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1465         
1466         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1467                "rxd %p, krx %p, status %d\n", rxd, krx, status);
1468
1469         LASSERT (krx->krx_rxd == rxd);
1470         LASSERT (krx->krx_rpc_reply_needed);
1471         
1472         krx->krx_rpc_reply_needed = 0;
1473         kqswnal_requeue_rx (krx);
1474 }
1475
1476 void
1477 kqswnal_rx_done (kqswnal_rx_t *krx) 
1478 {
1479         int           rc;
1480         EP_STATUSBLK *sblk;
1481
1482         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1483
1484         if (krx->krx_rpc_reply_needed) {
1485                 /* We've not completed the peer's RPC yet... */
1486                 sblk = (krx->krx_rpc_reply_status == 0) ? 
1487                        &kqswnal_data.kqn_rpc_success : 
1488                        &kqswnal_data.kqn_rpc_failed;
1489
1490                 LASSERT (!in_interrupt());
1491 #if MULTIRAIL_EKC
1492                 rc = ep_complete_rpc(krx->krx_rxd, 
1493                                      kqswnal_rpc_complete, krx,
1494                                      sblk, NULL, NULL, 0);
1495                 if (rc == EP_SUCCESS)
1496                         return;
1497 #else
1498                 rc = ep_complete_rpc(krx->krx_rxd, 
1499                                      kqswnal_rpc_complete, krx,
1500                                      sblk, NULL, 0);
1501                 if (rc == EP_SUCCESS)
1502                         return;
1503
1504                 /* "old" EKC destroys rxd on failed completion */
1505                 krx->krx_rxd = NULL;
1506 #endif
1507                 CERROR("can't complete RPC: %d\n", rc);
1508                 krx->krx_rpc_reply_needed = 0;
1509         }
1510
1511         kqswnal_requeue_rx(krx);
1512 }
1513         
1514 void
1515 kqswnal_parse (kqswnal_rx_t *krx)
1516 {
1517         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
1518         ptl_nid_t       dest_nid = le64_to_cpu(hdr->dest_nid);
1519         int             payload_nob;
1520         int             nob;
1521         int             niov;
1522
1523         LASSERT (atomic_read(&krx->krx_refcount) == 1);
1524
1525         if (dest_nid == kqswnal_lib.libnal_ni.ni_pid.nid) { /* It's for me :) */
1526                 /* I ignore parse errors since I'm not consuming a byte
1527                  * stream */
1528                 (void)lib_parse (&kqswnal_lib, hdr, krx);
1529
1530                 /* Drop my ref; any RDMA activity takes an additional ref */
1531                 kqswnal_rx_decref(krx);
1532                 return;
1533         }
1534
1535 #if KQSW_CHECKSUM
1536         LASSERTF (0, "checksums for forwarded packets not implemented\n");
1537 #endif
1538
1539         if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
1540         {
1541                 CERROR("dropping packet from "LPX64" for "LPX64
1542                        ": target is peer\n", le64_to_cpu(hdr->src_nid), dest_nid);
1543
1544                 kqswnal_rx_decref (krx);
1545                 return;
1546         }
1547
1548         nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
1549         niov = 0;
1550         if (nob > 0) {
1551                 krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
1552                 krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
1553                 niov = 1;
1554                 nob -= PAGE_SIZE - KQSW_HDR_SIZE;
1555                 
1556                 while (nob > 0) {
1557                         LASSERT (niov < krx->krx_npages);
1558                         
1559                         krx->krx_kiov[niov].kiov_offset = 0;
1560                         krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
1561                         niov++;
1562                         nob -= PAGE_SIZE;
1563                 }
1564         }
1565
1566         kpr_fwd_init (&krx->krx_fwd, dest_nid, 
1567                       hdr, payload_nob, niov, krx->krx_kiov,
1568                       kqswnal_fwd_callback, krx);
1569
1570         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1571 }
1572
1573 /* Receive Interrupt Handler: posts to schedulers */
1574 void 
1575 kqswnal_rxhandler(EP_RXD *rxd)
1576 {
1577         unsigned long flags;
1578         int           nob    = ep_rxd_len (rxd);
1579         int           status = ep_rxd_status (rxd);
1580         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1581
1582         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1583                rxd, krx, nob, status);
1584
1585         LASSERT (krx != NULL);
1586         LASSERT (krx->krx_state == KRX_POSTED);
1587         
1588         krx->krx_state = KRX_PARSE;
1589         krx->krx_rxd = rxd;
1590         krx->krx_nob = nob;
1591
1592         /* RPC reply iff rpc request received without error */
1593         krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
1594                                     (status == EP_SUCCESS ||
1595                                      status == EP_MSG_TOO_BIG);
1596
1597         /* Default to failure if an RPC reply is requested but not handled */
1598         krx->krx_rpc_reply_status = -EPROTO;
1599         atomic_set (&krx->krx_refcount, 1);
1600
1601         /* must receive a whole header to be able to parse */
1602         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1603         {
1604                 /* receives complete with failure when receiver is removed */
1605 #if MULTIRAIL_EKC
1606                 if (status == EP_SHUTDOWN)
1607                         LASSERT (kqswnal_data.kqn_shuttingdown);
1608                 else
1609                         CERROR("receive status failed with status %d nob %d\n",
1610                                ep_rxd_status(rxd), nob);
1611 #else
1612                 if (!kqswnal_data.kqn_shuttingdown)
1613                         CERROR("receive status failed with status %d nob %d\n",
1614                                ep_rxd_status(rxd), nob);
1615 #endif
1616                 kqswnal_rx_decref(krx);
1617                 return;
1618         }
1619
1620         if (!in_interrupt()) {
1621                 kqswnal_parse(krx);
1622                 return;
1623         }
1624
1625         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1626
1627         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1628         wake_up (&kqswnal_data.kqn_sched_waitq);
1629
1630         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1631 }
1632
1633 #if KQSW_CHECKSUM
1634 void
1635 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1636 {
1637         ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1638
1639         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1640                 ", dpid %d, spid %d, type %d\n",
1641                 ishdr ? "Header" : "Payload", krx,
1642                 le64_to_cpu(hdr->dest_nid), le64_to_cpu(hdr->src_nid)
1643                 le32_to_cpu(hdr->dest_pid), le32_to_cpu(hdr->src_pid),
1644                 le32_to_cpu(hdr->type));
1645
1646         switch (le32_to_cpu(hdr->type))
1647         {
1648         case PTL_MSG_ACK:
1649                 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1650                        " len %u\n",
1651                        le32_to_cpu(hdr->msg.ack.mlength),
1652                        hdr->msg.ack.dst_wmd.handle_cookie,
1653                        hdr->msg.ack.dst_wmd.handle_idx,
1654                        le64_to_cpu(hdr->msg.ack.match_bits),
1655                        le32_to_cpu(hdr->msg.ack.length));
1656                 break;
1657         case PTL_MSG_PUT:
1658                 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1659                        " len %u off %u data "LPX64"\n",
1660                        le32_to_cpu(hdr->msg.put.ptl_index),
1661                        hdr->msg.put.ack_wmd.handle_cookie,
1662                        hdr->msg.put.ack_wmd.handle_idx,
1663                        le64_to_cpu(hdr->msg.put.match_bits),
1664                        le32_to_cpu(hdr->msg.put.length),
1665                        le32_to_cpu(hdr->msg.put.offset),
1666                        hdr->msg.put.hdr_data);
1667                 break;
1668         case PTL_MSG_GET:
1669                 CERROR ("GET: <>\n");
1670                 break;
1671         case PTL_MSG_REPLY:
1672                 CERROR ("REPLY: <>\n");
1673                 break;
1674         default:
1675                 CERROR ("TYPE?: <>\n");
1676         }
1677 }
1678 #endif
1679
1680 static ptl_err_t
1681 kqswnal_recvmsg (lib_nal_t    *nal,
1682                  void         *private,
1683                  lib_msg_t    *libmsg,
1684                  unsigned int  niov,
1685                  struct iovec *iov,
1686                  ptl_kiov_t   *kiov,
1687                  size_t        offset,
1688                  size_t        mlen,
1689                  size_t        rlen)
1690 {
1691         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1692         char         *buffer = page_address(krx->krx_kiov[0].kiov_page);
1693         ptl_hdr_t    *hdr = (ptl_hdr_t *)buffer;
1694         int           page;
1695         char         *page_ptr;
1696         int           page_nob;
1697         char         *iov_ptr;
1698         int           iov_nob;
1699         int           frag;
1700         int           rc;
1701 #if KQSW_CHECKSUM
1702         kqsw_csum_t   senders_csum;
1703         kqsw_csum_t   payload_csum = 0;
1704         kqsw_csum_t   hdr_csum = kqsw_csum(0, hdr, sizeof(*hdr));
1705         size_t        csum_len = mlen;
1706         int           csum_frags = 0;
1707         int           csum_nob = 0;
1708         static atomic_t csum_counter;
1709         int           csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1710
1711         atomic_inc (&csum_counter);
1712
1713         memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1714         if (senders_csum != hdr_csum)
1715                 kqswnal_csum_error (krx, 1);
1716 #endif
1717         /* NB lib_parse() has already flipped *hdr */
1718
1719         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1720
1721         if (libmsg == NULL) {                   /* portals is discarding. */
1722                 LASSERT (mlen == 0);
1723                 return PTL_OK;                  /* ignored by caller! */
1724         }
1725         
1726         if (krx->krx_rpc_reply_needed &&
1727             hdr->type == PTL_MSG_PUT) {
1728                 /* This must be an optimized PUT */
1729                 rc = kqswnal_rdma (krx, libmsg, PTL_MSG_PUT,
1730                                    niov, iov, kiov, offset, mlen);
1731                 return (rc == 0 ? PTL_OK : PTL_FAIL);
1732         }
1733
1734         /* What was actually received must be >= payload. */
1735         LASSERT (mlen <= rlen);
1736         if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
1737                 CERROR("Bad message size: have %d, need %d + %d\n",
1738                        krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
1739                 return (PTL_FAIL);
1740         }
1741
1742         /* It must be OK to kmap() if required */
1743         LASSERT (kiov == NULL || !in_interrupt ());
1744         /* Either all pages or all vaddrs */
1745         LASSERT (!(kiov != NULL && iov != NULL));
1746
1747         if (mlen != 0) {
1748                 page     = 0;
1749                 page_ptr = buffer + KQSW_HDR_SIZE;
1750                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1751
1752                 LASSERT (niov > 0);
1753
1754                 if (kiov != NULL) {
1755                         /* skip complete frags */
1756                         while (offset >= kiov->kiov_len) {
1757                                 offset -= kiov->kiov_len;
1758                                 kiov++;
1759                                 niov--;
1760                                 LASSERT (niov > 0);
1761                         }
1762                         iov_ptr = ((char *)kmap (kiov->kiov_page)) +
1763                                 kiov->kiov_offset + offset;
1764                         iov_nob = kiov->kiov_len - offset;
1765                 } else {
1766                         /* skip complete frags */
1767                         while (offset >= iov->iov_len) {
1768                                 offset -= iov->iov_len;
1769                                 iov++;
1770                                 niov--;
1771                                 LASSERT (niov > 0);
1772                         }
1773                         iov_ptr = iov->iov_base + offset;
1774                         iov_nob = iov->iov_len - offset;
1775                 }
1776                 
1777                 for (;;)
1778                 {
1779                         frag = mlen;
1780                         if (frag > page_nob)
1781                                 frag = page_nob;
1782                         if (frag > iov_nob)
1783                                 frag = iov_nob;
1784
1785                         memcpy (iov_ptr, page_ptr, frag);
1786 #if KQSW_CHECKSUM
1787                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1788                         csum_nob += frag;
1789                         csum_frags++;
1790 #endif
1791                         mlen -= frag;
1792                         if (mlen == 0)
1793                                 break;
1794
1795                         page_nob -= frag;
1796                         if (page_nob != 0)
1797                                 page_ptr += frag;
1798                         else
1799                         {
1800                                 page++;
1801                                 LASSERT (page < krx->krx_npages);
1802                                 page_ptr = page_address(krx->krx_kiov[page].kiov_page);
1803                                 page_nob = PAGE_SIZE;
1804                         }
1805
1806                         iov_nob -= frag;
1807                         if (iov_nob != 0)
1808                                 iov_ptr += frag;
1809                         else if (kiov != NULL) {
1810                                 kunmap (kiov->kiov_page);
1811                                 kiov++;
1812                                 niov--;
1813                                 LASSERT (niov > 0);
1814                                 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1815                                 iov_nob = kiov->kiov_len;
1816                         } else {
1817                                 iov++;
1818                                 niov--;
1819                                 LASSERT (niov > 0);
1820                                 iov_ptr = iov->iov_base;
1821                                 iov_nob = iov->iov_len;
1822                         }
1823                 }
1824
1825                 if (kiov != NULL)
1826                         kunmap (kiov->kiov_page);
1827         }
1828
1829 #if KQSW_CHECKSUM
1830         memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), 
1831                 sizeof(kqsw_csum_t));
1832
1833         if (csum_len != rlen)
1834                 CERROR("Unable to checksum data in user's buffer\n");
1835         else if (senders_csum != payload_csum)
1836                 kqswnal_csum_error (krx, 0);
1837
1838         if (csum_verbose)
1839                 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1840                        "csum_nob %d\n",
1841                         hdr_csum, payload_csum, csum_frags, csum_nob);
1842 #endif
1843         lib_finalize(nal, private, libmsg, PTL_OK);
1844
1845         return (PTL_OK);
1846 }
1847
1848 static ptl_err_t
1849 kqswnal_recv(lib_nal_t    *nal,
1850              void         *private,
1851              lib_msg_t    *libmsg,
1852              unsigned int  niov,
1853              struct iovec *iov,
1854              size_t        offset,
1855              size_t        mlen,
1856              size_t        rlen)
1857 {
1858         return (kqswnal_recvmsg(nal, private, libmsg, 
1859                                 niov, iov, NULL, 
1860                                 offset, mlen, rlen));
1861 }
1862
1863 static ptl_err_t
1864 kqswnal_recv_pages (lib_nal_t    *nal,
1865                     void         *private,
1866                     lib_msg_t    *libmsg,
1867                     unsigned int  niov,
1868                     ptl_kiov_t   *kiov,
1869                     size_t        offset,
1870                     size_t        mlen,
1871                     size_t        rlen)
1872 {
1873         return (kqswnal_recvmsg(nal, private, libmsg, 
1874                                 niov, NULL, kiov, 
1875                                 offset, mlen, rlen));
1876 }
1877
1878 int
1879 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1880 {
1881         long    pid = kernel_thread (fn, arg, 0);
1882
1883         if (pid < 0)
1884                 return ((int)pid);
1885
1886         atomic_inc (&kqswnal_data.kqn_nthreads);
1887         return (0);
1888 }
1889
1890 void
1891 kqswnal_thread_fini (void)
1892 {
1893         atomic_dec (&kqswnal_data.kqn_nthreads);
1894 }
1895
1896 int
1897 kqswnal_scheduler (void *arg)
1898 {
1899         kqswnal_rx_t    *krx;
1900         kqswnal_tx_t    *ktx;
1901         kpr_fwd_desc_t  *fwd;
1902         unsigned long    flags;
1903         int              rc;
1904         int              counter = 0;
1905         int              did_something;
1906
1907         kportal_daemonize ("kqswnal_sched");
1908         kportal_blockallsigs ();
1909         
1910         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1911
1912         for (;;)
1913         {
1914                 did_something = 0;
1915
1916                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1917                 {
1918                         krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1919                                          kqswnal_rx_t, krx_list);
1920                         list_del (&krx->krx_list);
1921                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1922                                                flags);
1923
1924                         switch (krx->krx_state) {
1925                         case KRX_PARSE:
1926                                 kqswnal_parse (krx);
1927                                 break;
1928                         case KRX_COMPLETING:
1929                                 kqswnal_rx_decref (krx);
1930                                 break;
1931                         default:
1932                                 LBUG();
1933                         }
1934
1935                         did_something = 1;
1936                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1937                 }
1938
1939                 if (!list_empty (&kqswnal_data.kqn_delayedtxds))
1940                 {
1941                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1942                                          kqswnal_tx_t, ktx_delayed_list);
1943                         list_del_init (&ktx->ktx_delayed_list);
1944                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1945                                                flags);
1946
1947                         rc = kqswnal_launch (ktx);
1948                         if (rc != 0) {
1949                                 CERROR("Failed delayed transmit to "LPX64
1950                                        ": %d\n", ktx->ktx_nid, rc);
1951                                 kqswnal_tx_done (ktx, rc);
1952                         }
1953                         atomic_dec (&kqswnal_data.kqn_pending_txs);
1954
1955                         did_something = 1;
1956                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1957                 }
1958
1959                 if (!list_empty (&kqswnal_data.kqn_delayedfwds))
1960                 {
1961                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1962                         list_del (&fwd->kprfd_list);
1963                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1964
1965                         /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */
1966                         kqswnal_fwd_packet (NULL, fwd);
1967
1968                         did_something = 1;
1969                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1970                 }
1971
1972                 /* nothing to do or hogging CPU */
1973                 if (!did_something || counter++ == KQSW_RESCHED) {
1974                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1975                                                flags);
1976
1977                         counter = 0;
1978
1979                         if (!did_something) {
1980                                 if (kqswnal_data.kqn_shuttingdown == 2) {
1981                                         /* We only exit in stage 2 of shutdown when 
1982                                          * there's nothing left to do */
1983                                         break;
1984                                 }
1985                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1986                                                                kqswnal_data.kqn_shuttingdown == 2 ||
1987                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
1988                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1989                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
1990                                 LASSERT (rc == 0);
1991                         } else if (need_resched())
1992                                 schedule ();
1993
1994                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1995                 }
1996         }
1997
1998         kqswnal_thread_fini ();
1999         return (0);
2000 }
2001
2002 lib_nal_t kqswnal_lib =
2003 {
2004         libnal_data:       &kqswnal_data,         /* NAL private data */
2005         libnal_send:        kqswnal_send,
2006         libnal_send_pages:  kqswnal_send_pages,
2007         libnal_recv:        kqswnal_recv,
2008         libnal_recv_pages:  kqswnal_recv_pages,
2009         libnal_dist:        kqswnal_dist
2010 };