Whamcloud - gitweb
LU-502 don't allow to kill service threads by OOM killer.
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
5  *
6  * Author: Eric Barton <eric@bartonsoftware.com>
7  *
8  * This file is part of Portals, http://www.lustre.org
9  *
10  * Portals is free software; you can redistribute it and/or
11  * modify it under the terms of version 2 of the GNU General Public
12  * License as published by the Free Software Foundation.
13  *
14  * Portals is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with Portals; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include "qswlnd.h"
25
26 void
27 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
28 {
29         time_t             then;
30
31         then = cfs_time_current_sec() -
32                 cfs_duration_sec(cfs_time_current() -
33                                  ktx->ktx_launchtime);
34
35         lnet_notify(kqswnal_data.kqn_ni, ktx->ktx_nid, 0, then);
36 }
37
38 void
39 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
40 {
41         int      i;
42
43         ktx->ktx_rail = -1;                     /* unset rail */
44
45         if (ktx->ktx_nmappedpages == 0)
46                 return;
47         
48         CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
49                ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
50
51         for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
52                 ep_dvma_unload(kqswnal_data.kqn_ep,
53                                kqswnal_data.kqn_ep_tx_nmh,
54                                &ktx->ktx_frags[i]);
55
56         ktx->ktx_nmappedpages = 0;
57 }
58
59 int
60 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, 
61                      unsigned int niov, lnet_kiov_t *kiov)
62 {
63         int       nfrags    = ktx->ktx_nfrag;
64         int       nmapped   = ktx->ktx_nmappedpages;
65         int       maxmapped = ktx->ktx_npages;
66         __u32     basepage  = ktx->ktx_basepage + nmapped;
67         char     *ptr;
68
69         EP_RAILMASK railmask;
70         int         rail;
71
72         if (ktx->ktx_rail < 0)
73                 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
74                                                  EP_RAILMASK_ALL,
75                                                  kqswnal_nid2elanid(ktx->ktx_nid));
76         rail = ktx->ktx_rail;
77         if (rail < 0) {
78                 CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
79                 return (-ENETDOWN);
80         }
81         railmask = 1 << rail;
82
83         LASSERT (nmapped <= maxmapped);
84         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
85         LASSERT (nfrags <= EP_MAXFRAG);
86         LASSERT (niov > 0);
87         LASSERT (nob > 0);
88
89         /* skip complete frags before 'offset' */
90         while (offset >= kiov->kiov_len) {
91                 offset -= kiov->kiov_len;
92                 kiov++;
93                 niov--;
94                 LASSERT (niov > 0);
95         }
96
97         do {
98                 int  fraglen = kiov->kiov_len - offset;
99
100                 /* each page frag is contained in one page */
101                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
102
103                 if (fraglen > nob)
104                         fraglen = nob;
105
106                 nmapped++;
107                 if (nmapped > maxmapped) {
108                         CERROR("Can't map message in %d pages (max %d)\n",
109                                nmapped, maxmapped);
110                         return (-EMSGSIZE);
111                 }
112
113                 if (nfrags == EP_MAXFRAG) {
114                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
115                                EP_MAXFRAG);
116                         return (-EMSGSIZE);
117                 }
118
119                 /* XXX this is really crap, but we'll have to kmap until
120                  * EKC has a page (rather than vaddr) mapping interface */
121
122                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
123
124                 CDEBUG(D_NET,
125                        "%p[%d] loading %p for %d, page %d, %d total\n",
126                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
127
128                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
129                              ptr, fraglen,
130                              kqswnal_data.kqn_ep_tx_nmh, basepage,
131                              &railmask, &ktx->ktx_frags[nfrags]);
132
133                 if (nfrags == ktx->ktx_firsttmpfrag ||
134                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
135                                   &ktx->ktx_frags[nfrags - 1],
136                                   &ktx->ktx_frags[nfrags])) {
137                         /* new frag if this is the first or can't merge */
138                         nfrags++;
139                 }
140
141                 kunmap (kiov->kiov_page);
142                 
143                 /* keep in loop for failure case */
144                 ktx->ktx_nmappedpages = nmapped;
145
146                 basepage++;
147                 kiov++;
148                 niov--;
149                 nob -= fraglen;
150                 offset = 0;
151
152                 /* iov must not run out before end of data */
153                 LASSERT (nob == 0 || niov > 0);
154
155         } while (nob > 0);
156
157         ktx->ktx_nfrag = nfrags;
158         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
159                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
160
161         return (0);
162 }
163
164 #if KQSW_CKSUM
165 __u32
166 kqswnal_csum_kiov (__u32 csum, int offset, int nob, 
167                    unsigned int niov, lnet_kiov_t *kiov)
168 {
169         char     *ptr;
170
171         if (nob == 0)
172                 return csum;
173
174         LASSERT (niov > 0);
175         LASSERT (nob > 0);
176
177         /* skip complete frags before 'offset' */
178         while (offset >= kiov->kiov_len) {
179                 offset -= kiov->kiov_len;
180                 kiov++;
181                 niov--;
182                 LASSERT (niov > 0);
183         }
184
185         do {
186                 int  fraglen = kiov->kiov_len - offset;
187
188                 /* each page frag is contained in one page */
189                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
190
191                 if (fraglen > nob)
192                         fraglen = nob;
193
194                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
195
196                 csum = kqswnal_csum(csum, ptr, fraglen);
197
198                 kunmap (kiov->kiov_page);
199                 
200                 kiov++;
201                 niov--;
202                 nob -= fraglen;
203                 offset = 0;
204
205                 /* iov must not run out before end of data */
206                 LASSERT (nob == 0 || niov > 0);
207
208         } while (nob > 0);
209
210         return csum;
211 }
212 #endif
213
214 int
215 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
216                     unsigned int niov, struct iovec *iov)
217 {
218         int       nfrags    = ktx->ktx_nfrag;
219         int       nmapped   = ktx->ktx_nmappedpages;
220         int       maxmapped = ktx->ktx_npages;
221         __u32     basepage  = ktx->ktx_basepage + nmapped;
222
223         EP_RAILMASK railmask;
224         int         rail;
225         
226         if (ktx->ktx_rail < 0)
227                 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
228                                                  EP_RAILMASK_ALL,
229                                                  kqswnal_nid2elanid(ktx->ktx_nid));
230         rail = ktx->ktx_rail;
231         if (rail < 0) {
232                 CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
233                 return (-ENETDOWN);
234         }
235         railmask = 1 << rail;
236
237         LASSERT (nmapped <= maxmapped);
238         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
239         LASSERT (nfrags <= EP_MAXFRAG);
240         LASSERT (niov > 0);
241         LASSERT (nob > 0);
242
243         /* skip complete frags before offset */
244         while (offset >= iov->iov_len) {
245                 offset -= iov->iov_len;
246                 iov++;
247                 niov--;
248                 LASSERT (niov > 0);
249         }
250         
251         do {
252                 int  fraglen = iov->iov_len - offset;
253                 long npages;
254                 
255                 if (fraglen > nob)
256                         fraglen = nob;
257                 npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
258
259                 nmapped += npages;
260                 if (nmapped > maxmapped) {
261                         CERROR("Can't map message in %d pages (max %d)\n",
262                                nmapped, maxmapped);
263                         return (-EMSGSIZE);
264                 }
265
266                 if (nfrags == EP_MAXFRAG) {
267                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
268                                EP_MAXFRAG);
269                         return (-EMSGSIZE);
270                 }
271
272                 CDEBUG(D_NET,
273                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
274                        ktx, nfrags, iov->iov_base + offset, fraglen, 
275                        basepage, npages, nmapped);
276
277                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
278                              iov->iov_base + offset, fraglen,
279                              kqswnal_data.kqn_ep_tx_nmh, basepage,
280                              &railmask, &ktx->ktx_frags[nfrags]);
281
282                 if (nfrags == ktx->ktx_firsttmpfrag ||
283                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
284                                   &ktx->ktx_frags[nfrags - 1],
285                                   &ktx->ktx_frags[nfrags])) {
286                         /* new frag if this is the first or can't merge */
287                         nfrags++;
288                 }
289
290                 /* keep in loop for failure case */
291                 ktx->ktx_nmappedpages = nmapped;
292
293                 basepage += npages;
294                 iov++;
295                 niov--;
296                 nob -= fraglen;
297                 offset = 0;
298
299                 /* iov must not run out before end of data */
300                 LASSERT (nob == 0 || niov > 0);
301
302         } while (nob > 0);
303
304         ktx->ktx_nfrag = nfrags;
305         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
306                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
307
308         return (0);
309 }
310
311 #if KQSW_CKSUM
312 __u32
313 kqswnal_csum_iov (__u32 csum, int offset, int nob, 
314                   unsigned int niov, struct iovec *iov)
315 {
316         if (nob == 0)
317                 return csum;
318         
319         LASSERT (niov > 0);
320         LASSERT (nob > 0);
321
322         /* skip complete frags before offset */
323         while (offset >= iov->iov_len) {
324                 offset -= iov->iov_len;
325                 iov++;
326                 niov--;
327                 LASSERT (niov > 0);
328         }
329         
330         do {
331                 int  fraglen = iov->iov_len - offset;
332                 
333                 if (fraglen > nob)
334                         fraglen = nob;
335
336                 csum = kqswnal_csum(csum, iov->iov_base + offset, fraglen);
337
338                 iov++;
339                 niov--;
340                 nob -= fraglen;
341                 offset = 0;
342
343                 /* iov must not run out before end of data */
344                 LASSERT (nob == 0 || niov > 0);
345
346         } while (nob > 0);
347
348         return csum;
349 }
350 #endif
351
352 void
353 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
354 {
355         unsigned long     flags;
356
357         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
358         ktx->ktx_state = KTX_IDLE;
359
360         cfs_spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
361
362         cfs_list_del (&ktx->ktx_list);              /* take off active list */
363         cfs_list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
364
365         cfs_spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
366 }
367
368 kqswnal_tx_t *
369 kqswnal_get_idle_tx (void)
370 {
371         unsigned long  flags;
372         kqswnal_tx_t  *ktx;
373
374         cfs_spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
375
376         if (kqswnal_data.kqn_shuttingdown ||
377             cfs_list_empty (&kqswnal_data.kqn_idletxds)) {
378                 cfs_spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock,
379                                             flags);
380
381                 return NULL;
382         }
383
384         ktx = cfs_list_entry (kqswnal_data.kqn_idletxds.next, kqswnal_tx_t,
385                               ktx_list);
386         cfs_list_del (&ktx->ktx_list);
387
388         cfs_list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
389         ktx->ktx_launcher = current->pid;
390         cfs_atomic_inc(&kqswnal_data.kqn_pending_txs);
391
392         cfs_spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
393
394         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
395         LASSERT (ktx->ktx_nmappedpages == 0);
396         return (ktx);
397 }
398
399 void
400 kqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx)
401 {
402         lnet_msg_t    *lnetmsg0 = NULL;
403         lnet_msg_t    *lnetmsg1 = NULL;
404         int            status0  = 0;
405         int            status1  = 0;
406         kqswnal_rx_t  *krx;
407
408         LASSERT (!cfs_in_interrupt());
409
410         if (ktx->ktx_status == -EHOSTDOWN)
411                 kqswnal_notify_peer_down(ktx);
412
413         switch (ktx->ktx_state) {
414         case KTX_RDMA_FETCH:                    /* optimized PUT/REPLY handled */
415                 krx      = (kqswnal_rx_t *)ktx->ktx_args[0];
416                 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
417                 status0  = ktx->ktx_status;
418 #if KQSW_CKSUM
419                 if (status0 == 0) {             /* RDMA succeeded */
420                         kqswnal_msg_t *msg;
421                         __u32          csum;
422
423                         msg = (kqswnal_msg_t *)
424                               page_address(krx->krx_kiov[0].kiov_page);
425
426                         csum = (lnetmsg0->msg_kiov != NULL) ?
427                                kqswnal_csum_kiov(krx->krx_cksum,
428                                                  lnetmsg0->msg_offset,
429                                                  lnetmsg0->msg_wanted,
430                                                  lnetmsg0->msg_niov,
431                                                  lnetmsg0->msg_kiov) :
432                                kqswnal_csum_iov(krx->krx_cksum,
433                                                 lnetmsg0->msg_offset,
434                                                 lnetmsg0->msg_wanted,
435                                                 lnetmsg0->msg_niov,
436                                                 lnetmsg0->msg_iov);
437
438                         /* Can only check csum if I got it all */
439                         if (lnetmsg0->msg_wanted == lnetmsg0->msg_len &&
440                             csum != msg->kqm_cksum) {
441                                 ktx->ktx_status = -EIO;
442                                 krx->krx_rpc_reply.msg.status = -EIO;
443                                 CERROR("RDMA checksum failed %u(%u) from %s\n",
444                                        csum, msg->kqm_cksum,
445                                        libcfs_nid2str(kqswnal_rx_nid(krx)));
446                         }
447                 }
448 #endif       
449                 LASSERT (krx->krx_state == KRX_COMPLETING);
450                 kqswnal_rx_decref (krx);
451                 break;
452
453         case KTX_RDMA_STORE:       /* optimized GET handled */
454         case KTX_PUTTING:          /* optimized PUT sent */
455         case KTX_SENDING:          /* normal send */
456                 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
457                 status0  = ktx->ktx_status;
458                 break;
459
460         case KTX_GETTING:          /* optimized GET sent & payload received */
461                 /* Complete the GET with success since we can't avoid
462                  * delivering a REPLY event; we committed to it when we
463                  * launched the GET */
464                 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
465                 status0  = 0;
466                 lnetmsg1 = (lnet_msg_t *)ktx->ktx_args[2];
467                 status1  = ktx->ktx_status;
468 #if KQSW_CKSUM
469                 if (status1 == 0) {             /* RDMA succeeded */
470                         lnet_msg_t   *lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
471                         lnet_libmd_t *md = lnetmsg0->msg_md;
472                         __u32         csum;
473                 
474                         csum = ((md->md_options & LNET_MD_KIOV) != 0) ? 
475                                kqswnal_csum_kiov(~0, 0,
476                                                  md->md_length,
477                                                  md->md_niov, 
478                                                  md->md_iov.kiov) :
479                                kqswnal_csum_iov(~0, 0,
480                                                 md->md_length,
481                                                 md->md_niov,
482                                                 md->md_iov.iov);
483
484                         if (csum != ktx->ktx_cksum) {
485                                 CERROR("RDMA checksum failed %u(%u) from %s\n",
486                                        csum, ktx->ktx_cksum,
487                                        libcfs_nid2str(ktx->ktx_nid));
488                                 status1 = -EIO;
489                         }
490                 }
491 #endif                
492                 break;
493
494         default:
495                 LASSERT (0);
496         }
497
498         kqswnal_put_idle_tx (ktx);
499
500         lnet_finalize (kqswnal_data.kqn_ni, lnetmsg0, status0);
501         if (lnetmsg1 != NULL)
502                 lnet_finalize (kqswnal_data.kqn_ni, lnetmsg1, status1);
503 }
504
505 void
506 kqswnal_tx_done (kqswnal_tx_t *ktx, int status)
507 {
508         unsigned long      flags;
509
510         ktx->ktx_status = status;
511
512         if (!cfs_in_interrupt()) {
513                 kqswnal_tx_done_in_thread_context(ktx);
514                 return;
515         }
516
517         /* Complete the send in thread context */
518         cfs_spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
519
520         cfs_list_add_tail(&ktx->ktx_schedlist,
521                           &kqswnal_data.kqn_donetxds);
522         cfs_waitq_signal(&kqswnal_data.kqn_sched_waitq);
523
524         cfs_spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);
525 }
526
527 static void
528 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
529 {
530         kqswnal_tx_t         *ktx = (kqswnal_tx_t *)arg;
531         kqswnal_rpc_reply_t  *reply;
532
533         LASSERT (txd != NULL);
534         LASSERT (ktx != NULL);
535
536         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
537
538         if (status != EP_SUCCESS) {
539
540                 CNETERR("Tx completion to %s failed: %d\n",
541                         libcfs_nid2str(ktx->ktx_nid), status);
542
543                 status = -EHOSTDOWN;
544
545         } else switch (ktx->ktx_state) {
546
547         case KTX_GETTING:
548         case KTX_PUTTING:
549                 /* RPC complete! */
550                 reply = (kqswnal_rpc_reply_t *)ep_txd_statusblk(txd);
551                 if (reply->msg.magic == 0) {    /* "old" peer */
552                         status = reply->msg.status;
553                         break;
554                 }
555                 
556                 if (reply->msg.magic != LNET_PROTO_QSW_MAGIC) {
557                         if (reply->msg.magic != swab32(LNET_PROTO_QSW_MAGIC)) {
558                                 CERROR("%s unexpected rpc reply magic %08x\n",
559                                        libcfs_nid2str(ktx->ktx_nid),
560                                        reply->msg.magic);
561                                 status = -EPROTO;
562                                 break;
563                         }
564
565                         __swab32s(&reply->msg.status);
566                         __swab32s(&reply->msg.version);
567                         
568                         if (ktx->ktx_state == KTX_GETTING) {
569                                 __swab32s(&reply->msg.u.get.len);
570                                 __swab32s(&reply->msg.u.get.cksum);
571                         }
572                 }
573                         
574                 status = reply->msg.status;
575                 if (status != 0) {
576                         CERROR("%s RPC status %08x\n",
577                                libcfs_nid2str(ktx->ktx_nid), status);
578                         break;
579                 }
580
581                 if (ktx->ktx_state == KTX_GETTING) {
582                         lnet_set_reply_msg_len(kqswnal_data.kqn_ni,
583                                                (lnet_msg_t *)ktx->ktx_args[2],
584                                                reply->msg.u.get.len);
585 #if KQSW_CKSUM
586                         ktx->ktx_cksum = reply->msg.u.get.cksum;
587 #endif
588                 }
589                 break;
590                 
591         case KTX_SENDING:
592                 status = 0;
593                 break;
594                 
595         default:
596                 LBUG();
597                 break;
598         }
599
600         kqswnal_tx_done(ktx, status);
601 }
602
603 int
604 kqswnal_launch (kqswnal_tx_t *ktx)
605 {
606         /* Don't block for transmit descriptor if we're in interrupt context */
607         int   attr = cfs_in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
608         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
609         unsigned long flags;
610         int   rc;
611
612         ktx->ktx_launchtime = cfs_time_current();
613
614         if (kqswnal_data.kqn_shuttingdown)
615                 return (-ESHUTDOWN);
616
617         LASSERT (dest >= 0);                    /* must be a peer */
618
619         if (ktx->ktx_nmappedpages != 0)
620                 attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
621
622         switch (ktx->ktx_state) {
623         case KTX_GETTING:
624         case KTX_PUTTING:
625                 if (the_lnet.ln_testprotocompat != 0) {
626                         kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
627
628                         /* single-shot proto test:
629                          * Future version queries will use an RPC, so I'll
630                          * co-opt one of the existing ones */
631                         LNET_LOCK();
632                         if ((the_lnet.ln_testprotocompat & 1) != 0) {
633                                 msg->kqm_version++;
634                                 the_lnet.ln_testprotocompat &= ~1;
635                         }
636                         if ((the_lnet.ln_testprotocompat & 2) != 0) {
637                                 msg->kqm_magic = LNET_PROTO_MAGIC;
638                                 the_lnet.ln_testprotocompat &= ~2;
639                         }
640                         LNET_UNLOCK();
641                 }
642
643                 /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
644                  * The other frags are the payload, awaiting RDMA */
645                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
646                                      ktx->ktx_port, attr,
647                                      kqswnal_txhandler, ktx,
648                                      NULL, ktx->ktx_frags, 1);
649                 break;
650
651         case KTX_SENDING:
652                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
653                                          ktx->ktx_port, attr,
654                                          kqswnal_txhandler, ktx,
655                                          NULL, ktx->ktx_frags, ktx->ktx_nfrag);
656                 break;
657
658         default:
659                 LBUG();
660                 rc = -EINVAL;                   /* no compiler warning please */
661                 break;
662         }
663
664         switch (rc) {
665         case EP_SUCCESS: /* success */
666                 return (0);
667
668         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
669                 cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
670
671                 cfs_list_add_tail (&ktx->ktx_schedlist,
672                                    &kqswnal_data.kqn_delayedtxds);
673                 cfs_waitq_signal (&kqswnal_data.kqn_sched_waitq);
674
675                 cfs_spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock,
676                                             flags);
677                 return (0);
678
679         default: /* fatal error */
680                 CNETERR ("Tx to %s failed: %d\n",
681                         libcfs_nid2str(ktx->ktx_nid), rc);
682                 kqswnal_notify_peer_down(ktx);
683                 return (-EHOSTUNREACH);
684         }
685 }
686
687 #if 0
688 static char *
689 hdr_type_string (lnet_hdr_t *hdr)
690 {
691         switch (hdr->type) {
692         case LNET_MSG_ACK:
693                 return ("ACK");
694         case LNET_MSG_PUT:
695                 return ("PUT");
696         case LNET_MSG_GET:
697                 return ("GET");
698         case LNET_MSG_REPLY:
699                 return ("REPLY");
700         default:
701                 return ("<UNKNOWN>");
702         }
703 }
704
705 static void
706 kqswnal_cerror_hdr(lnet_hdr_t * hdr)
707 {
708         char *type_str = hdr_type_string (hdr);
709
710         CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
711                le32_to_cpu(hdr->payload_length));
712         CERROR("    From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),
713                le32_to_cpu(hdr->src_pid));
714         CERROR("    To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),
715                le32_to_cpu(hdr->dest_pid));
716
717         switch (le32_to_cpu(hdr->type)) {
718         case LNET_MSG_PUT:
719                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
720                        "match bits "LPX64"\n",
721                        le32_to_cpu(hdr->msg.put.ptl_index),
722                        hdr->msg.put.ack_wmd.wh_interface_cookie,
723                        hdr->msg.put.ack_wmd.wh_object_cookie,
724                        le64_to_cpu(hdr->msg.put.match_bits));
725                 CERROR("    offset %d, hdr data "LPX64"\n",
726                        le32_to_cpu(hdr->msg.put.offset),
727                        hdr->msg.put.hdr_data);
728                 break;
729
730         case LNET_MSG_GET:
731                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
732                        "match bits "LPX64"\n",
733                        le32_to_cpu(hdr->msg.get.ptl_index),
734                        hdr->msg.get.return_wmd.wh_interface_cookie,
735                        hdr->msg.get.return_wmd.wh_object_cookie,
736                        hdr->msg.get.match_bits);
737                 CERROR("    Length %d, src offset %d\n",
738                        le32_to_cpu(hdr->msg.get.sink_length),
739                        le32_to_cpu(hdr->msg.get.src_offset));
740                 break;
741
742         case LNET_MSG_ACK:
743                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
744                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
745                        hdr->msg.ack.dst_wmd.wh_object_cookie,
746                        le32_to_cpu(hdr->msg.ack.mlength));
747                 break;
748
749         case LNET_MSG_REPLY:
750                 CERROR("    dst md "LPX64"."LPX64"\n",
751                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
752                        hdr->msg.reply.dst_wmd.wh_object_cookie);
753         }
754
755 }                               /* end of print_hdr() */
756 #endif
757
758 int
759 kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
760                     int nrfrag, EP_NMD *rfrag)
761 {
762         int  i;
763
764         if (nlfrag != nrfrag) {
765                 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
766                        nlfrag, nrfrag);
767                 return (-EINVAL);
768         }
769         
770         for (i = 0; i < nlfrag; i++)
771                 if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
772                         CERROR("Can't cope with unequal frags %d(%d):"
773                                " %d local %d remote\n",
774                                i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
775                         return (-EINVAL);
776                 }
777         
778         return (0);
779 }
780
781 kqswnal_remotemd_t *
782 kqswnal_get_portalscompat_rmd (kqswnal_rx_t *krx)
783 {
784         /* Check that the RMD sent after the "raw" LNET header in a
785          * portals-compatible QSWLND message is OK */
786         char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
787         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + sizeof(lnet_hdr_t));
788
789         /* Note RDMA addresses are sent in native endian-ness in the "old"
790          * portals protocol so no swabbing... */
791
792         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
793                 /* msg too small to discover rmd size */
794                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
795                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
796                 return (NULL);
797         }
798
799         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
800                 /* rmd doesn't fit in the incoming message */
801                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
802                         krx->krx_nob, rmd->kqrmd_nfrag,
803                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
804                 return (NULL);
805         }
806
807         return (rmd);
808 }
809
810 void
811 kqswnal_rdma_store_complete (EP_RXD *rxd) 
812 {
813         int           status = ep_rxd_status(rxd);
814         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
815         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
816         
817         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
818                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
819
820         LASSERT (ktx->ktx_state == KTX_RDMA_STORE);
821         LASSERT (krx->krx_rxd == rxd);
822         LASSERT (krx->krx_rpc_reply_needed);
823
824         krx->krx_rpc_reply_needed = 0;
825         kqswnal_rx_decref (krx);
826
827         /* free ktx & finalize() its lnet_msg_t */
828         kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
829 }
830
831 void
832 kqswnal_rdma_fetch_complete (EP_RXD *rxd) 
833 {
834         /* Completed fetching the PUT/REPLY data */
835         int           status = ep_rxd_status(rxd);
836         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
837         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
838         
839         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
840                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
841
842         LASSERT (ktx->ktx_state == KTX_RDMA_FETCH);
843         LASSERT (krx->krx_rxd == rxd);
844         /* RPC completes with failure by default */
845         LASSERT (krx->krx_rpc_reply_needed);
846         LASSERT (krx->krx_rpc_reply.msg.status != 0);
847
848         if (status == EP_SUCCESS) {
849                 krx->krx_rpc_reply.msg.status = 0;
850                 status = 0;
851         } else {
852                 /* Abandon RPC since get failed */
853                 krx->krx_rpc_reply_needed = 0;
854                 status = -ECONNABORTED;
855         }
856
857         /* krx gets decref'd in kqswnal_tx_done_in_thread_context() */
858         LASSERT (krx->krx_state == KRX_PARSE);
859         krx->krx_state = KRX_COMPLETING;
860
861         /* free ktx & finalize() its lnet_msg_t */
862         kqswnal_tx_done(ktx, status);
863 }
864
865 int
866 kqswnal_rdma (kqswnal_rx_t *krx, lnet_msg_t *lntmsg,
867               int type, kqswnal_remotemd_t *rmd,
868               unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
869               unsigned int offset, unsigned int len)
870 {
871         kqswnal_tx_t       *ktx;
872         int                 eprc;
873         int                 rc;
874
875         /* Not both mapped and paged payload */
876         LASSERT (iov == NULL || kiov == NULL);
877         /* RPC completes with failure by default */
878         LASSERT (krx->krx_rpc_reply_needed);
879         LASSERT (krx->krx_rpc_reply.msg.status != 0);
880
881         if (len == 0) {
882                 /* data got truncated to nothing. */
883                 lnet_finalize(kqswnal_data.kqn_ni, lntmsg, 0);
884                 /* Let kqswnal_rx_done() complete the RPC with success */
885                 krx->krx_rpc_reply.msg.status = 0;
886                 return (0);
887         }
888         
889         /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
890            actually sending a portals message with it */
891         ktx = kqswnal_get_idle_tx();
892         if (ktx == NULL) {
893                 CERROR ("Can't get txd for RDMA with %s\n",
894                         libcfs_nid2str(kqswnal_rx_nid(krx)));
895                 return (-ENOMEM);
896         }
897
898         ktx->ktx_state   = type;
899         ktx->ktx_nid     = kqswnal_rx_nid(krx);
900         ktx->ktx_args[0] = krx;
901         ktx->ktx_args[1] = lntmsg;
902
903         LASSERT (cfs_atomic_read(&krx->krx_refcount) > 0);
904         /* Take an extra ref for the completion callback */
905         cfs_atomic_inc(&krx->krx_refcount);
906
907         /* Map on the rail the RPC prefers */
908         ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
909                                          ep_rxd_railmask(krx->krx_rxd));
910
911         /* Start mapping at offset 0 (we're not mapping any headers) */
912         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
913         
914         if (kiov != NULL)
915                 rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
916         else
917                 rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
918
919         if (rc != 0) {
920                 CERROR ("Can't map local RDMA data: %d\n", rc);
921                 goto out;
922         }
923
924         rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
925                                  rmd->kqrmd_nfrag, rmd->kqrmd_frag);
926         if (rc != 0) {
927                 CERROR ("Incompatible RDMA descriptors\n");
928                 goto out;
929         }
930
931         switch (type) {
932         default:
933                 LBUG();
934                 
935         case KTX_RDMA_STORE:
936                 krx->krx_rpc_reply.msg.status    = 0;
937                 krx->krx_rpc_reply.msg.magic     = LNET_PROTO_QSW_MAGIC;
938                 krx->krx_rpc_reply.msg.version   = QSWLND_PROTO_VERSION;
939                 krx->krx_rpc_reply.msg.u.get.len = len;
940 #if KQSW_CKSUM
941                 krx->krx_rpc_reply.msg.u.get.cksum = (kiov != NULL) ?
942                             kqswnal_csum_kiov(~0, offset, len, niov, kiov) :
943                             kqswnal_csum_iov(~0, offset, len, niov, iov);
944                 if (*kqswnal_tunables.kqn_inject_csum_error == 4) {
945                         krx->krx_rpc_reply.msg.u.get.cksum++;
946                         *kqswnal_tunables.kqn_inject_csum_error = 0;
947                 }
948 #endif
949                 eprc = ep_complete_rpc(krx->krx_rxd, 
950                                        kqswnal_rdma_store_complete, ktx, 
951                                        &krx->krx_rpc_reply.ep_statusblk, 
952                                        ktx->ktx_frags, rmd->kqrmd_frag, 
953                                        rmd->kqrmd_nfrag);
954                 if (eprc != EP_SUCCESS) {
955                         CERROR("can't complete RPC: %d\n", eprc);
956                         /* don't re-attempt RPC completion */
957                         krx->krx_rpc_reply_needed = 0;
958                         rc = -ECONNABORTED;
959                 }
960                 break;
961                 
962         case KTX_RDMA_FETCH:
963                 eprc = ep_rpc_get (krx->krx_rxd, 
964                                    kqswnal_rdma_fetch_complete, ktx,
965                                    rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
966                 if (eprc != EP_SUCCESS) {
967                         CERROR("ep_rpc_get failed: %d\n", eprc);
968                         /* Don't attempt RPC completion: 
969                          * EKC nuked it when the get failed */
970                         krx->krx_rpc_reply_needed = 0;
971                         rc = -ECONNABORTED;
972                 }
973                 break;
974         }
975
976  out:
977         if (rc != 0) {
978                 kqswnal_rx_decref(krx);                 /* drop callback's ref */
979                 kqswnal_put_idle_tx (ktx);
980         }
981
982         cfs_atomic_dec(&kqswnal_data.kqn_pending_txs);
983         return (rc);
984 }
985
986 int
987 kqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
988 {
989         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
990         int               type = lntmsg->msg_type;
991         lnet_process_id_t target = lntmsg->msg_target;
992         int               target_is_router = lntmsg->msg_target_is_router;
993         int               routing = lntmsg->msg_routing;
994         unsigned int      payload_niov = lntmsg->msg_niov;
995         struct iovec     *payload_iov = lntmsg->msg_iov;
996         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
997         unsigned int      payload_offset = lntmsg->msg_offset;
998         unsigned int      payload_nob = lntmsg->msg_len;
999         int               nob;
1000         kqswnal_tx_t     *ktx;
1001         int               rc;
1002
1003         /* NB 1. hdr is in network byte order */
1004         /*    2. 'private' depends on the message type */
1005         
1006         CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",
1007                payload_nob, payload_niov, libcfs_id2str(target));
1008
1009         LASSERT (payload_nob == 0 || payload_niov > 0);
1010         LASSERT (payload_niov <= LNET_MAX_IOV);
1011
1012         /* It must be OK to kmap() if required */
1013         LASSERT (payload_kiov == NULL || !cfs_in_interrupt ());
1014         /* payload is either all vaddrs or all pages */
1015         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1016
1017         if (kqswnal_nid2elanid (target.nid) < 0) {
1018                 CERROR("%s not in my cluster\n", libcfs_nid2str(target.nid));
1019                 return -EIO;
1020         }
1021
1022         /* I may not block for a transmit descriptor if I might block the
1023          * router, receiver, or an interrupt handler. */
1024         ktx = kqswnal_get_idle_tx();
1025         if (ktx == NULL) {
1026                 CERROR ("Can't get txd for msg type %d for %s\n",
1027                         type, libcfs_nid2str(target.nid));
1028                 return (-ENOMEM);
1029         }
1030
1031         ktx->ktx_state   = KTX_SENDING;
1032         ktx->ktx_nid     = target.nid;
1033         ktx->ktx_args[0] = private;
1034         ktx->ktx_args[1] = lntmsg;
1035         ktx->ktx_args[2] = NULL;    /* set when a GET commits to REPLY */
1036
1037         /* The first frag will be the pre-mapped buffer. */
1038         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1039
1040         if ((!target_is_router &&               /* target.nid is final dest */
1041              !routing &&                        /* I'm the source */
1042              type == LNET_MSG_GET &&            /* optimize GET? */
1043              *kqswnal_tunables.kqn_optimized_gets != 0 &&
1044              lntmsg->msg_md->md_length >= 
1045              *kqswnal_tunables.kqn_optimized_gets) ||
1046             ((type == LNET_MSG_PUT ||            /* optimize PUT? */
1047               type == LNET_MSG_REPLY) &&         /* optimize REPLY? */
1048              *kqswnal_tunables.kqn_optimized_puts != 0 &&
1049              payload_nob >= *kqswnal_tunables.kqn_optimized_puts)) {
1050                 lnet_libmd_t       *md = lntmsg->msg_md;
1051                 kqswnal_msg_t      *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1052                 lnet_hdr_t         *mhdr;
1053                 kqswnal_remotemd_t *rmd;
1054
1055                 /* Optimised path: I send over the Elan vaddrs of the local
1056                  * buffers, and my peer DMAs directly to/from them.
1057                  *
1058                  * First I set up ktx as if it was going to send this
1059                  * payload, (it needs to map it anyway).  This fills
1060                  * ktx_frags[1] and onward with the network addresses
1061                  * of the buffer frags. */
1062
1063                 /* Send an RDMA message */
1064                 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1065                 msg->kqm_version = QSWLND_PROTO_VERSION;
1066                 msg->kqm_type = QSWLND_MSG_RDMA;
1067
1068                 mhdr = &msg->kqm_u.rdma.kqrm_hdr;
1069                 rmd  = &msg->kqm_u.rdma.kqrm_rmd;
1070
1071                 *mhdr = *hdr;
1072                 nob = (((char *)rmd) - ktx->ktx_buffer);
1073
1074                 if (type == LNET_MSG_GET) {
1075                         if ((md->md_options & LNET_MD_KIOV) != 0) 
1076                                 rc = kqswnal_map_tx_kiov (ktx, 0, md->md_length,
1077                                                           md->md_niov, md->md_iov.kiov);
1078                         else
1079                                 rc = kqswnal_map_tx_iov (ktx, 0, md->md_length,
1080                                                          md->md_niov, md->md_iov.iov);
1081                         ktx->ktx_state = KTX_GETTING;
1082                 } else {
1083                         if (payload_kiov != NULL)
1084                                 rc = kqswnal_map_tx_kiov(ktx, 0, payload_nob,
1085                                                          payload_niov, payload_kiov);
1086                         else
1087                                 rc = kqswnal_map_tx_iov(ktx, 0, payload_nob,
1088                                                         payload_niov, payload_iov);
1089                         ktx->ktx_state = KTX_PUTTING;
1090                 }
1091
1092                 if (rc != 0)
1093                         goto out;
1094
1095                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1096                 nob += offsetof(kqswnal_remotemd_t,
1097                                 kqrmd_frag[rmd->kqrmd_nfrag]);
1098                 LASSERT (nob <= KQSW_TX_BUFFER_SIZE);
1099
1100                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1101                        rmd->kqrmd_nfrag * sizeof(EP_NMD));
1102
1103                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1104 #if KQSW_CKSUM
1105                 msg->kqm_nob   = nob + payload_nob;
1106                 msg->kqm_cksum = 0;
1107                 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1108 #endif
1109                 if (type == LNET_MSG_GET) {
1110                         /* Allocate reply message now while I'm in thread context */
1111                         ktx->ktx_args[2] = lnet_create_reply_msg (
1112                                 kqswnal_data.kqn_ni, lntmsg);
1113                         if (ktx->ktx_args[2] == NULL)
1114                                 goto out;
1115
1116                         /* NB finalizing the REPLY message is my
1117                          * responsibility now, whatever happens. */
1118 #if KQSW_CKSUM
1119                         if (*kqswnal_tunables.kqn_inject_csum_error ==  3) {
1120                                 msg->kqm_cksum++;
1121                                 *kqswnal_tunables.kqn_inject_csum_error = 0;
1122                         }
1123
1124                 } else if (payload_kiov != NULL) {
1125                         /* must checksum payload after header so receiver can
1126                          * compute partial header cksum before swab.  Sadly
1127                          * this causes 2 rounds of kmap */
1128                         msg->kqm_cksum =
1129                                 kqswnal_csum_kiov(msg->kqm_cksum, 0, payload_nob,
1130                                                   payload_niov, payload_kiov);
1131                         if (*kqswnal_tunables.kqn_inject_csum_error ==  2) {
1132                                 msg->kqm_cksum++;
1133                                 *kqswnal_tunables.kqn_inject_csum_error = 0;
1134                         }
1135                 } else {
1136                         msg->kqm_cksum =
1137                                 kqswnal_csum_iov(msg->kqm_cksum, 0, payload_nob,
1138                                                  payload_niov, payload_iov);
1139                         if (*kqswnal_tunables.kqn_inject_csum_error ==  2) {
1140                                 msg->kqm_cksum++;
1141                                 *kqswnal_tunables.kqn_inject_csum_error = 0;
1142                         }
1143 #endif
1144                 }
1145                 
1146         } else if (payload_nob <= *kqswnal_tunables.kqn_tx_maxcontig) {
1147                 lnet_hdr_t    *mhdr;
1148                 char          *payload;
1149                 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1150
1151                 /* single frag copied into the pre-mapped buffer */
1152                 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1153                 msg->kqm_version = QSWLND_PROTO_VERSION;
1154                 msg->kqm_type = QSWLND_MSG_IMMEDIATE;
1155
1156                 mhdr = &msg->kqm_u.immediate.kqim_hdr;
1157                 payload = msg->kqm_u.immediate.kqim_payload;
1158
1159                 *mhdr = *hdr;
1160                 nob = (payload - ktx->ktx_buffer) + payload_nob;
1161
1162                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1163
1164                 if (payload_kiov != NULL)
1165                         lnet_copy_kiov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
1166                                             payload_niov, payload_kiov, 
1167                                             payload_offset, payload_nob);
1168                 else
1169                         lnet_copy_iov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
1170                                            payload_niov, payload_iov, 
1171                                            payload_offset, payload_nob);
1172 #if KQSW_CKSUM
1173                 msg->kqm_nob   = nob;
1174                 msg->kqm_cksum = 0;
1175                 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1176                 if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
1177                         msg->kqm_cksum++;
1178                         *kqswnal_tunables.kqn_inject_csum_error = 0;
1179                 }
1180 #endif
1181         } else {
1182                 lnet_hdr_t    *mhdr;
1183                 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1184
1185                 /* multiple frags: first is hdr in pre-mapped buffer */
1186                 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1187                 msg->kqm_version = QSWLND_PROTO_VERSION;
1188                 msg->kqm_type = QSWLND_MSG_IMMEDIATE;
1189
1190                 mhdr = &msg->kqm_u.immediate.kqim_hdr;
1191                 nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1192
1193                 *mhdr = *hdr;
1194
1195                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1196
1197                 if (payload_kiov != NULL)
1198                         rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
1199                                                   payload_niov, payload_kiov);
1200                 else
1201                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1202                                                  payload_niov, payload_iov);
1203                 if (rc != 0)
1204                         goto out;
1205
1206 #if KQSW_CKSUM
1207                 msg->kqm_nob   = nob + payload_nob;
1208                 msg->kqm_cksum = 0;
1209                 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1210
1211                 msg->kqm_cksum = (payload_kiov != NULL) ?
1212                                  kqswnal_csum_kiov(msg->kqm_cksum,
1213                                                    payload_offset, payload_nob,
1214                                                    payload_niov, payload_kiov) :
1215                                  kqswnal_csum_iov(msg->kqm_cksum,
1216                                                   payload_offset, payload_nob,
1217                                                   payload_niov, payload_iov);
1218
1219                 if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
1220                         msg->kqm_cksum++;
1221                         *kqswnal_tunables.kqn_inject_csum_error = 0;
1222                 }
1223 #endif
1224                 nob += payload_nob;
1225         }
1226
1227         ktx->ktx_port = (nob <= KQSW_SMALLMSG) ?
1228                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1229
1230         rc = kqswnal_launch (ktx);
1231
1232  out:
1233         CDEBUG_LIMIT(rc == 0? D_NET :D_NETERROR, "%s %d bytes to %s%s: rc %d\n",
1234                      routing ? (rc == 0 ? "Routed" : "Failed to route") :
1235                                (rc == 0 ? "Sent" : "Failed to send"),
1236                      nob, libcfs_nid2str(target.nid),
1237                      target_is_router ? "(router)" : "", rc);
1238
1239         if (rc != 0) {
1240                 lnet_msg_t *repmsg = (lnet_msg_t *)ktx->ktx_args[2];
1241                 int         state = ktx->ktx_state;
1242
1243                 kqswnal_put_idle_tx (ktx);
1244
1245                 if (state == KTX_GETTING && repmsg != NULL) {
1246                         /* We committed to reply, but there was a problem
1247                          * launching the GET.  We can't avoid delivering a
1248                          * REPLY event since we committed above, so we
1249                          * pretend the GET succeeded but the REPLY
1250                          * failed. */
1251                         rc = 0;
1252                         lnet_finalize (kqswnal_data.kqn_ni, lntmsg, 0);
1253                         lnet_finalize (kqswnal_data.kqn_ni, repmsg, -EIO);
1254                 }
1255                 
1256         }
1257         
1258         cfs_atomic_dec(&kqswnal_data.kqn_pending_txs);
1259         return (rc == 0 ? 0 : -EIO);
1260 }
1261
1262 void
1263 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1264 {
1265         LASSERT (cfs_atomic_read(&krx->krx_refcount) == 0);
1266         LASSERT (!krx->krx_rpc_reply_needed);
1267
1268         krx->krx_state = KRX_POSTED;
1269
1270         if (kqswnal_data.kqn_shuttingdown) {
1271                 /* free EKC rxd on shutdown */
1272                 ep_complete_receive(krx->krx_rxd);
1273         } else {
1274                 /* repost receive */
1275                 ep_requeue_receive(krx->krx_rxd, 
1276                                    kqswnal_rxhandler, krx,
1277                                    &krx->krx_elanbuffer, 0);
1278         }
1279 }
1280
1281 void
1282 kqswnal_rpc_complete (EP_RXD *rxd)
1283 {
1284         int           status = ep_rxd_status(rxd);
1285         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1286         
1287         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1288                "rxd %p, krx %p, status %d\n", rxd, krx, status);
1289
1290         LASSERT (krx->krx_rxd == rxd);
1291         LASSERT (krx->krx_rpc_reply_needed);
1292         
1293         krx->krx_rpc_reply_needed = 0;
1294         kqswnal_requeue_rx (krx);
1295 }
1296
1297 void
1298 kqswnal_rx_done (kqswnal_rx_t *krx) 
1299 {
1300         int           rc;
1301
1302         LASSERT (cfs_atomic_read(&krx->krx_refcount) == 0);
1303
1304         if (krx->krx_rpc_reply_needed) {
1305                 /* We've not completed the peer's RPC yet... */
1306                 krx->krx_rpc_reply.msg.magic   = LNET_PROTO_QSW_MAGIC;
1307                 krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION;
1308
1309                 LASSERT (!cfs_in_interrupt());
1310
1311                 rc = ep_complete_rpc(krx->krx_rxd, 
1312                                      kqswnal_rpc_complete, krx,
1313                                      &krx->krx_rpc_reply.ep_statusblk, 
1314                                      NULL, NULL, 0);
1315                 if (rc == EP_SUCCESS)
1316                         return;
1317
1318                 CERROR("can't complete RPC: %d\n", rc);
1319                 krx->krx_rpc_reply_needed = 0;
1320         }
1321
1322         kqswnal_requeue_rx(krx);
1323 }
1324         
1325 void
1326 kqswnal_parse (kqswnal_rx_t *krx)
1327 {
1328         lnet_ni_t      *ni = kqswnal_data.kqn_ni;
1329         kqswnal_msg_t  *msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
1330         lnet_nid_t      fromnid = kqswnal_rx_nid(krx);
1331         int             swab;
1332         int             n;
1333         int             i;
1334         int             nob;
1335         int             rc;
1336
1337         LASSERT (cfs_atomic_read(&krx->krx_refcount) == 1);
1338
1339         if (krx->krx_nob < offsetof(kqswnal_msg_t, kqm_u)) {
1340                 CERROR("Short message %d received from %s\n",
1341                        krx->krx_nob, libcfs_nid2str(fromnid));
1342                 goto done;
1343         }
1344
1345         swab = msg->kqm_magic == __swab32(LNET_PROTO_QSW_MAGIC);
1346
1347         if (swab || msg->kqm_magic == LNET_PROTO_QSW_MAGIC) {
1348 #if KQSW_CKSUM
1349                 __u32 csum0;
1350                 __u32 csum1;
1351
1352                 /* csum byte array before swab */
1353                 csum1 = msg->kqm_cksum;
1354                 msg->kqm_cksum = 0;
1355                 csum0 = kqswnal_csum_kiov(~0, 0, krx->krx_nob,
1356                                           krx->krx_npages, krx->krx_kiov);
1357                 msg->kqm_cksum = csum1;
1358 #endif
1359
1360                 if (swab) {
1361                         __swab16s(&msg->kqm_version);
1362                         __swab16s(&msg->kqm_type);
1363 #if KQSW_CKSUM
1364                         __swab32s(&msg->kqm_cksum);
1365                         __swab32s(&msg->kqm_nob);
1366 #endif
1367                 }
1368
1369                 if (msg->kqm_version != QSWLND_PROTO_VERSION) {
1370                         /* Future protocol version compatibility support!
1371                          * The next qswlnd-specific protocol rev will first
1372                          * send an RPC to check version.
1373                          * 1.4.6 and 1.4.7.early reply with a status
1374                          * block containing its current version.
1375                          * Later versions send a failure (-ve) status +
1376                          * magic/version */
1377
1378                         if (!krx->krx_rpc_reply_needed) {
1379                                 CERROR("Unexpected version %d from %s\n",
1380                                        msg->kqm_version, libcfs_nid2str(fromnid));
1381                                 goto done;
1382                         }
1383
1384                         LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
1385                         goto done;
1386                 }
1387
1388                 switch (msg->kqm_type) {
1389                 default:
1390                         CERROR("Bad request type %x from %s\n",
1391                                msg->kqm_type, libcfs_nid2str(fromnid));
1392                         goto done;
1393
1394                 case QSWLND_MSG_IMMEDIATE:
1395                         if (krx->krx_rpc_reply_needed) {
1396                                 /* Should have been a simple message */
1397                                 CERROR("IMMEDIATE sent as RPC from %s\n",
1398                                        libcfs_nid2str(fromnid));
1399                                 goto done;
1400                         }
1401
1402                         nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1403                         if (krx->krx_nob < nob) {
1404                                 CERROR("Short IMMEDIATE %d(%d) from %s\n",
1405                                        krx->krx_nob, nob, libcfs_nid2str(fromnid));
1406                                 goto done;
1407                         }
1408
1409 #if KQSW_CKSUM
1410                         if (csum0 != msg->kqm_cksum) {
1411                                 CERROR("Bad IMMEDIATE checksum %08x(%08x) from %s\n",
1412                                        csum0, msg->kqm_cksum, libcfs_nid2str(fromnid));
1413                                 CERROR("nob %d (%d)\n", krx->krx_nob, msg->kqm_nob);
1414                                 goto done;
1415                         }
1416 #endif
1417                         rc = lnet_parse(ni, &msg->kqm_u.immediate.kqim_hdr,
1418                                         fromnid, krx, 0);
1419                         if (rc < 0)
1420                                 goto done;
1421                         return;
1422
1423                 case QSWLND_MSG_RDMA:
1424                         if (!krx->krx_rpc_reply_needed) {
1425                                 /* Should have been a simple message */
1426                                 CERROR("RDMA sent as simple message from %s\n",
1427                                        libcfs_nid2str(fromnid));
1428                                 goto done;
1429                         }
1430
1431                         nob = offsetof(kqswnal_msg_t,
1432                                        kqm_u.rdma.kqrm_rmd.kqrmd_frag[0]);
1433                         if (krx->krx_nob < nob) {
1434                                 CERROR("Short RDMA message %d(%d) from %s\n",
1435                                        krx->krx_nob, nob, libcfs_nid2str(fromnid));
1436                                 goto done;
1437                         }
1438
1439                         if (swab)
1440                                 __swab32s(&msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag);
1441
1442                         n = msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag;
1443                         nob = offsetof(kqswnal_msg_t,
1444                                        kqm_u.rdma.kqrm_rmd.kqrmd_frag[n]);
1445
1446                         if (krx->krx_nob < nob) {
1447                                 CERROR("short RDMA message %d(%d) from %s\n",
1448                                        krx->krx_nob, nob, libcfs_nid2str(fromnid));
1449                                 goto done;
1450                         }
1451
1452                         if (swab) {
1453                                 for (i = 0; i < n; i++) {
1454                                         EP_NMD *nmd = &msg->kqm_u.rdma.kqrm_rmd.kqrmd_frag[i];
1455
1456                                         __swab32s(&nmd->nmd_addr);
1457                                         __swab32s(&nmd->nmd_len);
1458                                         __swab32s(&nmd->nmd_attr);
1459                                 }
1460                         }
1461
1462 #if KQSW_CKSUM
1463                         krx->krx_cksum = csum0; /* stash checksum so far */
1464 #endif
1465                         rc = lnet_parse(ni, &msg->kqm_u.rdma.kqrm_hdr,
1466                                         fromnid, krx, 1);
1467                         if (rc < 0)
1468                                 goto done;
1469                         return;
1470                 }
1471                 /* Not Reached */
1472         }
1473
1474         if (msg->kqm_magic == LNET_PROTO_MAGIC ||
1475             msg->kqm_magic == __swab32(LNET_PROTO_MAGIC)) {
1476                 /* Future protocol version compatibility support!
1477                  * When LNET unifies protocols over all LNDs, the first thing a
1478                  * peer will send will be a version query RPC.  
1479                  * 1.4.6 and 1.4.7.early reply with a status block containing
1480                  * LNET_PROTO_QSW_MAGIC..
1481                  * Later versions send a failure (-ve) status +
1482                  * magic/version */
1483
1484                 if (!krx->krx_rpc_reply_needed) {
1485                         CERROR("Unexpected magic %08x from %s\n",
1486                                msg->kqm_magic, libcfs_nid2str(fromnid));
1487                         goto done;
1488                 }
1489
1490                 LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
1491                 goto done;
1492         }
1493
1494         CERROR("Unrecognised magic %08x from %s\n",
1495                msg->kqm_magic, libcfs_nid2str(fromnid));
1496  done:
1497         kqswnal_rx_decref(krx);
1498 }
1499
1500 /* Receive Interrupt Handler: posts to schedulers */
1501 void 
1502 kqswnal_rxhandler(EP_RXD *rxd)
1503 {
1504         unsigned long flags;
1505         int           nob    = ep_rxd_len (rxd);
1506         int           status = ep_rxd_status (rxd);
1507         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1508         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1509                rxd, krx, nob, status);
1510
1511         LASSERT (krx != NULL);
1512         LASSERT (krx->krx_state == KRX_POSTED);
1513         
1514         krx->krx_state = KRX_PARSE;
1515         krx->krx_rxd = rxd;
1516         krx->krx_nob = nob;
1517
1518         /* RPC reply iff rpc request received without error */
1519         krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
1520                                     (status == EP_SUCCESS ||
1521                                      status == EP_MSG_TOO_BIG);
1522
1523         /* Default to failure if an RPC reply is requested but not handled */
1524         krx->krx_rpc_reply.msg.status = -EPROTO;
1525         cfs_atomic_set (&krx->krx_refcount, 1);
1526
1527         if (status != EP_SUCCESS) {
1528                 /* receives complete with failure when receiver is removed */
1529                 if (status == EP_SHUTDOWN)
1530                         LASSERT (kqswnal_data.kqn_shuttingdown);
1531                 else
1532                         CERROR("receive status failed with status %d nob %d\n",
1533                                ep_rxd_status(rxd), nob);
1534                 kqswnal_rx_decref(krx);
1535                 return;
1536         }
1537
1538         if (!cfs_in_interrupt()) {
1539                 kqswnal_parse(krx);
1540                 return;
1541         }
1542
1543         cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1544
1545         cfs_list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1546         cfs_waitq_signal (&kqswnal_data.kqn_sched_waitq);
1547
1548         cfs_spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1549 }
1550
1551 int
1552 kqswnal_recv (lnet_ni_t     *ni,
1553               void          *private,
1554               lnet_msg_t    *lntmsg,
1555               int            delayed,
1556               unsigned int   niov,
1557               struct iovec  *iov,
1558               lnet_kiov_t   *kiov,
1559               unsigned int   offset,
1560               unsigned int   mlen,
1561               unsigned int   rlen)
1562 {
1563         kqswnal_rx_t       *krx = (kqswnal_rx_t *)private;
1564         lnet_nid_t          fromnid;
1565         kqswnal_msg_t      *msg;
1566         lnet_hdr_t         *hdr;
1567         kqswnal_remotemd_t *rmd;
1568         int                 msg_offset;
1569         int                 rc;
1570
1571         LASSERT (!cfs_in_interrupt ());             /* OK to map */
1572         /* Either all pages or all vaddrs */
1573         LASSERT (!(kiov != NULL && iov != NULL));
1574
1575         fromnid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ep_rxd_node(krx->krx_rxd));
1576         msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
1577
1578         if (krx->krx_rpc_reply_needed) {
1579                 /* optimized (rdma) request sent as RPC */
1580
1581                 LASSERT (msg->kqm_type == QSWLND_MSG_RDMA);
1582                 hdr = &msg->kqm_u.rdma.kqrm_hdr;
1583                 rmd = &msg->kqm_u.rdma.kqrm_rmd;
1584
1585                 /* NB header is still in wire byte order */
1586
1587                 switch (le32_to_cpu(hdr->type)) {
1588                         case LNET_MSG_PUT:
1589                         case LNET_MSG_REPLY:
1590                                 /* This is an optimized PUT/REPLY */
1591                                 rc = kqswnal_rdma(krx, lntmsg, 
1592                                                   KTX_RDMA_FETCH, rmd,
1593                                                   niov, iov, kiov, offset, mlen);
1594                                 break;
1595
1596                         case LNET_MSG_GET:
1597 #if KQSW_CKSUM
1598                                 if (krx->krx_cksum != msg->kqm_cksum) {
1599                                         CERROR("Bad GET checksum %08x(%08x) from %s\n",
1600                                                krx->krx_cksum, msg->kqm_cksum,
1601                                                libcfs_nid2str(fromnid));
1602                                         rc = -EIO;
1603                                         break;
1604                                 }
1605 #endif                                
1606                                 if (lntmsg == NULL) {
1607                                         /* No buffer match: my decref will
1608                                          * complete the RPC with failure */
1609                                         rc = 0;
1610                                 } else {
1611                                         /* Matched something! */
1612                                         rc = kqswnal_rdma(krx, lntmsg,
1613                                                           KTX_RDMA_STORE, rmd,
1614                                                           lntmsg->msg_niov,
1615                                                           lntmsg->msg_iov,
1616                                                           lntmsg->msg_kiov,
1617                                                           lntmsg->msg_offset,
1618                                                           lntmsg->msg_len);
1619                                 }
1620                                 break;
1621
1622                         default:
1623                                 CERROR("Bad RPC type %d\n",
1624                                        le32_to_cpu(hdr->type));
1625                                 rc = -EPROTO;
1626                                 break;
1627                 }
1628
1629                 kqswnal_rx_decref(krx);
1630                 return rc;
1631         }
1632
1633         LASSERT (msg->kqm_type == QSWLND_MSG_IMMEDIATE);
1634         msg_offset = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1635         
1636         if (krx->krx_nob < msg_offset + rlen) {
1637                 CERROR("Bad message size from %s: have %d, need %d + %d\n",
1638                        libcfs_nid2str(fromnid), krx->krx_nob,
1639                        msg_offset, rlen);
1640                 kqswnal_rx_decref(krx);
1641                 return -EPROTO;
1642         }
1643
1644         if (kiov != NULL)
1645                 lnet_copy_kiov2kiov(niov, kiov, offset,
1646                                     krx->krx_npages, krx->krx_kiov, 
1647                                     msg_offset, mlen);
1648         else
1649                 lnet_copy_kiov2iov(niov, iov, offset,
1650                                    krx->krx_npages, krx->krx_kiov, 
1651                                    msg_offset, mlen);
1652
1653         lnet_finalize(ni, lntmsg, 0);
1654         kqswnal_rx_decref(krx);
1655         return 0;
1656 }
1657
1658 int
1659 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1660 {
1661         long    pid = cfs_create_thread (fn, arg, 0);
1662
1663         if (pid < 0)
1664                 return ((int)pid);
1665
1666         cfs_atomic_inc (&kqswnal_data.kqn_nthreads);
1667         return (0);
1668 }
1669
1670 void
1671 kqswnal_thread_fini (void)
1672 {
1673         cfs_atomic_dec (&kqswnal_data.kqn_nthreads);
1674 }
1675
1676 int
1677 kqswnal_scheduler (void *arg)
1678 {
1679         kqswnal_rx_t    *krx;
1680         kqswnal_tx_t    *ktx;
1681         unsigned long    flags;
1682         int              rc;
1683         int              counter = 0;
1684         int              did_something;
1685
1686         cfs_daemonize ("kqswnal_sched");
1687         cfs_block_allsigs ();
1688
1689         cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1690
1691         for (;;)
1692         {
1693                 did_something = 0;
1694
1695                 if (!cfs_list_empty (&kqswnal_data.kqn_readyrxds))
1696                 {
1697                         krx = cfs_list_entry(kqswnal_data.kqn_readyrxds.next,
1698                                              kqswnal_rx_t, krx_list);
1699                         cfs_list_del (&krx->krx_list);
1700                         cfs_spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1701                                                    flags);
1702
1703                         LASSERT (krx->krx_state == KRX_PARSE);
1704                         kqswnal_parse (krx);
1705
1706                         did_something = 1;
1707                         cfs_spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
1708                                               flags);
1709                 }
1710
1711                 if (!cfs_list_empty (&kqswnal_data.kqn_donetxds))
1712                 {
1713                         ktx = cfs_list_entry(kqswnal_data.kqn_donetxds.next,
1714                                              kqswnal_tx_t, ktx_schedlist);
1715                         cfs_list_del_init (&ktx->ktx_schedlist);
1716                         cfs_spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1717                                                    flags);
1718
1719                         kqswnal_tx_done_in_thread_context(ktx);
1720
1721                         did_something = 1;
1722                         cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock,
1723                                                flags);
1724                 }
1725
1726                 if (!cfs_list_empty (&kqswnal_data.kqn_delayedtxds))
1727                 {
1728                         ktx = cfs_list_entry(kqswnal_data.kqn_delayedtxds.next,
1729                                              kqswnal_tx_t, ktx_schedlist);
1730                         cfs_list_del_init (&ktx->ktx_schedlist);
1731                         cfs_spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1732                                                    flags);
1733
1734                         rc = kqswnal_launch (ktx);
1735                         if (rc != 0) {
1736                                 CERROR("Failed delayed transmit to %s: %d\n", 
1737                                        libcfs_nid2str(ktx->ktx_nid), rc);
1738                                 kqswnal_tx_done (ktx, rc);
1739                         }
1740                         cfs_atomic_dec (&kqswnal_data.kqn_pending_txs);
1741
1742                         did_something = 1;
1743                         cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock,
1744                                                flags);
1745                 }
1746
1747                 /* nothing to do or hogging CPU */
1748                 if (!did_something || counter++ == KQSW_RESCHED) {
1749                         cfs_spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1750                                                    flags);
1751
1752                         counter = 0;
1753
1754                         if (!did_something) {
1755                                 if (kqswnal_data.kqn_shuttingdown == 2) {
1756                                         /* We only exit in stage 2 of shutdown
1757                                          * when there's nothing left to do */
1758                                         break;
1759                                 }
1760                                 cfs_wait_event_interruptible_exclusive (
1761                                         kqswnal_data.kqn_sched_waitq,
1762                                         kqswnal_data.kqn_shuttingdown == 2 ||
1763                                         !cfs_list_empty(&kqswnal_data. \
1764                                                         kqn_readyrxds) ||
1765                                         !cfs_list_empty(&kqswnal_data. \
1766                                                         kqn_donetxds) ||
1767                                         !cfs_list_empty(&kqswnal_data. \
1768                                                         kqn_delayedtxds, rc));
1769                                 LASSERT (rc == 0);
1770                         } else if (need_resched())
1771                                 cfs_schedule ();
1772
1773                         cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock,
1774                                                flags);
1775                 }
1776         }
1777
1778         kqswnal_thread_fini ();
1779         return (0);
1780 }