Whamcloud - gitweb
96749cd7e0ab7205db6522bcf77d2c95e762fa17
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8  * W. Marcus Miller - Based on ksocknal
9  *
10  * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  * Portals is free software; you can redistribute it and/or
13  * modify it under the terms of version 2 of the GNU General Public
14  * License as published by the Free Software Foundation.
15  *
16  * Portals is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with Portals; if not, write to the Free Software
23  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  */
26
27 #include "qswnal.h"
28
29 EP_STATUSBLK  kqswnal_rpc_success;
30 EP_STATUSBLK  kqswnal_rpc_failed;
31
32 /*
33  *  LIB functions follow
34  *
35  */
36 static int
37 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
38              size_t len)
39 {
40         CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
41                 nal->ni.nid, len, src_addr, dst_addr );
42         memcpy( dst_addr, src_addr, len );
43
44         return (0);
45 }
46
47 static int
48 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
49               size_t len)
50 {
51         CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
52                 nal->ni.nid, len, src_addr, dst_addr );
53         memcpy( dst_addr, src_addr, len );
54
55         return (0);
56 }
57
58 static void *
59 kqswnal_malloc(nal_cb_t *nal, size_t len)
60 {
61         void *buf;
62
63         PORTAL_ALLOC(buf, len);
64         return (buf);
65 }
66
67 static void
68 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
69 {
70         PORTAL_FREE(buf, len);
71 }
72
73 static void
74 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
75 {
76         va_list ap;
77         char msg[256];
78
79         va_start (ap, fmt);
80         vsnprintf (msg, sizeof (msg), fmt, ap);        /* sprint safely */
81         va_end (ap);
82
83         msg[sizeof (msg) - 1] = 0;                /* ensure terminated */
84
85         CDEBUG (D_NET, "%s", msg);
86 }
87
88
89 static void
90 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
91 {
92         kqswnal_data_t *data= nal->nal_data;
93
94         spin_lock_irqsave(&data->kqn_statelock, *flags);
95 }
96
97
98 static void
99 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
100 {
101         kqswnal_data_t *data= nal->nal_data;
102
103         spin_unlock_irqrestore(&data->kqn_statelock, *flags);
104 }
105
106
107 static int
108 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
109 {
110         if (nid == nal->ni.nid)
111                 *dist = 0;                      /* it's me */
112         else if (kqswnal_nid2elanid (nid) >= 0)
113                 *dist = 1;                      /* it's my peer */
114         else
115                 *dist = 2;                      /* via router */
116         return (0);
117 }
118
119 void
120 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
121 {
122         struct timeval     now;
123         time_t             then;
124
125         do_gettimeofday (&now);
126         then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
127
128         kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
129 }
130
131 void
132 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
133 {
134 #if MULTIRAIL_EKC
135         int      i;
136 #endif
137
138         if (ktx->ktx_nmappedpages == 0)
139                 return;
140         
141 #if MULTIRAIL_EKC
142         CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
143                ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
144
145         for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
146                 ep_dvma_unload(kqswnal_data.kqn_ep,
147                                kqswnal_data.kqn_ep_tx_nmh,
148                                &ktx->ktx_frags[i]);
149 #else
150         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
151                 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
152
153         LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
154         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
155                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
156
157         elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
158                           kqswnal_data.kqn_eptxdmahandle,
159                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
160
161 #endif
162         ktx->ktx_nmappedpages = 0;
163 }
164
165 int
166 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
167 {
168         int       nfrags    = ktx->ktx_nfrag;
169         int       nmapped   = ktx->ktx_nmappedpages;
170         int       maxmapped = ktx->ktx_npages;
171         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
172         char     *ptr;
173 #if MULTIRAIL_EKC
174         EP_RAILMASK railmask;
175         int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
176                                             EP_RAILMASK_ALL,
177                                             kqswnal_nid2elanid(ktx->ktx_nid));
178         
179         if (rail < 0) {
180                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
181                 return (-ENETDOWN);
182         }
183         railmask = 1 << rail;
184 #endif
185         LASSERT (nmapped <= maxmapped);
186         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
187         LASSERT (nfrags <= EP_MAXFRAG);
188         LASSERT (niov > 0);
189         LASSERT (nob > 0);
190
191         do {
192                 int  fraglen = kiov->kiov_len;
193
194                 /* nob exactly spans the iovs */
195                 LASSERT (fraglen <= nob);
196                 /* each frag fits in a page */
197                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
198
199                 nmapped++;
200                 if (nmapped > maxmapped) {
201                         CERROR("Can't map message in %d pages (max %d)\n",
202                                nmapped, maxmapped);
203                         return (-EMSGSIZE);
204                 }
205
206                 if (nfrags == EP_MAXFRAG) {
207                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
208                                EP_MAXFRAG);
209                         return (-EMSGSIZE);
210                 }
211
212                 /* XXX this is really crap, but we'll have to kmap until
213                  * EKC has a page (rather than vaddr) mapping interface */
214
215                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
216
217                 CDEBUG(D_NET,
218                        "%p[%d] loading %p for %d, page %d, %d total\n",
219                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
220
221 #if MULTIRAIL_EKC
222                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
223                              ptr, fraglen,
224                              kqswnal_data.kqn_ep_tx_nmh, basepage,
225                              &railmask, &ktx->ktx_frags[nfrags]);
226
227                 if (nfrags == ktx->ktx_firsttmpfrag ||
228                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
229                                   &ktx->ktx_frags[nfrags - 1],
230                                   &ktx->ktx_frags[nfrags])) {
231                         /* new frag if this is the first or can't merge */
232                         nfrags++;
233                 }
234 #else
235                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
236                                        kqswnal_data.kqn_eptxdmahandle,
237                                        ptr, fraglen,
238                                        basepage, &ktx->ktx_frags[nfrags].Base);
239
240                 if (nfrags > 0 &&                /* previous frag mapped */
241                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
242                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
243                         /* just extend previous */
244                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
245                 else {
246                         ktx->ktx_frags[nfrags].Len = fraglen;
247                         nfrags++;                /* new frag */
248                 }
249 #endif
250
251                 kunmap (kiov->kiov_page);
252                 
253                 /* keep in loop for failure case */
254                 ktx->ktx_nmappedpages = nmapped;
255
256                 basepage++;
257                 kiov++;
258                 niov--;
259                 nob -= fraglen;
260
261                 /* iov must not run out before end of data */
262                 LASSERT (nob == 0 || niov > 0);
263
264         } while (nob > 0);
265
266         ktx->ktx_nfrag = nfrags;
267         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
268                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
269
270         return (0);
271 }
272
273 int
274 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
275 {
276         int       nfrags    = ktx->ktx_nfrag;
277         int       nmapped   = ktx->ktx_nmappedpages;
278         int       maxmapped = ktx->ktx_npages;
279         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
280 #if MULTIRAIL_EKC
281         EP_RAILMASK railmask;
282         int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
283                                             EP_RAILMASK_ALL,
284                                             kqswnal_nid2elanid(ktx->ktx_nid));
285         
286         if (rail < 0) {
287                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
288                 return (-ENETDOWN);
289         }
290         railmask = 1 << rail;
291 #endif
292         LASSERT (nmapped <= maxmapped);
293         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
294         LASSERT (nfrags <= EP_MAXFRAG);
295         LASSERT (niov > 0);
296         LASSERT (nob > 0);
297
298         do {
299                 int  fraglen = iov->iov_len;
300                 long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
301
302                 /* nob exactly spans the iovs */
303                 LASSERT (fraglen <= nob);
304                 
305                 nmapped += npages;
306                 if (nmapped > maxmapped) {
307                         CERROR("Can't map message in %d pages (max %d)\n",
308                                nmapped, maxmapped);
309                         return (-EMSGSIZE);
310                 }
311
312                 if (nfrags == EP_MAXFRAG) {
313                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
314                                EP_MAXFRAG);
315                         return (-EMSGSIZE);
316                 }
317
318                 CDEBUG(D_NET,
319                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
320                         ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
321                         nmapped);
322
323 #if MULTIRAIL_EKC
324                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
325                              iov->iov_base, fraglen,
326                              kqswnal_data.kqn_ep_tx_nmh, basepage,
327                              &railmask, &ktx->ktx_frags[nfrags]);
328
329                 if (nfrags == ktx->ktx_firsttmpfrag ||
330                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
331                                   &ktx->ktx_frags[nfrags - 1],
332                                   &ktx->ktx_frags[nfrags])) {
333                         /* new frag if this is the first or can't merge */
334                         nfrags++;
335                 }
336 #else
337                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
338                                        kqswnal_data.kqn_eptxdmahandle,
339                                        iov->iov_base, fraglen,
340                                        basepage, &ktx->ktx_frags[nfrags].Base);
341
342                 if (nfrags > 0 &&                /* previous frag mapped */
343                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
344                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
345                         /* just extend previous */
346                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
347                 else {
348                         ktx->ktx_frags[nfrags].Len = fraglen;
349                         nfrags++;                /* new frag */
350                 }
351 #endif
352
353                 /* keep in loop for failure case */
354                 ktx->ktx_nmappedpages = nmapped;
355
356                 basepage += npages;
357                 iov++;
358                 niov--;
359                 nob -= fraglen;
360
361                 /* iov must not run out before end of data */
362                 LASSERT (nob == 0 || niov > 0);
363
364         } while (nob > 0);
365
366         ktx->ktx_nfrag = nfrags;
367         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
368                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
369
370         return (0);
371 }
372
373
374 void
375 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
376 {
377         kpr_fwd_desc_t   *fwd = NULL;
378         unsigned long     flags;
379
380         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
381         ktx->ktx_state = KTX_IDLE;
382
383         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
384
385         list_del (&ktx->ktx_list);              /* take off active list */
386
387         if (ktx->ktx_isnblk) {
388                 /* reserved for non-blocking tx */
389                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
390                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
391                 return;
392         }
393
394         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
395
396         /* anything blocking for a tx descriptor? */
397         if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
398         {
399                 CDEBUG(D_NET,"wakeup fwd\n");
400
401                 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
402                                   kpr_fwd_desc_t, kprfd_list);
403                 list_del (&fwd->kprfd_list);
404         }
405
406         wake_up (&kqswnal_data.kqn_idletxd_waitq);
407
408         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
409
410         if (fwd == NULL)
411                 return;
412
413         /* schedule packet for forwarding again */
414         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
415
416         list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
417         wake_up (&kqswnal_data.kqn_sched_waitq);
418
419         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
420 }
421
422 kqswnal_tx_t *
423 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
424 {
425         unsigned long  flags;
426         kqswnal_tx_t  *ktx = NULL;
427
428         for (;;) {
429                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
430
431                 /* "normal" descriptor is free */
432                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
433                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
434                                           kqswnal_tx_t, ktx_list);
435                         break;
436                 }
437
438                 /* "normal" descriptor pool is empty */
439
440                 if (fwd != NULL) { /* forwarded packet => queue for idle txd */
441                         CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
442                         list_add_tail (&fwd->kprfd_list,
443                                        &kqswnal_data.kqn_idletxd_fwdq);
444                         break;
445                 }
446
447                 /* doing a local transmit */
448                 if (!may_block) {
449                         if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
450                                 CERROR ("intr tx desc pool exhausted\n");
451                                 break;
452                         }
453
454                         ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
455                                           kqswnal_tx_t, ktx_list);
456                         break;
457                 }
458
459                 /* block for idle tx */
460
461                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
462
463                 CDEBUG (D_NET, "blocking for tx desc\n");
464                 wait_event (kqswnal_data.kqn_idletxd_waitq,
465                             !list_empty (&kqswnal_data.kqn_idletxds));
466         }
467
468         if (ktx != NULL) {
469                 list_del (&ktx->ktx_list);
470                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
471                 ktx->ktx_launcher = current->pid;
472         }
473
474         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
475
476         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
477         LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
478
479         return (ktx);
480 }
481
482 void
483 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
484 {
485         lib_msg_t     *msg;
486         lib_msg_t     *repmsg;
487
488         switch (ktx->ktx_state) {
489         case KTX_FORWARDING:       /* router asked me to forward this packet */
490                 kpr_fwd_done (&kqswnal_data.kqn_router,
491                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
492                 break;
493
494         case KTX_SENDING:          /* packet sourced locally */
495                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
496                               (lib_msg_t *)ktx->ktx_args[1]);
497                 break;
498
499         case KTX_GETTING:          /* Peer has DMA-ed direct? */
500                 msg = (lib_msg_t *)ktx->ktx_args[1];
501                 repmsg = NULL;
502
503                 if (error == 0) 
504                         repmsg = lib_fake_reply_msg (&kqswnal_lib, 
505                                                      ktx->ktx_nid, msg->md);
506                 
507                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg);
508
509                 if (repmsg != NULL) 
510                         lib_finalize (&kqswnal_lib, NULL, repmsg);
511                 break;
512
513         default:
514                 LASSERT (0);
515         }
516
517         kqswnal_put_idle_tx (ktx);
518 }
519
520 static void
521 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
522 {
523         kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
524
525         LASSERT (txd != NULL);
526         LASSERT (ktx != NULL);
527
528         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
529
530         if (status != EP_SUCCESS) {
531
532                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
533                         ktx->ktx_nid, status);
534
535                 kqswnal_notify_peer_down(ktx);
536                 status = -EIO;
537
538         } else if (ktx->ktx_state == KTX_GETTING) {
539                 /* RPC completed OK; what did our peer put in the status
540                  * block? */
541 #if MULTIRAIL_EKC
542                 status = ep_txd_statusblk(txd)->Data[0];
543 #else
544                 status = ep_txd_statusblk(txd)->Status;
545 #endif
546         } else {
547                 status = 0;
548         }
549
550         kqswnal_tx_done (ktx, status);
551 }
552
553 int
554 kqswnal_launch (kqswnal_tx_t *ktx)
555 {
556         /* Don't block for transmit descriptor if we're in interrupt context */
557         int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
558         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
559         long  flags;
560         int   rc;
561
562         ktx->ktx_launchtime = jiffies;
563
564         LASSERT (dest >= 0);                    /* must be a peer */
565         if (ktx->ktx_state == KTX_GETTING) {
566                 /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
567                  * other frags are the GET sink which we obviously don't
568                  * send here :) */
569 #if MULTIRAIL_EKC
570                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
571                                      ktx->ktx_port, attr,
572                                      kqswnal_txhandler, ktx,
573                                      NULL, ktx->ktx_frags, 1);
574 #else
575                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
576                                      ktx->ktx_port, attr, kqswnal_txhandler,
577                                      ktx, NULL, ktx->ktx_frags, 1);
578 #endif
579         } else {
580 #if MULTIRAIL_EKC
581                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
582                                          ktx->ktx_port, attr,
583                                          kqswnal_txhandler, ktx,
584                                          NULL, ktx->ktx_frags, ktx->ktx_nfrag);
585 #else
586                 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
587                                        ktx->ktx_port, attr, 
588                                        kqswnal_txhandler, ktx, 
589                                        ktx->ktx_frags, ktx->ktx_nfrag);
590 #endif
591         }
592
593         switch (rc) {
594         case EP_SUCCESS: /* success */
595                 return (0);
596
597         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
598                 LASSERT (in_interrupt());
599
600                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
601
602                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
603                 wake_up (&kqswnal_data.kqn_sched_waitq);
604
605                 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
606                 return (0);
607
608         default: /* fatal error */
609                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
610                 kqswnal_notify_peer_down(ktx);
611                 return (-EHOSTUNREACH);
612         }
613 }
614
615 static char *
616 hdr_type_string (ptl_hdr_t *hdr)
617 {
618         switch (hdr->type) {
619         case PTL_MSG_ACK:
620                 return ("ACK");
621         case PTL_MSG_PUT:
622                 return ("PUT");
623         case PTL_MSG_GET:
624                 return ("GET");
625         case PTL_MSG_REPLY:
626                 return ("REPLY");
627         default:
628                 return ("<UNKNOWN>");
629         }
630 }
631
632 static void
633 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
634 {
635         char *type_str = hdr_type_string (hdr);
636
637         CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
638                NTOH__u32(hdr->payload_length));
639         CERROR("    From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
640                NTOH__u32(hdr->src_pid));
641         CERROR("    To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
642                NTOH__u32(hdr->dest_pid));
643
644         switch (NTOH__u32(hdr->type)) {
645         case PTL_MSG_PUT:
646                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
647                        "match bits "LPX64"\n",
648                        NTOH__u32 (hdr->msg.put.ptl_index),
649                        hdr->msg.put.ack_wmd.wh_interface_cookie,
650                        hdr->msg.put.ack_wmd.wh_object_cookie,
651                        NTOH__u64 (hdr->msg.put.match_bits));
652                 CERROR("    offset %d, hdr data "LPX64"\n",
653                        NTOH__u32(hdr->msg.put.offset),
654                        hdr->msg.put.hdr_data);
655                 break;
656
657         case PTL_MSG_GET:
658                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
659                        "match bits "LPX64"\n",
660                        NTOH__u32 (hdr->msg.get.ptl_index),
661                        hdr->msg.get.return_wmd.wh_interface_cookie,
662                        hdr->msg.get.return_wmd.wh_object_cookie,
663                        hdr->msg.get.match_bits);
664                 CERROR("    Length %d, src offset %d\n",
665                        NTOH__u32 (hdr->msg.get.sink_length),
666                        NTOH__u32 (hdr->msg.get.src_offset));
667                 break;
668
669         case PTL_MSG_ACK:
670                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
671                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
672                        hdr->msg.ack.dst_wmd.wh_object_cookie,
673                        NTOH__u32 (hdr->msg.ack.mlength));
674                 break;
675
676         case PTL_MSG_REPLY:
677                 CERROR("    dst md "LPX64"."LPX64"\n",
678                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
679                        hdr->msg.reply.dst_wmd.wh_object_cookie);
680         }
681
682 }                               /* end of print_hdr() */
683
684 #if !MULTIRAIL_EKC
685 void
686 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) 
687 {
688         int          i;
689
690         CDEBUG (how, "%s: %d\n", str, n);
691         for (i = 0; i < n; i++) {
692                 CDEBUG (how, "   %08x for %d\n", iov[i].Base, iov[i].Len);
693         }
694 }
695
696 int
697 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
698                      int nsrc, EP_IOVEC *src,
699                      int ndst, EP_IOVEC *dst) 
700 {
701         int        count;
702         int        nob;
703
704         LASSERT (ndv > 0);
705         LASSERT (nsrc > 0);
706         LASSERT (ndst > 0);
707
708         for (count = 0; count < ndv; count++, dv++) {
709
710                 if (nsrc == 0 || ndst == 0) {
711                         if (nsrc != ndst) {
712                                 /* For now I'll barf on any left over entries */
713                                 CERROR ("mismatched src and dst iovs\n");
714                                 return (-EINVAL);
715                         }
716                         return (count);
717                 }
718
719                 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
720                 dv->Len    = nob;
721                 dv->Source = src->Base;
722                 dv->Dest   = dst->Base;
723
724                 if (nob >= src->Len) {
725                         src++;
726                         nsrc--;
727                 } else {
728                         src->Len -= nob;
729                         src->Base += nob;
730                 }
731                 
732                 if (nob >= dst->Len) {
733                         dst++;
734                         ndst--;
735                 } else {
736                         src->Len -= nob;
737                         src->Base += nob;
738                 }
739         }
740
741         CERROR ("DATAVEC too small\n");
742         return (-E2BIG);
743 }
744 #endif
745
746 int
747 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
748                    struct iovec *iov, ptl_kiov_t *kiov, int nob)
749 {
750         kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
751         char               *buffer = (char *)page_address(krx->krx_pages[0]);
752         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
753         int                 rc;
754 #if MULTIRAIL_EKC
755         int                 i;
756 #else
757         EP_DATAVEC          datav[EP_MAXFRAG];
758         int                 ndatav;
759 #endif
760         LASSERT (krx->krx_rpc_reply_needed);
761         LASSERT ((iov == NULL) != (kiov == NULL));
762
763         /* see kqswnal_sendmsg comment regarding endian-ness */
764         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
765                 /* msg too small to discover rmd size */
766                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
767                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
768                 return (-EINVAL);
769         }
770         
771         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
772                 /* rmd doesn't fit in the incoming message */
773                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
774                         krx->krx_nob, rmd->kqrmd_nfrag,
775                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
776                 return (-EINVAL);
777         }
778
779         /* Map the source data... */
780         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
781         if (kiov != NULL)
782                 rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov);
783         else
784                 rc = kqswnal_map_tx_iov (ktx, nob, nfrag, iov);
785
786         if (rc != 0) {
787                 CERROR ("Can't map source data: %d\n", rc);
788                 return (rc);
789         }
790
791 #if MULTIRAIL_EKC
792         if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
793                 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
794                        ktx->ktx_nfrag, rmd->kqrmd_nfrag);
795                 return (-EINVAL);
796         }
797         
798         for (i = 0; i < rmd->kqrmd_nfrag; i++)
799                 if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
800                         CERROR("Can't cope with unequal frags %d(%d):"
801                                " %d local %d remote\n",
802                                i, rmd->kqrmd_nfrag, 
803                                ktx->ktx_frags[i].nmd_len, 
804                                rmd->kqrmd_frag[i].nmd_len);
805                         return (-EINVAL);
806                 }
807 #else
808         ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
809                                       ktx->ktx_nfrag, ktx->ktx_frags,
810                                       rmd->kqrmd_nfrag, rmd->kqrmd_frag);
811         if (ndatav < 0) {
812                 CERROR ("Can't create datavec: %d\n", ndatav);
813                 return (ndatav);
814         }
815 #endif
816
817         /* Our caller will start to race with kqswnal_dma_reply_complete... */
818         LASSERT (atomic_read (&krx->krx_refcount) == 1);
819         atomic_set (&krx->krx_refcount, 2);
820
821 #if MULTIRAIL_EKC
822         rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx, 
823                              &kqswnal_rpc_success,
824                              ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
825         if (rc == EP_SUCCESS)
826                 return (0);
827
828         /* Well we tried... */
829         krx->krx_rpc_reply_needed = 0;
830 #else
831         rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
832                               &kqswnal_rpc_success, datav, ndatav);
833         if (rc == EP_SUCCESS)
834                 return (0);
835
836         /* "old" EKC destroys rxd on failed completion */
837         krx->krx_rxd = NULL;
838 #endif
839
840         CERROR("can't complete RPC: %d\n", rc);
841
842         /* reset refcount back to 1: we're not going to be racing with
843          * kqswnal_dma_reply_complete. */
844         atomic_set (&krx->krx_refcount, 1);
845
846         return (-ECONNABORTED);
847 }
848
849 static int
850 kqswnal_sendmsg (nal_cb_t     *nal,
851                  void         *private,
852                  lib_msg_t    *libmsg,
853                  ptl_hdr_t    *hdr,
854                  int           type,
855                  ptl_nid_t     nid,
856                  ptl_pid_t     pid,
857                  unsigned int  payload_niov,
858                  struct iovec *payload_iov,
859                  ptl_kiov_t   *payload_kiov,
860                  size_t        payload_nob)
861 {
862         kqswnal_tx_t      *ktx;
863         int                rc;
864         ptl_nid_t          targetnid;
865 #if KQSW_CHECKSUM
866         int                i;
867         kqsw_csum_t        csum;
868         int                sumnob;
869 #endif
870         
871         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
872                " pid %u\n", payload_nob, payload_niov, nid, pid);
873
874         LASSERT (payload_nob == 0 || payload_niov > 0);
875         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
876
877         /* It must be OK to kmap() if required */
878         LASSERT (payload_kiov == NULL || !in_interrupt ());
879         /* payload is either all vaddrs or all pages */
880         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
881         
882         if (payload_nob > KQSW_MAXPAYLOAD) {
883                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
884                         payload_nob, KQSW_MAXPAYLOAD);
885                 return (PTL_FAIL);
886         }
887
888         targetnid = nid;
889         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
890                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
891                                  sizeof (ptl_hdr_t) + payload_nob, &targetnid);
892                 if (rc != 0) {
893                         CERROR("Can't route to "LPX64": router error %d\n",
894                                nid, rc);
895                         return (PTL_FAIL);
896                 }
897                 if (kqswnal_nid2elanid (targetnid) < 0) {
898                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
899                                targetnid, nid);
900                         return (PTL_FAIL);
901                 }
902         }
903
904         /* I may not block for a transmit descriptor if I might block the
905          * receiver, or an interrupt handler. */
906         ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
907                                           type == PTL_MSG_REPLY ||
908                                           in_interrupt()));
909         if (ktx == NULL) {
910                 kqswnal_cerror_hdr (hdr);
911                 return (PTL_NOSPACE);
912         }
913
914         ktx->ktx_nid     = targetnid;
915         ktx->ktx_args[0] = private;
916         ktx->ktx_args[1] = libmsg;
917
918         if (type == PTL_MSG_REPLY &&
919             ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
920                 if (nid != targetnid ||
921                     kqswnal_nid2elanid(nid) != 
922                     ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
923                         CERROR("Optimized reply nid conflict: "
924                                "nid "LPX64" via "LPX64" elanID %d\n",
925                                nid, targetnid,
926                                ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
927                         return (PTL_FAIL);
928                 }
929
930                 /* peer expects RPC completion with GET data */
931                 rc = kqswnal_dma_reply (ktx,
932                                         payload_niov, payload_iov, 
933                                         payload_kiov, payload_nob);
934                 if (rc == 0)
935                         return (PTL_OK);
936                 
937                 CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
938                 kqswnal_put_idle_tx (ktx);
939                 return (PTL_FAIL);
940         }
941
942         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
943         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
944
945 #if KQSW_CHECKSUM
946         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
947         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
948         for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) {
949                 if (payload_kiov != NULL) {
950                         ptl_kiov_t *kiov = &payload_kiov[i];
951                         char       *addr = ((char *)kmap (kiov->kiov_page)) +
952                                            kiov->kiov_offset;
953                         
954                         csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len));
955                         sumnob -= kiov->kiov_len;
956                 } else {
957                         struct iovec *iov = &payload_iov[i];
958
959                         csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len));
960                         sumnob -= iov->iov_len;
961                 }
962         }
963         memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
964 #endif
965         
966         if (kqswnal_data.kqn_optimized_gets &&
967             type == PTL_MSG_GET &&              /* doing a GET */
968             nid == targetnid) {                 /* not forwarding */
969                 lib_md_t           *md = libmsg->md;
970                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
971                 
972                 /* Optimised path: I send over the Elan vaddrs of the get
973                  * sink buffers, and my peer DMAs directly into them.
974                  *
975                  * First I set up ktx as if it was going to send this
976                  * payload, (it needs to map it anyway).  This fills
977                  * ktx_frags[1] and onward with the network addresses
978                  * of the GET sink frags.  I copy these into ktx_buffer,
979                  * immediately after the header, and send that as my GET
980                  * message.
981                  *
982                  * Note that the addresses are sent in native endian-ness.
983                  * When EKC copes with different endian nodes, I'll fix
984                  * this (and eat my hat :) */
985
986                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
987                 ktx->ktx_state = KTX_GETTING;
988
989                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
990                         rc = kqswnal_map_tx_kiov (ktx, md->length,
991                                                   md->md_niov, md->md_iov.kiov);
992                 else
993                         rc = kqswnal_map_tx_iov (ktx, md->length,
994                                                  md->md_niov, md->md_iov.iov);
995
996                 if (rc < 0) {
997                         kqswnal_put_idle_tx (ktx);
998                         return (PTL_FAIL);
999                 }
1000
1001                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1002
1003                 payload_nob = offsetof(kqswnal_remotemd_t,
1004                                        kqrmd_frag[rmd->kqrmd_nfrag]);
1005                 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1006
1007 #if MULTIRAIL_EKC
1008                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1009                        rmd->kqrmd_nfrag * sizeof(EP_NMD));
1010
1011                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1012                               0, KQSW_HDR_SIZE + payload_nob);
1013 #else
1014                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1015                        rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1016                 
1017                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1018                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1019 #endif
1020         } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1021
1022                 /* small message: single frag copied into the pre-mapped buffer */
1023
1024                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1025                 ktx->ktx_state = KTX_SENDING;
1026 #if MULTIRAIL_EKC
1027                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1028                               0, KQSW_HDR_SIZE + payload_nob);
1029 #else
1030                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1031                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1032 #endif
1033                 if (payload_nob > 0) {
1034                         if (payload_kiov != NULL)
1035                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1036                                                    payload_niov, payload_kiov, payload_nob);
1037                         else
1038                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1039                                                   payload_niov, payload_iov, payload_nob);
1040                 }
1041         } else {
1042
1043                 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1044
1045                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1046                 ktx->ktx_state = KTX_SENDING;
1047 #if MULTIRAIL_EKC
1048                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1049                               0, KQSW_HDR_SIZE);
1050 #else
1051                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1052                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1053 #endif
1054                 if (payload_kiov != NULL)
1055                         rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
1056                                                   payload_niov, payload_kiov);
1057                 else
1058                         rc = kqswnal_map_tx_iov (ktx, payload_nob,
1059                                                  payload_niov, payload_iov);
1060                 if (rc != 0) {
1061                         kqswnal_put_idle_tx (ktx);
1062                         return (PTL_FAIL);
1063                 }
1064         }
1065         
1066         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1067                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1068
1069         rc = kqswnal_launch (ktx);
1070         if (rc != 0) {                    /* failed? */
1071                 CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
1072                 kqswnal_put_idle_tx (ktx);
1073                 return (PTL_FAIL);
1074         }
1075
1076         CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n", 
1077                payload_nob, nid, targetnid);
1078         return (PTL_OK);
1079 }
1080
1081 static int
1082 kqswnal_send (nal_cb_t     *nal,
1083               void         *private,
1084               lib_msg_t    *libmsg,
1085               ptl_hdr_t    *hdr,
1086               int           type,
1087               ptl_nid_t     nid,
1088               ptl_pid_t     pid,
1089               unsigned int  payload_niov,
1090               struct iovec *payload_iov,
1091               size_t        payload_nob)
1092 {
1093         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1094                                  payload_niov, payload_iov, NULL, payload_nob));
1095 }
1096
1097 static int
1098 kqswnal_send_pages (nal_cb_t     *nal,
1099                     void         *private,
1100                     lib_msg_t    *libmsg,
1101                     ptl_hdr_t    *hdr,
1102                     int           type,
1103                     ptl_nid_t     nid,
1104                     ptl_pid_t     pid,
1105                     unsigned int  payload_niov,
1106                     ptl_kiov_t   *payload_kiov,
1107                     size_t        payload_nob)
1108 {
1109         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1110                                  payload_niov, NULL, payload_kiov, payload_nob));
1111 }
1112
1113 void
1114 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1115 {
1116         int             rc;
1117         kqswnal_tx_t   *ktx;
1118         struct iovec   *iov = fwd->kprfd_iov;
1119         int             niov = fwd->kprfd_niov;
1120         int             nob = fwd->kprfd_nob;
1121         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
1122
1123 #if KQSW_CHECKSUM
1124         CERROR ("checksums for forwarded packets not implemented\n");
1125         LBUG ();
1126 #endif
1127         /* The router wants this NAL to forward a packet */
1128         CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
1129                 fwd, nid, niov, nob);
1130
1131         LASSERT (niov > 0);
1132         
1133         ktx = kqswnal_get_idle_tx (fwd, 0);
1134         if (ktx == NULL)        /* can't get txd right now */
1135                 return;         /* fwd will be scheduled when tx desc freed */
1136
1137         if (nid == kqswnal_lib.ni.nid)          /* gateway is me */
1138                 nid = fwd->kprfd_target_nid;    /* target is final dest */
1139
1140         if (kqswnal_nid2elanid (nid) < 0) {
1141                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1142                 rc = -EHOSTUNREACH;
1143                 goto failed;
1144         }
1145
1146         if (nob > KQSW_NRXMSGBYTES_LARGE) {
1147                 CERROR ("Can't forward [%p] to "LPX64
1148                         ": size %d bigger than max packet size %ld\n",
1149                         fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
1150                 rc = -EMSGSIZE;
1151                 goto failed;
1152         }
1153
1154         ktx->ktx_port    = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ?
1155                            EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1156         ktx->ktx_nid     = nid;
1157         ktx->ktx_state   = KTX_FORWARDING;
1158         ktx->ktx_args[0] = fwd;
1159
1160         if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) &&
1161             nob <= KQSW_TX_BUFFER_SIZE) 
1162         {
1163                 /* send from ktx's pre-mapped contiguous buffer? */
1164                 lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
1165 #if MULTIRAIL_EKC
1166                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1167                               0, nob);
1168 #else
1169                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1170                 ktx->ktx_frags[0].Len = nob;
1171 #endif
1172                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1173                 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
1174         }
1175         else
1176         {
1177                 /* zero copy */
1178                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
1179                 rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
1180                 if (rc != 0)
1181                         goto failed;
1182
1183                 ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
1184         }
1185
1186         rc = kqswnal_launch (ktx);
1187         if (rc == 0)
1188                 return;
1189
1190  failed:
1191         LASSERT (rc != 0);
1192         CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1193
1194         kqswnal_put_idle_tx (ktx);
1195         /* complete now (with failure) */
1196         kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
1197 }
1198
1199 void
1200 kqswnal_fwd_callback (void *arg, int error)
1201 {
1202         kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1203
1204         /* The router has finished forwarding this packet */
1205
1206         if (error != 0)
1207         {
1208                 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
1209
1210                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1211                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
1212         }
1213
1214         kqswnal_requeue_rx (krx);
1215 }
1216
1217 void
1218 kqswnal_dma_reply_complete (EP_RXD *rxd) 
1219 {
1220         int           status = ep_rxd_status(rxd);
1221         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
1222         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
1223         lib_msg_t    *msg = (lib_msg_t *)ktx->ktx_args[1];
1224         
1225         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1226                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
1227
1228         LASSERT (krx->krx_rxd == rxd);
1229         LASSERT (krx->krx_rpc_reply_needed);
1230
1231         krx->krx_rpc_reply_needed = 0;
1232         kqswnal_rx_done (krx);
1233
1234         lib_finalize (&kqswnal_lib, NULL, msg);
1235         kqswnal_put_idle_tx (ktx);
1236 }
1237
1238 void
1239 kqswnal_rpc_complete (EP_RXD *rxd)
1240 {
1241         int           status = ep_rxd_status(rxd);
1242         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1243         
1244         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1245                "rxd %p, krx %p, status %d\n", rxd, krx, status);
1246
1247         LASSERT (krx->krx_rxd == rxd);
1248         LASSERT (krx->krx_rpc_reply_needed);
1249         
1250         krx->krx_rpc_reply_needed = 0;
1251         kqswnal_requeue_rx (krx);
1252 }
1253
1254 void
1255 kqswnal_requeue_rx (kqswnal_rx_t *krx) 
1256 {
1257         int   rc;
1258
1259         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1260
1261         if (krx->krx_rpc_reply_needed) {
1262
1263                 /* We failed to complete the peer's optimized GET (e.g. we
1264                  * couldn't map the source buffers).  We complete the
1265                  * peer's EKC rpc now with failure. */
1266 #if MULTIRAIL_EKC
1267                 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
1268                                      &kqswnal_rpc_failed, NULL, NULL, 0);
1269                 if (rc == EP_SUCCESS)
1270                         return;
1271                 
1272                 CERROR("can't complete RPC: %d\n", rc);
1273 #else
1274                 if (krx->krx_rxd != NULL) {
1275                         /* We didn't try (and fail) to complete earlier... */
1276                         rc = ep_complete_rpc(krx->krx_rxd, 
1277                                              kqswnal_rpc_complete, krx,
1278                                              &kqswnal_rpc_failed, NULL, 0);
1279                         if (rc == EP_SUCCESS)
1280                                 return;
1281
1282                         CERROR("can't complete RPC: %d\n", rc);
1283                 }
1284                 
1285                 /* NB the old ep_complete_rpc() frees rxd on failure, so we
1286                  * have to requeue from scratch here, unless we're shutting
1287                  * down */
1288                 if (kqswnal_data.kqn_shuttingdown)
1289                         return;
1290
1291                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
1292                                       krx->krx_elanbuffer, 
1293                                       krx->krx_npages * PAGE_SIZE, 0);
1294                 LASSERT (rc == EP_SUCCESS);
1295                 /* We don't handle failure here; it's incredibly rare
1296                  * (never reported?) and only happens with "old" EKC */
1297                 return;
1298 #endif
1299         }
1300
1301 #if MULTIRAIL_EKC
1302         if (kqswnal_data.kqn_shuttingdown) {
1303                 /* free EKC rxd on shutdown */
1304                 ep_complete_receive(krx->krx_rxd);
1305         } else {
1306                 /* repost receive */
1307                 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1308                                    &krx->krx_elanbuffer, 0);
1309         }
1310 #else                
1311         /* don't actually requeue on shutdown */
1312         if (!kqswnal_data.kqn_shuttingdown) 
1313                 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1314                                    krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
1315 #endif
1316 }
1317         
1318 void
1319 kqswnal_rx (kqswnal_rx_t *krx)
1320 {
1321         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
1322         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
1323         int             nob;
1324         int             niov;
1325
1326         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1327
1328         if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
1329                 atomic_set(&krx->krx_refcount, 1);
1330                 lib_parse (&kqswnal_lib, hdr, krx);
1331                 kqswnal_rx_done(krx);
1332                 return;
1333         }
1334
1335 #if KQSW_CHECKSUM
1336         CERROR ("checksums for forwarded packets not implemented\n");
1337         LBUG ();
1338 #endif
1339         if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
1340         {
1341                 CERROR("dropping packet from "LPX64" for "LPX64
1342                        ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
1343
1344                 kqswnal_requeue_rx (krx);
1345                 return;
1346         }
1347
1348         /* NB forwarding may destroy iov; rebuild every time */
1349         for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
1350         {
1351                 LASSERT (niov < krx->krx_npages);
1352                 krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
1353                 krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
1354         }
1355
1356         kpr_fwd_init (&krx->krx_fwd, dest_nid,
1357                       krx->krx_nob, niov, krx->krx_iov,
1358                       kqswnal_fwd_callback, krx);
1359
1360         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1361 }
1362
1363 /* Receive Interrupt Handler: posts to schedulers */
1364 void 
1365 kqswnal_rxhandler(EP_RXD *rxd)
1366 {
1367         long          flags;
1368         int           nob    = ep_rxd_len (rxd);
1369         int           status = ep_rxd_status (rxd);
1370         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1371
1372         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1373                rxd, krx, nob, status);
1374
1375         LASSERT (krx != NULL);
1376
1377         krx->krx_rxd = rxd;
1378         krx->krx_nob = nob;
1379 #if MULTIRAIL_EKC
1380         krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
1381 #else
1382         krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
1383 #endif
1384         
1385         /* must receive a whole header to be able to parse */
1386         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1387         {
1388                 /* receives complete with failure when receiver is removed */
1389 #if MULTIRAIL_EKC
1390                 if (status == EP_SHUTDOWN)
1391                         LASSERT (kqswnal_data.kqn_shuttingdown);
1392                 else
1393                         CERROR("receive status failed with status %d nob %d\n",
1394                                ep_rxd_status(rxd), nob);
1395 #else
1396                 if (!kqswnal_data.kqn_shuttingdown)
1397                         CERROR("receive status failed with status %d nob %d\n",
1398                                ep_rxd_status(rxd), nob);
1399 #endif
1400                 kqswnal_requeue_rx (krx);
1401                 return;
1402         }
1403
1404         if (!in_interrupt()) {
1405                 kqswnal_rx (krx);
1406                 return;
1407         }
1408
1409         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1410
1411         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1412         wake_up (&kqswnal_data.kqn_sched_waitq);
1413
1414         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1415 }
1416
1417 #if KQSW_CHECKSUM
1418 void
1419 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1420 {
1421         ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
1422
1423         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1424                 ", dpid %d, spid %d, type %d\n",
1425                 ishdr ? "Header" : "Payload", krx,
1426                 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
1427                 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
1428                 NTOH__u32(hdr->type));
1429
1430         switch (NTOH__u32 (hdr->type))
1431         {
1432         case PTL_MSG_ACK:
1433                 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1434                        " len %u\n",
1435                        NTOH__u32(hdr->msg.ack.mlength),
1436                        hdr->msg.ack.dst_wmd.handle_cookie,
1437                        hdr->msg.ack.dst_wmd.handle_idx,
1438                        NTOH__u64(hdr->msg.ack.match_bits),
1439                        NTOH__u32(hdr->msg.ack.length));
1440                 break;
1441         case PTL_MSG_PUT:
1442                 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1443                        " len %u off %u data "LPX64"\n",
1444                        NTOH__u32(hdr->msg.put.ptl_index),
1445                        hdr->msg.put.ack_wmd.handle_cookie,
1446                        hdr->msg.put.ack_wmd.handle_idx,
1447                        NTOH__u64(hdr->msg.put.match_bits),
1448                        NTOH__u32(hdr->msg.put.length),
1449                        NTOH__u32(hdr->msg.put.offset),
1450                        hdr->msg.put.hdr_data);
1451                 break;
1452         case PTL_MSG_GET:
1453                 CERROR ("GET: <>\n");
1454                 break;
1455         case PTL_MSG_REPLY:
1456                 CERROR ("REPLY: <>\n");
1457                 break;
1458         default:
1459                 CERROR ("TYPE?: <>\n");
1460         }
1461 }
1462 #endif
1463
1464 static int
1465 kqswnal_recvmsg (nal_cb_t     *nal,
1466                  void         *private,
1467                  lib_msg_t    *libmsg,
1468                  unsigned int  niov,
1469                  struct iovec *iov,
1470                  ptl_kiov_t   *kiov,
1471                  size_t        mlen,
1472                  size_t        rlen)
1473 {
1474         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1475         int           page;
1476         char         *page_ptr;
1477         int           page_nob;
1478         char         *iov_ptr;
1479         int           iov_nob;
1480         int           frag;
1481 #if KQSW_CHECKSUM
1482         kqsw_csum_t   senders_csum;
1483         kqsw_csum_t   payload_csum = 0;
1484         kqsw_csum_t   hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
1485                                            sizeof(ptl_hdr_t));
1486         size_t        csum_len = mlen;
1487         int           csum_frags = 0;
1488         int           csum_nob = 0;
1489         static atomic_t csum_counter;
1490         int           csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1491
1492         atomic_inc (&csum_counter);
1493
1494         memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
1495                                 sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1496         if (senders_csum != hdr_csum)
1497                 kqswnal_csum_error (krx, 1);
1498 #endif
1499         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1500
1501         /* What was actually received must be >= payload.
1502          * This is an LASSERT, as lib_finalize() doesn't have a completion status. */
1503         LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen);
1504         LASSERT (mlen <= rlen);
1505
1506         /* It must be OK to kmap() if required */
1507         LASSERT (kiov == NULL || !in_interrupt ());
1508         /* Either all pages or all vaddrs */
1509         LASSERT (!(kiov != NULL && iov != NULL));
1510         
1511         if (mlen != 0)
1512         {
1513                 page     = 0;
1514                 page_ptr = ((char *) page_address(krx->krx_pages[0])) +
1515                         KQSW_HDR_SIZE;
1516                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1517
1518                 LASSERT (niov > 0);
1519                 if (kiov != NULL) {
1520                         iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1521                         iov_nob = kiov->kiov_len;
1522                 } else {
1523                         iov_ptr = iov->iov_base;
1524                         iov_nob = iov->iov_len;
1525                 }
1526
1527                 for (;;)
1528                 {
1529                         /* We expect the iov to exactly match mlen */
1530                         LASSERT (iov_nob <= mlen);
1531                         
1532                         frag = MIN (page_nob, iov_nob);
1533                         memcpy (iov_ptr, page_ptr, frag);
1534 #if KQSW_CHECKSUM
1535                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1536                         csum_nob += frag;
1537                         csum_frags++;
1538 #endif
1539                         mlen -= frag;
1540                         if (mlen == 0)
1541                                 break;
1542
1543                         page_nob -= frag;
1544                         if (page_nob != 0)
1545                                 page_ptr += frag;
1546                         else
1547                         {
1548                                 page++;
1549                                 LASSERT (page < krx->krx_npages);
1550                                 page_ptr = page_address(krx->krx_pages[page]);
1551                                 page_nob = PAGE_SIZE;
1552                         }
1553
1554                         iov_nob -= frag;
1555                         if (iov_nob != 0)
1556                                 iov_ptr += frag;
1557                         else if (kiov != NULL) {
1558                                 kunmap (kiov->kiov_page);
1559                                 kiov++;
1560                                 niov--;
1561                                 LASSERT (niov > 0);
1562                                 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1563                                 iov_nob = kiov->kiov_len;
1564                         } else {
1565                                 iov++;
1566                                 niov--;
1567                                 LASSERT (niov > 0);
1568                                 iov_ptr = iov->iov_base;
1569                                 iov_nob = iov->iov_len;
1570                         }
1571                 }
1572
1573                 if (kiov != NULL)
1574                         kunmap (kiov->kiov_page);
1575         }
1576
1577 #if KQSW_CHECKSUM
1578         memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
1579                 sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
1580
1581         if (csum_len != rlen)
1582                 CERROR("Unable to checksum data in user's buffer\n");
1583         else if (senders_csum != payload_csum)
1584                 kqswnal_csum_error (krx, 0);
1585
1586         if (csum_verbose)
1587                 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1588                        "csum_nob %d\n",
1589                         hdr_csum, payload_csum, csum_frags, csum_nob);
1590 #endif
1591         lib_finalize(nal, private, libmsg);
1592
1593         return (rlen);
1594 }
1595
1596 static int
1597 kqswnal_recv(nal_cb_t     *nal,
1598              void         *private,
1599              lib_msg_t    *libmsg,
1600              unsigned int  niov,
1601              struct iovec *iov,
1602              size_t        mlen,
1603              size_t        rlen)
1604 {
1605         return (kqswnal_recvmsg (nal, private, libmsg, niov, iov, NULL, mlen, rlen));
1606 }
1607
1608 static int
1609 kqswnal_recv_pages (nal_cb_t     *nal,
1610                     void         *private,
1611                     lib_msg_t    *libmsg,
1612                     unsigned int  niov,
1613                     ptl_kiov_t   *kiov,
1614                     size_t        mlen,
1615                     size_t        rlen)
1616 {
1617         return (kqswnal_recvmsg (nal, private, libmsg, niov, NULL, kiov, mlen, rlen));
1618 }
1619
1620 int
1621 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1622 {
1623         long    pid = kernel_thread (fn, arg, 0);
1624
1625         if (pid < 0)
1626                 return ((int)pid);
1627
1628         atomic_inc (&kqswnal_data.kqn_nthreads);
1629         atomic_inc (&kqswnal_data.kqn_nthreads_running);
1630         return (0);
1631 }
1632
1633 void
1634 kqswnal_thread_fini (void)
1635 {
1636         atomic_dec (&kqswnal_data.kqn_nthreads);
1637 }
1638
1639 int
1640 kqswnal_scheduler (void *arg)
1641 {
1642         kqswnal_rx_t    *krx;
1643         kqswnal_tx_t    *ktx;
1644         kpr_fwd_desc_t  *fwd;
1645         long             flags;
1646         int              rc;
1647         int              counter = 0;
1648         int              shuttingdown = 0;
1649         int              did_something;
1650
1651         kportal_daemonize ("kqswnal_sched");
1652         kportal_blockallsigs ();
1653         
1654         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1655
1656         for (;;)
1657         {
1658                 if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
1659
1660                         if (kqswnal_data.kqn_shuttingdown == 2)
1661                                 break;
1662                 
1663                         /* During stage 1 of shutdown we are still responsive
1664                          * to receives */
1665
1666                         atomic_dec (&kqswnal_data.kqn_nthreads_running);
1667                         shuttingdown = kqswnal_data.kqn_shuttingdown;
1668                 }
1669
1670                 did_something = 0;
1671
1672                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1673                 {
1674                         krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1675                                          kqswnal_rx_t, krx_list);
1676                         list_del (&krx->krx_list);
1677                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1678                                                flags);
1679
1680                         kqswnal_rx (krx);
1681
1682                         did_something = 1;
1683                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1684                 }
1685
1686                 if (!shuttingdown &&
1687                     !list_empty (&kqswnal_data.kqn_delayedtxds))
1688                 {
1689                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1690                                          kqswnal_tx_t, ktx_list);
1691                         list_del_init (&ktx->ktx_delayed_list);
1692                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1693                                                flags);
1694
1695                         rc = kqswnal_launch (ktx);
1696                         if (rc != 0)          /* failed: ktx_nid down? */
1697                         {
1698                                 CERROR("Failed delayed transmit to "LPX64
1699                                        ": %d\n", ktx->ktx_nid, rc);
1700                                 kqswnal_tx_done (ktx, rc);
1701                         }
1702
1703                         did_something = 1;
1704                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1705                 }
1706
1707                 if (!shuttingdown &
1708                     !list_empty (&kqswnal_data.kqn_delayedfwds))
1709                 {
1710                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1711                         list_del (&fwd->kprfd_list);
1712                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1713
1714                         kqswnal_fwd_packet (NULL, fwd);
1715
1716                         did_something = 1;
1717                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1718                 }
1719
1720                     /* nothing to do or hogging CPU */
1721                 if (!did_something || counter++ == KQSW_RESCHED) {
1722                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1723                                                flags);
1724
1725                         counter = 0;
1726
1727                         if (!did_something) {
1728                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1729                                                                kqswnal_data.kqn_shuttingdown != shuttingdown ||
1730                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
1731                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1732                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
1733                                 LASSERT (rc == 0);
1734                         } else if (current->need_resched)
1735                                 schedule ();
1736
1737                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1738                 }
1739         }
1740
1741         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1742
1743         kqswnal_thread_fini ();
1744         return (0);
1745 }
1746
1747 nal_cb_t kqswnal_lib =
1748 {
1749         nal_data:       &kqswnal_data,         /* NAL private data */
1750         cb_send:        kqswnal_send,
1751         cb_send_pages:  kqswnal_send_pages,
1752         cb_recv:        kqswnal_recv,
1753         cb_recv_pages:  kqswnal_recv_pages,
1754         cb_read:        kqswnal_read,
1755         cb_write:       kqswnal_write,
1756         cb_malloc:      kqswnal_malloc,
1757         cb_free:        kqswnal_free,
1758         cb_printf:      kqswnal_printf,
1759         cb_cli:         kqswnal_cli,
1760         cb_sti:         kqswnal_sti,
1761         cb_dist:        kqswnal_dist
1762 };