Whamcloud - gitweb
- merge 2 weeks of b1_4 fixes onto HEAD
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8  * W. Marcus Miller - Based on ksocknal
9  *
10  * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  * Portals is free software; you can redistribute it and/or
13  * modify it under the terms of version 2 of the GNU General Public
14  * License as published by the Free Software Foundation.
15  *
16  * Portals is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with Portals; if not, write to the Free Software
23  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  */
26
27 #include "qswnal.h"
28
29 /*
30  *  LIB functions follow
31  *
32  */
33 static int
34 kqswnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
35 {
36         if (nid == nal->libnal_ni.ni_pid.nid)
37                 *dist = 0;                      /* it's me */
38         else if (kqswnal_nid2elanid (nid) >= 0)
39                 *dist = 1;                      /* it's my peer */
40         else
41                 *dist = 2;                      /* via router */
42         return (0);
43 }
44
45 void
46 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
47 {
48         struct timeval     now;
49         time_t             then;
50
51         do_gettimeofday (&now);
52         then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
53
54         kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
55 }
56
57 void
58 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
59 {
60 #if MULTIRAIL_EKC
61         int      i;
62 #endif
63
64         if (ktx->ktx_nmappedpages == 0)
65                 return;
66         
67 #if MULTIRAIL_EKC
68         CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
69                ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
70
71         for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
72                 ep_dvma_unload(kqswnal_data.kqn_ep,
73                                kqswnal_data.kqn_ep_tx_nmh,
74                                &ktx->ktx_frags[i]);
75 #else
76         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
77                 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
78
79         LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
80         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
81                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
82
83         elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
84                           kqswnal_data.kqn_eptxdmahandle,
85                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
86 #endif
87         ktx->ktx_nmappedpages = 0;
88 }
89
90 int
91 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
92 {
93         int       nfrags    = ktx->ktx_nfrag;
94         int       nmapped   = ktx->ktx_nmappedpages;
95         int       maxmapped = ktx->ktx_npages;
96         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
97         char     *ptr;
98 #if MULTIRAIL_EKC
99         EP_RAILMASK railmask;
100         int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
101                                             EP_RAILMASK_ALL,
102                                             kqswnal_nid2elanid(ktx->ktx_nid));
103         
104         if (rail < 0) {
105                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
106                 return (-ENETDOWN);
107         }
108         railmask = 1 << rail;
109 #endif
110         LASSERT (nmapped <= maxmapped);
111         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
112         LASSERT (nfrags <= EP_MAXFRAG);
113         LASSERT (niov > 0);
114         LASSERT (nob > 0);
115
116         /* skip complete frags before 'offset' */
117         while (offset >= kiov->kiov_len) {
118                 offset -= kiov->kiov_len;
119                 kiov++;
120                 niov--;
121                 LASSERT (niov > 0);
122         }
123
124         do {
125                 int  fraglen = kiov->kiov_len - offset;
126
127                 /* each page frag is contained in one page */
128                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
129
130                 if (fraglen > nob)
131                         fraglen = nob;
132
133                 nmapped++;
134                 if (nmapped > maxmapped) {
135                         CERROR("Can't map message in %d pages (max %d)\n",
136                                nmapped, maxmapped);
137                         return (-EMSGSIZE);
138                 }
139
140                 if (nfrags == EP_MAXFRAG) {
141                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
142                                EP_MAXFRAG);
143                         return (-EMSGSIZE);
144                 }
145
146                 /* XXX this is really crap, but we'll have to kmap until
147                  * EKC has a page (rather than vaddr) mapping interface */
148
149                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
150
151                 CDEBUG(D_NET,
152                        "%p[%d] loading %p for %d, page %d, %d total\n",
153                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
154
155 #if MULTIRAIL_EKC
156                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
157                              ptr, fraglen,
158                              kqswnal_data.kqn_ep_tx_nmh, basepage,
159                              &railmask, &ktx->ktx_frags[nfrags]);
160
161                 if (nfrags == ktx->ktx_firsttmpfrag ||
162                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
163                                   &ktx->ktx_frags[nfrags - 1],
164                                   &ktx->ktx_frags[nfrags])) {
165                         /* new frag if this is the first or can't merge */
166                         nfrags++;
167                 }
168 #else
169                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
170                                        kqswnal_data.kqn_eptxdmahandle,
171                                        ptr, fraglen,
172                                        basepage, &ktx->ktx_frags[nfrags].Base);
173
174                 if (nfrags > 0 &&                /* previous frag mapped */
175                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
176                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
177                         /* just extend previous */
178                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
179                 else {
180                         ktx->ktx_frags[nfrags].Len = fraglen;
181                         nfrags++;                /* new frag */
182                 }
183 #endif
184
185                 kunmap (kiov->kiov_page);
186                 
187                 /* keep in loop for failure case */
188                 ktx->ktx_nmappedpages = nmapped;
189
190                 basepage++;
191                 kiov++;
192                 niov--;
193                 nob -= fraglen;
194                 offset = 0;
195
196                 /* iov must not run out before end of data */
197                 LASSERT (nob == 0 || niov > 0);
198
199         } while (nob > 0);
200
201         ktx->ktx_nfrag = nfrags;
202         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
203                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
204
205         return (0);
206 }
207
208 int
209 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
210                     int niov, struct iovec *iov)
211 {
212         int       nfrags    = ktx->ktx_nfrag;
213         int       nmapped   = ktx->ktx_nmappedpages;
214         int       maxmapped = ktx->ktx_npages;
215         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
216 #if MULTIRAIL_EKC
217         EP_RAILMASK railmask;
218         int         rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
219                                             EP_RAILMASK_ALL,
220                                             kqswnal_nid2elanid(ktx->ktx_nid));
221         
222         if (rail < 0) {
223                 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
224                 return (-ENETDOWN);
225         }
226         railmask = 1 << rail;
227 #endif
228         LASSERT (nmapped <= maxmapped);
229         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
230         LASSERT (nfrags <= EP_MAXFRAG);
231         LASSERT (niov > 0);
232         LASSERT (nob > 0);
233
234         /* skip complete frags before offset */
235         while (offset >= iov->iov_len) {
236                 offset -= iov->iov_len;
237                 iov++;
238                 niov--;
239                 LASSERT (niov > 0);
240         }
241         
242         do {
243                 int  fraglen = iov->iov_len - offset;
244                 long npages;
245                 
246                 if (fraglen > nob)
247                         fraglen = nob;
248                 npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
249
250                 nmapped += npages;
251                 if (nmapped > maxmapped) {
252                         CERROR("Can't map message in %d pages (max %d)\n",
253                                nmapped, maxmapped);
254                         return (-EMSGSIZE);
255                 }
256
257                 if (nfrags == EP_MAXFRAG) {
258                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
259                                EP_MAXFRAG);
260                         return (-EMSGSIZE);
261                 }
262
263                 CDEBUG(D_NET,
264                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
265                        ktx, nfrags, iov->iov_base + offset, fraglen, 
266                        basepage, npages, nmapped);
267
268 #if MULTIRAIL_EKC
269                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
270                              iov->iov_base + offset, fraglen,
271                              kqswnal_data.kqn_ep_tx_nmh, basepage,
272                              &railmask, &ktx->ktx_frags[nfrags]);
273
274                 if (nfrags == ktx->ktx_firsttmpfrag ||
275                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
276                                   &ktx->ktx_frags[nfrags - 1],
277                                   &ktx->ktx_frags[nfrags])) {
278                         /* new frag if this is the first or can't merge */
279                         nfrags++;
280                 }
281 #else
282                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
283                                        kqswnal_data.kqn_eptxdmahandle,
284                                        iov->iov_base + offset, fraglen,
285                                        basepage, &ktx->ktx_frags[nfrags].Base);
286
287                 if (nfrags > 0 &&                /* previous frag mapped */
288                     ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
289                     (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
290                         /* just extend previous */
291                         ktx->ktx_frags[nfrags - 1].Len += fraglen;
292                 else {
293                         ktx->ktx_frags[nfrags].Len = fraglen;
294                         nfrags++;                /* new frag */
295                 }
296 #endif
297
298                 /* keep in loop for failure case */
299                 ktx->ktx_nmappedpages = nmapped;
300
301                 basepage += npages;
302                 iov++;
303                 niov--;
304                 nob -= fraglen;
305                 offset = 0;
306
307                 /* iov must not run out before end of data */
308                 LASSERT (nob == 0 || niov > 0);
309
310         } while (nob > 0);
311
312         ktx->ktx_nfrag = nfrags;
313         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
314                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
315
316         return (0);
317 }
318
319
320 void
321 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
322 {
323         kpr_fwd_desc_t   *fwd = NULL;
324         unsigned long     flags;
325
326         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
327         ktx->ktx_state = KTX_IDLE;
328
329         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
330
331         list_del (&ktx->ktx_list);              /* take off active list */
332
333         if (ktx->ktx_isnblk) {
334                 /* reserved for non-blocking tx */
335                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
336                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
337                 return;
338         }
339
340         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
341
342         /* anything blocking for a tx descriptor? */
343         if (!kqswnal_data.kqn_shuttingdown &&
344             !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
345         {
346                 CDEBUG(D_NET,"wakeup fwd\n");
347
348                 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
349                                   kpr_fwd_desc_t, kprfd_list);
350                 list_del (&fwd->kprfd_list);
351         }
352
353         wake_up (&kqswnal_data.kqn_idletxd_waitq);
354
355         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
356
357         if (fwd == NULL)
358                 return;
359
360         /* schedule packet for forwarding again */
361         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
362
363         list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
364         wake_up (&kqswnal_data.kqn_sched_waitq);
365
366         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
367 }
368
369 kqswnal_tx_t *
370 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
371 {
372         unsigned long  flags;
373         kqswnal_tx_t  *ktx = NULL;
374
375         for (;;) {
376                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
377
378                 if (kqswnal_data.kqn_shuttingdown)
379                         break;
380
381                 /* "normal" descriptor is free */
382                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
383                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
384                                           kqswnal_tx_t, ktx_list);
385                         break;
386                 }
387
388                 if (fwd != NULL)                /* forwarded packet? */
389                         break;
390
391                 /* doing a local transmit */
392                 if (!may_block) {
393                         if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
394                                 CERROR ("intr tx desc pool exhausted\n");
395                                 break;
396                         }
397
398                         ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
399                                           kqswnal_tx_t, ktx_list);
400                         break;
401                 }
402
403                 /* block for idle tx */
404
405                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
406
407                 CDEBUG (D_NET, "blocking for tx desc\n");
408                 wait_event (kqswnal_data.kqn_idletxd_waitq,
409                             !list_empty (&kqswnal_data.kqn_idletxds) ||
410                             kqswnal_data.kqn_shuttingdown);
411         }
412
413         if (ktx != NULL) {
414                 list_del (&ktx->ktx_list);
415                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
416                 ktx->ktx_launcher = current->pid;
417                 atomic_inc(&kqswnal_data.kqn_pending_txs);
418         } else if (fwd != NULL) {
419                 /* queue forwarded packet until idle txd available */
420                 CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
421                 list_add_tail (&fwd->kprfd_list,
422                                &kqswnal_data.kqn_idletxd_fwdq);
423         }
424
425         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
426
427         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
428         LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
429
430         return (ktx);
431 }
432
433 void
434 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
435 {
436         switch (ktx->ktx_state) {
437         case KTX_FORWARDING:       /* router asked me to forward this packet */
438                 kpr_fwd_done (&kqswnal_data.kqn_router,
439                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
440                 break;
441
442         case KTX_RDMAING:          /* optimized GET/PUT handled */
443         case KTX_PUTTING:          /* optimized PUT sent */
444         case KTX_SENDING:          /* normal send */
445                 lib_finalize (&kqswnal_lib, NULL,
446                               (lib_msg_t *)ktx->ktx_args[1],
447                               (error == 0) ? PTL_OK : PTL_FAIL);
448                 break;
449
450         case KTX_GETTING:          /* optimized GET sent & REPLY received */
451                 /* Complete the GET with success since we can't avoid
452                  * delivering a REPLY event; we committed to it when we
453                  * launched the GET */
454                 lib_finalize (&kqswnal_lib, NULL, 
455                               (lib_msg_t *)ktx->ktx_args[1], PTL_OK);
456                 lib_finalize (&kqswnal_lib, NULL,
457                               (lib_msg_t *)ktx->ktx_args[2],
458                               (error == 0) ? PTL_OK : PTL_FAIL);
459                 break;
460
461         default:
462                 LASSERT (0);
463         }
464
465         kqswnal_put_idle_tx (ktx);
466 }
467
468 static void
469 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
470 {
471         kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
472
473         LASSERT (txd != NULL);
474         LASSERT (ktx != NULL);
475
476         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
477
478         if (status != EP_SUCCESS) {
479
480                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
481                         ktx->ktx_nid, status);
482
483                 kqswnal_notify_peer_down(ktx);
484                 status = -EHOSTDOWN;
485
486         } else switch (ktx->ktx_state) {
487
488         case KTX_GETTING:
489         case KTX_PUTTING:
490                 /* RPC completed OK; but what did our peer put in the status
491                  * block? */
492 #if MULTIRAIL_EKC
493                 status = ep_txd_statusblk(txd)->Data[0];
494 #else
495                 status = ep_txd_statusblk(txd)->Status;
496 #endif
497                 break;
498                 
499         case KTX_FORWARDING:
500         case KTX_SENDING:
501                 status = 0;
502                 break;
503                 
504         default:
505                 LBUG();
506                 break;
507         }
508
509         kqswnal_tx_done (ktx, status);
510 }
511
512 int
513 kqswnal_launch (kqswnal_tx_t *ktx)
514 {
515         /* Don't block for transmit descriptor if we're in interrupt context */
516         int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
517         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
518         long  flags;
519         int   rc;
520
521         ktx->ktx_launchtime = jiffies;
522
523         if (kqswnal_data.kqn_shuttingdown)
524                 return (-ESHUTDOWN);
525
526         LASSERT (dest >= 0);                    /* must be a peer */
527
528         switch (ktx->ktx_state) {
529         case KTX_GETTING:
530         case KTX_PUTTING:
531                 /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
532                  * The other frags are the payload, awaiting RDMA */
533                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
534                                      ktx->ktx_port, attr,
535                                      kqswnal_txhandler, ktx,
536                                      NULL, ktx->ktx_frags, 1);
537                 break;
538
539         case KTX_FORWARDING:
540         case KTX_SENDING:
541 #if MULTIRAIL_EKC
542                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
543                                          ktx->ktx_port, attr,
544                                          kqswnal_txhandler, ktx,
545                                          NULL, ktx->ktx_frags, ktx->ktx_nfrag);
546 #else
547                 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
548                                        ktx->ktx_port, attr, 
549                                        kqswnal_txhandler, ktx, 
550                                        ktx->ktx_frags, ktx->ktx_nfrag);
551 #endif
552                 break;
553                 
554         default:
555                 LBUG();
556                 rc = -EINVAL;                   /* no compiler warning please */
557                 break;
558         }
559
560         switch (rc) {
561         case EP_SUCCESS: /* success */
562                 return (0);
563
564         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
565                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
566
567                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
568                 wake_up (&kqswnal_data.kqn_sched_waitq);
569
570                 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
571                 return (0);
572
573         default: /* fatal error */
574                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
575                 kqswnal_notify_peer_down(ktx);
576                 return (-EHOSTUNREACH);
577         }
578 }
579
580 #if 0
581 static char *
582 hdr_type_string (ptl_hdr_t *hdr)
583 {
584         switch (hdr->type) {
585         case PTL_MSG_ACK:
586                 return ("ACK");
587         case PTL_MSG_PUT:
588                 return ("PUT");
589         case PTL_MSG_GET:
590                 return ("GET");
591         case PTL_MSG_REPLY:
592                 return ("REPLY");
593         default:
594                 return ("<UNKNOWN>");
595         }
596 }
597
598 static void
599 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
600 {
601         char *type_str = hdr_type_string (hdr);
602
603         CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
604                NTOH__u32(hdr->payload_length));
605         CERROR("    From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
606                NTOH__u32(hdr->src_pid));
607         CERROR("    To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
608                NTOH__u32(hdr->dest_pid));
609
610         switch (NTOH__u32(hdr->type)) {
611         case PTL_MSG_PUT:
612                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
613                        "match bits "LPX64"\n",
614                        NTOH__u32 (hdr->msg.put.ptl_index),
615                        hdr->msg.put.ack_wmd.wh_interface_cookie,
616                        hdr->msg.put.ack_wmd.wh_object_cookie,
617                        NTOH__u64 (hdr->msg.put.match_bits));
618                 CERROR("    offset %d, hdr data "LPX64"\n",
619                        NTOH__u32(hdr->msg.put.offset),
620                        hdr->msg.put.hdr_data);
621                 break;
622
623         case PTL_MSG_GET:
624                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
625                        "match bits "LPX64"\n",
626                        NTOH__u32 (hdr->msg.get.ptl_index),
627                        hdr->msg.get.return_wmd.wh_interface_cookie,
628                        hdr->msg.get.return_wmd.wh_object_cookie,
629                        hdr->msg.get.match_bits);
630                 CERROR("    Length %d, src offset %d\n",
631                        NTOH__u32 (hdr->msg.get.sink_length),
632                        NTOH__u32 (hdr->msg.get.src_offset));
633                 break;
634
635         case PTL_MSG_ACK:
636                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
637                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
638                        hdr->msg.ack.dst_wmd.wh_object_cookie,
639                        NTOH__u32 (hdr->msg.ack.mlength));
640                 break;
641
642         case PTL_MSG_REPLY:
643                 CERROR("    dst md "LPX64"."LPX64"\n",
644                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
645                        hdr->msg.reply.dst_wmd.wh_object_cookie);
646         }
647
648 }                               /* end of print_hdr() */
649 #endif
650
651 #if !MULTIRAIL_EKC
652 void
653 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) 
654 {
655         int          i;
656
657         CDEBUG (how, "%s: %d\n", str, n);
658         for (i = 0; i < n; i++) {
659                 CDEBUG (how, "   %08x for %d\n", iov[i].Base, iov[i].Len);
660         }
661 }
662
663 int
664 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
665                      int nsrc, EP_IOVEC *src,
666                      int ndst, EP_IOVEC *dst) 
667 {
668         int        count;
669         int        nob;
670
671         LASSERT (ndv > 0);
672         LASSERT (nsrc > 0);
673         LASSERT (ndst > 0);
674
675         for (count = 0; count < ndv; count++, dv++) {
676
677                 if (nsrc == 0 || ndst == 0) {
678                         if (nsrc != ndst) {
679                                 /* For now I'll barf on any left over entries */
680                                 CERROR ("mismatched src and dst iovs\n");
681                                 return (-EINVAL);
682                         }
683                         return (count);
684                 }
685
686                 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
687                 dv->Len    = nob;
688                 dv->Source = src->Base;
689                 dv->Dest   = dst->Base;
690
691                 if (nob >= src->Len) {
692                         src++;
693                         nsrc--;
694                 } else {
695                         src->Len -= nob;
696                         src->Base += nob;
697                 }
698                 
699                 if (nob >= dst->Len) {
700                         dst++;
701                         ndst--;
702                 } else {
703                         src->Len -= nob;
704                         src->Base += nob;
705                 }
706         }
707
708         CERROR ("DATAVEC too small\n");
709         return (-E2BIG);
710 }
711 #else
712 int
713 kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
714                     int nrfrag, EP_NMD *rfrag)
715 {
716         int  i;
717
718         if (nlfrag != nrfrag) {
719                 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
720                        nlfrag, nrfrag);
721                 return (-EINVAL);
722         }
723         
724         for (i = 0; i < nlfrag; i++)
725                 if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
726                         CERROR("Can't cope with unequal frags %d(%d):"
727                                " %d local %d remote\n",
728                                i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
729                         return (-EINVAL);
730                 }
731         
732         return (0);
733 }
734 #endif
735
736 kqswnal_remotemd_t *
737 kqswnal_parse_rmd (kqswnal_rx_t *krx, int type, ptl_nid_t expected_nid)
738 {
739         char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
740         ptl_hdr_t          *hdr = (ptl_hdr_t *)buffer;
741         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
742         ptl_nid_t           nid = kqswnal_rx_nid(krx);
743
744         /* Note (1) lib_parse has already flipped hdr.
745          *      (2) RDMA addresses are sent in native endian-ness.  When
746          *      EKC copes with different endian nodes, I'll fix this (and
747          *      eat my hat :) */
748
749         LASSERT (krx->krx_nob >= sizeof(*hdr));
750
751         if (hdr->type != type) {
752                 CERROR ("Unexpected optimized get/put type %d (%d expected)"
753                         "from "LPX64"\n", hdr->type, type, nid);
754                 return (NULL);
755         }
756         
757         if (hdr->src_nid != nid) {
758                 CERROR ("Unexpected optimized get/put source NID "
759                         LPX64" from "LPX64"\n", hdr->src_nid, nid);
760                 return (NULL);
761         }
762
763         LASSERT (nid == expected_nid);
764
765         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
766                 /* msg too small to discover rmd size */
767                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
768                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
769                 return (NULL);
770         }
771
772         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
773                 /* rmd doesn't fit in the incoming message */
774                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
775                         krx->krx_nob, rmd->kqrmd_nfrag,
776                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
777                 return (NULL);
778         }
779
780         return (rmd);
781 }
782
783 void
784 kqswnal_rdma_store_complete (EP_RXD *rxd) 
785 {
786         int           status = ep_rxd_status(rxd);
787         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
788         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
789         
790         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
791                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
792
793         LASSERT (ktx->ktx_state == KTX_RDMAING);
794         LASSERT (krx->krx_rxd == rxd);
795         LASSERT (krx->krx_rpc_reply_needed);
796
797         krx->krx_rpc_reply_needed = 0;
798         kqswnal_rx_decref (krx);
799
800         /* free ktx & finalize() its lib_msg_t */
801         kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
802 }
803
804 void
805 kqswnal_rdma_fetch_complete (EP_RXD *rxd) 
806 {
807         /* Completed fetching the PUT data */
808         int           status = ep_rxd_status(rxd);
809         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
810         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
811         unsigned long flags;
812         
813         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
814                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
815
816         LASSERT (ktx->ktx_state == KTX_RDMAING);
817         LASSERT (krx->krx_rxd == rxd);
818         LASSERT (krx->krx_rpc_reply_needed);
819
820         /* Set the RPC completion status */
821         status = (status == EP_SUCCESS) ? 0 : -ECONNABORTED;
822         krx->krx_rpc_reply_status = status;
823
824         /* free ktx & finalize() its lib_msg_t */
825         kqswnal_tx_done(ktx, status);
826
827         if (!in_interrupt()) {
828                 /* OK to complete the RPC now (iff I had the last ref) */
829                 kqswnal_rx_decref (krx);
830                 return;
831         }
832
833         LASSERT (krx->krx_state == KRX_PARSE);
834         krx->krx_state = KRX_COMPLETING;
835
836         /* Complete the RPC in thread context */
837         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
838
839         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
840         wake_up (&kqswnal_data.kqn_sched_waitq);
841
842         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
843 }
844
845 int
846 kqswnal_rdma (kqswnal_rx_t *krx, lib_msg_t *libmsg, int type,
847               int niov, struct iovec *iov, ptl_kiov_t *kiov,
848               size_t offset, size_t len)
849 {
850         kqswnal_remotemd_t *rmd;
851         kqswnal_tx_t       *ktx;
852         int                 eprc;
853         int                 rc;
854 #if !MULTIRAIL_EKC
855         EP_DATAVEC          datav[EP_MAXFRAG];
856         int                 ndatav;
857 #endif
858
859         LASSERT (type == PTL_MSG_GET || type == PTL_MSG_PUT);
860         /* Not both mapped and paged payload */
861         LASSERT (iov == NULL || kiov == NULL);
862         /* RPC completes with failure by default */
863         LASSERT (krx->krx_rpc_reply_needed);
864         LASSERT (krx->krx_rpc_reply_status != 0);
865
866         rmd = kqswnal_parse_rmd(krx, type, libmsg->ev.initiator.nid);
867         if (rmd == NULL)
868                 return (-EPROTO);
869
870         if (len == 0) {
871                 /* data got truncated to nothing. */
872                 lib_finalize(&kqswnal_lib, krx, libmsg, PTL_OK);
873                 /* Let kqswnal_rx_done() complete the RPC with success */
874                 krx->krx_rpc_reply_status = 0;
875                 return (0);
876         }
877         
878         /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
879            actually sending a portals message with it */
880         ktx = kqswnal_get_idle_tx(NULL, 0);
881         if (ktx == NULL) {
882                 CERROR ("Can't get txd for RDMA with "LPX64"\n",
883                         libmsg->ev.initiator.nid);
884                 return (-ENOMEM);
885         }
886
887         ktx->ktx_state   = KTX_RDMAING;
888         ktx->ktx_nid     = libmsg->ev.initiator.nid;
889         ktx->ktx_args[0] = krx;
890         ktx->ktx_args[1] = libmsg;
891
892         /* Start mapping at offset 0 (we're not mapping any headers) */
893         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
894         
895         if (kiov != NULL)
896                 rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
897         else
898                 rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
899
900         if (rc != 0) {
901                 CERROR ("Can't map local RDMA data: %d\n", rc);
902                 goto out;
903         }
904
905 #if MULTIRAIL_EKC
906         rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
907                                  rmd->kqrmd_nfrag, rmd->kqrmd_frag);
908         if (rc != 0) {
909                 CERROR ("Incompatible RDMA descriptors\n");
910                 goto out;
911         }
912 #else
913         switch (type) {
914         default:
915                 LBUG();
916
917         case PTL_MSG_GET:
918                 ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
919                                              ktx->ktx_nfrag, ktx->ktx_frags,
920                                              rmd->kqrmd_nfrag, rmd->kqrmd_frag);
921                 break;
922
923         case PTL_MSG_PUT:
924                 ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
925                                              rmd->kqrmd_nfrag, rmd->kqrmd_frag,
926                                              ktx->ktx_nfrag, ktx->ktx_frags);
927                 break;
928         }
929                 
930         if (ndatav < 0) {
931                 CERROR ("Can't create datavec: %d\n", ndatav);
932                 rc = ndatav;
933                 goto out;
934         }
935 #endif
936
937         LASSERT (atomic_read(&krx->krx_refcount) > 0);
938         /* Take an extra ref for the completion callback */
939         atomic_inc(&krx->krx_refcount);
940
941         switch (type) {
942         default:
943                 LBUG();
944
945         case PTL_MSG_GET:
946 #if MULTIRAIL_EKC
947                 eprc = ep_complete_rpc(krx->krx_rxd, 
948                                        kqswnal_rdma_store_complete, ktx, 
949                                        &kqswnal_data.kqn_rpc_success,
950                                        ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
951 #else
952                 eprc = ep_complete_rpc (krx->krx_rxd, 
953                                         kqswnal_rdma_store_complete, ktx,
954                                         &kqswnal_data.kqn_rpc_success, 
955                                         datav, ndatav);
956                 if (eprc != EP_SUCCESS) /* "old" EKC destroys rxd on failed completion */
957                         krx->krx_rxd = NULL;
958 #endif
959                 if (eprc != EP_SUCCESS) {
960                         CERROR("can't complete RPC: %d\n", eprc);
961                         /* don't re-attempt RPC completion */
962                         krx->krx_rpc_reply_needed = 0;
963                         rc = -ECONNABORTED;
964                 }
965                 break;
966                 
967         case PTL_MSG_PUT:
968 #if MULTIRAIL_EKC
969                 eprc = ep_rpc_get (krx->krx_rxd, 
970                                    kqswnal_rdma_fetch_complete, ktx,
971                                    rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
972 #else
973                 eprc = ep_rpc_get (krx->krx_rxd,
974                                    kqswnal_rdma_fetch_complete, ktx,
975                                    datav, ndatav);
976 #endif
977                 if (eprc != EP_SUCCESS) {
978                         CERROR("ep_rpc_get failed: %d\n", eprc);
979                         rc = -ECONNABORTED;
980                 }
981                 break;
982         }
983
984  out:
985         if (rc != 0) {
986                 kqswnal_rx_decref(krx);                 /* drop callback's ref */
987                 kqswnal_put_idle_tx (ktx);
988         }
989
990         atomic_dec(&kqswnal_data.kqn_pending_txs);
991         return (rc);
992 }
993
994 static ptl_err_t
995 kqswnal_sendmsg (lib_nal_t    *nal,
996                  void         *private,
997                  lib_msg_t    *libmsg,
998                  ptl_hdr_t    *hdr,
999                  int           type,
1000                  ptl_nid_t     nid,
1001                  ptl_pid_t     pid,
1002                  unsigned int  payload_niov,
1003                  struct iovec *payload_iov,
1004                  ptl_kiov_t   *payload_kiov,
1005                  size_t        payload_offset,
1006                  size_t        payload_nob)
1007 {
1008         kqswnal_tx_t      *ktx;
1009         int                rc;
1010         ptl_nid_t          targetnid;
1011 #if KQSW_CHECKSUM
1012         int                i;
1013         kqsw_csum_t        csum;
1014         int                sumoff;
1015         int                sumnob;
1016 #endif
1017         /* NB 1. hdr is in network byte order */
1018         /*    2. 'private' depends on the message type */
1019         
1020         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
1021                " pid %u\n", payload_nob, payload_niov, nid, pid);
1022
1023         LASSERT (payload_nob == 0 || payload_niov > 0);
1024         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1025
1026         /* It must be OK to kmap() if required */
1027         LASSERT (payload_kiov == NULL || !in_interrupt ());
1028         /* payload is either all vaddrs or all pages */
1029         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1030
1031         if (payload_nob > KQSW_MAXPAYLOAD) {
1032                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
1033                         payload_nob, KQSW_MAXPAYLOAD);
1034                 return (PTL_FAIL);
1035         }
1036
1037         if (type == PTL_MSG_REPLY &&            /* can I look in 'private' */
1038             ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) { /* is it an RPC */
1039                 /* Must be a REPLY for an optimized GET */
1040                 rc = kqswnal_rdma ((kqswnal_rx_t *)private, libmsg, PTL_MSG_GET,
1041                                    payload_niov, payload_iov, payload_kiov, 
1042                                    payload_offset, payload_nob);
1043                 return ((rc == 0) ? PTL_OK : PTL_FAIL);
1044         }
1045
1046         targetnid = nid;
1047         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
1048                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
1049                                  sizeof (ptl_hdr_t) + payload_nob, &targetnid);
1050                 if (rc != 0) {
1051                         CERROR("Can't route to "LPX64": router error %d\n",
1052                                nid, rc);
1053                         return (PTL_FAIL);
1054                 }
1055                 if (kqswnal_nid2elanid (targetnid) < 0) {
1056                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
1057                                targetnid, nid);
1058                         return (PTL_FAIL);
1059                 }
1060         }
1061
1062         /* I may not block for a transmit descriptor if I might block the
1063          * receiver, or an interrupt handler. */
1064         ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
1065                                           type == PTL_MSG_REPLY ||
1066                                           in_interrupt()));
1067         if (ktx == NULL) {
1068                 CERROR ("Can't get txd for msg type %d for "LPX64"\n",
1069                         type, libmsg->ev.initiator.nid);
1070                 return (PTL_NO_SPACE);
1071         }
1072
1073         ktx->ktx_state   = KTX_SENDING;
1074         ktx->ktx_nid     = targetnid;
1075         ktx->ktx_args[0] = private;
1076         ktx->ktx_args[1] = libmsg;
1077         ktx->ktx_args[2] = NULL;    /* set when a GET commits to REPLY */
1078
1079         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
1080         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
1081
1082 #if KQSW_CHECKSUM
1083         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
1084         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
1085         for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
1086                 LASSERT(i < niov);
1087                 if (payload_kiov != NULL) {
1088                         ptl_kiov_t *kiov = &payload_kiov[i];
1089
1090                         if (sumoff >= kiov->kiov_len) {
1091                                 sumoff -= kiov->kiov_len;
1092                         } else {
1093                                 char *addr = ((char *)kmap (kiov->kiov_page)) +
1094                                              kiov->kiov_offset + sumoff;
1095                                 int   fragnob = kiov->kiov_len - sumoff;
1096
1097                                 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1098                                 sumnob -= fragnob;
1099                                 sumoff = 0;
1100                                 kunmap(kiov->kiov_page);
1101                         }
1102                 } else {
1103                         struct iovec *iov = &payload_iov[i];
1104
1105                         if (sumoff > iov->iov_len) {
1106                                 sumoff -= iov->iov_len;
1107                         } else {
1108                                 char *addr = iov->iov_base + sumoff;
1109                                 int   fragnob = iov->iov_len - sumoff;
1110                                 
1111                                 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1112                                 sumnob -= fragnob;
1113                                 sumoff = 0;
1114                         }
1115                 }
1116         }
1117         memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
1118 #endif
1119
1120         /* The first frag will be the pre-mapped buffer for (at least) the
1121          * portals header. */
1122         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1123
1124         if (nid == targetnid &&                 /* not forwarding */
1125             ((type == PTL_MSG_GET &&            /* optimize GET? */
1126               kqswnal_tunables.kqn_optimized_gets != 0 &&
1127               NTOH__u32(hdr->msg.get.sink_length) >= kqswnal_tunables.kqn_optimized_gets) ||
1128              (type == PTL_MSG_PUT &&            /* optimize PUT? */
1129               kqswnal_tunables.kqn_optimized_puts != 0 &&
1130               payload_nob >= kqswnal_tunables.kqn_optimized_puts))) {
1131                 lib_md_t           *md = libmsg->md;
1132                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
1133                 
1134                 /* Optimised path: I send over the Elan vaddrs of the local
1135                  * buffers, and my peer DMAs directly to/from them.
1136                  *
1137                  * First I set up ktx as if it was going to send this
1138                  * payload, (it needs to map it anyway).  This fills
1139                  * ktx_frags[1] and onward with the network addresses
1140                  * of the GET sink frags.  I copy these into ktx_buffer,
1141                  * immediately after the header, and send that as my
1142                  * message. */
1143
1144                 ktx->ktx_state = (type == PTL_MSG_PUT) ? KTX_PUTTING : KTX_GETTING;
1145
1146                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
1147                         rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
1148                                                   md->md_niov, md->md_iov.kiov);
1149                 else
1150                         rc = kqswnal_map_tx_iov (ktx, 0, md->length,
1151                                                  md->md_niov, md->md_iov.iov);
1152                 if (rc != 0)
1153                         goto out;
1154
1155                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1156
1157                 payload_nob = offsetof(kqswnal_remotemd_t,
1158                                        kqrmd_frag[rmd->kqrmd_nfrag]);
1159                 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1160
1161 #if MULTIRAIL_EKC
1162                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1163                        rmd->kqrmd_nfrag * sizeof(EP_NMD));
1164
1165                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1166                               0, KQSW_HDR_SIZE + payload_nob);
1167 #else
1168                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1169                        rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1170                 
1171                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1172                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1173 #endif
1174                 if (type == PTL_MSG_GET) {
1175                         /* Allocate reply message now while I'm in thread context */
1176                         ktx->ktx_args[2] = lib_create_reply_msg (&kqswnal_lib,
1177                                                                  nid, libmsg);
1178                         if (ktx->ktx_args[2] == NULL)
1179                                 goto out;
1180
1181                         /* NB finalizing the REPLY message is my
1182                          * responsibility now, whatever happens. */
1183                 }
1184                 
1185         } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1186
1187                 /* small message: single frag copied into the pre-mapped buffer */
1188
1189 #if MULTIRAIL_EKC
1190                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1191                               0, KQSW_HDR_SIZE + payload_nob);
1192 #else
1193                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1194                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1195 #endif
1196                 if (payload_nob > 0) {
1197                         if (payload_kiov != NULL)
1198                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1199                                                    payload_niov, payload_kiov, 
1200                                                    payload_offset, payload_nob);
1201                         else
1202                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1203                                                   payload_niov, payload_iov, 
1204                                                   payload_offset, payload_nob);
1205                 }
1206         } else {
1207
1208                 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1209
1210 #if MULTIRAIL_EKC
1211                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1212                               0, KQSW_HDR_SIZE);
1213 #else
1214                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1215                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1216 #endif
1217                 if (payload_kiov != NULL)
1218                         rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
1219                                                   payload_niov, payload_kiov);
1220                 else
1221                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1222                                                  payload_niov, payload_iov);
1223                 if (rc != 0)
1224                         goto out;
1225         }
1226         
1227         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1228                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1229
1230         rc = kqswnal_launch (ktx);
1231
1232  out:
1233         CDEBUG(rc == 0 ? D_NET : D_ERROR, 
1234                "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n", 
1235                rc == 0 ? "Sent" : "Failed to send",
1236                payload_nob, nid, targetnid, rc);
1237
1238         if (rc != 0) {
1239                 if (ktx->ktx_state == KTX_GETTING &&
1240                     ktx->ktx_args[2] != NULL) {
1241                         /* We committed to reply, but there was a problem
1242                          * launching the GET.  We can't avoid delivering a
1243                          * REPLY event since we committed above, so we
1244                          * pretend the GET succeeded but the REPLY
1245                          * failed. */
1246                         rc = 0;
1247                         lib_finalize (&kqswnal_lib, private, libmsg, PTL_OK);
1248                         lib_finalize (&kqswnal_lib, private,
1249                                       (lib_msg_t *)ktx->ktx_args[2], PTL_FAIL);
1250                 }
1251                 
1252                 kqswnal_put_idle_tx (ktx);
1253         }
1254         
1255         atomic_dec(&kqswnal_data.kqn_pending_txs);
1256         return (rc == 0 ? PTL_OK : PTL_FAIL);
1257 }
1258
1259 static ptl_err_t
1260 kqswnal_send (lib_nal_t    *nal,
1261               void         *private,
1262               lib_msg_t    *libmsg,
1263               ptl_hdr_t    *hdr,
1264               int           type,
1265               ptl_nid_t     nid,
1266               ptl_pid_t     pid,
1267               unsigned int  payload_niov,
1268               struct iovec *payload_iov,
1269               size_t        payload_offset,
1270               size_t        payload_nob)
1271 {
1272         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1273                                  payload_niov, payload_iov, NULL, 
1274                                  payload_offset, payload_nob));
1275 }
1276
1277 static ptl_err_t
1278 kqswnal_send_pages (lib_nal_t    *nal,
1279                     void         *private,
1280                     lib_msg_t    *libmsg,
1281                     ptl_hdr_t    *hdr,
1282                     int           type,
1283                     ptl_nid_t     nid,
1284                     ptl_pid_t     pid,
1285                     unsigned int  payload_niov,
1286                     ptl_kiov_t   *payload_kiov,
1287                     size_t        payload_offset,
1288                     size_t        payload_nob)
1289 {
1290         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1291                                  payload_niov, NULL, payload_kiov, 
1292                                  payload_offset, payload_nob));
1293 }
1294
1295 void
1296 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1297 {
1298         int             rc;
1299         kqswnal_tx_t   *ktx;
1300         ptl_kiov_t     *kiov = fwd->kprfd_kiov;
1301         int             niov = fwd->kprfd_niov;
1302         int             nob = fwd->kprfd_nob;
1303         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
1304
1305 #if KQSW_CHECKSUM
1306         CERROR ("checksums for forwarded packets not implemented\n");
1307         LBUG ();
1308 #endif
1309         /* The router wants this NAL to forward a packet */
1310         CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
1311                 fwd, nid, niov, nob);
1312
1313         ktx = kqswnal_get_idle_tx (fwd, 0);
1314         if (ktx == NULL)        /* can't get txd right now */
1315                 return;         /* fwd will be scheduled when tx desc freed */
1316
1317         if (nid == kqswnal_lib.libnal_ni.ni_pid.nid) /* gateway is me */
1318                 nid = fwd->kprfd_target_nid;    /* target is final dest */
1319
1320         if (kqswnal_nid2elanid (nid) < 0) {
1321                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1322                 rc = -EHOSTUNREACH;
1323                 goto out;
1324         }
1325
1326         /* copy hdr into pre-mapped buffer */
1327         memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
1328         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
1329
1330         ktx->ktx_port    = (nob <= KQSW_SMALLPAYLOAD) ?
1331                            EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1332         ktx->ktx_nid     = nid;
1333         ktx->ktx_state   = KTX_FORWARDING;
1334         ktx->ktx_args[0] = fwd;
1335         ktx->ktx_nfrag   = ktx->ktx_firsttmpfrag = 1;
1336
1337         if (nob <= KQSW_TX_MAXCONTIG) 
1338         {
1339                 /* send payload from ktx's pre-mapped contiguous buffer */
1340 #if MULTIRAIL_EKC
1341                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1342                               0, KQSW_HDR_SIZE + nob);
1343 #else
1344                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1345                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
1346 #endif
1347                 if (nob > 0)
1348                         lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
1349                                           niov, kiov, 0, nob);
1350         }
1351         else
1352         {
1353                 /* zero copy payload */
1354 #if MULTIRAIL_EKC
1355                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1356                               0, KQSW_HDR_SIZE);
1357 #else
1358                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1359                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1360 #endif
1361                 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
1362                 if (rc != 0)
1363                         goto out;
1364         }
1365
1366         rc = kqswnal_launch (ktx);
1367  out:
1368         if (rc != 0) {
1369                 CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1370
1371                 /* complete now (with failure) */
1372                 kqswnal_tx_done (ktx, rc);
1373         }
1374
1375         atomic_dec(&kqswnal_data.kqn_pending_txs);
1376 }
1377
1378 void
1379 kqswnal_fwd_callback (void *arg, int error)
1380 {
1381         kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1382
1383         /* The router has finished forwarding this packet */
1384
1385         if (error != 0)
1386         {
1387                 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1388
1389                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1390                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
1391         }
1392
1393         LASSERT (atomic_read(&krx->krx_refcount) == 1);
1394         kqswnal_rx_decref (krx);
1395 }
1396
1397 void
1398 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1399 {
1400         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1401         LASSERT (!krx->krx_rpc_reply_needed);
1402
1403         krx->krx_state = KRX_POSTED;
1404
1405 #if MULTIRAIL_EKC
1406         if (kqswnal_data.kqn_shuttingdown) {
1407                 /* free EKC rxd on shutdown */
1408                 ep_complete_receive(krx->krx_rxd);
1409         } else {
1410                 /* repost receive */
1411                 ep_requeue_receive(krx->krx_rxd, 
1412                                    kqswnal_rxhandler, krx,
1413                                    &krx->krx_elanbuffer, 0);
1414         }
1415 #else                
1416         if (kqswnal_data.kqn_shuttingdown)
1417                 return;
1418
1419         if (krx->krx_rxd == NULL) {
1420                 /* We had a failed ep_complete_rpc() which nukes the
1421                  * descriptor in "old" EKC */
1422                 int eprc = ep_queue_receive(krx->krx_eprx, 
1423                                             kqswnal_rxhandler, krx,
1424                                             krx->krx_elanbuffer, 
1425                                             krx->krx_npages * PAGE_SIZE, 0);
1426                 LASSERT (eprc == EP_SUCCESS);
1427                 /* We don't handle failure here; it's incredibly rare
1428                  * (never reported?) and only happens with "old" EKC */
1429         } else {
1430                 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1431                                    krx->krx_elanbuffer, 
1432                                    krx->krx_npages * PAGE_SIZE);
1433         }
1434 #endif
1435 }
1436
1437 void
1438 kqswnal_rpc_complete (EP_RXD *rxd)
1439 {
1440         int           status = ep_rxd_status(rxd);
1441         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1442         
1443         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1444                "rxd %p, krx %p, status %d\n", rxd, krx, status);
1445
1446         LASSERT (krx->krx_rxd == rxd);
1447         LASSERT (krx->krx_rpc_reply_needed);
1448         
1449         krx->krx_rpc_reply_needed = 0;
1450         kqswnal_requeue_rx (krx);
1451 }
1452
1453 void
1454 kqswnal_rx_done (kqswnal_rx_t *krx) 
1455 {
1456         int           rc;
1457         EP_STATUSBLK *sblk;
1458
1459         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1460
1461         if (krx->krx_rpc_reply_needed) {
1462                 /* We've not completed the peer's RPC yet... */
1463                 sblk = (krx->krx_rpc_reply_status == 0) ? 
1464                        &kqswnal_data.kqn_rpc_success : 
1465                        &kqswnal_data.kqn_rpc_failed;
1466
1467                 LASSERT (!in_interrupt());
1468 #if MULTIRAIL_EKC
1469                 rc = ep_complete_rpc(krx->krx_rxd, 
1470                                      kqswnal_rpc_complete, krx,
1471                                      sblk, NULL, NULL, 0);
1472                 if (rc == EP_SUCCESS)
1473                         return;
1474 #else
1475                 rc = ep_complete_rpc(krx->krx_rxd, 
1476                                      kqswnal_rpc_complete, krx,
1477                                      sblk, NULL, 0);
1478                 if (rc == EP_SUCCESS)
1479                         return;
1480
1481                 /* "old" EKC destroys rxd on failed completion */
1482                 krx->krx_rxd = NULL;
1483 #endif
1484                 CERROR("can't complete RPC: %d\n", rc);
1485                 krx->krx_rpc_reply_needed = 0;
1486         }
1487
1488         kqswnal_requeue_rx(krx);
1489 }
1490         
1491 void
1492 kqswnal_parse (kqswnal_rx_t *krx)
1493 {
1494         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
1495         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
1496         int             payload_nob;
1497         int             nob;
1498         int             niov;
1499
1500         LASSERT (atomic_read(&krx->krx_refcount) == 1);
1501
1502         if (dest_nid == kqswnal_lib.libnal_ni.ni_pid.nid) { /* It's for me :) */
1503                 /* I ignore parse errors since I'm not consuming a byte
1504                  * stream */
1505                 (void)lib_parse (&kqswnal_lib, hdr, krx);
1506
1507                 /* Drop my ref; any RDMA activity takes an additional ref */
1508                 kqswnal_rx_decref(krx);
1509                 return;
1510         }
1511
1512 #if KQSW_CHECKSUM
1513         LASSERTF (0, "checksums for forwarded packets not implemented\n");
1514 #endif
1515
1516         if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
1517         {
1518                 CERROR("dropping packet from "LPX64" for "LPX64
1519                        ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
1520
1521                 kqswnal_rx_decref (krx);
1522                 return;
1523         }
1524
1525         nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
1526         niov = 0;
1527         if (nob > 0) {
1528                 krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
1529                 krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
1530                 niov = 1;
1531                 nob -= PAGE_SIZE - KQSW_HDR_SIZE;
1532                 
1533                 while (nob > 0) {
1534                         LASSERT (niov < krx->krx_npages);
1535                         
1536                         krx->krx_kiov[niov].kiov_offset = 0;
1537                         krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
1538                         niov++;
1539                         nob -= PAGE_SIZE;
1540                 }
1541         }
1542
1543         kpr_fwd_init (&krx->krx_fwd, dest_nid, 
1544                       hdr, payload_nob, niov, krx->krx_kiov,
1545                       kqswnal_fwd_callback, krx);
1546
1547         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1548 }
1549
1550 /* Receive Interrupt Handler: posts to schedulers */
1551 void 
1552 kqswnal_rxhandler(EP_RXD *rxd)
1553 {
1554         long          flags;
1555         int           nob    = ep_rxd_len (rxd);
1556         int           status = ep_rxd_status (rxd);
1557         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1558
1559         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1560                rxd, krx, nob, status);
1561
1562         LASSERT (krx != NULL);
1563         LASSERT (krx->krx_state = KRX_POSTED);
1564         
1565         krx->krx_state = KRX_PARSE;
1566         krx->krx_rxd = rxd;
1567         krx->krx_nob = nob;
1568 #if MULTIRAIL_EKC
1569         krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
1570 #else
1571         krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
1572 #endif
1573         /* Default to failure if an RPC reply is requested but not handled */
1574         krx->krx_rpc_reply_status = -EPROTO;
1575         atomic_set (&krx->krx_refcount, 1);
1576
1577         /* must receive a whole header to be able to parse */
1578         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1579         {
1580                 /* receives complete with failure when receiver is removed */
1581 #if MULTIRAIL_EKC
1582                 if (status == EP_SHUTDOWN)
1583                         LASSERT (kqswnal_data.kqn_shuttingdown);
1584                 else
1585                         CERROR("receive status failed with status %d nob %d\n",
1586                                ep_rxd_status(rxd), nob);
1587 #else
1588                 if (!kqswnal_data.kqn_shuttingdown)
1589                         CERROR("receive status failed with status %d nob %d\n",
1590                                ep_rxd_status(rxd), nob);
1591 #endif
1592                 kqswnal_rx_decref(krx);
1593                 return;
1594         }
1595
1596         if (!in_interrupt()) {
1597                 kqswnal_parse(krx);
1598                 return;
1599         }
1600
1601         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1602
1603         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1604         wake_up (&kqswnal_data.kqn_sched_waitq);
1605
1606         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1607 }
1608
1609 #if KQSW_CHECKSUM
1610 void
1611 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1612 {
1613         ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1614
1615         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1616                 ", dpid %d, spid %d, type %d\n",
1617                 ishdr ? "Header" : "Payload", krx,
1618                 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
1619                 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
1620                 NTOH__u32(hdr->type));
1621
1622         switch (NTOH__u32 (hdr->type))
1623         {
1624         case PTL_MSG_ACK:
1625                 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1626                        " len %u\n",
1627                        NTOH__u32(hdr->msg.ack.mlength),
1628                        hdr->msg.ack.dst_wmd.handle_cookie,
1629                        hdr->msg.ack.dst_wmd.handle_idx,
1630                        NTOH__u64(hdr->msg.ack.match_bits),
1631                        NTOH__u32(hdr->msg.ack.length));
1632                 break;
1633         case PTL_MSG_PUT:
1634                 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1635                        " len %u off %u data "LPX64"\n",
1636                        NTOH__u32(hdr->msg.put.ptl_index),
1637                        hdr->msg.put.ack_wmd.handle_cookie,
1638                        hdr->msg.put.ack_wmd.handle_idx,
1639                        NTOH__u64(hdr->msg.put.match_bits),
1640                        NTOH__u32(hdr->msg.put.length),
1641                        NTOH__u32(hdr->msg.put.offset),
1642                        hdr->msg.put.hdr_data);
1643                 break;
1644         case PTL_MSG_GET:
1645                 CERROR ("GET: <>\n");
1646                 break;
1647         case PTL_MSG_REPLY:
1648                 CERROR ("REPLY: <>\n");
1649                 break;
1650         default:
1651                 CERROR ("TYPE?: <>\n");
1652         }
1653 }
1654 #endif
1655
1656 static ptl_err_t
1657 kqswnal_recvmsg (lib_nal_t    *nal,
1658                  void         *private,
1659                  lib_msg_t    *libmsg,
1660                  unsigned int  niov,
1661                  struct iovec *iov,
1662                  ptl_kiov_t   *kiov,
1663                  size_t        offset,
1664                  size_t        mlen,
1665                  size_t        rlen)
1666 {
1667         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1668         char         *buffer = page_address(krx->krx_kiov[0].kiov_page);
1669         ptl_hdr_t    *hdr = (ptl_hdr_t *)buffer;
1670         int           page;
1671         char         *page_ptr;
1672         int           page_nob;
1673         char         *iov_ptr;
1674         int           iov_nob;
1675         int           frag;
1676         int           rc;
1677 #if KQSW_CHECKSUM
1678         kqsw_csum_t   senders_csum;
1679         kqsw_csum_t   payload_csum = 0;
1680         kqsw_csum_t   hdr_csum = kqsw_csum(0, hdr, sizeof(*hdr));
1681         size_t        csum_len = mlen;
1682         int           csum_frags = 0;
1683         int           csum_nob = 0;
1684         static atomic_t csum_counter;
1685         int           csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1686
1687         atomic_inc (&csum_counter);
1688
1689         memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1690         if (senders_csum != hdr_csum)
1691                 kqswnal_csum_error (krx, 1);
1692 #endif
1693         /* NB lib_parse() has already flipped *hdr */
1694
1695         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1696
1697         if (krx->krx_rpc_reply_needed &&
1698             hdr->type == PTL_MSG_PUT) {
1699                 /* This must be an optimized PUT */
1700                 rc = kqswnal_rdma (krx, libmsg, PTL_MSG_PUT,
1701                                    niov, iov, kiov, offset, mlen);
1702                 return (rc == 0 ? PTL_OK : PTL_FAIL);
1703         }
1704
1705         /* What was actually received must be >= payload. */
1706         LASSERT (mlen <= rlen);
1707         if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
1708                 CERROR("Bad message size: have %d, need %d + %d\n",
1709                        krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
1710                 return (PTL_FAIL);
1711         }
1712
1713         /* It must be OK to kmap() if required */
1714         LASSERT (kiov == NULL || !in_interrupt ());
1715         /* Either all pages or all vaddrs */
1716         LASSERT (!(kiov != NULL && iov != NULL));
1717
1718         if (mlen != 0) {
1719                 page     = 0;
1720                 page_ptr = buffer + KQSW_HDR_SIZE;
1721                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1722
1723                 LASSERT (niov > 0);
1724
1725                 if (kiov != NULL) {
1726                         /* skip complete frags */
1727                         while (offset >= kiov->kiov_len) {
1728                                 offset -= kiov->kiov_len;
1729                                 kiov++;
1730                                 niov--;
1731                                 LASSERT (niov > 0);
1732                         }
1733                         iov_ptr = ((char *)kmap (kiov->kiov_page)) +
1734                                 kiov->kiov_offset + offset;
1735                         iov_nob = kiov->kiov_len - offset;
1736                 } else {
1737                         /* skip complete frags */
1738                         while (offset >= iov->iov_len) {
1739                                 offset -= iov->iov_len;
1740                                 iov++;
1741                                 niov--;
1742                                 LASSERT (niov > 0);
1743                         }
1744                         iov_ptr = iov->iov_base + offset;
1745                         iov_nob = iov->iov_len - offset;
1746                 }
1747                 
1748                 for (;;)
1749                 {
1750                         frag = mlen;
1751                         if (frag > page_nob)
1752                                 frag = page_nob;
1753                         if (frag > iov_nob)
1754                                 frag = iov_nob;
1755
1756                         memcpy (iov_ptr, page_ptr, frag);
1757 #if KQSW_CHECKSUM
1758                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1759                         csum_nob += frag;
1760                         csum_frags++;
1761 #endif
1762                         mlen -= frag;
1763                         if (mlen == 0)
1764                                 break;
1765
1766                         page_nob -= frag;
1767                         if (page_nob != 0)
1768                                 page_ptr += frag;
1769                         else
1770                         {
1771                                 page++;
1772                                 LASSERT (page < krx->krx_npages);
1773                                 page_ptr = page_address(krx->krx_kiov[page].kiov_page);
1774                                 page_nob = PAGE_SIZE;
1775                         }
1776
1777                         iov_nob -= frag;
1778                         if (iov_nob != 0)
1779                                 iov_ptr += frag;
1780                         else if (kiov != NULL) {
1781                                 kunmap (kiov->kiov_page);
1782                                 kiov++;
1783                                 niov--;
1784                                 LASSERT (niov > 0);
1785                                 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1786                                 iov_nob = kiov->kiov_len;
1787                         } else {
1788                                 iov++;
1789                                 niov--;
1790                                 LASSERT (niov > 0);
1791                                 iov_ptr = iov->iov_base;
1792                                 iov_nob = iov->iov_len;
1793                         }
1794                 }
1795
1796                 if (kiov != NULL)
1797                         kunmap (kiov->kiov_page);
1798         }
1799
1800 #if KQSW_CHECKSUM
1801         memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), 
1802                 sizeof(kqsw_csum_t));
1803
1804         if (csum_len != rlen)
1805                 CERROR("Unable to checksum data in user's buffer\n");
1806         else if (senders_csum != payload_csum)
1807                 kqswnal_csum_error (krx, 0);
1808
1809         if (csum_verbose)
1810                 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1811                        "csum_nob %d\n",
1812                         hdr_csum, payload_csum, csum_frags, csum_nob);
1813 #endif
1814         lib_finalize(nal, private, libmsg, PTL_OK);
1815
1816         return (PTL_OK);
1817 }
1818
1819 static ptl_err_t
1820 kqswnal_recv(lib_nal_t    *nal,
1821              void         *private,
1822              lib_msg_t    *libmsg,
1823              unsigned int  niov,
1824              struct iovec *iov,
1825              size_t        offset,
1826              size_t        mlen,
1827              size_t        rlen)
1828 {
1829         return (kqswnal_recvmsg(nal, private, libmsg, 
1830                                 niov, iov, NULL, 
1831                                 offset, mlen, rlen));
1832 }
1833
1834 static ptl_err_t
1835 kqswnal_recv_pages (lib_nal_t    *nal,
1836                     void         *private,
1837                     lib_msg_t    *libmsg,
1838                     unsigned int  niov,
1839                     ptl_kiov_t   *kiov,
1840                     size_t        offset,
1841                     size_t        mlen,
1842                     size_t        rlen)
1843 {
1844         return (kqswnal_recvmsg(nal, private, libmsg, 
1845                                 niov, NULL, kiov, 
1846                                 offset, mlen, rlen));
1847 }
1848
1849 int
1850 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1851 {
1852         long    pid = kernel_thread (fn, arg, 0);
1853
1854         if (pid < 0)
1855                 return ((int)pid);
1856
1857         atomic_inc (&kqswnal_data.kqn_nthreads);
1858         return (0);
1859 }
1860
1861 void
1862 kqswnal_thread_fini (void)
1863 {
1864         atomic_dec (&kqswnal_data.kqn_nthreads);
1865 }
1866
1867 int
1868 kqswnal_scheduler (void *arg)
1869 {
1870         kqswnal_rx_t    *krx;
1871         kqswnal_tx_t    *ktx;
1872         kpr_fwd_desc_t  *fwd;
1873         long             flags;
1874         int              rc;
1875         int              counter = 0;
1876         int              did_something;
1877
1878         kportal_daemonize ("kqswnal_sched");
1879         kportal_blockallsigs ();
1880         
1881         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1882
1883         for (;;)
1884         {
1885                 did_something = 0;
1886
1887                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1888                 {
1889                         krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1890                                          kqswnal_rx_t, krx_list);
1891                         list_del (&krx->krx_list);
1892                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1893                                                flags);
1894
1895                         switch (krx->krx_state) {
1896                         case KRX_PARSE:
1897                                 kqswnal_parse (krx);
1898                                 break;
1899                         case KRX_COMPLETING:
1900                                 /* Drop last ref to reply to RPC and requeue */
1901                                 LASSERT (krx->krx_rpc_reply_needed);
1902                                 kqswnal_rx_decref (krx);
1903                                 break;
1904                         default:
1905                                 LBUG();
1906                         }
1907
1908                         did_something = 1;
1909                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1910                 }
1911
1912                 if (!list_empty (&kqswnal_data.kqn_delayedtxds))
1913                 {
1914                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1915                                          kqswnal_tx_t, ktx_list);
1916                         list_del_init (&ktx->ktx_delayed_list);
1917                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1918                                                flags);
1919
1920                         rc = kqswnal_launch (ktx);
1921                         if (rc != 0) {
1922                                 CERROR("Failed delayed transmit to "LPX64
1923                                        ": %d\n", ktx->ktx_nid, rc);
1924                                 kqswnal_tx_done (ktx, rc);
1925                         }
1926                         atomic_dec (&kqswnal_data.kqn_pending_txs);
1927
1928                         did_something = 1;
1929                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1930                 }
1931
1932                 if (!list_empty (&kqswnal_data.kqn_delayedfwds))
1933                 {
1934                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1935                         list_del (&fwd->kprfd_list);
1936                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1937
1938                         /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */
1939                         kqswnal_fwd_packet (NULL, fwd);
1940
1941                         did_something = 1;
1942                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1943                 }
1944
1945                 /* nothing to do or hogging CPU */
1946                 if (!did_something || counter++ == KQSW_RESCHED) {
1947                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1948                                                flags);
1949
1950                         counter = 0;
1951
1952                         if (!did_something) {
1953                                 if (kqswnal_data.kqn_shuttingdown == 2) {
1954                                         /* We only exit in stage 2 of shutdown when 
1955                                          * there's nothing left to do */
1956                                         break;
1957                                 }
1958                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1959                                                                kqswnal_data.kqn_shuttingdown == 2 ||
1960                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
1961                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1962                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
1963                                 LASSERT (rc == 0);
1964                         } else if (need_resched())
1965                                 schedule ();
1966
1967                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1968                 }
1969         }
1970
1971         kqswnal_thread_fini ();
1972         return (0);
1973 }
1974
1975 lib_nal_t kqswnal_lib =
1976 {
1977         libnal_data:       &kqswnal_data,         /* NAL private data */
1978         libnal_send:        kqswnal_send,
1979         libnal_send_pages:  kqswnal_send_pages,
1980         libnal_recv:        kqswnal_recv,
1981         libnal_recv_pages:  kqswnal_recv_pages,
1982         libnal_dist:        kqswnal_dist
1983 };