Whamcloud - gitweb
eec1a6b0c26c0e987f4d73a83f83a76d595bc239
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
1 /*
2  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3  *
4  * Copyright (c) 2012, Intel Corporation.
5  *
6  * Author: Eric Barton <eric@bartonsoftware.com>
7  *
8  * This file is part of Portals, http://www.lustre.org
9  *
10  * Portals is free software; you can redistribute it and/or
11  * modify it under the terms of version 2 of the GNU General Public
12  * License as published by the Free Software Foundation.
13  *
14  * Portals is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with Portals; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include "qswlnd.h"
25
26 void
27 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
28 {
29         time_t             then;
30
31         then = cfs_time_current_sec() -
32                 cfs_duration_sec(cfs_time_current() -
33                                  ktx->ktx_launchtime);
34
35         lnet_notify(kqswnal_data.kqn_ni, ktx->ktx_nid, 0, then);
36 }
37
38 void
39 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
40 {
41         int      i;
42
43         ktx->ktx_rail = -1;                     /* unset rail */
44
45         if (ktx->ktx_nmappedpages == 0)
46                 return;
47         
48         CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
49                ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
50
51         for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
52                 ep_dvma_unload(kqswnal_data.kqn_ep,
53                                kqswnal_data.kqn_ep_tx_nmh,
54                                &ktx->ktx_frags[i]);
55
56         ktx->ktx_nmappedpages = 0;
57 }
58
59 int
60 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, 
61                      unsigned int niov, lnet_kiov_t *kiov)
62 {
63         int       nfrags    = ktx->ktx_nfrag;
64         int       nmapped   = ktx->ktx_nmappedpages;
65         int       maxmapped = ktx->ktx_npages;
66         __u32     basepage  = ktx->ktx_basepage + nmapped;
67         char     *ptr;
68
69         EP_RAILMASK railmask;
70         int         rail;
71
72         if (ktx->ktx_rail < 0)
73                 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
74                                                  EP_RAILMASK_ALL,
75                                                  kqswnal_nid2elanid(ktx->ktx_nid));
76         rail = ktx->ktx_rail;
77         if (rail < 0) {
78                 CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
79                 return (-ENETDOWN);
80         }
81         railmask = 1 << rail;
82
83         LASSERT (nmapped <= maxmapped);
84         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
85         LASSERT (nfrags <= EP_MAXFRAG);
86         LASSERT (niov > 0);
87         LASSERT (nob > 0);
88
89         /* skip complete frags before 'offset' */
90         while (offset >= kiov->kiov_len) {
91                 offset -= kiov->kiov_len;
92                 kiov++;
93                 niov--;
94                 LASSERT (niov > 0);
95         }
96
97         do {
98                 int  fraglen = kiov->kiov_len - offset;
99
100                 /* each page frag is contained in one page */
101                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
102
103                 if (fraglen > nob)
104                         fraglen = nob;
105
106                 nmapped++;
107                 if (nmapped > maxmapped) {
108                         CERROR("Can't map message in %d pages (max %d)\n",
109                                nmapped, maxmapped);
110                         return (-EMSGSIZE);
111                 }
112
113                 if (nfrags == EP_MAXFRAG) {
114                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
115                                EP_MAXFRAG);
116                         return (-EMSGSIZE);
117                 }
118
119                 /* XXX this is really crap, but we'll have to kmap until
120                  * EKC has a page (rather than vaddr) mapping interface */
121
122                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
123
124                 CDEBUG(D_NET,
125                        "%p[%d] loading %p for %d, page %d, %d total\n",
126                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
127
128                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
129                              ptr, fraglen,
130                              kqswnal_data.kqn_ep_tx_nmh, basepage,
131                              &railmask, &ktx->ktx_frags[nfrags]);
132
133                 if (nfrags == ktx->ktx_firsttmpfrag ||
134                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
135                                   &ktx->ktx_frags[nfrags - 1],
136                                   &ktx->ktx_frags[nfrags])) {
137                         /* new frag if this is the first or can't merge */
138                         nfrags++;
139                 }
140
141                 kunmap (kiov->kiov_page);
142                 
143                 /* keep in loop for failure case */
144                 ktx->ktx_nmappedpages = nmapped;
145
146                 basepage++;
147                 kiov++;
148                 niov--;
149                 nob -= fraglen;
150                 offset = 0;
151
152                 /* iov must not run out before end of data */
153                 LASSERT (nob == 0 || niov > 0);
154
155         } while (nob > 0);
156
157         ktx->ktx_nfrag = nfrags;
158         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
159                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
160
161         return (0);
162 }
163
164 #if KQSW_CKSUM
165 __u32
166 kqswnal_csum_kiov (__u32 csum, int offset, int nob, 
167                    unsigned int niov, lnet_kiov_t *kiov)
168 {
169         char     *ptr;
170
171         if (nob == 0)
172                 return csum;
173
174         LASSERT (niov > 0);
175         LASSERT (nob > 0);
176
177         /* skip complete frags before 'offset' */
178         while (offset >= kiov->kiov_len) {
179                 offset -= kiov->kiov_len;
180                 kiov++;
181                 niov--;
182                 LASSERT (niov > 0);
183         }
184
185         do {
186                 int  fraglen = kiov->kiov_len - offset;
187
188                 /* each page frag is contained in one page */
189                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
190
191                 if (fraglen > nob)
192                         fraglen = nob;
193
194                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
195
196                 csum = kqswnal_csum(csum, ptr, fraglen);
197
198                 kunmap (kiov->kiov_page);
199                 
200                 kiov++;
201                 niov--;
202                 nob -= fraglen;
203                 offset = 0;
204
205                 /* iov must not run out before end of data */
206                 LASSERT (nob == 0 || niov > 0);
207
208         } while (nob > 0);
209
210         return csum;
211 }
212 #endif
213
214 int
215 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
216                     unsigned int niov, struct iovec *iov)
217 {
218         int       nfrags    = ktx->ktx_nfrag;
219         int       nmapped   = ktx->ktx_nmappedpages;
220         int       maxmapped = ktx->ktx_npages;
221         __u32     basepage  = ktx->ktx_basepage + nmapped;
222
223         EP_RAILMASK railmask;
224         int         rail;
225         
226         if (ktx->ktx_rail < 0)
227                 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
228                                                  EP_RAILMASK_ALL,
229                                                  kqswnal_nid2elanid(ktx->ktx_nid));
230         rail = ktx->ktx_rail;
231         if (rail < 0) {
232                 CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
233                 return (-ENETDOWN);
234         }
235         railmask = 1 << rail;
236
237         LASSERT (nmapped <= maxmapped);
238         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
239         LASSERT (nfrags <= EP_MAXFRAG);
240         LASSERT (niov > 0);
241         LASSERT (nob > 0);
242
243         /* skip complete frags before offset */
244         while (offset >= iov->iov_len) {
245                 offset -= iov->iov_len;
246                 iov++;
247                 niov--;
248                 LASSERT (niov > 0);
249         }
250         
251         do {
252                 int  fraglen = iov->iov_len - offset;
253                 long npages;
254                 
255                 if (fraglen > nob)
256                         fraglen = nob;
257                 npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
258
259                 nmapped += npages;
260                 if (nmapped > maxmapped) {
261                         CERROR("Can't map message in %d pages (max %d)\n",
262                                nmapped, maxmapped);
263                         return (-EMSGSIZE);
264                 }
265
266                 if (nfrags == EP_MAXFRAG) {
267                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
268                                EP_MAXFRAG);
269                         return (-EMSGSIZE);
270                 }
271
272                 CDEBUG(D_NET,
273                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
274                        ktx, nfrags, iov->iov_base + offset, fraglen, 
275                        basepage, npages, nmapped);
276
277                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
278                              iov->iov_base + offset, fraglen,
279                              kqswnal_data.kqn_ep_tx_nmh, basepage,
280                              &railmask, &ktx->ktx_frags[nfrags]);
281
282                 if (nfrags == ktx->ktx_firsttmpfrag ||
283                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
284                                   &ktx->ktx_frags[nfrags - 1],
285                                   &ktx->ktx_frags[nfrags])) {
286                         /* new frag if this is the first or can't merge */
287                         nfrags++;
288                 }
289
290                 /* keep in loop for failure case */
291                 ktx->ktx_nmappedpages = nmapped;
292
293                 basepage += npages;
294                 iov++;
295                 niov--;
296                 nob -= fraglen;
297                 offset = 0;
298
299                 /* iov must not run out before end of data */
300                 LASSERT (nob == 0 || niov > 0);
301
302         } while (nob > 0);
303
304         ktx->ktx_nfrag = nfrags;
305         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
306                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
307
308         return (0);
309 }
310
311 #if KQSW_CKSUM
312 __u32
313 kqswnal_csum_iov (__u32 csum, int offset, int nob, 
314                   unsigned int niov, struct iovec *iov)
315 {
316         if (nob == 0)
317                 return csum;
318         
319         LASSERT (niov > 0);
320         LASSERT (nob > 0);
321
322         /* skip complete frags before offset */
323         while (offset >= iov->iov_len) {
324                 offset -= iov->iov_len;
325                 iov++;
326                 niov--;
327                 LASSERT (niov > 0);
328         }
329         
330         do {
331                 int  fraglen = iov->iov_len - offset;
332                 
333                 if (fraglen > nob)
334                         fraglen = nob;
335
336                 csum = kqswnal_csum(csum, iov->iov_base + offset, fraglen);
337
338                 iov++;
339                 niov--;
340                 nob -= fraglen;
341                 offset = 0;
342
343                 /* iov must not run out before end of data */
344                 LASSERT (nob == 0 || niov > 0);
345
346         } while (nob > 0);
347
348         return csum;
349 }
350 #endif
351
352 void
353 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
354 {
355         unsigned long     flags;
356
357         kqswnal_unmap_tx(ktx);                  /* release temporary mappings */
358         ktx->ktx_state = KTX_IDLE;
359
360         spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
361
362         cfs_list_del(&ktx->ktx_list);           /* take off active list */
363         cfs_list_add(&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
364
365         spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
366 }
367
368 kqswnal_tx_t *
369 kqswnal_get_idle_tx (void)
370 {
371         unsigned long  flags;
372         kqswnal_tx_t  *ktx;
373
374         spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
375
376         if (kqswnal_data.kqn_shuttingdown ||
377             cfs_list_empty(&kqswnal_data.kqn_idletxds)) {
378                 spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
379
380                 return NULL;
381         }
382
383         ktx = cfs_list_entry (kqswnal_data.kqn_idletxds.next, kqswnal_tx_t,
384                               ktx_list);
385         cfs_list_del (&ktx->ktx_list);
386
387         cfs_list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
388         ktx->ktx_launcher = current->pid;
389         cfs_atomic_inc(&kqswnal_data.kqn_pending_txs);
390
391         spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
392
393         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
394         LASSERT (ktx->ktx_nmappedpages == 0);
395         return (ktx);
396 }
397
398 void
399 kqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx)
400 {
401         lnet_msg_t    *lnetmsg0 = NULL;
402         lnet_msg_t    *lnetmsg1 = NULL;
403         int            status0  = 0;
404         int            status1  = 0;
405         kqswnal_rx_t  *krx;
406
407         LASSERT (!cfs_in_interrupt());
408
409         if (ktx->ktx_status == -EHOSTDOWN)
410                 kqswnal_notify_peer_down(ktx);
411
412         switch (ktx->ktx_state) {
413         case KTX_RDMA_FETCH:                    /* optimized PUT/REPLY handled */
414                 krx      = (kqswnal_rx_t *)ktx->ktx_args[0];
415                 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
416                 status0  = ktx->ktx_status;
417 #if KQSW_CKSUM
418                 if (status0 == 0) {             /* RDMA succeeded */
419                         kqswnal_msg_t *msg;
420                         __u32          csum;
421
422                         msg = (kqswnal_msg_t *)
423                               page_address(krx->krx_kiov[0].kiov_page);
424
425                         csum = (lnetmsg0->msg_kiov != NULL) ?
426                                kqswnal_csum_kiov(krx->krx_cksum,
427                                                  lnetmsg0->msg_offset,
428                                                  lnetmsg0->msg_wanted,
429                                                  lnetmsg0->msg_niov,
430                                                  lnetmsg0->msg_kiov) :
431                                kqswnal_csum_iov(krx->krx_cksum,
432                                                 lnetmsg0->msg_offset,
433                                                 lnetmsg0->msg_wanted,
434                                                 lnetmsg0->msg_niov,
435                                                 lnetmsg0->msg_iov);
436
437                         /* Can only check csum if I got it all */
438                         if (lnetmsg0->msg_wanted == lnetmsg0->msg_len &&
439                             csum != msg->kqm_cksum) {
440                                 ktx->ktx_status = -EIO;
441                                 krx->krx_rpc_reply.msg.status = -EIO;
442                                 CERROR("RDMA checksum failed %u(%u) from %s\n",
443                                        csum, msg->kqm_cksum,
444                                        libcfs_nid2str(kqswnal_rx_nid(krx)));
445                         }
446                 }
447 #endif       
448                 LASSERT (krx->krx_state == KRX_COMPLETING);
449                 kqswnal_rx_decref (krx);
450                 break;
451
452         case KTX_RDMA_STORE:       /* optimized GET handled */
453         case KTX_PUTTING:          /* optimized PUT sent */
454         case KTX_SENDING:          /* normal send */
455                 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
456                 status0  = ktx->ktx_status;
457                 break;
458
459         case KTX_GETTING:          /* optimized GET sent & payload received */
460                 /* Complete the GET with success since we can't avoid
461                  * delivering a REPLY event; we committed to it when we
462                  * launched the GET */
463                 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
464                 status0  = 0;
465                 lnetmsg1 = (lnet_msg_t *)ktx->ktx_args[2];
466                 status1  = ktx->ktx_status;
467 #if KQSW_CKSUM
468                 if (status1 == 0) {             /* RDMA succeeded */
469                         lnet_msg_t   *lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
470                         lnet_libmd_t *md = lnetmsg0->msg_md;
471                         __u32         csum;
472                 
473                         csum = ((md->md_options & LNET_MD_KIOV) != 0) ? 
474                                kqswnal_csum_kiov(~0, 0,
475                                                  md->md_length,
476                                                  md->md_niov, 
477                                                  md->md_iov.kiov) :
478                                kqswnal_csum_iov(~0, 0,
479                                                 md->md_length,
480                                                 md->md_niov,
481                                                 md->md_iov.iov);
482
483                         if (csum != ktx->ktx_cksum) {
484                                 CERROR("RDMA checksum failed %u(%u) from %s\n",
485                                        csum, ktx->ktx_cksum,
486                                        libcfs_nid2str(ktx->ktx_nid));
487                                 status1 = -EIO;
488                         }
489                 }
490 #endif                
491                 break;
492
493         default:
494                 LASSERT (0);
495         }
496
497         kqswnal_put_idle_tx (ktx);
498
499         lnet_finalize (kqswnal_data.kqn_ni, lnetmsg0, status0);
500         if (lnetmsg1 != NULL)
501                 lnet_finalize (kqswnal_data.kqn_ni, lnetmsg1, status1);
502 }
503
504 void
505 kqswnal_tx_done (kqswnal_tx_t *ktx, int status)
506 {
507         unsigned long      flags;
508
509         ktx->ktx_status = status;
510
511         if (!cfs_in_interrupt()) {
512                 kqswnal_tx_done_in_thread_context(ktx);
513                 return;
514         }
515
516         /* Complete the send in thread context */
517         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
518
519         cfs_list_add_tail(&ktx->ktx_schedlist,
520                            &kqswnal_data.kqn_donetxds);
521         wake_up(&kqswnal_data.kqn_sched_waitq);
522
523         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);
524 }
525
526 static void
527 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
528 {
529         kqswnal_tx_t         *ktx = (kqswnal_tx_t *)arg;
530         kqswnal_rpc_reply_t  *reply;
531
532         LASSERT (txd != NULL);
533         LASSERT (ktx != NULL);
534
535         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
536
537         if (status != EP_SUCCESS) {
538
539                 CNETERR("Tx completion to %s failed: %d\n",
540                         libcfs_nid2str(ktx->ktx_nid), status);
541
542                 status = -EHOSTDOWN;
543
544         } else switch (ktx->ktx_state) {
545
546         case KTX_GETTING:
547         case KTX_PUTTING:
548                 /* RPC complete! */
549                 reply = (kqswnal_rpc_reply_t *)ep_txd_statusblk(txd);
550                 if (reply->msg.magic == 0) {    /* "old" peer */
551                         status = reply->msg.status;
552                         break;
553                 }
554                 
555                 if (reply->msg.magic != LNET_PROTO_QSW_MAGIC) {
556                         if (reply->msg.magic != swab32(LNET_PROTO_QSW_MAGIC)) {
557                                 CERROR("%s unexpected rpc reply magic %08x\n",
558                                        libcfs_nid2str(ktx->ktx_nid),
559                                        reply->msg.magic);
560                                 status = -EPROTO;
561                                 break;
562                         }
563
564                         __swab32s(&reply->msg.status);
565                         __swab32s(&reply->msg.version);
566                         
567                         if (ktx->ktx_state == KTX_GETTING) {
568                                 __swab32s(&reply->msg.u.get.len);
569                                 __swab32s(&reply->msg.u.get.cksum);
570                         }
571                 }
572                         
573                 status = reply->msg.status;
574                 if (status != 0) {
575                         CERROR("%s RPC status %08x\n",
576                                libcfs_nid2str(ktx->ktx_nid), status);
577                         break;
578                 }
579
580                 if (ktx->ktx_state == KTX_GETTING) {
581                         lnet_set_reply_msg_len(kqswnal_data.kqn_ni,
582                                                (lnet_msg_t *)ktx->ktx_args[2],
583                                                reply->msg.u.get.len);
584 #if KQSW_CKSUM
585                         ktx->ktx_cksum = reply->msg.u.get.cksum;
586 #endif
587                 }
588                 break;
589                 
590         case KTX_SENDING:
591                 status = 0;
592                 break;
593                 
594         default:
595                 LBUG();
596                 break;
597         }
598
599         kqswnal_tx_done(ktx, status);
600 }
601
602 int
603 kqswnal_launch (kqswnal_tx_t *ktx)
604 {
605         /* Don't block for transmit descriptor if we're in interrupt context */
606         int   attr = cfs_in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
607         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
608         unsigned long flags;
609         int   rc;
610
611         ktx->ktx_launchtime = cfs_time_current();
612
613         if (kqswnal_data.kqn_shuttingdown)
614                 return (-ESHUTDOWN);
615
616         LASSERT (dest >= 0);                    /* must be a peer */
617
618         if (ktx->ktx_nmappedpages != 0)
619                 attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
620
621         switch (ktx->ktx_state) {
622         case KTX_GETTING:
623         case KTX_PUTTING:
624                 if (the_lnet.ln_testprotocompat != 0) {
625                         kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
626
627                         /* single-shot proto test:
628                          * Future version queries will use an RPC, so I'll
629                          * co-opt one of the existing ones */
630                         LNET_LOCK();
631                         if ((the_lnet.ln_testprotocompat & 1) != 0) {
632                                 msg->kqm_version++;
633                                 the_lnet.ln_testprotocompat &= ~1;
634                         }
635                         if ((the_lnet.ln_testprotocompat & 2) != 0) {
636                                 msg->kqm_magic = LNET_PROTO_MAGIC;
637                                 the_lnet.ln_testprotocompat &= ~2;
638                         }
639                         LNET_UNLOCK();
640                 }
641
642                 /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
643                  * The other frags are the payload, awaiting RDMA */
644                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
645                                      ktx->ktx_port, attr,
646                                      kqswnal_txhandler, ktx,
647                                      NULL, ktx->ktx_frags, 1);
648                 break;
649
650         case KTX_SENDING:
651                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
652                                          ktx->ktx_port, attr,
653                                          kqswnal_txhandler, ktx,
654                                          NULL, ktx->ktx_frags, ktx->ktx_nfrag);
655                 break;
656
657         default:
658                 LBUG();
659                 rc = -EINVAL;                   /* no compiler warning please */
660                 break;
661         }
662
663         switch (rc) {
664         case EP_SUCCESS: /* success */
665                 return (0);
666
667         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
668                 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
669
670                 cfs_list_add_tail(&ktx->ktx_schedlist,
671                                   &kqswnal_data.kqn_delayedtxds);
672                 wake_up(&kqswnal_data.kqn_sched_waitq);
673
674                 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
675                                             flags);
676                 return (0);
677
678         default: /* fatal error */
679                 CNETERR ("Tx to %s failed: %d\n",
680                         libcfs_nid2str(ktx->ktx_nid), rc);
681                 kqswnal_notify_peer_down(ktx);
682                 return (-EHOSTUNREACH);
683         }
684 }
685
686 #if 0
687 static char *
688 hdr_type_string (lnet_hdr_t *hdr)
689 {
690         switch (hdr->type) {
691         case LNET_MSG_ACK:
692                 return ("ACK");
693         case LNET_MSG_PUT:
694                 return ("PUT");
695         case LNET_MSG_GET:
696                 return ("GET");
697         case LNET_MSG_REPLY:
698                 return ("REPLY");
699         default:
700                 return ("<UNKNOWN>");
701         }
702 }
703
704 static void
705 kqswnal_cerror_hdr(lnet_hdr_t * hdr)
706 {
707         char *type_str = hdr_type_string (hdr);
708
709         CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
710                le32_to_cpu(hdr->payload_length));
711         CERROR("    From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),
712                le32_to_cpu(hdr->src_pid));
713         CERROR("    To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),
714                le32_to_cpu(hdr->dest_pid));
715
716         switch (le32_to_cpu(hdr->type)) {
717         case LNET_MSG_PUT:
718                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
719                        "match bits "LPX64"\n",
720                        le32_to_cpu(hdr->msg.put.ptl_index),
721                        hdr->msg.put.ack_wmd.wh_interface_cookie,
722                        hdr->msg.put.ack_wmd.wh_object_cookie,
723                        le64_to_cpu(hdr->msg.put.match_bits));
724                 CERROR("    offset %d, hdr data "LPX64"\n",
725                        le32_to_cpu(hdr->msg.put.offset),
726                        hdr->msg.put.hdr_data);
727                 break;
728
729         case LNET_MSG_GET:
730                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
731                        "match bits "LPX64"\n",
732                        le32_to_cpu(hdr->msg.get.ptl_index),
733                        hdr->msg.get.return_wmd.wh_interface_cookie,
734                        hdr->msg.get.return_wmd.wh_object_cookie,
735                        hdr->msg.get.match_bits);
736                 CERROR("    Length %d, src offset %d\n",
737                        le32_to_cpu(hdr->msg.get.sink_length),
738                        le32_to_cpu(hdr->msg.get.src_offset));
739                 break;
740
741         case LNET_MSG_ACK:
742                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
743                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
744                        hdr->msg.ack.dst_wmd.wh_object_cookie,
745                        le32_to_cpu(hdr->msg.ack.mlength));
746                 break;
747
748         case LNET_MSG_REPLY:
749                 CERROR("    dst md "LPX64"."LPX64"\n",
750                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
751                        hdr->msg.reply.dst_wmd.wh_object_cookie);
752         }
753
754 }                               /* end of print_hdr() */
755 #endif
756
757 int
758 kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
759                     int nrfrag, EP_NMD *rfrag)
760 {
761         int  i;
762
763         if (nlfrag != nrfrag) {
764                 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
765                        nlfrag, nrfrag);
766                 return (-EINVAL);
767         }
768         
769         for (i = 0; i < nlfrag; i++)
770                 if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
771                         CERROR("Can't cope with unequal frags %d(%d):"
772                                " %d local %d remote\n",
773                                i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
774                         return (-EINVAL);
775                 }
776         
777         return (0);
778 }
779
780 kqswnal_remotemd_t *
781 kqswnal_get_portalscompat_rmd (kqswnal_rx_t *krx)
782 {
783         /* Check that the RMD sent after the "raw" LNET header in a
784          * portals-compatible QSWLND message is OK */
785         char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
786         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + sizeof(lnet_hdr_t));
787
788         /* Note RDMA addresses are sent in native endian-ness in the "old"
789          * portals protocol so no swabbing... */
790
791         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
792                 /* msg too small to discover rmd size */
793                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
794                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
795                 return (NULL);
796         }
797
798         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
799                 /* rmd doesn't fit in the incoming message */
800                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
801                         krx->krx_nob, rmd->kqrmd_nfrag,
802                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
803                 return (NULL);
804         }
805
806         return (rmd);
807 }
808
809 void
810 kqswnal_rdma_store_complete (EP_RXD *rxd) 
811 {
812         int           status = ep_rxd_status(rxd);
813         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
814         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
815         
816         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
817                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
818
819         LASSERT (ktx->ktx_state == KTX_RDMA_STORE);
820         LASSERT (krx->krx_rxd == rxd);
821         LASSERT (krx->krx_rpc_reply_needed);
822
823         krx->krx_rpc_reply_needed = 0;
824         kqswnal_rx_decref (krx);
825
826         /* free ktx & finalize() its lnet_msg_t */
827         kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
828 }
829
830 void
831 kqswnal_rdma_fetch_complete (EP_RXD *rxd) 
832 {
833         /* Completed fetching the PUT/REPLY data */
834         int           status = ep_rxd_status(rxd);
835         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
836         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
837         
838         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
839                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
840
841         LASSERT (ktx->ktx_state == KTX_RDMA_FETCH);
842         LASSERT (krx->krx_rxd == rxd);
843         /* RPC completes with failure by default */
844         LASSERT (krx->krx_rpc_reply_needed);
845         LASSERT (krx->krx_rpc_reply.msg.status != 0);
846
847         if (status == EP_SUCCESS) {
848                 krx->krx_rpc_reply.msg.status = 0;
849                 status = 0;
850         } else {
851                 /* Abandon RPC since get failed */
852                 krx->krx_rpc_reply_needed = 0;
853                 status = -ECONNABORTED;
854         }
855
856         /* krx gets decref'd in kqswnal_tx_done_in_thread_context() */
857         LASSERT (krx->krx_state == KRX_PARSE);
858         krx->krx_state = KRX_COMPLETING;
859
860         /* free ktx & finalize() its lnet_msg_t */
861         kqswnal_tx_done(ktx, status);
862 }
863
864 int
865 kqswnal_rdma (kqswnal_rx_t *krx, lnet_msg_t *lntmsg,
866               int type, kqswnal_remotemd_t *rmd,
867               unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
868               unsigned int offset, unsigned int len)
869 {
870         kqswnal_tx_t       *ktx;
871         int                 eprc;
872         int                 rc;
873
874         /* Not both mapped and paged payload */
875         LASSERT (iov == NULL || kiov == NULL);
876         /* RPC completes with failure by default */
877         LASSERT (krx->krx_rpc_reply_needed);
878         LASSERT (krx->krx_rpc_reply.msg.status != 0);
879
880         if (len == 0) {
881                 /* data got truncated to nothing. */
882                 lnet_finalize(kqswnal_data.kqn_ni, lntmsg, 0);
883                 /* Let kqswnal_rx_done() complete the RPC with success */
884                 krx->krx_rpc_reply.msg.status = 0;
885                 return (0);
886         }
887         
888         /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
889            actually sending a portals message with it */
890         ktx = kqswnal_get_idle_tx();
891         if (ktx == NULL) {
892                 CERROR ("Can't get txd for RDMA with %s\n",
893                         libcfs_nid2str(kqswnal_rx_nid(krx)));
894                 return (-ENOMEM);
895         }
896
897         ktx->ktx_state   = type;
898         ktx->ktx_nid     = kqswnal_rx_nid(krx);
899         ktx->ktx_args[0] = krx;
900         ktx->ktx_args[1] = lntmsg;
901
902         LASSERT (cfs_atomic_read(&krx->krx_refcount) > 0);
903         /* Take an extra ref for the completion callback */
904         cfs_atomic_inc(&krx->krx_refcount);
905
906         /* Map on the rail the RPC prefers */
907         ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
908                                          ep_rxd_railmask(krx->krx_rxd));
909
910         /* Start mapping at offset 0 (we're not mapping any headers) */
911         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
912         
913         if (kiov != NULL)
914                 rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
915         else
916                 rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
917
918         if (rc != 0) {
919                 CERROR ("Can't map local RDMA data: %d\n", rc);
920                 goto out;
921         }
922
923         rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
924                                  rmd->kqrmd_nfrag, rmd->kqrmd_frag);
925         if (rc != 0) {
926                 CERROR ("Incompatible RDMA descriptors\n");
927                 goto out;
928         }
929
930         switch (type) {
931         default:
932                 LBUG();
933                 
934         case KTX_RDMA_STORE:
935                 krx->krx_rpc_reply.msg.status    = 0;
936                 krx->krx_rpc_reply.msg.magic     = LNET_PROTO_QSW_MAGIC;
937                 krx->krx_rpc_reply.msg.version   = QSWLND_PROTO_VERSION;
938                 krx->krx_rpc_reply.msg.u.get.len = len;
939 #if KQSW_CKSUM
940                 krx->krx_rpc_reply.msg.u.get.cksum = (kiov != NULL) ?
941                             kqswnal_csum_kiov(~0, offset, len, niov, kiov) :
942                             kqswnal_csum_iov(~0, offset, len, niov, iov);
943                 if (*kqswnal_tunables.kqn_inject_csum_error == 4) {
944                         krx->krx_rpc_reply.msg.u.get.cksum++;
945                         *kqswnal_tunables.kqn_inject_csum_error = 0;
946                 }
947 #endif
948                 eprc = ep_complete_rpc(krx->krx_rxd, 
949                                        kqswnal_rdma_store_complete, ktx, 
950                                        &krx->krx_rpc_reply.ep_statusblk, 
951                                        ktx->ktx_frags, rmd->kqrmd_frag, 
952                                        rmd->kqrmd_nfrag);
953                 if (eprc != EP_SUCCESS) {
954                         CERROR("can't complete RPC: %d\n", eprc);
955                         /* don't re-attempt RPC completion */
956                         krx->krx_rpc_reply_needed = 0;
957                         rc = -ECONNABORTED;
958                 }
959                 break;
960                 
961         case KTX_RDMA_FETCH:
962                 eprc = ep_rpc_get (krx->krx_rxd, 
963                                    kqswnal_rdma_fetch_complete, ktx,
964                                    rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
965                 if (eprc != EP_SUCCESS) {
966                         CERROR("ep_rpc_get failed: %d\n", eprc);
967                         /* Don't attempt RPC completion: 
968                          * EKC nuked it when the get failed */
969                         krx->krx_rpc_reply_needed = 0;
970                         rc = -ECONNABORTED;
971                 }
972                 break;
973         }
974
975  out:
976         if (rc != 0) {
977                 kqswnal_rx_decref(krx);                 /* drop callback's ref */
978                 kqswnal_put_idle_tx (ktx);
979         }
980
981         cfs_atomic_dec(&kqswnal_data.kqn_pending_txs);
982         return (rc);
983 }
984
985 int
986 kqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
987 {
988         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
989         int               type = lntmsg->msg_type;
990         lnet_process_id_t target = lntmsg->msg_target;
991         int               target_is_router = lntmsg->msg_target_is_router;
992         int               routing = lntmsg->msg_routing;
993         unsigned int      payload_niov = lntmsg->msg_niov;
994         struct iovec     *payload_iov = lntmsg->msg_iov;
995         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
996         unsigned int      payload_offset = lntmsg->msg_offset;
997         unsigned int      payload_nob = lntmsg->msg_len;
998         int               nob;
999         kqswnal_tx_t     *ktx;
1000         int               rc;
1001
1002         /* NB 1. hdr is in network byte order */
1003         /*    2. 'private' depends on the message type */
1004         
1005         CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",
1006                payload_nob, payload_niov, libcfs_id2str(target));
1007
1008         LASSERT (payload_nob == 0 || payload_niov > 0);
1009         LASSERT (payload_niov <= LNET_MAX_IOV);
1010
1011         /* It must be OK to kmap() if required */
1012         LASSERT (payload_kiov == NULL || !cfs_in_interrupt ());
1013         /* payload is either all vaddrs or all pages */
1014         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1015
1016         if (kqswnal_nid2elanid (target.nid) < 0) {
1017                 CERROR("%s not in my cluster\n", libcfs_nid2str(target.nid));
1018                 return -EIO;
1019         }
1020
1021         /* I may not block for a transmit descriptor if I might block the
1022          * router, receiver, or an interrupt handler. */
1023         ktx = kqswnal_get_idle_tx();
1024         if (ktx == NULL) {
1025                 CERROR ("Can't get txd for msg type %d for %s\n",
1026                         type, libcfs_nid2str(target.nid));
1027                 return (-ENOMEM);
1028         }
1029
1030         ktx->ktx_state   = KTX_SENDING;
1031         ktx->ktx_nid     = target.nid;
1032         ktx->ktx_args[0] = private;
1033         ktx->ktx_args[1] = lntmsg;
1034         ktx->ktx_args[2] = NULL;    /* set when a GET commits to REPLY */
1035
1036         /* The first frag will be the pre-mapped buffer. */
1037         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1038
1039         if ((!target_is_router &&               /* target.nid is final dest */
1040              !routing &&                        /* I'm the source */
1041              type == LNET_MSG_GET &&            /* optimize GET? */
1042              *kqswnal_tunables.kqn_optimized_gets != 0 &&
1043              lntmsg->msg_md->md_length >= 
1044              *kqswnal_tunables.kqn_optimized_gets) ||
1045             ((type == LNET_MSG_PUT ||            /* optimize PUT? */
1046               type == LNET_MSG_REPLY) &&         /* optimize REPLY? */
1047              *kqswnal_tunables.kqn_optimized_puts != 0 &&
1048              payload_nob >= *kqswnal_tunables.kqn_optimized_puts)) {
1049                 lnet_libmd_t       *md = lntmsg->msg_md;
1050                 kqswnal_msg_t      *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1051                 lnet_hdr_t         *mhdr;
1052                 kqswnal_remotemd_t *rmd;
1053
1054                 /* Optimised path: I send over the Elan vaddrs of the local
1055                  * buffers, and my peer DMAs directly to/from them.
1056                  *
1057                  * First I set up ktx as if it was going to send this
1058                  * payload, (it needs to map it anyway).  This fills
1059                  * ktx_frags[1] and onward with the network addresses
1060                  * of the buffer frags. */
1061
1062                 /* Send an RDMA message */
1063                 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1064                 msg->kqm_version = QSWLND_PROTO_VERSION;
1065                 msg->kqm_type = QSWLND_MSG_RDMA;
1066
1067                 mhdr = &msg->kqm_u.rdma.kqrm_hdr;
1068                 rmd  = &msg->kqm_u.rdma.kqrm_rmd;
1069
1070                 *mhdr = *hdr;
1071                 nob = (((char *)rmd) - ktx->ktx_buffer);
1072
1073                 if (type == LNET_MSG_GET) {
1074                         if ((md->md_options & LNET_MD_KIOV) != 0) 
1075                                 rc = kqswnal_map_tx_kiov (ktx, 0, md->md_length,
1076                                                           md->md_niov, md->md_iov.kiov);
1077                         else
1078                                 rc = kqswnal_map_tx_iov (ktx, 0, md->md_length,
1079                                                          md->md_niov, md->md_iov.iov);
1080                         ktx->ktx_state = KTX_GETTING;
1081                 } else {
1082                         if (payload_kiov != NULL)
1083                                 rc = kqswnal_map_tx_kiov(ktx, 0, payload_nob,
1084                                                          payload_niov, payload_kiov);
1085                         else
1086                                 rc = kqswnal_map_tx_iov(ktx, 0, payload_nob,
1087                                                         payload_niov, payload_iov);
1088                         ktx->ktx_state = KTX_PUTTING;
1089                 }
1090
1091                 if (rc != 0)
1092                         goto out;
1093
1094                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1095                 nob += offsetof(kqswnal_remotemd_t,
1096                                 kqrmd_frag[rmd->kqrmd_nfrag]);
1097                 LASSERT (nob <= KQSW_TX_BUFFER_SIZE);
1098
1099                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1100                        rmd->kqrmd_nfrag * sizeof(EP_NMD));
1101
1102                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1103 #if KQSW_CKSUM
1104                 msg->kqm_nob   = nob + payload_nob;
1105                 msg->kqm_cksum = 0;
1106                 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1107 #endif
1108                 if (type == LNET_MSG_GET) {
1109                         /* Allocate reply message now while I'm in thread context */
1110                         ktx->ktx_args[2] = lnet_create_reply_msg (
1111                                 kqswnal_data.kqn_ni, lntmsg);
1112                         if (ktx->ktx_args[2] == NULL)
1113                                 goto out;
1114
1115                         /* NB finalizing the REPLY message is my
1116                          * responsibility now, whatever happens. */
1117 #if KQSW_CKSUM
1118                         if (*kqswnal_tunables.kqn_inject_csum_error ==  3) {
1119                                 msg->kqm_cksum++;
1120                                 *kqswnal_tunables.kqn_inject_csum_error = 0;
1121                         }
1122
1123                 } else if (payload_kiov != NULL) {
1124                         /* must checksum payload after header so receiver can
1125                          * compute partial header cksum before swab.  Sadly
1126                          * this causes 2 rounds of kmap */
1127                         msg->kqm_cksum =
1128                                 kqswnal_csum_kiov(msg->kqm_cksum, 0, payload_nob,
1129                                                   payload_niov, payload_kiov);
1130                         if (*kqswnal_tunables.kqn_inject_csum_error ==  2) {
1131                                 msg->kqm_cksum++;
1132                                 *kqswnal_tunables.kqn_inject_csum_error = 0;
1133                         }
1134                 } else {
1135                         msg->kqm_cksum =
1136                                 kqswnal_csum_iov(msg->kqm_cksum, 0, payload_nob,
1137                                                  payload_niov, payload_iov);
1138                         if (*kqswnal_tunables.kqn_inject_csum_error ==  2) {
1139                                 msg->kqm_cksum++;
1140                                 *kqswnal_tunables.kqn_inject_csum_error = 0;
1141                         }
1142 #endif
1143                 }
1144                 
1145         } else if (payload_nob <= *kqswnal_tunables.kqn_tx_maxcontig) {
1146                 lnet_hdr_t    *mhdr;
1147                 char          *payload;
1148                 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1149
1150                 /* single frag copied into the pre-mapped buffer */
1151                 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1152                 msg->kqm_version = QSWLND_PROTO_VERSION;
1153                 msg->kqm_type = QSWLND_MSG_IMMEDIATE;
1154
1155                 mhdr = &msg->kqm_u.immediate.kqim_hdr;
1156                 payload = msg->kqm_u.immediate.kqim_payload;
1157
1158                 *mhdr = *hdr;
1159                 nob = (payload - ktx->ktx_buffer) + payload_nob;
1160
1161                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1162
1163                 if (payload_kiov != NULL)
1164                         lnet_copy_kiov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
1165                                             payload_niov, payload_kiov, 
1166                                             payload_offset, payload_nob);
1167                 else
1168                         lnet_copy_iov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
1169                                            payload_niov, payload_iov, 
1170                                            payload_offset, payload_nob);
1171 #if KQSW_CKSUM
1172                 msg->kqm_nob   = nob;
1173                 msg->kqm_cksum = 0;
1174                 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1175                 if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
1176                         msg->kqm_cksum++;
1177                         *kqswnal_tunables.kqn_inject_csum_error = 0;
1178                 }
1179 #endif
1180         } else {
1181                 lnet_hdr_t    *mhdr;
1182                 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1183
1184                 /* multiple frags: first is hdr in pre-mapped buffer */
1185                 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1186                 msg->kqm_version = QSWLND_PROTO_VERSION;
1187                 msg->kqm_type = QSWLND_MSG_IMMEDIATE;
1188
1189                 mhdr = &msg->kqm_u.immediate.kqim_hdr;
1190                 nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1191
1192                 *mhdr = *hdr;
1193
1194                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1195
1196                 if (payload_kiov != NULL)
1197                         rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
1198                                                   payload_niov, payload_kiov);
1199                 else
1200                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1201                                                  payload_niov, payload_iov);
1202                 if (rc != 0)
1203                         goto out;
1204
1205 #if KQSW_CKSUM
1206                 msg->kqm_nob   = nob + payload_nob;
1207                 msg->kqm_cksum = 0;
1208                 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1209
1210                 msg->kqm_cksum = (payload_kiov != NULL) ?
1211                                  kqswnal_csum_kiov(msg->kqm_cksum,
1212                                                    payload_offset, payload_nob,
1213                                                    payload_niov, payload_kiov) :
1214                                  kqswnal_csum_iov(msg->kqm_cksum,
1215                                                   payload_offset, payload_nob,
1216                                                   payload_niov, payload_iov);
1217
1218                 if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
1219                         msg->kqm_cksum++;
1220                         *kqswnal_tunables.kqn_inject_csum_error = 0;
1221                 }
1222 #endif
1223                 nob += payload_nob;
1224         }
1225
1226         ktx->ktx_port = (nob <= KQSW_SMALLMSG) ?
1227                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1228
1229         rc = kqswnal_launch (ktx);
1230
1231  out:
1232         CDEBUG_LIMIT(rc == 0? D_NET :D_NETERROR, "%s %d bytes to %s%s: rc %d\n",
1233                      routing ? (rc == 0 ? "Routed" : "Failed to route") :
1234                                (rc == 0 ? "Sent" : "Failed to send"),
1235                      nob, libcfs_nid2str(target.nid),
1236                      target_is_router ? "(router)" : "", rc);
1237
1238         if (rc != 0) {
1239                 lnet_msg_t *repmsg = (lnet_msg_t *)ktx->ktx_args[2];
1240                 int         state = ktx->ktx_state;
1241
1242                 kqswnal_put_idle_tx (ktx);
1243
1244                 if (state == KTX_GETTING && repmsg != NULL) {
1245                         /* We committed to reply, but there was a problem
1246                          * launching the GET.  We can't avoid delivering a
1247                          * REPLY event since we committed above, so we
1248                          * pretend the GET succeeded but the REPLY
1249                          * failed. */
1250                         rc = 0;
1251                         lnet_finalize (kqswnal_data.kqn_ni, lntmsg, 0);
1252                         lnet_finalize (kqswnal_data.kqn_ni, repmsg, -EIO);
1253                 }
1254                 
1255         }
1256         
1257         cfs_atomic_dec(&kqswnal_data.kqn_pending_txs);
1258         return (rc == 0 ? 0 : -EIO);
1259 }
1260
1261 void
1262 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1263 {
1264         LASSERT (cfs_atomic_read(&krx->krx_refcount) == 0);
1265         LASSERT (!krx->krx_rpc_reply_needed);
1266
1267         krx->krx_state = KRX_POSTED;
1268
1269         if (kqswnal_data.kqn_shuttingdown) {
1270                 /* free EKC rxd on shutdown */
1271                 ep_complete_receive(krx->krx_rxd);
1272         } else {
1273                 /* repost receive */
1274                 ep_requeue_receive(krx->krx_rxd, 
1275                                    kqswnal_rxhandler, krx,
1276                                    &krx->krx_elanbuffer, 0);
1277         }
1278 }
1279
1280 void
1281 kqswnal_rpc_complete (EP_RXD *rxd)
1282 {
1283         int           status = ep_rxd_status(rxd);
1284         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1285         
1286         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1287                "rxd %p, krx %p, status %d\n", rxd, krx, status);
1288
1289         LASSERT (krx->krx_rxd == rxd);
1290         LASSERT (krx->krx_rpc_reply_needed);
1291         
1292         krx->krx_rpc_reply_needed = 0;
1293         kqswnal_requeue_rx (krx);
1294 }
1295
1296 void
1297 kqswnal_rx_done (kqswnal_rx_t *krx) 
1298 {
1299         int           rc;
1300
1301         LASSERT (cfs_atomic_read(&krx->krx_refcount) == 0);
1302
1303         if (krx->krx_rpc_reply_needed) {
1304                 /* We've not completed the peer's RPC yet... */
1305                 krx->krx_rpc_reply.msg.magic   = LNET_PROTO_QSW_MAGIC;
1306                 krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION;
1307
1308                 LASSERT (!cfs_in_interrupt());
1309
1310                 rc = ep_complete_rpc(krx->krx_rxd, 
1311                                      kqswnal_rpc_complete, krx,
1312                                      &krx->krx_rpc_reply.ep_statusblk, 
1313                                      NULL, NULL, 0);
1314                 if (rc == EP_SUCCESS)
1315                         return;
1316
1317                 CERROR("can't complete RPC: %d\n", rc);
1318                 krx->krx_rpc_reply_needed = 0;
1319         }
1320
1321         kqswnal_requeue_rx(krx);
1322 }
1323         
1324 void
1325 kqswnal_parse (kqswnal_rx_t *krx)
1326 {
1327         lnet_ni_t      *ni = kqswnal_data.kqn_ni;
1328         kqswnal_msg_t  *msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
1329         lnet_nid_t      fromnid = kqswnal_rx_nid(krx);
1330         int             swab;
1331         int             n;
1332         int             i;
1333         int             nob;
1334         int             rc;
1335
1336         LASSERT (cfs_atomic_read(&krx->krx_refcount) == 1);
1337
1338         if (krx->krx_nob < offsetof(kqswnal_msg_t, kqm_u)) {
1339                 CERROR("Short message %d received from %s\n",
1340                        krx->krx_nob, libcfs_nid2str(fromnid));
1341                 goto done;
1342         }
1343
1344         swab = msg->kqm_magic == __swab32(LNET_PROTO_QSW_MAGIC);
1345
1346         if (swab || msg->kqm_magic == LNET_PROTO_QSW_MAGIC) {
1347 #if KQSW_CKSUM
1348                 __u32 csum0;
1349                 __u32 csum1;
1350
1351                 /* csum byte array before swab */
1352                 csum1 = msg->kqm_cksum;
1353                 msg->kqm_cksum = 0;
1354                 csum0 = kqswnal_csum_kiov(~0, 0, krx->krx_nob,
1355                                           krx->krx_npages, krx->krx_kiov);
1356                 msg->kqm_cksum = csum1;
1357 #endif
1358
1359                 if (swab) {
1360                         __swab16s(&msg->kqm_version);
1361                         __swab16s(&msg->kqm_type);
1362 #if KQSW_CKSUM
1363                         __swab32s(&msg->kqm_cksum);
1364                         __swab32s(&msg->kqm_nob);
1365 #endif
1366                 }
1367
1368                 if (msg->kqm_version != QSWLND_PROTO_VERSION) {
1369                         /* Future protocol version compatibility support!
1370                          * The next qswlnd-specific protocol rev will first
1371                          * send an RPC to check version.
1372                          * 1.4.6 and 1.4.7.early reply with a status
1373                          * block containing its current version.
1374                          * Later versions send a failure (-ve) status +
1375                          * magic/version */
1376
1377                         if (!krx->krx_rpc_reply_needed) {
1378                                 CERROR("Unexpected version %d from %s\n",
1379                                        msg->kqm_version, libcfs_nid2str(fromnid));
1380                                 goto done;
1381                         }
1382
1383                         LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
1384                         goto done;
1385                 }
1386
1387                 switch (msg->kqm_type) {
1388                 default:
1389                         CERROR("Bad request type %x from %s\n",
1390                                msg->kqm_type, libcfs_nid2str(fromnid));
1391                         goto done;
1392
1393                 case QSWLND_MSG_IMMEDIATE:
1394                         if (krx->krx_rpc_reply_needed) {
1395                                 /* Should have been a simple message */
1396                                 CERROR("IMMEDIATE sent as RPC from %s\n",
1397                                        libcfs_nid2str(fromnid));
1398                                 goto done;
1399                         }
1400
1401                         nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1402                         if (krx->krx_nob < nob) {
1403                                 CERROR("Short IMMEDIATE %d(%d) from %s\n",
1404                                        krx->krx_nob, nob, libcfs_nid2str(fromnid));
1405                                 goto done;
1406                         }
1407
1408 #if KQSW_CKSUM
1409                         if (csum0 != msg->kqm_cksum) {
1410                                 CERROR("Bad IMMEDIATE checksum %08x(%08x) from %s\n",
1411                                        csum0, msg->kqm_cksum, libcfs_nid2str(fromnid));
1412                                 CERROR("nob %d (%d)\n", krx->krx_nob, msg->kqm_nob);
1413                                 goto done;
1414                         }
1415 #endif
1416                         rc = lnet_parse(ni, &msg->kqm_u.immediate.kqim_hdr,
1417                                         fromnid, krx, 0);
1418                         if (rc < 0)
1419                                 goto done;
1420                         return;
1421
1422                 case QSWLND_MSG_RDMA:
1423                         if (!krx->krx_rpc_reply_needed) {
1424                                 /* Should have been a simple message */
1425                                 CERROR("RDMA sent as simple message from %s\n",
1426                                        libcfs_nid2str(fromnid));
1427                                 goto done;
1428                         }
1429
1430                         nob = offsetof(kqswnal_msg_t,
1431                                        kqm_u.rdma.kqrm_rmd.kqrmd_frag[0]);
1432                         if (krx->krx_nob < nob) {
1433                                 CERROR("Short RDMA message %d(%d) from %s\n",
1434                                        krx->krx_nob, nob, libcfs_nid2str(fromnid));
1435                                 goto done;
1436                         }
1437
1438                         if (swab)
1439                                 __swab32s(&msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag);
1440
1441                         n = msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag;
1442                         nob = offsetof(kqswnal_msg_t,
1443                                        kqm_u.rdma.kqrm_rmd.kqrmd_frag[n]);
1444
1445                         if (krx->krx_nob < nob) {
1446                                 CERROR("short RDMA message %d(%d) from %s\n",
1447                                        krx->krx_nob, nob, libcfs_nid2str(fromnid));
1448                                 goto done;
1449                         }
1450
1451                         if (swab) {
1452                                 for (i = 0; i < n; i++) {
1453                                         EP_NMD *nmd = &msg->kqm_u.rdma.kqrm_rmd.kqrmd_frag[i];
1454
1455                                         __swab32s(&nmd->nmd_addr);
1456                                         __swab32s(&nmd->nmd_len);
1457                                         __swab32s(&nmd->nmd_attr);
1458                                 }
1459                         }
1460
1461 #if KQSW_CKSUM
1462                         krx->krx_cksum = csum0; /* stash checksum so far */
1463 #endif
1464                         rc = lnet_parse(ni, &msg->kqm_u.rdma.kqrm_hdr,
1465                                         fromnid, krx, 1);
1466                         if (rc < 0)
1467                                 goto done;
1468                         return;
1469                 }
1470                 /* Not Reached */
1471         }
1472
1473         if (msg->kqm_magic == LNET_PROTO_MAGIC ||
1474             msg->kqm_magic == __swab32(LNET_PROTO_MAGIC)) {
1475                 /* Future protocol version compatibility support!
1476                  * When LNET unifies protocols over all LNDs, the first thing a
1477                  * peer will send will be a version query RPC.  
1478                  * 1.4.6 and 1.4.7.early reply with a status block containing
1479                  * LNET_PROTO_QSW_MAGIC..
1480                  * Later versions send a failure (-ve) status +
1481                  * magic/version */
1482
1483                 if (!krx->krx_rpc_reply_needed) {
1484                         CERROR("Unexpected magic %08x from %s\n",
1485                                msg->kqm_magic, libcfs_nid2str(fromnid));
1486                         goto done;
1487                 }
1488
1489                 LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
1490                 goto done;
1491         }
1492
1493         CERROR("Unrecognised magic %08x from %s\n",
1494                msg->kqm_magic, libcfs_nid2str(fromnid));
1495  done:
1496         kqswnal_rx_decref(krx);
1497 }
1498
1499 /* Receive Interrupt Handler: posts to schedulers */
1500 void 
1501 kqswnal_rxhandler(EP_RXD *rxd)
1502 {
1503         unsigned long flags;
1504         int           nob    = ep_rxd_len (rxd);
1505         int           status = ep_rxd_status (rxd);
1506         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1507         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1508                rxd, krx, nob, status);
1509
1510         LASSERT (krx != NULL);
1511         LASSERT (krx->krx_state == KRX_POSTED);
1512         
1513         krx->krx_state = KRX_PARSE;
1514         krx->krx_rxd = rxd;
1515         krx->krx_nob = nob;
1516
1517         /* RPC reply iff rpc request received without error */
1518         krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
1519                                     (status == EP_SUCCESS ||
1520                                      status == EP_MSG_TOO_BIG);
1521
1522         /* Default to failure if an RPC reply is requested but not handled */
1523         krx->krx_rpc_reply.msg.status = -EPROTO;
1524         cfs_atomic_set (&krx->krx_refcount, 1);
1525
1526         if (status != EP_SUCCESS) {
1527                 /* receives complete with failure when receiver is removed */
1528                 if (status == EP_SHUTDOWN)
1529                         LASSERT (kqswnal_data.kqn_shuttingdown);
1530                 else
1531                         CERROR("receive status failed with status %d nob %d\n",
1532                                ep_rxd_status(rxd), nob);
1533                 kqswnal_rx_decref(krx);
1534                 return;
1535         }
1536
1537         if (!cfs_in_interrupt()) {
1538                 kqswnal_parse(krx);
1539                 return;
1540         }
1541
1542         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1543
1544         cfs_list_add_tail(&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1545         wake_up(&kqswnal_data.kqn_sched_waitq);
1546
1547         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);
1548 }
1549
1550 int
1551 kqswnal_recv (lnet_ni_t     *ni,
1552               void          *private,
1553               lnet_msg_t    *lntmsg,
1554               int            delayed,
1555               unsigned int   niov,
1556               struct iovec  *iov,
1557               lnet_kiov_t   *kiov,
1558               unsigned int   offset,
1559               unsigned int   mlen,
1560               unsigned int   rlen)
1561 {
1562         kqswnal_rx_t       *krx = (kqswnal_rx_t *)private;
1563         lnet_nid_t          fromnid;
1564         kqswnal_msg_t      *msg;
1565         lnet_hdr_t         *hdr;
1566         kqswnal_remotemd_t *rmd;
1567         int                 msg_offset;
1568         int                 rc;
1569
1570         LASSERT (!cfs_in_interrupt ());             /* OK to map */
1571         /* Either all pages or all vaddrs */
1572         LASSERT (!(kiov != NULL && iov != NULL));
1573
1574         fromnid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ep_rxd_node(krx->krx_rxd));
1575         msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
1576
1577         if (krx->krx_rpc_reply_needed) {
1578                 /* optimized (rdma) request sent as RPC */
1579
1580                 LASSERT (msg->kqm_type == QSWLND_MSG_RDMA);
1581                 hdr = &msg->kqm_u.rdma.kqrm_hdr;
1582                 rmd = &msg->kqm_u.rdma.kqrm_rmd;
1583
1584                 /* NB header is still in wire byte order */
1585
1586                 switch (le32_to_cpu(hdr->type)) {
1587                         case LNET_MSG_PUT:
1588                         case LNET_MSG_REPLY:
1589                                 /* This is an optimized PUT/REPLY */
1590                                 rc = kqswnal_rdma(krx, lntmsg, 
1591                                                   KTX_RDMA_FETCH, rmd,
1592                                                   niov, iov, kiov, offset, mlen);
1593                                 break;
1594
1595                         case LNET_MSG_GET:
1596 #if KQSW_CKSUM
1597                                 if (krx->krx_cksum != msg->kqm_cksum) {
1598                                         CERROR("Bad GET checksum %08x(%08x) from %s\n",
1599                                                krx->krx_cksum, msg->kqm_cksum,
1600                                                libcfs_nid2str(fromnid));
1601                                         rc = -EIO;
1602                                         break;
1603                                 }
1604 #endif                                
1605                                 if (lntmsg == NULL) {
1606                                         /* No buffer match: my decref will
1607                                          * complete the RPC with failure */
1608                                         rc = 0;
1609                                 } else {
1610                                         /* Matched something! */
1611                                         rc = kqswnal_rdma(krx, lntmsg,
1612                                                           KTX_RDMA_STORE, rmd,
1613                                                           lntmsg->msg_niov,
1614                                                           lntmsg->msg_iov,
1615                                                           lntmsg->msg_kiov,
1616                                                           lntmsg->msg_offset,
1617                                                           lntmsg->msg_len);
1618                                 }
1619                                 break;
1620
1621                         default:
1622                                 CERROR("Bad RPC type %d\n",
1623                                        le32_to_cpu(hdr->type));
1624                                 rc = -EPROTO;
1625                                 break;
1626                 }
1627
1628                 kqswnal_rx_decref(krx);
1629                 return rc;
1630         }
1631
1632         LASSERT (msg->kqm_type == QSWLND_MSG_IMMEDIATE);
1633         msg_offset = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1634         
1635         if (krx->krx_nob < msg_offset + rlen) {
1636                 CERROR("Bad message size from %s: have %d, need %d + %d\n",
1637                        libcfs_nid2str(fromnid), krx->krx_nob,
1638                        msg_offset, rlen);
1639                 kqswnal_rx_decref(krx);
1640                 return -EPROTO;
1641         }
1642
1643         if (kiov != NULL)
1644                 lnet_copy_kiov2kiov(niov, kiov, offset,
1645                                     krx->krx_npages, krx->krx_kiov, 
1646                                     msg_offset, mlen);
1647         else
1648                 lnet_copy_kiov2iov(niov, iov, offset,
1649                                    krx->krx_npages, krx->krx_kiov, 
1650                                    msg_offset, mlen);
1651
1652         lnet_finalize(ni, lntmsg, 0);
1653         kqswnal_rx_decref(krx);
1654         return 0;
1655 }
1656
1657 int
1658 kqswnal_thread_start(int (*fn)(void *arg), void *arg, char *name)
1659 {
1660         struct task_struct *task = cfs_thread_run(fn, arg, name);
1661
1662         if (IS_ERR(task))
1663                 return PTR_ERR(task);
1664
1665         cfs_atomic_inc(&kqswnal_data.kqn_nthreads);
1666         return 0;
1667 }
1668
1669 void
1670 kqswnal_thread_fini (void)
1671 {
1672         cfs_atomic_dec (&kqswnal_data.kqn_nthreads);
1673 }
1674
1675 int
1676 kqswnal_scheduler (void *arg)
1677 {
1678         kqswnal_rx_t    *krx;
1679         kqswnal_tx_t    *ktx;
1680         unsigned long    flags;
1681         int              rc;
1682         int              counter = 0;
1683         int              did_something;
1684
1685         cfs_block_allsigs ();
1686
1687         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1688
1689         for (;;)
1690         {
1691                 did_something = 0;
1692
1693                 if (!cfs_list_empty (&kqswnal_data.kqn_readyrxds))
1694                 {
1695                         krx = cfs_list_entry(kqswnal_data.kqn_readyrxds.next,
1696                                              kqswnal_rx_t, krx_list);
1697                         cfs_list_del (&krx->krx_list);
1698                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1699                                                    flags);
1700
1701                         LASSERT (krx->krx_state == KRX_PARSE);
1702                         kqswnal_parse (krx);
1703
1704                         did_something = 1;
1705                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
1706                                               flags);
1707                 }
1708
1709                 if (!cfs_list_empty (&kqswnal_data.kqn_donetxds))
1710                 {
1711                         ktx = cfs_list_entry(kqswnal_data.kqn_donetxds.next,
1712                                              kqswnal_tx_t, ktx_schedlist);
1713                         cfs_list_del_init (&ktx->ktx_schedlist);
1714                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1715                                                    flags);
1716
1717                         kqswnal_tx_done_in_thread_context(ktx);
1718
1719                         did_something = 1;
1720                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
1721                                                flags);
1722                 }
1723
1724                 if (!cfs_list_empty (&kqswnal_data.kqn_delayedtxds))
1725                 {
1726                         ktx = cfs_list_entry(kqswnal_data.kqn_delayedtxds.next,
1727                                              kqswnal_tx_t, ktx_schedlist);
1728                         cfs_list_del_init (&ktx->ktx_schedlist);
1729                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1730                                                    flags);
1731
1732                         rc = kqswnal_launch (ktx);
1733                         if (rc != 0) {
1734                                 CERROR("Failed delayed transmit to %s: %d\n", 
1735                                        libcfs_nid2str(ktx->ktx_nid), rc);
1736                                 kqswnal_tx_done (ktx, rc);
1737                         }
1738                         cfs_atomic_dec (&kqswnal_data.kqn_pending_txs);
1739
1740                         did_something = 1;
1741                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
1742                                                flags);
1743                 }
1744
1745                 /* nothing to do or hogging CPU */
1746                 if (!did_something || counter++ == KQSW_RESCHED) {
1747                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1748                                                    flags);
1749
1750                         counter = 0;
1751
1752                         if (!did_something) {
1753                                 if (kqswnal_data.kqn_shuttingdown == 2) {
1754                                         /* We only exit in stage 2 of shutdown
1755                                          * when there's nothing left to do */
1756                                         break;
1757                                 }
1758                                 rc = wait_event_interruptible_exclusive (
1759                                         kqswnal_data.kqn_sched_waitq,
1760                                         kqswnal_data.kqn_shuttingdown == 2 ||
1761                                         !cfs_list_empty(&kqswnal_data. \
1762                                                         kqn_readyrxds) ||
1763                                         !cfs_list_empty(&kqswnal_data. \
1764                                                         kqn_donetxds) ||
1765                                         !cfs_list_empty(&kqswnal_data. \
1766                                                         kqn_delayedtxds));
1767                                 LASSERT (rc == 0);
1768                         } else if (need_resched())
1769                                 schedule ();
1770
1771                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
1772                                                flags);
1773                 }
1774         }
1775
1776         kqswnal_thread_fini ();
1777         return 0;
1778 }