Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8  * W. Marcus Miller - Based on ksocknal
9  *
10  * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  * Portals is free software; you can redistribute it and/or
13  * modify it under the terms of version 2 of the GNU General Public
14  * License as published by the Free Software Foundation.
15  *
16  * Portals is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with Portals; if not, write to the Free Software
23  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  */
26
27 #include "qswnal.h"
28
29 /*
30  *  LIB functions follow
31  *
32  */
33 static int
34 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
35              size_t len)
36 {
37         CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
38                 nal->ni.nid, len, src_addr, dst_addr );
39         memcpy( dst_addr, src_addr, len );
40
41         return (0);
42 }
43
44 static int
45 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
46               size_t len)
47 {
48         CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
49                 nal->ni.nid, len, src_addr, dst_addr );
50         memcpy( dst_addr, src_addr, len );
51
52         return (0);
53 }
54
55 static void *
56 kqswnal_malloc(nal_cb_t *nal, size_t len)
57 {
58         void *buf;
59
60         PORTAL_ALLOC(buf, len);
61         return (buf);
62 }
63
64 static void
65 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
66 {
67         PORTAL_FREE(buf, len);
68 }
69
70 static void
71 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
72 {
73         va_list ap;
74         char msg[256];
75
76         va_start (ap, fmt);
77         vsnprintf (msg, sizeof (msg), fmt, ap);        /* sprint safely */
78         va_end (ap);
79
80         msg[sizeof (msg) - 1] = 0;                /* ensure terminated */
81
82         CDEBUG (D_NET, "%s", msg);
83 }
84
85
86 static void
87 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
88 {
89         kqswnal_data_t *data= nal->nal_data;
90
91         spin_lock_irqsave(&data->kqn_statelock, *flags);
92 }
93
94
95 static void
96 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
97 {
98         kqswnal_data_t *data= nal->nal_data;
99
100         spin_unlock_irqrestore(&data->kqn_statelock, *flags);
101 }
102
103
104 static int
105 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
106 {
107         if (nid == nal->ni.nid)
108                 *dist = 0;                      /* it's me */
109         else if (kqswnal_nid2elanid (nid) >= 0)
110                 *dist = 1;                      /* it's my peer */
111         else
112                 *dist = 2;                      /* via router */
113         return (0);
114 }
115
116 void
117 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
118 {
119         struct timeval     now;
120         time_t             then;
121
122         do_gettimeofday (&now);
123         then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
124
125         kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
126 }
127
128 void
129 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
130 {
131         if (ktx->ktx_nmappedpages == 0)
132                 return;
133
134         CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
135                 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
136
137         LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
138         LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
139                  kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
140
141         elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
142                           kqswnal_data.kqn_eptxdmahandle,
143                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
144         ktx->ktx_nmappedpages = 0;
145 }
146
147 int
148 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
149 {
150         int       nfrags    = ktx->ktx_nfrag;
151         int       nmapped   = ktx->ktx_nmappedpages;
152         int       maxmapped = ktx->ktx_npages;
153         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
154         char     *ptr;
155         
156         LASSERT (nmapped <= maxmapped);
157         LASSERT (nfrags <= EP_MAXFRAG);
158         LASSERT (niov > 0);
159         LASSERT (nob > 0);
160         
161         do {
162                 int  fraglen = kiov->kiov_len;
163
164                 /* nob exactly spans the iovs */
165                 LASSERT (fraglen <= nob);
166                 /* each frag fits in a page */
167                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
168
169                 nmapped++;
170                 if (nmapped > maxmapped) {
171                         CERROR("Can't map message in %d pages (max %d)\n",
172                                nmapped, maxmapped);
173                         return (-EMSGSIZE);
174                 }
175
176                 if (nfrags == EP_MAXFRAG) {
177                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
178                                EP_MAXFRAG);
179                         return (-EMSGSIZE);
180                 }
181
182                 /* XXX this is really crap, but we'll have to kmap until
183                  * EKC has a page (rather than vaddr) mapping interface */
184
185                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
186
187                 CDEBUG(D_NET,
188                        "%p[%d] loading %p for %d, page %d, %d total\n",
189                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
190
191                 elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
192                                        kqswnal_data.kqn_eptxdmahandle,
193                                        ptr, fraglen,
194                                        basepage, &ktx->ktx_frags.iov[nfrags].Base);
195
196                 kunmap (kiov->kiov_page);
197                 
198                 /* keep in loop for failure case */
199                 ktx->ktx_nmappedpages = nmapped;
200
201                 if (nfrags > 0 &&                /* previous frag mapped */
202                     ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
203                     (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
204                         /* just extend previous */
205                         ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
206                 else {
207                         ktx->ktx_frags.iov[nfrags].Len = fraglen;
208                         nfrags++;                /* new frag */
209                 }
210
211                 basepage++;
212                 kiov++;
213                 niov--;
214                 nob -= fraglen;
215
216                 /* iov must not run out before end of data */
217                 LASSERT (nob == 0 || niov > 0);
218
219         } while (nob > 0);
220
221         ktx->ktx_nfrag = nfrags;
222         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
223                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
224
225         return (0);
226 }
227
228 int
229 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
230 {
231         int       nfrags    = ktx->ktx_nfrag;
232         int       nmapped   = ktx->ktx_nmappedpages;
233         int       maxmapped = ktx->ktx_npages;
234         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
235
236         LASSERT (nmapped <= maxmapped);
237         LASSERT (nfrags <= EP_MAXFRAG);
238         LASSERT (niov > 0);
239         LASSERT (nob > 0);
240
241         do {
242                 int  fraglen = iov->iov_len;
243                 long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
244
245                 /* nob exactly spans the iovs */
246                 LASSERT (fraglen <= nob);
247                 
248                 nmapped += npages;
249                 if (nmapped > maxmapped) {
250                         CERROR("Can't map message in %d pages (max %d)\n",
251                                nmapped, maxmapped);
252                         return (-EMSGSIZE);
253                 }
254
255                 if (nfrags == EP_MAXFRAG) {
256                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
257                                EP_MAXFRAG);
258                         return (-EMSGSIZE);
259                 }
260
261                 CDEBUG(D_NET,
262                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
263                         ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
264                         nmapped);
265
266                 elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
267                                        kqswnal_data.kqn_eptxdmahandle,
268                                        iov->iov_base, fraglen,
269                                        basepage, &ktx->ktx_frags.iov[nfrags].Base);
270                 /* keep in loop for failure case */
271                 ktx->ktx_nmappedpages = nmapped;
272
273                 if (nfrags > 0 &&                /* previous frag mapped */
274                     ktx->ktx_frags.iov[nfrags].Base == /* contiguous with this one */
275                     (ktx->ktx_frags.iov[nfrags-1].Base + ktx->ktx_frags.iov[nfrags-1].Len))
276                         /* just extend previous */
277                         ktx->ktx_frags.iov[nfrags - 1].Len += fraglen;
278                 else {
279                         ktx->ktx_frags.iov[nfrags].Len = fraglen;
280                         nfrags++;                /* new frag */
281                 }
282
283                 basepage += npages;
284                 iov++;
285                 niov--;
286                 nob -= fraglen;
287
288                 /* iov must not run out before end of data */
289                 LASSERT (nob == 0 || niov > 0);
290
291         } while (nob > 0);
292
293         ktx->ktx_nfrag = nfrags;
294         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
295                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
296
297         return (0);
298 }
299
300
301 void
302 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
303 {
304         kpr_fwd_desc_t   *fwd = NULL;
305         unsigned long     flags;
306
307         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
308         ktx->ktx_state = KTX_IDLE;
309
310         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
311
312         list_del (&ktx->ktx_list);              /* take off active list */
313
314         if (ktx->ktx_isnblk) {
315                 /* reserved for non-blocking tx */
316                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
317                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
318                 return;
319         }
320
321         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
322
323         /* anything blocking for a tx descriptor? */
324         if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
325         {
326                 CDEBUG(D_NET,"wakeup fwd\n");
327
328                 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
329                                   kpr_fwd_desc_t, kprfd_list);
330                 list_del (&fwd->kprfd_list);
331         }
332
333         wake_up (&kqswnal_data.kqn_idletxd_waitq);
334
335         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
336
337         if (fwd == NULL)
338                 return;
339
340         /* schedule packet for forwarding again */
341         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
342
343         list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
344         wake_up (&kqswnal_data.kqn_sched_waitq);
345
346         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
347 }
348
349 kqswnal_tx_t *
350 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
351 {
352         unsigned long  flags;
353         kqswnal_tx_t  *ktx = NULL;
354
355         for (;;) {
356                 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
357
358                 /* "normal" descriptor is free */
359                 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
360                         ktx = list_entry (kqswnal_data.kqn_idletxds.next,
361                                           kqswnal_tx_t, ktx_list);
362                         break;
363                 }
364
365                 /* "normal" descriptor pool is empty */
366
367                 if (fwd != NULL) { /* forwarded packet => queue for idle txd */
368                         CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
369                         list_add_tail (&fwd->kprfd_list,
370                                        &kqswnal_data.kqn_idletxd_fwdq);
371                         break;
372                 }
373
374                 /* doing a local transmit */
375                 if (!may_block) {
376                         if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
377                                 CERROR ("intr tx desc pool exhausted\n");
378                                 break;
379                         }
380
381                         ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
382                                           kqswnal_tx_t, ktx_list);
383                         break;
384                 }
385
386                 /* block for idle tx */
387
388                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
389
390                 CDEBUG (D_NET, "blocking for tx desc\n");
391                 wait_event (kqswnal_data.kqn_idletxd_waitq,
392                             !list_empty (&kqswnal_data.kqn_idletxds));
393         }
394
395         if (ktx != NULL) {
396                 list_del (&ktx->ktx_list);
397                 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
398                 ktx->ktx_launcher = current->pid;
399         }
400
401         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
402
403         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
404         LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
405
406         return (ktx);
407 }
408
409 void
410 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
411 {
412         lib_msg_t     *msg;
413         lib_msg_t     *repmsg;
414
415         switch (ktx->ktx_state) {
416         case KTX_FORWARDING:       /* router asked me to forward this packet */
417                 kpr_fwd_done (&kqswnal_data.kqn_router,
418                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
419                 break;
420
421         case KTX_SENDING:          /* packet sourced locally */
422                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
423                               (lib_msg_t *)ktx->ktx_args[1]);
424                 break;
425
426         case KTX_GETTING:          /* Peer has DMA-ed direct? */
427                 LASSERT (KQSW_OPTIMIZE_GETS);
428                 msg = (lib_msg_t *)ktx->ktx_args[1];
429                 repmsg = NULL;
430
431                 if (error == 0) 
432                         repmsg = lib_fake_reply_msg (&kqswnal_lib, 
433                                                      ktx->ktx_nid, msg->md);
434                 
435                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg);
436
437                 if (repmsg != NULL) 
438                         lib_finalize (&kqswnal_lib, NULL, repmsg);
439                 break;
440
441         default:
442                 LASSERT (0);
443         }
444
445         kqswnal_put_idle_tx (ktx);
446 }
447
448 static void
449 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
450 {
451         kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
452
453         LASSERT (txd != NULL);
454         LASSERT (ktx != NULL);
455
456         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
457
458         if (status != EP_SUCCESS)
459         {
460                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
461                         ktx->ktx_nid, status);
462
463                 kqswnal_notify_peer_down(ktx);
464                 status = -EIO;
465
466         } else if (ktx->ktx_state == KTX_GETTING) {
467                 /* RPC completed OK; what did our peer put in the status
468                  * block? */
469                 LASSERT (KQSW_OPTIMIZE_GETS);
470                 status = ep_txd_statusblk(txd)->Status;
471         } else {
472                 status = 0;
473         }
474
475         kqswnal_tx_done (ktx, status);
476 }
477
478 int
479 kqswnal_launch (kqswnal_tx_t *ktx)
480 {
481         /* Don't block for transmit descriptor if we're in interrupt context */
482         int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
483         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
484         long  flags;
485         int   rc;
486
487         ktx->ktx_launchtime = jiffies;
488
489         LASSERT (dest >= 0);                    /* must be a peer */
490         if (ktx->ktx_state == KTX_GETTING) {
491                 LASSERT (KQSW_OPTIMIZE_GETS);
492                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
493                                      ktx->ktx_port, attr, kqswnal_txhandler,
494                                      ktx, NULL, ktx->ktx_frags.iov, ktx->ktx_nfrag);
495         } else {
496                 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
497                                        ktx->ktx_port, attr, kqswnal_txhandler,
498                                        ktx, ktx->ktx_frags.iov, ktx->ktx_nfrag);
499         }
500
501         switch (rc) {
502         case ESUCCESS: /* success */
503                 return (0);
504
505         case ENOMEM: /* can't allocate ep txd => queue for later */
506                 LASSERT (in_interrupt());
507
508                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
509
510                 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
511                 wake_up (&kqswnal_data.kqn_sched_waitq);
512
513                 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
514                 return (0);
515
516         default: /* fatal error */
517                 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
518                 kqswnal_notify_peer_down(ktx);
519                 return (rc);
520         }
521 }
522
523 static char *
524 hdr_type_string (ptl_hdr_t *hdr)
525 {
526         switch (hdr->type) {
527         case PTL_MSG_ACK:
528                 return ("ACK");
529         case PTL_MSG_PUT:
530                 return ("PUT");
531         case PTL_MSG_GET:
532                 return ("GET");
533         case PTL_MSG_REPLY:
534                 return ("REPLY");
535         default:
536                 return ("<UNKNOWN>");
537         }
538 }
539
540 static void
541 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
542 {
543         char *type_str = hdr_type_string (hdr);
544
545         CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
546                NTOH__u32(hdr->payload_length));
547         CERROR("    From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
548                NTOH__u32(hdr->src_pid));
549         CERROR("    To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
550                NTOH__u32(hdr->dest_pid));
551
552         switch (NTOH__u32(hdr->type)) {
553         case PTL_MSG_PUT:
554                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
555                        "match bits "LPX64"\n",
556                        NTOH__u32 (hdr->msg.put.ptl_index),
557                        hdr->msg.put.ack_wmd.wh_interface_cookie,
558                        hdr->msg.put.ack_wmd.wh_object_cookie,
559                        NTOH__u64 (hdr->msg.put.match_bits));
560                 CERROR("    offset %d, hdr data "LPX64"\n",
561                        NTOH__u32(hdr->msg.put.offset),
562                        hdr->msg.put.hdr_data);
563                 break;
564
565         case PTL_MSG_GET:
566                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
567                        "match bits "LPX64"\n",
568                        NTOH__u32 (hdr->msg.get.ptl_index),
569                        hdr->msg.get.return_wmd.wh_interface_cookie,
570                        hdr->msg.get.return_wmd.wh_object_cookie,
571                        hdr->msg.get.match_bits);
572                 CERROR("    Length %d, src offset %d\n",
573                        NTOH__u32 (hdr->msg.get.sink_length),
574                        NTOH__u32 (hdr->msg.get.src_offset));
575                 break;
576
577         case PTL_MSG_ACK:
578                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
579                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
580                        hdr->msg.ack.dst_wmd.wh_object_cookie,
581                        NTOH__u32 (hdr->msg.ack.mlength));
582                 break;
583
584         case PTL_MSG_REPLY:
585                 CERROR("    dst md "LPX64"."LPX64"\n",
586                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
587                        hdr->msg.reply.dst_wmd.wh_object_cookie);
588         }
589
590 }                               /* end of print_hdr() */
591
592 void
593 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov) 
594 {
595         int          i;
596
597         CDEBUG (how, "%s: %d\n", str, n);
598         for (i = 0; i < n; i++) {
599                 CDEBUG (how, "   %08x for %d\n", iov[i].Base, iov[i].Len);
600         }
601 }
602
603 int
604 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
605                      int nsrc, EP_IOVEC *src,
606                      int ndst, EP_IOVEC *dst) 
607 {
608         int        count;
609         int        nob;
610
611         LASSERT (ndv > 0);
612         LASSERT (nsrc > 0);
613         LASSERT (ndst > 0);
614
615         for (count = 0; count < ndv; count++, dv++) {
616
617                 if (nsrc == 0 || ndst == 0) {
618                         if (nsrc != ndst) {
619                                 /* For now I'll barf on any left over entries */
620                                 CERROR ("mismatched src and dst iovs\n");
621                                 return (-EINVAL);
622                         }
623                         return (count);
624                 }
625
626                 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
627                 dv->Len    = nob;
628                 dv->Source = src->Base;
629                 dv->Dest   = dst->Base;
630
631                 if (nob >= src->Len) {
632                         src++;
633                         nsrc--;
634                 } else {
635                         src->Len -= nob;
636                         src->Base += nob;
637                 }
638                 
639                 if (nob >= dst->Len) {
640                         dst++;
641                         ndst--;
642                 } else {
643                         src->Len -= nob;
644                         src->Base += nob;
645                 }
646         }
647
648         CERROR ("DATAVEC too small\n");
649         return (-E2BIG);
650 }
651
652 int
653 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
654                    struct iovec *iov, ptl_kiov_t *kiov, int nob)
655 {
656         kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
657         char               *buffer = (char *)page_address(krx->krx_pages[0]);
658         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
659         EP_IOVEC            eiov[EP_MAXFRAG];
660         EP_STATUSBLK        blk;
661         int                 rc;
662
663         LASSERT (ep_rxd_isrpc(krx->krx_rxd) && !krx->krx_rpc_completed);
664         LASSERT ((iov == NULL) != (kiov == NULL));
665
666         /* see .*_pack_k?iov comment regarding endian-ness */
667         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
668                 /* msg too small to discover rmd size */
669                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
670                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
671                 return (-EINVAL);
672         }
673         
674         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) {
675                 /* rmd doesn't fit in the incoming message */
676                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
677                         krx->krx_nob, rmd->kqrmd_neiov,
678                         (int)(((char *)&rmd->kqrmd_eiov[rmd->kqrmd_neiov]) - buffer));
679                 return (-EINVAL);
680         }
681
682         /* Ghastly hack part 1, uses the existing procedures to map the source data... */
683         ktx->ktx_nfrag = 0;
684         if (kiov != NULL)
685                 rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov);
686         else
687                 rc = kqswnal_map_tx_iov (ktx, nob, nfrag, iov);
688
689         if (rc != 0) {
690                 CERROR ("Can't map source data: %d\n", rc);
691                 return (rc);
692         }
693
694         /* Ghastly hack part 2, copy out eiov so we can create the datav; Ugghh... */
695         memcpy (eiov, ktx->ktx_frags.iov, ktx->ktx_nfrag * sizeof (eiov[0]));
696
697         rc = kqswnal_eiovs2datav (EP_MAXFRAG, ktx->ktx_frags.datav,
698                                   ktx->ktx_nfrag, eiov,
699                                   rmd->kqrmd_neiov, rmd->kqrmd_eiov);
700         if (rc < 0) {
701                 CERROR ("Can't create datavec: %d\n", rc);
702                 return (rc);
703         }
704         ktx->ktx_nfrag = rc;
705
706         memset (&blk, 0, sizeof (blk));         /* zero blk.Status */
707
708         /* Our caller will start to race with kqswnal_rpc_complete... */
709         LASSERT (atomic_read (&krx->krx_refcount) == 1);
710         atomic_set (&krx->krx_refcount, 2);
711
712         rc = ep_complete_rpc (krx->krx_rxd, kqswnal_reply_complete, ktx,
713                               &blk, ktx->ktx_frags.datav, ktx->ktx_nfrag);
714         if (rc == ESUCCESS)
715                 return (0);
716
717         /* reset refcount back to 1: we're not going to be racing with
718          * kqswnal_rely_complete. */
719         atomic_set (&krx->krx_refcount, 1);
720         return (-ECONNABORTED);
721 }
722
723 static int
724 kqswnal_sendmsg (nal_cb_t     *nal,
725                  void         *private,
726                  lib_msg_t    *libmsg,
727                  ptl_hdr_t    *hdr,
728                  int           type,
729                  ptl_nid_t     nid,
730                  ptl_pid_t     pid,
731                  unsigned int  payload_niov,
732                  struct iovec *payload_iov,
733                  ptl_kiov_t   *payload_kiov,
734                  size_t        payload_nob)
735 {
736         kqswnal_tx_t      *ktx;
737         int                rc;
738         ptl_nid_t          targetnid;
739 #if KQSW_CHECKSUM
740         int                i;
741         kqsw_csum_t        csum;
742         int                sumnob;
743 #endif
744         
745         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
746                " pid %u\n", payload_nob, payload_niov, nid, pid);
747
748         LASSERT (payload_nob == 0 || payload_niov > 0);
749         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
750
751         /* It must be OK to kmap() if required */
752         LASSERT (payload_kiov == NULL || !in_interrupt ());
753         /* payload is either all vaddrs or all pages */
754         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
755         
756         if (payload_nob > KQSW_MAXPAYLOAD) {
757                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
758                         payload_nob, KQSW_MAXPAYLOAD);
759                 return (PTL_FAIL);
760         }
761
762         targetnid = nid;
763         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
764                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
765                                  sizeof (ptl_hdr_t) + payload_nob, &targetnid);
766                 if (rc != 0) {
767                         CERROR("Can't route to "LPX64": router error %d\n",
768                                nid, rc);
769                         return (PTL_FAIL);
770                 }
771                 if (kqswnal_nid2elanid (targetnid) < 0) {
772                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
773                                targetnid, nid);
774                         return (PTL_FAIL);
775                 }
776         }
777
778         /* I may not block for a transmit descriptor if I might block the
779          * receiver, or an interrupt handler. */
780         ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
781                                           type == PTL_MSG_REPLY ||
782                                           in_interrupt()));
783         if (ktx == NULL) {
784                 kqswnal_cerror_hdr (hdr);
785                 return (PTL_NOSPACE);
786         }
787
788         ktx->ktx_args[0] = private;
789         ktx->ktx_args[1] = libmsg;
790
791 #if KQSW_OPTIMIZE_GETS
792         if (type == PTL_MSG_REPLY &&
793             ep_rxd_isrpc(((kqswnal_rx_t *)private)->krx_rxd)) {
794                 if (nid != targetnid ||
795                     kqswnal_nid2elanid(nid) != 
796                     ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
797                         CERROR("Optimized reply nid conflict: "
798                                "nid "LPX64" via "LPX64" elanID %d\n",
799                                nid, targetnid,
800                                ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
801                         return(PTL_FAIL);
802                 }
803
804                 /* peer expects RPC completion with GET data */
805                 rc = kqswnal_dma_reply (ktx,
806                                         payload_niov, payload_iov, 
807                                         payload_kiov, payload_nob);
808                 if (rc == 0)
809                         return (0);
810                 
811                 CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
812                 kqswnal_put_idle_tx (ktx);
813                 return (PTL_FAIL);
814         }
815 #endif
816
817         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
818         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
819
820 #if KQSW_CHECKSUM
821         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
822         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
823         for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) {
824                 if (payload_kiov != NULL) {
825                         ptl_kiov_t *kiov = &payload_kiov[i];
826                         char       *addr = ((char *)kmap (kiov->kiov_page)) +
827                                            kiov->kiov_offset;
828                         
829                         csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len));
830                         sumnob -= kiov->kiov_len;
831                 } else {
832                         struct iovec *iov = &payload_iov[i];
833
834                         csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len));
835                         sumnob -= iov->iov_len;
836                 }
837         }
838         memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
839 #endif
840         
841         /* Set up first frag from pre-mapped buffer (it's at least the
842          * portals header) */
843         ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer;
844         ktx->ktx_frags.iov[0].Len = KQSW_HDR_SIZE;
845         ktx->ktx_nfrag = 1;
846         ktx->ktx_state = KTX_SENDING;   /* => lib_finalize() on completion */
847
848 #if KQSW_OPTIMIZE_GETS
849         if (type == PTL_MSG_GET &&              /* doing a GET */
850             nid == targetnid) {                 /* not forwarding */
851                 lib_md_t           *md = libmsg->md;
852                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
853                 
854                 /* Optimised path: I send over the Elan vaddrs of the get
855                  * sink buffers, and my peer DMAs directly into them.
856                  *
857                  * First I set up ktx as if it was going to send this
858                  * payload, (it needs to map it anyway).  This fills
859                  * ktx_frags.iov[1] and onward with the network addresses
860                  * of the get sink frags.  I copy these into ktx_buffer,
861                  * immediately after the header, and send that as my GET
862                  * message.
863                  *
864                  * Note that the addresses are sent in native endian-ness.
865                  * When EKC copes with different endian nodes, I'll fix
866                  * this (and eat my hat :) */
867
868                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
869                         rc = kqswnal_map_tx_kiov (ktx, md->length,
870                                                   md->md_niov, md->md_iov.kiov);
871                 else
872                         rc = kqswnal_map_tx_iov (ktx, md->length,
873                                                  md->md_niov, md->md_iov.iov);
874
875                 if (rc < 0) {
876                         kqswnal_put_idle_tx (ktx);
877                         return (PTL_FAIL);
878                 }
879
880                 rmd->kqrmd_neiov = ktx->ktx_nfrag - 1;
881                 memcpy (&rmd->kqrmd_eiov[0], &ktx->ktx_frags.iov[1],
882                         rmd->kqrmd_neiov * sizeof (EP_IOVEC));
883
884                 ktx->ktx_nfrag = 1;
885                 ktx->ktx_frags.iov[0].Len += offsetof (kqswnal_remotemd_t,
886                                                        kqrmd_eiov[rmd->kqrmd_neiov]);
887                 payload_nob = ktx->ktx_frags.iov[0].Len;
888                 ktx->ktx_state = KTX_GETTING;
889         } else 
890 #endif
891         if (payload_nob > 0) { /* got some payload (something more to do) */
892                 /* make a single contiguous message? */
893                 if (payload_nob <= KQSW_TX_MAXCONTIG) {
894                         /* copy payload to ktx_buffer, immediately after hdr */
895                         if (payload_kiov != NULL)
896                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
897                                                    payload_niov, payload_kiov, payload_nob);
898                         else
899                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
900                                                   payload_niov, payload_iov, payload_nob);
901                         /* first frag includes payload */
902                         ktx->ktx_frags.iov[0].Len += payload_nob;
903                 } else {
904                         if (payload_kiov != NULL)
905                                 rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
906                                                           payload_niov, payload_kiov);
907                         else
908                                 rc = kqswnal_map_tx_iov (ktx, payload_nob,
909                                                          payload_niov, payload_iov);
910                         if (rc != 0) {
911                                 kqswnal_put_idle_tx (ktx);
912                                 return (PTL_FAIL);
913                         }
914                 } 
915         }
916
917         ktx->ktx_nid  = targetnid;
918         ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
919                         EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
920
921         rc = kqswnal_launch (ktx);
922         if (rc != 0) {                    /* failed? */
923                 CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
924                 kqswnal_put_idle_tx (ktx);
925                 return (PTL_FAIL);
926         }
927
928         CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n", 
929                payload_nob, nid, targetnid);
930         return (PTL_OK);
931 }
932
933 static int
934 kqswnal_send (nal_cb_t     *nal,
935               void         *private,
936               lib_msg_t    *libmsg,
937               ptl_hdr_t    *hdr,
938               int           type,
939               ptl_nid_t     nid,
940               ptl_pid_t     pid,
941               unsigned int  payload_niov,
942               struct iovec *payload_iov,
943               size_t        payload_nob)
944 {
945         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
946                                  payload_niov, payload_iov, NULL, payload_nob));
947 }
948
949 static int
950 kqswnal_send_pages (nal_cb_t     *nal,
951                     void         *private,
952                     lib_msg_t    *libmsg,
953                     ptl_hdr_t    *hdr,
954                     int           type,
955                     ptl_nid_t     nid,
956                     ptl_pid_t     pid,
957                     unsigned int  payload_niov,
958                     ptl_kiov_t   *payload_kiov,
959                     size_t        payload_nob)
960 {
961         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
962                                  payload_niov, NULL, payload_kiov, payload_nob));
963 }
964
965 int kqswnal_fwd_copy_contig = 0;
966
967 void
968 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
969 {
970         int             rc;
971         kqswnal_tx_t   *ktx;
972         struct iovec   *iov = fwd->kprfd_iov;
973         int             niov = fwd->kprfd_niov;
974         int             nob = fwd->kprfd_nob;
975         ptl_nid_t       nid = fwd->kprfd_gateway_nid;
976
977 #if KQSW_CHECKSUM
978         CERROR ("checksums for forwarded packets not implemented\n");
979         LBUG ();
980 #endif
981         /* The router wants this NAL to forward a packet */
982         CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
983                 fwd, nid, niov, nob);
984
985         LASSERT (niov > 0);
986         
987         ktx = kqswnal_get_idle_tx (fwd, FALSE);
988         if (ktx == NULL)        /* can't get txd right now */
989                 return;         /* fwd will be scheduled when tx desc freed */
990
991         if (nid == kqswnal_lib.ni.nid)          /* gateway is me */
992                 nid = fwd->kprfd_target_nid;    /* target is final dest */
993
994         if (kqswnal_nid2elanid (nid) < 0) {
995                 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
996                 rc = -EHOSTUNREACH;
997                 goto failed;
998         }
999
1000         if (nob > KQSW_NRXMSGBYTES_LARGE) {
1001                 CERROR ("Can't forward [%p] to "LPX64
1002                         ": size %d bigger than max packet size %ld\n",
1003                         fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
1004                 rc = -EMSGSIZE;
1005                 goto failed;
1006         }
1007
1008         if ((kqswnal_fwd_copy_contig || niov > 1) &&
1009             nob <= KQSW_TX_BUFFER_SIZE) 
1010         {
1011                 /* send from ktx's pre-allocated/mapped contiguous buffer? */
1012                 lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
1013                 ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
1014                 ktx->ktx_frags.iov[0].Len = nob;
1015                 ktx->ktx_nfrag = 1;
1016                 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
1017         }
1018         else
1019         {
1020                 /* zero copy */
1021                 ktx->ktx_nfrag = 0;       /* no frags mapped yet */
1022                 rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
1023                 if (rc != 0)
1024                         goto failed;
1025
1026                 ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
1027         }
1028
1029         ktx->ktx_port    = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
1030                         EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
1031         ktx->ktx_nid     = nid;
1032         ktx->ktx_state   = KTX_FORWARDING; /* kpr_put_packet() on completion */
1033         ktx->ktx_args[0] = fwd;
1034
1035         rc = kqswnal_launch (ktx);
1036         if (rc == 0)
1037                 return;
1038
1039  failed:
1040         LASSERT (rc != 0);
1041         CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1042
1043         kqswnal_put_idle_tx (ktx);
1044         /* complete now (with failure) */
1045         kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
1046 }
1047
1048 void
1049 kqswnal_fwd_callback (void *arg, int error)
1050 {
1051         kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1052
1053         /* The router has finished forwarding this packet */
1054
1055         if (error != 0)
1056         {
1057                 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
1058
1059                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1060                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
1061         }
1062
1063         kqswnal_requeue_rx (krx);
1064 }
1065
1066 void
1067 kqswnal_reply_complete (EP_RXD *rxd) 
1068 {
1069         int           status = ep_rxd_status(rxd);
1070         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
1071         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
1072         lib_msg_t    *msg = (lib_msg_t *)ktx->ktx_args[1];
1073         
1074         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1075                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
1076
1077         LASSERT (krx->krx_rxd == rxd);
1078
1079         krx->krx_rpc_completed = 1;
1080         kqswnal_requeue_rx (krx);
1081
1082         lib_finalize (&kqswnal_lib, NULL, msg);
1083         kqswnal_put_idle_tx (ktx);
1084 }
1085
1086 void
1087 kqswnal_rpc_complete (EP_RXD *rxd)
1088 {
1089         int           status = ep_rxd_status(rxd);
1090         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1091         
1092         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1093                "rxd %p, krx %p, status %d\n", rxd, krx, status);
1094
1095         LASSERT (krx->krx_rxd == rxd);
1096         
1097         krx->krx_rpc_completed = 1;
1098         kqswnal_requeue_rx (krx);
1099 }
1100
1101 void
1102 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1103 {
1104         EP_STATUSBLK  blk;
1105         int           rc;
1106
1107         LASSERT (atomic_read (&krx->krx_refcount) > 0);
1108         if (!atomic_dec_and_test (&krx->krx_refcount))
1109                 return;
1110
1111         if (!ep_rxd_isrpc(krx->krx_rxd) ||
1112             krx->krx_rpc_completed) {
1113
1114                 /* don't actually requeue on shutdown */
1115                 if (kqswnal_data.kqn_shuttingdown)
1116                         return;
1117                 
1118                 ep_requeue_receive (krx->krx_rxd, kqswnal_rxhandler, krx,
1119                                     krx->krx_elanaddr, krx->krx_npages * PAGE_SIZE);
1120                 return;
1121         }
1122
1123         /* Sender wanted an RPC, but we didn't complete it (we must have
1124          * dropped the sender's message).  We complete it now with
1125          * failure... */
1126         memset (&blk, 0, sizeof (blk));
1127         blk.Status = -ECONNREFUSED;
1128
1129         atomic_set (&krx->krx_refcount, 1);
1130
1131         rc = ep_complete_rpc (krx->krx_rxd, 
1132                               kqswnal_rpc_complete, krx,
1133                               &blk, NULL, 0);
1134         if (rc == ESUCCESS) {
1135                 /* callback will call me again to requeue, having set
1136                  * krx_rpc_completed... */
1137                 return;
1138         }
1139
1140         CERROR("can't complete RPC: %d\n", rc);
1141
1142         /* we don't actually requeue on shutdown */
1143         if (kqswnal_data.kqn_shuttingdown)
1144                 return;
1145
1146         /* NB ep_complete_rpc() frees rxd on failure, so we have to requeue
1147          * from scratch here... */
1148         rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
1149                               krx->krx_elanaddr, 
1150                               krx->krx_npages * PAGE_SIZE, 0);
1151
1152         LASSERT (rc == ESUCCESS);
1153         /* This needs to be fixed by ep_complete_rpc NOT freeing
1154          * krx->krx_rxd on failure so we can just ep_requeue_receive() */
1155 }
1156
1157 void
1158 kqswnal_rx (kqswnal_rx_t *krx)
1159 {
1160         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
1161         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
1162         int             nob;
1163         int             niov;
1164
1165         if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
1166                 /* NB krx requeued when lib_parse() calls back kqswnal_recv */
1167                 lib_parse (&kqswnal_lib, hdr, krx);
1168                 return;
1169         }
1170
1171 #if KQSW_CHECKSUM
1172         CERROR ("checksums for forwarded packets not implemented\n");
1173         LBUG ();
1174 #endif
1175         if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
1176         {
1177                 CERROR("dropping packet from "LPX64" for "LPX64
1178                        ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
1179
1180                 kqswnal_requeue_rx (krx);
1181                 return;
1182         }
1183
1184         /* NB forwarding may destroy iov; rebuild every time */
1185         for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
1186         {
1187                 LASSERT (niov < krx->krx_npages);
1188                 krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
1189                 krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
1190         }
1191
1192         kpr_fwd_init (&krx->krx_fwd, dest_nid,
1193                       krx->krx_nob, niov, krx->krx_iov,
1194                       kqswnal_fwd_callback, krx);
1195
1196         kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1197 }
1198
1199 /* Receive Interrupt Handler: posts to schedulers */
1200 void 
1201 kqswnal_rxhandler(EP_RXD *rxd)
1202 {
1203         long          flags;
1204         int           nob    = ep_rxd_len (rxd);
1205         int           status = ep_rxd_status (rxd);
1206         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1207
1208         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1209                rxd, krx, nob, status);
1210
1211         LASSERT (krx != NULL);
1212
1213         krx->krx_rxd = rxd;
1214         krx->krx_nob = nob;
1215         LASSERT (atomic_read (&krx->krx_refcount) == 0);
1216         atomic_set (&krx->krx_refcount, 1);
1217         krx->krx_rpc_completed = 0;
1218         
1219         /* must receive a whole header to be able to parse */
1220         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1221         {
1222                 /* receives complete with failure when receiver is removed */
1223                 if (!kqswnal_data.kqn_shuttingdown)
1224                         CERROR("receive status failed with status %d nob %d\n",
1225                                ep_rxd_status(rxd), nob);
1226
1227                 kqswnal_requeue_rx (krx);
1228                 return;
1229         }
1230
1231         if (!in_interrupt()) {
1232                 kqswnal_rx (krx);
1233                 return;
1234         }
1235
1236         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1237
1238         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1239         wake_up (&kqswnal_data.kqn_sched_waitq);
1240
1241         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1242 }
1243
1244 #if KQSW_CHECKSUM
1245 void
1246 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1247 {
1248         ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
1249
1250         CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1251                 ", dpid %d, spid %d, type %d\n",
1252                 ishdr ? "Header" : "Payload", krx,
1253                 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
1254                 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
1255                 NTOH__u32(hdr->type));
1256
1257         switch (NTOH__u32 (hdr->type))
1258         {
1259         case PTL_MSG_ACK:
1260                 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1261                        " len %u\n",
1262                        NTOH__u32(hdr->msg.ack.mlength),
1263                        hdr->msg.ack.dst_wmd.handle_cookie,
1264                        hdr->msg.ack.dst_wmd.handle_idx,
1265                        NTOH__u64(hdr->msg.ack.match_bits),
1266                        NTOH__u32(hdr->msg.ack.length));
1267                 break;
1268         case PTL_MSG_PUT:
1269                 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1270                        " len %u off %u data "LPX64"\n",
1271                        NTOH__u32(hdr->msg.put.ptl_index),
1272                        hdr->msg.put.ack_wmd.handle_cookie,
1273                        hdr->msg.put.ack_wmd.handle_idx,
1274                        NTOH__u64(hdr->msg.put.match_bits),
1275                        NTOH__u32(hdr->msg.put.length),
1276                        NTOH__u32(hdr->msg.put.offset),
1277                        hdr->msg.put.hdr_data);
1278                 break;
1279         case PTL_MSG_GET:
1280                 CERROR ("GET: <>\n");
1281                 break;
1282         case PTL_MSG_REPLY:
1283                 CERROR ("REPLY: <>\n");
1284                 break;
1285         default:
1286                 CERROR ("TYPE?: <>\n");
1287         }
1288 }
1289 #endif
1290
1291 static int
1292 kqswnal_recvmsg (nal_cb_t     *nal,
1293                  void         *private,
1294                  lib_msg_t    *libmsg,
1295                  unsigned int  niov,
1296                  struct iovec *iov,
1297                  ptl_kiov_t   *kiov,
1298                  size_t        mlen,
1299                  size_t        rlen)
1300 {
1301         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1302         int           page;
1303         char         *page_ptr;
1304         int           page_nob;
1305         char         *iov_ptr;
1306         int           iov_nob;
1307         int           frag;
1308 #if KQSW_CHECKSUM
1309         kqsw_csum_t   senders_csum;
1310         kqsw_csum_t   payload_csum = 0;
1311         kqsw_csum_t   hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
1312                                            sizeof(ptl_hdr_t));
1313         size_t        csum_len = mlen;
1314         int           csum_frags = 0;
1315         int           csum_nob = 0;
1316         static atomic_t csum_counter;
1317         int           csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1318
1319         atomic_inc (&csum_counter);
1320
1321         memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
1322                                 sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1323         if (senders_csum != hdr_csum)
1324                 kqswnal_csum_error (krx, 1);
1325 #endif
1326         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1327
1328         /* What was actually received must be >= payload.
1329          * This is an LASSERT, as lib_finalize() doesn't have a completion status. */
1330         LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen);
1331         LASSERT (mlen <= rlen);
1332
1333         /* It must be OK to kmap() if required */
1334         LASSERT (kiov == NULL || !in_interrupt ());
1335         /* Either all pages or all vaddrs */
1336         LASSERT (!(kiov != NULL && iov != NULL));
1337         
1338         if (mlen != 0)
1339         {
1340                 page     = 0;
1341                 page_ptr = ((char *) page_address(krx->krx_pages[0])) +
1342                         KQSW_HDR_SIZE;
1343                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1344
1345                 LASSERT (niov > 0);
1346                 if (kiov != NULL) {
1347                         iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1348                         iov_nob = kiov->kiov_len;
1349                 } else {
1350                         iov_ptr = iov->iov_base;
1351                         iov_nob = iov->iov_len;
1352                 }
1353
1354                 for (;;)
1355                 {
1356                         /* We expect the iov to exactly match mlen */
1357                         LASSERT (iov_nob <= mlen);
1358                         
1359                         frag = MIN (page_nob, iov_nob);
1360                         memcpy (iov_ptr, page_ptr, frag);
1361 #if KQSW_CHECKSUM
1362                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1363                         csum_nob += frag;
1364                         csum_frags++;
1365 #endif
1366                         mlen -= frag;
1367                         if (mlen == 0)
1368                                 break;
1369
1370                         page_nob -= frag;
1371                         if (page_nob != 0)
1372                                 page_ptr += frag;
1373                         else
1374                         {
1375                                 page++;
1376                                 LASSERT (page < krx->krx_npages);
1377                                 page_ptr = page_address(krx->krx_pages[page]);
1378                                 page_nob = PAGE_SIZE;
1379                         }
1380
1381                         iov_nob -= frag;
1382                         if (iov_nob != 0)
1383                                 iov_ptr += frag;
1384                         else if (kiov != NULL) {
1385                                 kunmap (kiov->kiov_page);
1386                                 kiov++;
1387                                 niov--;
1388                                 LASSERT (niov > 0);
1389                                 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1390                                 iov_nob = kiov->kiov_len;
1391                         } else {
1392                                 iov++;
1393                                 niov--;
1394                                 LASSERT (niov > 0);
1395                                 iov_ptr = iov->iov_base;
1396                                 iov_nob = iov->iov_len;
1397                         }
1398                 }
1399
1400                 if (kiov != NULL)
1401                         kunmap (kiov->kiov_page);
1402         }
1403
1404 #if KQSW_CHECKSUM
1405         memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
1406                 sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
1407
1408         if (csum_len != rlen)
1409                 CERROR("Unable to checksum data in user's buffer\n");
1410         else if (senders_csum != payload_csum)
1411                 kqswnal_csum_error (krx, 0);
1412
1413         if (csum_verbose)
1414                 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1415                        "csum_nob %d\n",
1416                         hdr_csum, payload_csum, csum_frags, csum_nob);
1417 #endif
1418         lib_finalize(nal, private, libmsg);
1419
1420         kqswnal_requeue_rx (krx);
1421
1422         return (rlen);
1423 }
1424
1425 static int
1426 kqswnal_recv(nal_cb_t     *nal,
1427              void         *private,
1428              lib_msg_t    *libmsg,
1429              unsigned int  niov,
1430              struct iovec *iov,
1431              size_t        mlen,
1432              size_t        rlen)
1433 {
1434         return (kqswnal_recvmsg (nal, private, libmsg, niov, iov, NULL, mlen, rlen));
1435 }
1436
1437 static int
1438 kqswnal_recv_pages (nal_cb_t     *nal,
1439                     void         *private,
1440                     lib_msg_t    *libmsg,
1441                     unsigned int  niov,
1442                     ptl_kiov_t   *kiov,
1443                     size_t        mlen,
1444                     size_t        rlen)
1445 {
1446         return (kqswnal_recvmsg (nal, private, libmsg, niov, NULL, kiov, mlen, rlen));
1447 }
1448
1449 int
1450 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1451 {
1452         long    pid = kernel_thread (fn, arg, 0);
1453
1454         if (pid < 0)
1455                 return ((int)pid);
1456
1457         atomic_inc (&kqswnal_data.kqn_nthreads);
1458         return (0);
1459 }
1460
1461 void
1462 kqswnal_thread_fini (void)
1463 {
1464         atomic_dec (&kqswnal_data.kqn_nthreads);
1465 }
1466
1467 int
1468 kqswnal_scheduler (void *arg)
1469 {
1470         kqswnal_rx_t    *krx;
1471         kqswnal_tx_t    *ktx;
1472         kpr_fwd_desc_t  *fwd;
1473         long             flags;
1474         int              rc;
1475         int              counter = 0;
1476         int              did_something;
1477
1478         kportal_daemonize ("kqswnal_sched");
1479         kportal_blockallsigs ();
1480         
1481         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1482
1483         while (!kqswnal_data.kqn_shuttingdown)
1484         {
1485                 did_something = FALSE;
1486
1487                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1488                 {
1489                         krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1490                                          kqswnal_rx_t, krx_list);
1491                         list_del (&krx->krx_list);
1492                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1493                                                flags);
1494
1495                         kqswnal_rx (krx);
1496
1497                         did_something = TRUE;
1498                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1499                 }
1500
1501                 if (!list_empty (&kqswnal_data.kqn_delayedtxds))
1502                 {
1503                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1504                                          kqswnal_tx_t, ktx_list);
1505                         list_del_init (&ktx->ktx_delayed_list);
1506                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1507                                                flags);
1508
1509                         rc = kqswnal_launch (ktx);
1510                         if (rc != 0)          /* failed: ktx_nid down? */
1511                         {
1512                                 CERROR("Failed delayed transmit to "LPX64
1513                                        ": %d\n", ktx->ktx_nid, rc);
1514                                 kqswnal_tx_done (ktx, rc);
1515                         }
1516
1517                         did_something = TRUE;
1518                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1519                 }
1520
1521                 if (!list_empty (&kqswnal_data.kqn_delayedfwds))
1522                 {
1523                         fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1524                         list_del (&fwd->kprfd_list);
1525                         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1526
1527                         kqswnal_fwd_packet (NULL, fwd);
1528
1529                         did_something = TRUE;
1530                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1531                 }
1532
1533                     /* nothing to do or hogging CPU */
1534                 if (!did_something || counter++ == KQSW_RESCHED) {
1535                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1536                                                flags);
1537
1538                         counter = 0;
1539
1540                         if (!did_something) {
1541                                 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1542                                                                kqswnal_data.kqn_shuttingdown ||
1543                                                                !list_empty(&kqswnal_data.kqn_readyrxds) ||
1544                                                                !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1545                                                                !list_empty(&kqswnal_data.kqn_delayedfwds));
1546                                 LASSERT (rc == 0);
1547                         } else if (current->need_resched)
1548                                 schedule ();
1549
1550                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1551                 }
1552         }
1553
1554         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1555
1556         kqswnal_thread_fini ();
1557         return (0);
1558 }
1559
1560 nal_cb_t kqswnal_lib =
1561 {
1562         nal_data:       &kqswnal_data,         /* NAL private data */
1563         cb_send:        kqswnal_send,
1564         cb_send_pages:  kqswnal_send_pages,
1565         cb_recv:        kqswnal_recv,
1566         cb_recv_pages:  kqswnal_recv_pages,
1567         cb_read:        kqswnal_read,
1568         cb_write:       kqswnal_write,
1569         cb_malloc:      kqswnal_malloc,
1570         cb_free:        kqswnal_free,
1571         cb_printf:      kqswnal_printf,
1572         cb_cli:         kqswnal_cli,
1573         cb_sti:         kqswnal_sti,
1574         cb_dist:        kqswnal_dist
1575 };