Whamcloud - gitweb
86a1f8fb3e3a87a96b646b9ab6f9d1a9434e1797
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  * This file is part of Portals, http://www.lustre.org
8  *
9  * Portals is free software; you can redistribute it and/or
10  * modify it under the terms of version 2 of the GNU General Public
11  * License as published by the Free Software Foundation.
12  *
13  * Portals is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with Portals; if not, write to the Free Software
20  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "qswlnd.h"
25
26 void
27 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
28 {
29         struct timeval     now;
30         time_t             then;
31
32         do_gettimeofday (&now);
33         then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
34
35         lnet_notify(kqswnal_data.kqn_ni, ktx->ktx_nid, 0, then);
36 }
37
38 void
39 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
40 {
41         int      i;
42
43         ktx->ktx_rail = -1;                     /* unset rail */
44
45         if (ktx->ktx_nmappedpages == 0)
46                 return;
47         
48         CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
49                ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
50
51         for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
52                 ep_dvma_unload(kqswnal_data.kqn_ep,
53                                kqswnal_data.kqn_ep_tx_nmh,
54                                &ktx->ktx_frags[i]);
55
56         ktx->ktx_nmappedpages = 0;
57 }
58
59 int
60 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, 
61                      unsigned int niov, lnet_kiov_t *kiov)
62 {
63         int       nfrags    = ktx->ktx_nfrag;
64         int       nmapped   = ktx->ktx_nmappedpages;
65         int       maxmapped = ktx->ktx_npages;
66         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
67         char     *ptr;
68
69         EP_RAILMASK railmask;
70         int         rail;
71
72         if (ktx->ktx_rail < 0)
73                 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
74                                                  EP_RAILMASK_ALL,
75                                                  kqswnal_nid2elanid(ktx->ktx_nid));
76         rail = ktx->ktx_rail;
77         if (rail < 0) {
78                 CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
79                 return (-ENETDOWN);
80         }
81         railmask = 1 << rail;
82
83         LASSERT (nmapped <= maxmapped);
84         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
85         LASSERT (nfrags <= EP_MAXFRAG);
86         LASSERT (niov > 0);
87         LASSERT (nob > 0);
88
89         /* skip complete frags before 'offset' */
90         while (offset >= kiov->kiov_len) {
91                 offset -= kiov->kiov_len;
92                 kiov++;
93                 niov--;
94                 LASSERT (niov > 0);
95         }
96
97         do {
98                 int  fraglen = kiov->kiov_len - offset;
99
100                 /* each page frag is contained in one page */
101                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
102
103                 if (fraglen > nob)
104                         fraglen = nob;
105
106                 nmapped++;
107                 if (nmapped > maxmapped) {
108                         CERROR("Can't map message in %d pages (max %d)\n",
109                                nmapped, maxmapped);
110                         return (-EMSGSIZE);
111                 }
112
113                 if (nfrags == EP_MAXFRAG) {
114                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
115                                EP_MAXFRAG);
116                         return (-EMSGSIZE);
117                 }
118
119                 /* XXX this is really crap, but we'll have to kmap until
120                  * EKC has a page (rather than vaddr) mapping interface */
121
122                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
123
124                 CDEBUG(D_NET,
125                        "%p[%d] loading %p for %d, page %d, %d total\n",
126                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
127
128                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
129                              ptr, fraglen,
130                              kqswnal_data.kqn_ep_tx_nmh, basepage,
131                              &railmask, &ktx->ktx_frags[nfrags]);
132
133                 if (nfrags == ktx->ktx_firsttmpfrag ||
134                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
135                                   &ktx->ktx_frags[nfrags - 1],
136                                   &ktx->ktx_frags[nfrags])) {
137                         /* new frag if this is the first or can't merge */
138                         nfrags++;
139                 }
140
141                 kunmap (kiov->kiov_page);
142                 
143                 /* keep in loop for failure case */
144                 ktx->ktx_nmappedpages = nmapped;
145
146                 basepage++;
147                 kiov++;
148                 niov--;
149                 nob -= fraglen;
150                 offset = 0;
151
152                 /* iov must not run out before end of data */
153                 LASSERT (nob == 0 || niov > 0);
154
155         } while (nob > 0);
156
157         ktx->ktx_nfrag = nfrags;
158         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
159                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
160
161         return (0);
162 }
163
164 #if KQSW_CKSUM
165 __u32
166 kqswnal_csum_kiov (__u32 csum, int offset, int nob, 
167                    unsigned int niov, lnet_kiov_t *kiov)
168 {
169         char     *ptr;
170
171         if (nob == 0)
172                 return csum;
173
174         LASSERT (niov > 0);
175         LASSERT (nob > 0);
176
177         /* skip complete frags before 'offset' */
178         while (offset >= kiov->kiov_len) {
179                 offset -= kiov->kiov_len;
180                 kiov++;
181                 niov--;
182                 LASSERT (niov > 0);
183         }
184
185         do {
186                 int  fraglen = kiov->kiov_len - offset;
187
188                 /* each page frag is contained in one page */
189                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
190
191                 if (fraglen > nob)
192                         fraglen = nob;
193
194                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
195
196                 csum = kqswnal_csum(csum, ptr, fraglen);
197
198                 kunmap (kiov->kiov_page);
199                 
200                 kiov++;
201                 niov--;
202                 nob -= fraglen;
203                 offset = 0;
204
205                 /* iov must not run out before end of data */
206                 LASSERT (nob == 0 || niov > 0);
207
208         } while (nob > 0);
209
210         return csum;
211 }
212 #endif
213
214 int
215 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
216                     unsigned int niov, struct iovec *iov)
217 {
218         int       nfrags    = ktx->ktx_nfrag;
219         int       nmapped   = ktx->ktx_nmappedpages;
220         int       maxmapped = ktx->ktx_npages;
221         uint32_t  basepage  = ktx->ktx_basepage + nmapped;
222
223         EP_RAILMASK railmask;
224         int         rail;
225         
226         if (ktx->ktx_rail < 0)
227                 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
228                                                  EP_RAILMASK_ALL,
229                                                  kqswnal_nid2elanid(ktx->ktx_nid));
230         rail = ktx->ktx_rail;
231         if (rail < 0) {
232                 CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
233                 return (-ENETDOWN);
234         }
235         railmask = 1 << rail;
236
237         LASSERT (nmapped <= maxmapped);
238         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
239         LASSERT (nfrags <= EP_MAXFRAG);
240         LASSERT (niov > 0);
241         LASSERT (nob > 0);
242
243         /* skip complete frags before offset */
244         while (offset >= iov->iov_len) {
245                 offset -= iov->iov_len;
246                 iov++;
247                 niov--;
248                 LASSERT (niov > 0);
249         }
250         
251         do {
252                 int  fraglen = iov->iov_len - offset;
253                 long npages;
254                 
255                 if (fraglen > nob)
256                         fraglen = nob;
257                 npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
258
259                 nmapped += npages;
260                 if (nmapped > maxmapped) {
261                         CERROR("Can't map message in %d pages (max %d)\n",
262                                nmapped, maxmapped);
263                         return (-EMSGSIZE);
264                 }
265
266                 if (nfrags == EP_MAXFRAG) {
267                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
268                                EP_MAXFRAG);
269                         return (-EMSGSIZE);
270                 }
271
272                 CDEBUG(D_NET,
273                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
274                        ktx, nfrags, iov->iov_base + offset, fraglen, 
275                        basepage, npages, nmapped);
276
277                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
278                              iov->iov_base + offset, fraglen,
279                              kqswnal_data.kqn_ep_tx_nmh, basepage,
280                              &railmask, &ktx->ktx_frags[nfrags]);
281
282                 if (nfrags == ktx->ktx_firsttmpfrag ||
283                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
284                                   &ktx->ktx_frags[nfrags - 1],
285                                   &ktx->ktx_frags[nfrags])) {
286                         /* new frag if this is the first or can't merge */
287                         nfrags++;
288                 }
289
290                 /* keep in loop for failure case */
291                 ktx->ktx_nmappedpages = nmapped;
292
293                 basepage += npages;
294                 iov++;
295                 niov--;
296                 nob -= fraglen;
297                 offset = 0;
298
299                 /* iov must not run out before end of data */
300                 LASSERT (nob == 0 || niov > 0);
301
302         } while (nob > 0);
303
304         ktx->ktx_nfrag = nfrags;
305         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
306                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
307
308         return (0);
309 }
310
311 #if KQSW_CKSUM
312 __u32
313 kqswnal_csum_iov (__u32 csum, int offset, int nob, 
314                   unsigned int niov, struct iovec *iov)
315 {
316         if (nob == 0)
317                 return csum;
318         
319         LASSERT (niov > 0);
320         LASSERT (nob > 0);
321
322         /* skip complete frags before offset */
323         while (offset >= iov->iov_len) {
324                 offset -= iov->iov_len;
325                 iov++;
326                 niov--;
327                 LASSERT (niov > 0);
328         }
329         
330         do {
331                 int  fraglen = iov->iov_len - offset;
332                 
333                 if (fraglen > nob)
334                         fraglen = nob;
335
336                 csum = kqswnal_csum(csum, iov->iov_base + offset, fraglen);
337
338                 iov++;
339                 niov--;
340                 nob -= fraglen;
341                 offset = 0;
342
343                 /* iov must not run out before end of data */
344                 LASSERT (nob == 0 || niov > 0);
345
346         } while (nob > 0);
347
348         return csum;
349 }
350 #endif
351
352 void
353 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
354 {
355         unsigned long     flags;
356
357         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
358         ktx->ktx_state = KTX_IDLE;
359
360         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
361
362         list_del (&ktx->ktx_list);              /* take off active list */
363         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
364
365         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
366 }
367
368 kqswnal_tx_t *
369 kqswnal_get_idle_tx (void)
370 {
371         unsigned long  flags;
372         kqswnal_tx_t  *ktx;
373
374         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
375
376         if (kqswnal_data.kqn_shuttingdown ||
377             list_empty (&kqswnal_data.kqn_idletxds)) {
378                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
379
380                 return NULL;
381         }
382
383         ktx = list_entry (kqswnal_data.kqn_idletxds.next, kqswnal_tx_t, ktx_list);
384         list_del (&ktx->ktx_list);
385
386         list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
387         ktx->ktx_launcher = current->pid;
388         atomic_inc(&kqswnal_data.kqn_pending_txs);
389
390         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
391
392         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
393         LASSERT (ktx->ktx_nmappedpages == 0);
394         return (ktx);
395 }
396
397 void
398 kqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx)
399 {
400         lnet_msg_t    *lnetmsg0 = NULL;
401         lnet_msg_t    *lnetmsg1 = NULL;
402         int            status0  = 0;
403         int            status1  = 0;
404         kqswnal_rx_t  *krx;
405         
406         LASSERT (!in_interrupt());
407         
408         if (ktx->ktx_status == -EHOSTDOWN)
409                 kqswnal_notify_peer_down(ktx);
410
411         switch (ktx->ktx_state) {
412         case KTX_RDMA_FETCH:                    /* optimized PUT/REPLY handled */
413                 krx      = (kqswnal_rx_t *)ktx->ktx_args[0];
414                 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
415                 status0  = ktx->ktx_status;
416 #if KQSW_CKSUM
417                 if (status0 == 0) {             /* RDMA succeeded */
418                         kqswnal_msg_t *msg;
419                         __u32          csum;
420
421                         msg = (kqswnal_msg_t *)
422                               page_address(krx->krx_kiov[0].kiov_page);
423
424                         csum = (lnetmsg0->msg_kiov != NULL) ?
425                                kqswnal_csum_kiov(krx->krx_cksum,
426                                                  lnetmsg0->msg_offset,
427                                                  lnetmsg0->msg_wanted,
428                                                  lnetmsg0->msg_niov,
429                                                  lnetmsg0->msg_kiov) :
430                                kqswnal_csum_iov(krx->krx_cksum,
431                                                 lnetmsg0->msg_offset,
432                                                 lnetmsg0->msg_wanted,
433                                                 lnetmsg0->msg_niov,
434                                                 lnetmsg0->msg_iov);
435
436                         /* Can only check csum if I got it all */
437                         if (lnetmsg0->msg_wanted == lnetmsg0->msg_len &&
438                             csum != msg->kqm_cksum) {
439                                 ktx->ktx_status = -EIO;
440                                 krx->krx_rpc_reply.msg.status = -EIO;
441                                 CERROR("RDMA checksum failed %u(%u) from %s\n",
442                                        csum, msg->kqm_cksum,
443                                        libcfs_nid2str(kqswnal_rx_nid(krx)));
444                         }
445                 }
446 #endif       
447                 LASSERT (krx->krx_state == KRX_COMPLETING);
448                 kqswnal_rx_decref (krx);
449                 break;
450
451         case KTX_RDMA_STORE:       /* optimized GET handled */
452         case KTX_PUTTING:          /* optimized PUT sent */
453         case KTX_SENDING:          /* normal send */
454                 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
455                 status0  = ktx->ktx_status;
456                 break;
457
458         case KTX_GETTING:          /* optimized GET sent & payload received */
459                 /* Complete the GET with success since we can't avoid
460                  * delivering a REPLY event; we committed to it when we
461                  * launched the GET */
462                 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
463                 status0  = 0;
464                 lnetmsg1 = (lnet_msg_t *)ktx->ktx_args[2];
465                 status1  = ktx->ktx_status;
466 #if KQSW_CKSUM
467                 if (status1 == 0) {             /* RDMA succeeded */
468                         lnet_msg_t   *lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
469                         lnet_libmd_t *md = lnetmsg0->msg_md;
470                         __u32         csum;
471                 
472                         csum = ((md->md_options & LNET_MD_KIOV) != 0) ? 
473                                kqswnal_csum_kiov(~0, 0,
474                                                  md->md_length,
475                                                  md->md_niov, 
476                                                  md->md_iov.kiov) :
477                                kqswnal_csum_iov(~0, 0,
478                                                 md->md_length,
479                                                 md->md_niov,
480                                                 md->md_iov.iov);
481
482                         if (csum != ktx->ktx_cksum) {
483                                 CERROR("RDMA checksum failed %u(%u) from %s\n",
484                                        csum, ktx->ktx_cksum,
485                                        libcfs_nid2str(ktx->ktx_nid));
486                                 status1 = -EIO;
487                         }
488                 }
489 #endif                
490                 break;
491
492         default:
493                 LASSERT (0);
494         }
495
496         kqswnal_put_idle_tx (ktx);
497
498         lnet_finalize (kqswnal_data.kqn_ni, lnetmsg0, status0);
499         if (lnetmsg1 != NULL)
500                 lnet_finalize (kqswnal_data.kqn_ni, lnetmsg1, status1);
501 }
502
503 void
504 kqswnal_tx_done (kqswnal_tx_t *ktx, int status)
505 {
506         unsigned long      flags;
507
508         ktx->ktx_status = status;
509
510         if (!in_interrupt()) {
511                 kqswnal_tx_done_in_thread_context(ktx);
512                 return;
513         }
514
515         /* Complete the send in thread context */
516         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
517         
518         list_add_tail(&ktx->ktx_schedlist, 
519                       &kqswnal_data.kqn_donetxds);
520         wake_up(&kqswnal_data.kqn_sched_waitq);
521         
522         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);
523 }
524
525 static void
526 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
527 {
528         kqswnal_tx_t         *ktx = (kqswnal_tx_t *)arg;
529         kqswnal_rpc_reply_t  *reply;
530
531         LASSERT (txd != NULL);
532         LASSERT (ktx != NULL);
533
534         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
535
536         if (status != EP_SUCCESS) {
537
538                 CDEBUG (D_NETERROR, "Tx completion to %s failed: %d\n", 
539                         libcfs_nid2str(ktx->ktx_nid), status);
540
541                 status = -EHOSTDOWN;
542
543         } else switch (ktx->ktx_state) {
544
545         case KTX_GETTING:
546         case KTX_PUTTING:
547                 /* RPC complete! */
548                 reply = (kqswnal_rpc_reply_t *)ep_txd_statusblk(txd);
549                 if (reply->msg.magic == 0) {    /* "old" peer */
550                         status = reply->msg.status;
551                         break;
552                 }
553                 
554                 if (reply->msg.magic != LNET_PROTO_QSW_MAGIC) {
555                         if (reply->msg.magic != swab32(LNET_PROTO_QSW_MAGIC)) {
556                                 CERROR("%s unexpected rpc reply magic %08x\n",
557                                        libcfs_nid2str(ktx->ktx_nid),
558                                        reply->msg.magic);
559                                 status = -EPROTO;
560                                 break;
561                         }
562
563                         __swab32s(&reply->msg.status);
564                         __swab32s(&reply->msg.version);
565                         
566                         if (ktx->ktx_state == KTX_GETTING) {
567                                 __swab32s(&reply->msg.u.get.len);
568                                 __swab32s(&reply->msg.u.get.cksum);
569                         }
570                 }
571                         
572                 status = reply->msg.status;
573                 if (status != 0) {
574                         CERROR("%s RPC status %08x\n",
575                                libcfs_nid2str(ktx->ktx_nid), status);
576                         break;
577                 }
578
579                 if (ktx->ktx_state == KTX_GETTING) {
580                         lnet_set_reply_msg_len(kqswnal_data.kqn_ni,
581                                                (lnet_msg_t *)ktx->ktx_args[2],
582                                                reply->msg.u.get.len);
583 #if KQSW_CKSUM
584                         ktx->ktx_cksum = reply->msg.u.get.cksum;
585 #endif
586                 }
587                 break;
588                 
589         case KTX_SENDING:
590                 status = 0;
591                 break;
592                 
593         default:
594                 LBUG();
595                 break;
596         }
597
598         kqswnal_tx_done(ktx, status);
599 }
600
601 int
602 kqswnal_launch (kqswnal_tx_t *ktx)
603 {
604         /* Don't block for transmit descriptor if we're in interrupt context */
605         int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
606         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
607         unsigned long flags;
608         int   rc;
609
610         ktx->ktx_launchtime = jiffies;
611
612         if (kqswnal_data.kqn_shuttingdown)
613                 return (-ESHUTDOWN);
614
615         LASSERT (dest >= 0);                    /* must be a peer */
616
617         if (ktx->ktx_nmappedpages != 0)
618                 attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
619
620         switch (ktx->ktx_state) {
621         case KTX_GETTING:
622         case KTX_PUTTING:
623                 if (the_lnet.ln_testprotocompat != 0 &&
624                     the_lnet.ln_ptlcompat == 0) {
625                         kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
626
627                         /* single-shot proto test:
628                          * Future version queries will use an RPC, so I'll
629                          * co-opt one of the existing ones */
630                         LNET_LOCK();
631                         if ((the_lnet.ln_testprotocompat & 1) != 0) {
632                                 msg->kqm_version++;
633                                 the_lnet.ln_testprotocompat &= ~1;
634                         }
635                         if ((the_lnet.ln_testprotocompat & 2) != 0) {
636                                 msg->kqm_magic = LNET_PROTO_MAGIC;
637                                 the_lnet.ln_testprotocompat &= ~2;
638                         }
639                         LNET_UNLOCK();
640                 }
641
642                 /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
643                  * The other frags are the payload, awaiting RDMA */
644                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
645                                      ktx->ktx_port, attr,
646                                      kqswnal_txhandler, ktx,
647                                      NULL, ktx->ktx_frags, 1);
648                 break;
649
650         case KTX_SENDING:
651                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
652                                          ktx->ktx_port, attr,
653                                          kqswnal_txhandler, ktx,
654                                          NULL, ktx->ktx_frags, ktx->ktx_nfrag);
655                 break;
656                 
657         default:
658                 LBUG();
659                 rc = -EINVAL;                   /* no compiler warning please */
660                 break;
661         }
662
663         switch (rc) {
664         case EP_SUCCESS: /* success */
665                 return (0);
666
667         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
668                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
669
670                 list_add_tail (&ktx->ktx_schedlist, &kqswnal_data.kqn_delayedtxds);
671                 wake_up (&kqswnal_data.kqn_sched_waitq);
672
673                 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
674                 return (0);
675
676         default: /* fatal error */
677                 CDEBUG (D_NETERROR, "Tx to %s failed: %d\n", libcfs_nid2str(ktx->ktx_nid), rc);
678                 kqswnal_notify_peer_down(ktx);
679                 return (-EHOSTUNREACH);
680         }
681 }
682
683 #if 0
684 static char *
685 hdr_type_string (lnet_hdr_t *hdr)
686 {
687         switch (hdr->type) {
688         case LNET_MSG_ACK:
689                 return ("ACK");
690         case LNET_MSG_PUT:
691                 return ("PUT");
692         case LNET_MSG_GET:
693                 return ("GET");
694         case LNET_MSG_REPLY:
695                 return ("REPLY");
696         default:
697                 return ("<UNKNOWN>");
698         }
699 }
700
701 static void
702 kqswnal_cerror_hdr(lnet_hdr_t * hdr)
703 {
704         char *type_str = hdr_type_string (hdr);
705
706         CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
707                le32_to_cpu(hdr->payload_length));
708         CERROR("    From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),
709                le32_to_cpu(hdr->src_pid));
710         CERROR("    To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),
711                le32_to_cpu(hdr->dest_pid));
712
713         switch (le32_to_cpu(hdr->type)) {
714         case LNET_MSG_PUT:
715                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
716                        "match bits "LPX64"\n",
717                        le32_to_cpu(hdr->msg.put.ptl_index),
718                        hdr->msg.put.ack_wmd.wh_interface_cookie,
719                        hdr->msg.put.ack_wmd.wh_object_cookie,
720                        le64_to_cpu(hdr->msg.put.match_bits));
721                 CERROR("    offset %d, hdr data "LPX64"\n",
722                        le32_to_cpu(hdr->msg.put.offset),
723                        hdr->msg.put.hdr_data);
724                 break;
725
726         case LNET_MSG_GET:
727                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
728                        "match bits "LPX64"\n",
729                        le32_to_cpu(hdr->msg.get.ptl_index),
730                        hdr->msg.get.return_wmd.wh_interface_cookie,
731                        hdr->msg.get.return_wmd.wh_object_cookie,
732                        hdr->msg.get.match_bits);
733                 CERROR("    Length %d, src offset %d\n",
734                        le32_to_cpu(hdr->msg.get.sink_length),
735                        le32_to_cpu(hdr->msg.get.src_offset));
736                 break;
737
738         case LNET_MSG_ACK:
739                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
740                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
741                        hdr->msg.ack.dst_wmd.wh_object_cookie,
742                        le32_to_cpu(hdr->msg.ack.mlength));
743                 break;
744
745         case LNET_MSG_REPLY:
746                 CERROR("    dst md "LPX64"."LPX64"\n",
747                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
748                        hdr->msg.reply.dst_wmd.wh_object_cookie);
749         }
750
751 }                               /* end of print_hdr() */
752 #endif
753
754 int
755 kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
756                     int nrfrag, EP_NMD *rfrag)
757 {
758         int  i;
759
760         if (nlfrag != nrfrag) {
761                 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
762                        nlfrag, nrfrag);
763                 return (-EINVAL);
764         }
765         
766         for (i = 0; i < nlfrag; i++)
767                 if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
768                         CERROR("Can't cope with unequal frags %d(%d):"
769                                " %d local %d remote\n",
770                                i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
771                         return (-EINVAL);
772                 }
773         
774         return (0);
775 }
776
777 kqswnal_remotemd_t *
778 kqswnal_get_portalscompat_rmd (kqswnal_rx_t *krx)
779 {
780         /* Check that the RMD sent after the "raw" LNET header in a
781          * portals-compatible QSWLND message is OK */
782         char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
783         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + sizeof(lnet_hdr_t));
784
785         /* Note RDMA addresses are sent in native endian-ness in the "old"
786          * portals protocol so no swabbing... */
787
788         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
789                 /* msg too small to discover rmd size */
790                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
791                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
792                 return (NULL);
793         }
794
795         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
796                 /* rmd doesn't fit in the incoming message */
797                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
798                         krx->krx_nob, rmd->kqrmd_nfrag,
799                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
800                 return (NULL);
801         }
802
803         return (rmd);
804 }
805
806 void
807 kqswnal_rdma_store_complete (EP_RXD *rxd) 
808 {
809         int           status = ep_rxd_status(rxd);
810         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
811         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
812         
813         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
814                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
815
816         LASSERT (ktx->ktx_state == KTX_RDMA_STORE);
817         LASSERT (krx->krx_rxd == rxd);
818         LASSERT (krx->krx_rpc_reply_needed);
819
820         krx->krx_rpc_reply_needed = 0;
821         kqswnal_rx_decref (krx);
822
823         /* free ktx & finalize() its lnet_msg_t */
824         kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
825 }
826
827 void
828 kqswnal_rdma_fetch_complete (EP_RXD *rxd) 
829 {
830         /* Completed fetching the PUT/REPLY data */
831         int           status = ep_rxd_status(rxd);
832         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
833         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
834         
835         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
836                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
837
838         LASSERT (ktx->ktx_state == KTX_RDMA_FETCH);
839         LASSERT (krx->krx_rxd == rxd);
840         /* RPC completes with failure by default */
841         LASSERT (krx->krx_rpc_reply_needed);
842         LASSERT (krx->krx_rpc_reply.msg.status != 0);
843
844         if (status == EP_SUCCESS) {
845                 krx->krx_rpc_reply.msg.status = 0;
846                 status = 0;
847         } else {
848                 /* Abandon RPC since get failed */
849                 krx->krx_rpc_reply_needed = 0;
850                 status = -ECONNABORTED;
851         }
852
853         /* krx gets decref'd in kqswnal_tx_done_in_thread_context() */
854         LASSERT (krx->krx_state == KRX_PARSE);
855         krx->krx_state = KRX_COMPLETING;
856
857         /* free ktx & finalize() its lnet_msg_t */
858         kqswnal_tx_done(ktx, status);
859 }
860
861 int
862 kqswnal_rdma (kqswnal_rx_t *krx, lnet_msg_t *lntmsg,
863               int type, kqswnal_remotemd_t *rmd,
864               unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
865               unsigned int offset, unsigned int len)
866 {
867         kqswnal_tx_t       *ktx;
868         int                 eprc;
869         int                 rc;
870
871         /* Not both mapped and paged payload */
872         LASSERT (iov == NULL || kiov == NULL);
873         /* RPC completes with failure by default */
874         LASSERT (krx->krx_rpc_reply_needed);
875         LASSERT (krx->krx_rpc_reply.msg.status != 0);
876
877         if (len == 0) {
878                 /* data got truncated to nothing. */
879                 lnet_finalize(kqswnal_data.kqn_ni, lntmsg, 0);
880                 /* Let kqswnal_rx_done() complete the RPC with success */
881                 krx->krx_rpc_reply.msg.status = 0;
882                 return (0);
883         }
884         
885         /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
886            actually sending a portals message with it */
887         ktx = kqswnal_get_idle_tx();
888         if (ktx == NULL) {
889                 CERROR ("Can't get txd for RDMA with %s\n",
890                         libcfs_nid2str(kqswnal_rx_nid(krx)));
891                 return (-ENOMEM);
892         }
893
894         ktx->ktx_state   = type;
895         ktx->ktx_nid     = kqswnal_rx_nid(krx);
896         ktx->ktx_args[0] = krx;
897         ktx->ktx_args[1] = lntmsg;
898
899         LASSERT (atomic_read(&krx->krx_refcount) > 0);
900         /* Take an extra ref for the completion callback */
901         atomic_inc(&krx->krx_refcount);
902
903         /* Map on the rail the RPC prefers */
904         ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
905                                          ep_rxd_railmask(krx->krx_rxd));
906
907         /* Start mapping at offset 0 (we're not mapping any headers) */
908         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
909         
910         if (kiov != NULL)
911                 rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
912         else
913                 rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
914
915         if (rc != 0) {
916                 CERROR ("Can't map local RDMA data: %d\n", rc);
917                 goto out;
918         }
919
920         rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
921                                  rmd->kqrmd_nfrag, rmd->kqrmd_frag);
922         if (rc != 0) {
923                 CERROR ("Incompatible RDMA descriptors\n");
924                 goto out;
925         }
926
927         switch (type) {
928         default:
929                 LBUG();
930                 
931         case KTX_RDMA_STORE:
932                 krx->krx_rpc_reply.msg.status    = 0;
933                 krx->krx_rpc_reply.msg.magic     = LNET_PROTO_QSW_MAGIC;
934                 krx->krx_rpc_reply.msg.version   = QSWLND_PROTO_VERSION;
935                 krx->krx_rpc_reply.msg.u.get.len = len;
936 #if KQSW_CKSUM
937                 krx->krx_rpc_reply.msg.u.get.cksum = (kiov != NULL) ?
938                             kqswnal_csum_kiov(~0, offset, len, niov, kiov) :
939                             kqswnal_csum_iov(~0, offset, len, niov, iov);
940                 if (*kqswnal_tunables.kqn_inject_csum_error == 4) {
941                         krx->krx_rpc_reply.msg.u.get.cksum++;
942                         *kqswnal_tunables.kqn_inject_csum_error = 0;
943                 }
944 #endif
945                 eprc = ep_complete_rpc(krx->krx_rxd, 
946                                        kqswnal_rdma_store_complete, ktx, 
947                                        &krx->krx_rpc_reply.ep_statusblk, 
948                                        ktx->ktx_frags, rmd->kqrmd_frag, 
949                                        rmd->kqrmd_nfrag);
950                 if (eprc != EP_SUCCESS) {
951                         CERROR("can't complete RPC: %d\n", eprc);
952                         /* don't re-attempt RPC completion */
953                         krx->krx_rpc_reply_needed = 0;
954                         rc = -ECONNABORTED;
955                 }
956                 break;
957                 
958         case KTX_RDMA_FETCH:
959                 eprc = ep_rpc_get (krx->krx_rxd, 
960                                    kqswnal_rdma_fetch_complete, ktx,
961                                    rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
962                 if (eprc != EP_SUCCESS) {
963                         CERROR("ep_rpc_get failed: %d\n", eprc);
964                         /* Don't attempt RPC completion: 
965                          * EKC nuked it when the get failed */
966                         krx->krx_rpc_reply_needed = 0;
967                         rc = -ECONNABORTED;
968                 }
969                 break;
970         }
971
972  out:
973         if (rc != 0) {
974                 kqswnal_rx_decref(krx);                 /* drop callback's ref */
975                 kqswnal_put_idle_tx (ktx);
976         }
977
978         atomic_dec(&kqswnal_data.kqn_pending_txs);
979         return (rc);
980 }
981
982 int
983 kqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
984 {
985         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
986         int               type = lntmsg->msg_type;
987         lnet_process_id_t target = lntmsg->msg_target;
988         int               target_is_router = lntmsg->msg_target_is_router;
989         int               routing = lntmsg->msg_routing;
990         unsigned int      payload_niov = lntmsg->msg_niov;
991         struct iovec     *payload_iov = lntmsg->msg_iov;
992         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
993         unsigned int      payload_offset = lntmsg->msg_offset;
994         unsigned int      payload_nob = lntmsg->msg_len;
995         int               nob;
996         kqswnal_tx_t     *ktx;
997         int               rc;
998
999         /* NB 1. hdr is in network byte order */
1000         /*    2. 'private' depends on the message type */
1001         
1002         CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",
1003                payload_nob, payload_niov, libcfs_id2str(target));
1004
1005         LASSERT (payload_nob == 0 || payload_niov > 0);
1006         LASSERT (payload_niov <= LNET_MAX_IOV);
1007
1008         /* It must be OK to kmap() if required */
1009         LASSERT (payload_kiov == NULL || !in_interrupt ());
1010         /* payload is either all vaddrs or all pages */
1011         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1012
1013         if (kqswnal_nid2elanid (target.nid) < 0) {
1014                 CERROR("%s not in my cluster\n", libcfs_nid2str(target.nid));
1015                 return -EIO;
1016         }
1017
1018         /* I may not block for a transmit descriptor if I might block the
1019          * router, receiver, or an interrupt handler. */
1020         ktx = kqswnal_get_idle_tx();
1021         if (ktx == NULL) {
1022                 CERROR ("Can't get txd for msg type %d for %s\n",
1023                         type, libcfs_nid2str(target.nid));
1024                 return (-ENOMEM);
1025         }
1026
1027         ktx->ktx_state   = KTX_SENDING;
1028         ktx->ktx_nid     = target.nid;
1029         ktx->ktx_args[0] = private;
1030         ktx->ktx_args[1] = lntmsg;
1031         ktx->ktx_args[2] = NULL;    /* set when a GET commits to REPLY */
1032
1033         /* The first frag will be the pre-mapped buffer. */
1034         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1035
1036         if ((!target_is_router &&               /* target.nid is final dest */
1037              !routing &&                        /* I'm the source */
1038              type == LNET_MSG_GET &&            /* optimize GET? */
1039              *kqswnal_tunables.kqn_optimized_gets != 0 &&
1040              lntmsg->msg_md->md_length >= 
1041              *kqswnal_tunables.kqn_optimized_gets) ||
1042             ((type == LNET_MSG_PUT ||            /* optimize PUT? */
1043               type == LNET_MSG_REPLY) &&         /* optimize REPLY? */
1044              *kqswnal_tunables.kqn_optimized_puts != 0 &&
1045              payload_nob >= *kqswnal_tunables.kqn_optimized_puts)) {
1046                 lnet_libmd_t       *md = lntmsg->msg_md;
1047                 kqswnal_msg_t      *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1048                 lnet_hdr_t         *mhdr;
1049                 kqswnal_remotemd_t *rmd;
1050
1051                 /* Optimised path: I send over the Elan vaddrs of the local
1052                  * buffers, and my peer DMAs directly to/from them.
1053                  *
1054                  * First I set up ktx as if it was going to send this
1055                  * payload, (it needs to map it anyway).  This fills
1056                  * ktx_frags[1] and onward with the network addresses
1057                  * of the buffer frags. */
1058
1059                 if (the_lnet.ln_ptlcompat == 2) {
1060                         /* Strong portals compatibility: send "raw" LNET
1061                          * header + rdma descriptor */
1062                         mhdr = (lnet_hdr_t *)ktx->ktx_buffer;
1063                         rmd  = (kqswnal_remotemd_t *)(mhdr + 1);
1064                 } else {
1065                         /* Send an RDMA message */
1066                         msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1067                         msg->kqm_version = QSWLND_PROTO_VERSION;
1068                         msg->kqm_type = QSWLND_MSG_RDMA;
1069
1070                         mhdr = &msg->kqm_u.rdma.kqrm_hdr;
1071                         rmd  = &msg->kqm_u.rdma.kqrm_rmd;
1072                 }
1073
1074                 *mhdr = *hdr;
1075                 nob = (((char *)rmd) - ktx->ktx_buffer);
1076
1077                 if (type == LNET_MSG_GET) {
1078                         if ((md->md_options & LNET_MD_KIOV) != 0) 
1079                                 rc = kqswnal_map_tx_kiov (ktx, 0, md->md_length,
1080                                                           md->md_niov, md->md_iov.kiov);
1081                         else
1082                                 rc = kqswnal_map_tx_iov (ktx, 0, md->md_length,
1083                                                          md->md_niov, md->md_iov.iov);
1084                         ktx->ktx_state = KTX_GETTING;
1085                 } else {
1086                         if (payload_kiov != NULL)
1087                                 rc = kqswnal_map_tx_kiov(ktx, 0, payload_nob,
1088                                                          payload_niov, payload_kiov);
1089                         else
1090                                 rc = kqswnal_map_tx_iov(ktx, 0, payload_nob,
1091                                                         payload_niov, payload_iov);
1092                         ktx->ktx_state = KTX_PUTTING;
1093                 }
1094
1095                 if (rc != 0)
1096                         goto out;
1097
1098                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1099                 nob += offsetof(kqswnal_remotemd_t,
1100                                 kqrmd_frag[rmd->kqrmd_nfrag]);
1101                 LASSERT (nob <= KQSW_TX_BUFFER_SIZE);
1102
1103                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1104                        rmd->kqrmd_nfrag * sizeof(EP_NMD));
1105
1106                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1107 #if KQSW_CKSUM
1108                 LASSERT (the_lnet.ln_ptlcompat != 2);
1109                 msg->kqm_nob   = nob + payload_nob;
1110                 msg->kqm_cksum = 0;
1111                 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1112 #endif
1113                 if (type == LNET_MSG_GET) {
1114                         /* Allocate reply message now while I'm in thread context */
1115                         ktx->ktx_args[2] = lnet_create_reply_msg (
1116                                 kqswnal_data.kqn_ni, lntmsg);
1117                         if (ktx->ktx_args[2] == NULL)
1118                                 goto out;
1119
1120                         /* NB finalizing the REPLY message is my
1121                          * responsibility now, whatever happens. */
1122 #if KQSW_CKSUM
1123                         if (*kqswnal_tunables.kqn_inject_csum_error ==  3) {
1124                                 msg->kqm_cksum++;
1125                                 *kqswnal_tunables.kqn_inject_csum_error = 0;
1126                         }
1127
1128                 } else if (payload_kiov != NULL) {
1129                         /* must checksum payload after header so receiver can
1130                          * compute partial header cksum before swab.  Sadly
1131                          * this causes 2 rounds of kmap */
1132                         msg->kqm_cksum =
1133                                 kqswnal_csum_kiov(msg->kqm_cksum, 0, payload_nob,
1134                                                   payload_niov, payload_kiov);
1135                         if (*kqswnal_tunables.kqn_inject_csum_error ==  2) {
1136                                 msg->kqm_cksum++;
1137                                 *kqswnal_tunables.kqn_inject_csum_error = 0;
1138                         }
1139                 } else {
1140                         msg->kqm_cksum =
1141                                 kqswnal_csum_iov(msg->kqm_cksum, 0, payload_nob,
1142                                                  payload_niov, payload_iov);
1143                         if (*kqswnal_tunables.kqn_inject_csum_error ==  2) {
1144                                 msg->kqm_cksum++;
1145                                 *kqswnal_tunables.kqn_inject_csum_error = 0;
1146                         }
1147 #endif
1148                 }
1149                 
1150         } else if (payload_nob <= *kqswnal_tunables.kqn_tx_maxcontig) {
1151                 lnet_hdr_t    *mhdr;
1152                 char          *payload;
1153                 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1154
1155                 /* small message: single frag copied into the pre-mapped buffer */
1156                 if (the_lnet.ln_ptlcompat == 2) {
1157                         /* Strong portals compatibility: send "raw" LNET header
1158                          * + payload */
1159                         mhdr = (lnet_hdr_t *)ktx->ktx_buffer;
1160                         payload = (char *)(mhdr + 1);
1161                 } else {
1162                         /* Send an IMMEDIATE message */
1163                         msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1164                         msg->kqm_version = QSWLND_PROTO_VERSION;
1165                         msg->kqm_type = QSWLND_MSG_IMMEDIATE;
1166
1167                         mhdr = &msg->kqm_u.immediate.kqim_hdr;
1168                         payload = msg->kqm_u.immediate.kqim_payload;
1169                 }
1170
1171                 *mhdr = *hdr;
1172                 nob = (payload - ktx->ktx_buffer) + payload_nob;
1173
1174                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1175
1176                 if (payload_kiov != NULL)
1177                         lnet_copy_kiov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
1178                                             payload_niov, payload_kiov, 
1179                                             payload_offset, payload_nob);
1180                 else
1181                         lnet_copy_iov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
1182                                            payload_niov, payload_iov, 
1183                                            payload_offset, payload_nob);
1184 #if KQSW_CKSUM
1185                 LASSERT (the_lnet.ln_ptlcompat != 2);
1186                 msg->kqm_nob   = nob;
1187                 msg->kqm_cksum = 0;
1188                 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1189                 if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
1190                         msg->kqm_cksum++;
1191                         *kqswnal_tunables.kqn_inject_csum_error = 0;
1192                 }
1193 #endif
1194         } else {
1195                 lnet_hdr_t    *mhdr;
1196                 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1197
1198                 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1199                 if (the_lnet.ln_ptlcompat == 2) {
1200                         /* Strong portals compatibility: send "raw" LNET header
1201                          * + payload */
1202                         mhdr = (lnet_hdr_t *)ktx->ktx_buffer;
1203                         nob = sizeof(lnet_hdr_t);
1204                 } else {
1205                         /* Send an IMMEDIATE message */
1206                         msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1207                         msg->kqm_version = QSWLND_PROTO_VERSION;
1208                         msg->kqm_type = QSWLND_MSG_IMMEDIATE;
1209
1210                         mhdr = &msg->kqm_u.immediate.kqim_hdr;
1211                         nob = offsetof(kqswnal_msg_t,
1212                                        kqm_u.immediate.kqim_payload);
1213                 }
1214
1215                 *mhdr = *hdr;
1216
1217                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1218
1219                 if (payload_kiov != NULL)
1220                         rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
1221                                                   payload_niov, payload_kiov);
1222                 else
1223                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1224                                                  payload_niov, payload_iov);
1225                 if (rc != 0)
1226                         goto out;
1227
1228 #if KQSW_CKSUM
1229                 msg->kqm_nob   = nob + payload_nob;
1230                 msg->kqm_cksum = 0;
1231                 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1232
1233                 msg->kqm_cksum = (payload_kiov != NULL) ?
1234                                  kqswnal_csum_kiov(msg->kqm_cksum,
1235                                                    payload_offset, payload_nob,
1236                                                    payload_niov, payload_kiov) :
1237                                  kqswnal_csum_iov(msg->kqm_cksum,
1238                                                   payload_offset, payload_nob,
1239                                                   payload_niov, payload_iov);
1240
1241                 if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
1242                         msg->kqm_cksum++;
1243                         *kqswnal_tunables.kqn_inject_csum_error = 0;
1244                 }
1245 #endif
1246                 nob += payload_nob;
1247         }
1248         
1249         ktx->ktx_port = (nob <= KQSW_SMALLMSG) ?
1250                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1251
1252         rc = kqswnal_launch (ktx);
1253
1254  out:
1255         CDEBUG(rc == 0 ? D_NET : D_NETERROR, "%s %d bytes to %s%s: rc %d\n",
1256                routing ? (rc == 0 ? "Routed" : "Failed to route") :
1257                          (rc == 0 ? "Sent" : "Failed to send"),
1258                nob, libcfs_nid2str(target.nid),
1259                target_is_router ? "(router)" : "", rc);
1260
1261         if (rc != 0) {
1262                 lnet_msg_t *repmsg = (lnet_msg_t *)ktx->ktx_args[2];
1263                 int         state = ktx->ktx_state;
1264                 
1265                 kqswnal_put_idle_tx (ktx);
1266
1267                 if (state == KTX_GETTING && repmsg != NULL) {
1268                         /* We committed to reply, but there was a problem
1269                          * launching the GET.  We can't avoid delivering a
1270                          * REPLY event since we committed above, so we
1271                          * pretend the GET succeeded but the REPLY
1272                          * failed. */
1273                         rc = 0;
1274                         lnet_finalize (kqswnal_data.kqn_ni, lntmsg, 0);
1275                         lnet_finalize (kqswnal_data.kqn_ni, repmsg, -EIO);
1276                 }
1277                 
1278         }
1279         
1280         atomic_dec(&kqswnal_data.kqn_pending_txs);
1281         return (rc == 0 ? 0 : -EIO);
1282 }
1283
1284 void
1285 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1286 {
1287         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1288         LASSERT (!krx->krx_rpc_reply_needed);
1289
1290         krx->krx_state = KRX_POSTED;
1291
1292         if (kqswnal_data.kqn_shuttingdown) {
1293                 /* free EKC rxd on shutdown */
1294                 ep_complete_receive(krx->krx_rxd);
1295         } else {
1296                 /* repost receive */
1297                 ep_requeue_receive(krx->krx_rxd, 
1298                                    kqswnal_rxhandler, krx,
1299                                    &krx->krx_elanbuffer, 0);
1300         }
1301 }
1302
1303 void
1304 kqswnal_rpc_complete (EP_RXD *rxd)
1305 {
1306         int           status = ep_rxd_status(rxd);
1307         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1308         
1309         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1310                "rxd %p, krx %p, status %d\n", rxd, krx, status);
1311
1312         LASSERT (krx->krx_rxd == rxd);
1313         LASSERT (krx->krx_rpc_reply_needed);
1314         
1315         krx->krx_rpc_reply_needed = 0;
1316         kqswnal_requeue_rx (krx);
1317 }
1318
1319 void
1320 kqswnal_rx_done (kqswnal_rx_t *krx) 
1321 {
1322         int           rc;
1323
1324         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1325
1326         if (krx->krx_rpc_reply_needed) {
1327                 /* We've not completed the peer's RPC yet... */
1328                 krx->krx_rpc_reply.msg.magic   = LNET_PROTO_QSW_MAGIC;
1329                 krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION;
1330
1331                 LASSERT (!in_interrupt());
1332
1333                 rc = ep_complete_rpc(krx->krx_rxd, 
1334                                      kqswnal_rpc_complete, krx,
1335                                      &krx->krx_rpc_reply.ep_statusblk, 
1336                                      NULL, NULL, 0);
1337                 if (rc == EP_SUCCESS)
1338                         return;
1339
1340                 CERROR("can't complete RPC: %d\n", rc);
1341                 krx->krx_rpc_reply_needed = 0;
1342         }
1343
1344         kqswnal_requeue_rx(krx);
1345 }
1346         
1347 void
1348 kqswnal_parse (kqswnal_rx_t *krx)
1349 {
1350         lnet_ni_t      *ni = kqswnal_data.kqn_ni;
1351         kqswnal_msg_t  *msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
1352         lnet_nid_t      fromnid = kqswnal_rx_nid(krx);
1353         int             swab;
1354         int             n;
1355         int             i;
1356         int             nob;
1357         int             rc;
1358
1359         LASSERT (atomic_read(&krx->krx_refcount) == 1);
1360
1361         /* If ln_ptlcompat is set, peers may send me an "old" unencapsulated
1362          * lnet hdr */
1363         LASSERT (offsetof(kqswnal_msg_t, kqm_u) <= sizeof(lnet_hdr_t));
1364         
1365         if (krx->krx_nob < offsetof(kqswnal_msg_t, kqm_u)) {
1366                 CERROR("Short message %d received from %s\n",
1367                        krx->krx_nob, libcfs_nid2str(fromnid));
1368                 goto done;
1369         }
1370
1371         swab = msg->kqm_magic == __swab32(LNET_PROTO_QSW_MAGIC);
1372
1373         if (swab || msg->kqm_magic == LNET_PROTO_QSW_MAGIC) {
1374 #if KQSW_CKSUM
1375                 __u32 csum0;
1376                 __u32 csum1;
1377
1378                 /* csum byte array before swab */
1379                 csum1 = msg->kqm_cksum;
1380                 msg->kqm_cksum = 0;
1381                 csum0 = kqswnal_csum_kiov(~0, 0, krx->krx_nob,
1382                                           krx->krx_npages, krx->krx_kiov);
1383                 msg->kqm_cksum = csum1;
1384 #endif
1385
1386                 if (swab) {
1387                         __swab16s(&msg->kqm_version);
1388                         __swab16s(&msg->kqm_type);
1389 #if KQSW_CKSUM
1390                         __swab32s(&msg->kqm_cksum);
1391                         __swab32s(&msg->kqm_nob);
1392 #endif
1393                 }
1394
1395                 if (msg->kqm_version != QSWLND_PROTO_VERSION) {
1396                         /* Future protocol version compatibility support!
1397                          * The next qswlnd-specific protocol rev will first
1398                          * send an RPC to check version.
1399                          * 1.4.6 and 1.4.7.early reply with a status
1400                          * block containing its current version.
1401                          * Later versions send a failure (-ve) status +
1402                          * magic/version */
1403
1404                         if (!krx->krx_rpc_reply_needed) {
1405                                 CERROR("Unexpected version %d from %s\n",
1406                                        msg->kqm_version, libcfs_nid2str(fromnid));
1407                                 goto done;
1408                         }
1409
1410                         LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
1411                         goto done;
1412                 }
1413
1414                 switch (msg->kqm_type) {
1415                 default:
1416                         CERROR("Bad request type %x from %s\n",
1417                                msg->kqm_type, libcfs_nid2str(fromnid));
1418                         goto done;
1419
1420                 case QSWLND_MSG_IMMEDIATE:
1421                         if (krx->krx_rpc_reply_needed) {
1422                                 /* Should have been a simple message */
1423                                 CERROR("IMMEDIATE sent as RPC from %s\n",
1424                                        libcfs_nid2str(fromnid));
1425                                 goto done;
1426                         }
1427
1428                         nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1429                         if (krx->krx_nob < nob) {
1430                                 CERROR("Short IMMEDIATE %d(%d) from %s\n",
1431                                        krx->krx_nob, nob, libcfs_nid2str(fromnid));
1432                                 goto done;
1433                         }
1434
1435 #if KQSW_CKSUM
1436                         if (csum0 != msg->kqm_cksum) {
1437                                 CERROR("Bad IMMEDIATE checksum %08x(%08x) from %s\n",
1438                                        csum0, msg->kqm_cksum, libcfs_nid2str(fromnid));
1439                                 CERROR("nob %d (%d)\n", krx->krx_nob, msg->kqm_nob);
1440                                 goto done;
1441                         }
1442 #endif
1443                         rc = lnet_parse(ni, &msg->kqm_u.immediate.kqim_hdr,
1444                                         fromnid, krx, 0);
1445                         if (rc < 0)
1446                                 goto done;
1447                         return;
1448
1449                 case QSWLND_MSG_RDMA:
1450                         if (!krx->krx_rpc_reply_needed) {
1451                                 /* Should have been a simple message */
1452                                 CERROR("RDMA sent as simple message from %s\n",
1453                                        libcfs_nid2str(fromnid));
1454                                 goto done;
1455                         }
1456
1457                         nob = offsetof(kqswnal_msg_t,
1458                                        kqm_u.rdma.kqrm_rmd.kqrmd_frag[0]);
1459                         if (krx->krx_nob < nob) {
1460                                 CERROR("Short RDMA message %d(%d) from %s\n",
1461                                        krx->krx_nob, nob, libcfs_nid2str(fromnid));
1462                                 goto done;
1463                         }
1464
1465                         if (swab)
1466                                 __swab32s(&msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag);
1467
1468                         n = msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag;
1469                         nob = offsetof(kqswnal_msg_t,
1470                                        kqm_u.rdma.kqrm_rmd.kqrmd_frag[n]);
1471
1472                         if (krx->krx_nob < nob) {
1473                                 CERROR("short RDMA message %d(%d) from %s\n",
1474                                        krx->krx_nob, nob, libcfs_nid2str(fromnid));
1475                                 goto done;
1476                         }
1477
1478                         if (swab) {
1479                                 for (i = 0; i < n; i++) {
1480                                         EP_NMD *nmd = &msg->kqm_u.rdma.kqrm_rmd.kqrmd_frag[i];
1481
1482                                         __swab32s(&nmd->nmd_addr);
1483                                         __swab32s(&nmd->nmd_len);
1484                                         __swab32s(&nmd->nmd_attr);
1485                                 }
1486                         }
1487
1488 #if KQSW_CKSUM
1489                         krx->krx_cksum = csum0; /* stash checksum so far */
1490 #endif
1491                         rc = lnet_parse(ni, &msg->kqm_u.rdma.kqrm_hdr,
1492                                         fromnid, krx, 1);
1493                         if (rc < 0)
1494                                 goto done;
1495                         return;
1496                 }
1497                 /* Not Reached */
1498         }
1499
1500         if (msg->kqm_magic == LNET_PROTO_MAGIC ||
1501             msg->kqm_magic == __swab32(LNET_PROTO_MAGIC)) {
1502                 /* Future protocol version compatibility support!
1503                  * When LNET unifies protocols over all LNDs, the first thing a
1504                  * peer will send will be a version query RPC.  
1505                  * 1.4.6 and 1.4.7.early reply with a status block containing
1506                  * LNET_PROTO_QSW_MAGIC..
1507                  * Later versions send a failure (-ve) status +
1508                  * magic/version */
1509
1510                 if (!krx->krx_rpc_reply_needed) {
1511                         CERROR("Unexpected magic %08x from %s\n",
1512                                msg->kqm_magic, libcfs_nid2str(fromnid));
1513                         goto done;
1514                 }
1515
1516                 LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
1517                 goto done;
1518         }
1519
1520         if (the_lnet.ln_ptlcompat != 0) {
1521                 /* Portals compatibility (strong or weak)
1522                  * This could be an unencapsulated LNET header.  If it's big
1523                  * enough, let LNET's parser sort it out */
1524
1525                 if (krx->krx_nob < sizeof(lnet_hdr_t)) {
1526                         CERROR("Short portals-compatible message from %s\n",
1527                                libcfs_nid2str(fromnid));
1528                         goto done;
1529                 }
1530
1531                 krx->krx_raw_lnet_hdr = 1;
1532                 rc = lnet_parse(ni, (lnet_hdr_t *)msg,
1533                                 fromnid, krx, krx->krx_rpc_reply_needed);
1534                 if (rc < 0)
1535                         goto done;
1536                 return;
1537         }
1538
1539         CERROR("Unrecognised magic %08x from %s\n",
1540                msg->kqm_magic, libcfs_nid2str(fromnid));
1541  done:
1542         kqswnal_rx_decref(krx);
1543 }
1544
1545 /* Receive Interrupt Handler: posts to schedulers */
1546 void 
1547 kqswnal_rxhandler(EP_RXD *rxd)
1548 {
1549         unsigned long flags;
1550         int           nob    = ep_rxd_len (rxd);
1551         int           status = ep_rxd_status (rxd);
1552         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1553         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1554                rxd, krx, nob, status);
1555
1556         LASSERT (krx != NULL);
1557         LASSERT (krx->krx_state == KRX_POSTED);
1558         
1559         krx->krx_state = KRX_PARSE;
1560         krx->krx_rxd = rxd;
1561         krx->krx_nob = nob;
1562         krx->krx_raw_lnet_hdr = 0;
1563
1564         /* RPC reply iff rpc request received without error */
1565         krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
1566                                     (status == EP_SUCCESS ||
1567                                      status == EP_MSG_TOO_BIG);
1568
1569         /* Default to failure if an RPC reply is requested but not handled */
1570         krx->krx_rpc_reply.msg.status = -EPROTO;
1571         atomic_set (&krx->krx_refcount, 1);
1572
1573         if (status != EP_SUCCESS) {
1574                 /* receives complete with failure when receiver is removed */
1575                 if (status == EP_SHUTDOWN)
1576                         LASSERT (kqswnal_data.kqn_shuttingdown);
1577                 else
1578                         CERROR("receive status failed with status %d nob %d\n",
1579                                ep_rxd_status(rxd), nob);
1580                 kqswnal_rx_decref(krx);
1581                 return;
1582         }
1583
1584         if (!in_interrupt()) {
1585                 kqswnal_parse(krx);
1586                 return;
1587         }
1588
1589         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1590
1591         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1592         wake_up (&kqswnal_data.kqn_sched_waitq);
1593
1594         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1595 }
1596
1597 int
1598 kqswnal_recv (lnet_ni_t     *ni,
1599               void          *private,
1600               lnet_msg_t    *lntmsg,
1601               int            delayed,
1602               unsigned int   niov,
1603               struct iovec  *iov,
1604               lnet_kiov_t   *kiov,
1605               unsigned int   offset,
1606               unsigned int   mlen,
1607               unsigned int   rlen)
1608 {
1609         kqswnal_rx_t       *krx = (kqswnal_rx_t *)private;
1610         lnet_nid_t          fromnid;
1611         kqswnal_msg_t      *msg;
1612         lnet_hdr_t         *hdr;
1613         kqswnal_remotemd_t *rmd;
1614         int                 msg_offset;
1615         int                 rc;
1616
1617         LASSERT (!in_interrupt ());             /* OK to map */
1618         /* Either all pages or all vaddrs */
1619         LASSERT (!(kiov != NULL && iov != NULL));
1620
1621         fromnid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ep_rxd_node(krx->krx_rxd));
1622         msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
1623
1624         if (krx->krx_rpc_reply_needed) {
1625                 /* optimized (rdma) request sent as RPC */
1626
1627                 if (krx->krx_raw_lnet_hdr) {
1628                         LASSERT (the_lnet.ln_ptlcompat != 0);
1629                         hdr = (lnet_hdr_t *)msg;
1630                         rmd = kqswnal_get_portalscompat_rmd(krx);
1631                         if (rmd == NULL)
1632                                 return (-EPROTO);
1633                 } else {
1634                         LASSERT (msg->kqm_type == QSWLND_MSG_RDMA);
1635                         hdr = &msg->kqm_u.rdma.kqrm_hdr;
1636                         rmd = &msg->kqm_u.rdma.kqrm_rmd;
1637                 }
1638
1639                 /* NB header is still in wire byte order */
1640
1641                 switch (le32_to_cpu(hdr->type)) {
1642                         case LNET_MSG_PUT:
1643                         case LNET_MSG_REPLY:
1644                                 /* This is an optimized PUT/REPLY */
1645                                 rc = kqswnal_rdma(krx, lntmsg, 
1646                                                   KTX_RDMA_FETCH, rmd,
1647                                                   niov, iov, kiov, offset, mlen);
1648                                 break;
1649
1650                         case LNET_MSG_GET:
1651 #if KQSW_CKSUM
1652                                 if (krx->krx_cksum != msg->kqm_cksum) {
1653                                         CERROR("Bad GET checksum %08x(%08x) from %s\n",
1654                                                krx->krx_cksum, msg->kqm_cksum,
1655                                                libcfs_nid2str(fromnid));
1656                                         rc = -EIO;
1657                                         break;
1658                                 }
1659 #endif                                
1660                                 if (lntmsg == NULL) {
1661                                         /* No buffer match: my decref will
1662                                          * complete the RPC with failure */
1663                                         rc = 0;
1664                                 } else {
1665                                         /* Matched something! */
1666                                         rc = kqswnal_rdma(krx, lntmsg,
1667                                                           KTX_RDMA_STORE, rmd,
1668                                                           lntmsg->msg_niov,
1669                                                           lntmsg->msg_iov,
1670                                                           lntmsg->msg_kiov,
1671                                                           lntmsg->msg_offset,
1672                                                           lntmsg->msg_len);
1673                                 }
1674                                 break;
1675
1676                         default:
1677                                 CERROR("Bad RPC type %d\n",
1678                                        le32_to_cpu(hdr->type));
1679                                 rc = -EPROTO;
1680                                 break;
1681                 }
1682
1683                 kqswnal_rx_decref(krx);
1684                 return rc;
1685         }
1686
1687         if (krx->krx_raw_lnet_hdr) {
1688                 LASSERT (the_lnet.ln_ptlcompat != 0);
1689                 msg_offset = sizeof(lnet_hdr_t);
1690         } else {
1691                 LASSERT (msg->kqm_type == QSWLND_MSG_IMMEDIATE);
1692                 msg_offset = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1693         }
1694         
1695         if (krx->krx_nob < msg_offset + rlen) {
1696                 CERROR("Bad message size from %s: have %d, need %d + %d\n",
1697                        libcfs_nid2str(fromnid), krx->krx_nob,
1698                        msg_offset, rlen);
1699                 kqswnal_rx_decref(krx);
1700                 return -EPROTO;
1701         }
1702
1703         if (kiov != NULL)
1704                 lnet_copy_kiov2kiov(niov, kiov, offset,
1705                                     krx->krx_npages, krx->krx_kiov, 
1706                                     msg_offset, mlen);
1707         else
1708                 lnet_copy_kiov2iov(niov, iov, offset,
1709                                    krx->krx_npages, krx->krx_kiov, 
1710                                    msg_offset, mlen);
1711
1712         lnet_finalize(ni, lntmsg, 0);
1713         kqswnal_rx_decref(krx);
1714         return 0;
1715 }
1716
1717 int
1718 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1719 {
1720         long    pid = kernel_thread (fn, arg, 0);
1721
1722         if (pid < 0)
1723                 return ((int)pid);
1724
1725         atomic_inc (&kqswnal_data.kqn_nthreads);
1726         return (0);
1727 }
1728
1729 void
1730 kqswnal_thread_fini (void)
1731 {
1732         atomic_dec (&kqswnal_data.kqn_nthreads);
1733 }
1734
1735 int
1736 kqswnal_scheduler (void *arg)
1737 {
1738         kqswnal_rx_t    *krx;
1739         kqswnal_tx_t    *ktx;
1740         unsigned long    flags;
1741         int              rc;
1742         int              counter = 0;
1743         int              did_something;
1744
1745         cfs_daemonize ("kqswnal_sched");
1746         cfs_block_allsigs ();
1747         
1748         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1749
1750         for (;;)
1751         {
1752                 did_something = 0;
1753
1754                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1755                 {
1756                         krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1757                                          kqswnal_rx_t, krx_list);
1758                         list_del (&krx->krx_list);
1759                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1760                                                flags);
1761
1762                         LASSERT (krx->krx_state == KRX_PARSE);
1763                         kqswnal_parse (krx);
1764
1765                         did_something = 1;
1766                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1767                 }
1768
1769                 if (!list_empty (&kqswnal_data.kqn_donetxds))
1770                 {
1771                         ktx = list_entry(kqswnal_data.kqn_donetxds.next,
1772                                          kqswnal_tx_t, ktx_schedlist);
1773                         list_del_init (&ktx->ktx_schedlist);
1774                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1775                                                flags);
1776
1777                         kqswnal_tx_done_in_thread_context(ktx);
1778
1779                         did_something = 1;
1780                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1781                 }
1782
1783                 if (!list_empty (&kqswnal_data.kqn_delayedtxds))
1784                 {
1785                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1786                                          kqswnal_tx_t, ktx_schedlist);
1787                         list_del_init (&ktx->ktx_schedlist);
1788                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1789                                                flags);
1790
1791                         rc = kqswnal_launch (ktx);
1792                         if (rc != 0) {
1793                                 CERROR("Failed delayed transmit to %s: %d\n", 
1794                                        libcfs_nid2str(ktx->ktx_nid), rc);
1795                                 kqswnal_tx_done (ktx, rc);
1796                         }
1797                         atomic_dec (&kqswnal_data.kqn_pending_txs);
1798
1799                         did_something = 1;
1800                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1801                 }
1802
1803                 /* nothing to do or hogging CPU */
1804                 if (!did_something || counter++ == KQSW_RESCHED) {
1805                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1806                                                flags);
1807
1808                         counter = 0;
1809
1810                         if (!did_something) {
1811                                 if (kqswnal_data.kqn_shuttingdown == 2) {
1812                                         /* We only exit in stage 2 of shutdown when 
1813                                          * there's nothing left to do */
1814                                         break;
1815                                 }
1816                                 rc = wait_event_interruptible_exclusive (
1817                                         kqswnal_data.kqn_sched_waitq,
1818                                         kqswnal_data.kqn_shuttingdown == 2 ||
1819                                         !list_empty(&kqswnal_data.kqn_readyrxds) ||
1820                                         !list_empty(&kqswnal_data.kqn_donetxds) ||
1821                                         !list_empty(&kqswnal_data.kqn_delayedtxds));
1822                                 LASSERT (rc == 0);
1823                         } else if (need_resched())
1824                                 schedule ();
1825
1826                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1827                 }
1828         }
1829
1830         kqswnal_thread_fini ();
1831         return (0);
1832 }