Whamcloud - gitweb
b=13139,i=liangzhen,i=maxim:
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
5  *
6  * Author: Eric Barton <eric@bartonsoftware.com>
7  *
8  * This file is part of Portals, http://www.lustre.org
9  *
10  * Portals is free software; you can redistribute it and/or
11  * modify it under the terms of version 2 of the GNU General Public
12  * License as published by the Free Software Foundation.
13  *
14  * Portals is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with Portals; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include "qswlnd.h"
25
26 void
27 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
28 {
29         time_t             then;
30
31         then = cfs_time_current_sec() -
32                 cfs_duration_sec(cfs_time_current() -
33                                  ktx->ktx_launchtime);
34
35         lnet_notify(kqswnal_data.kqn_ni, ktx->ktx_nid, 0, then);
36 }
37
38 void
39 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
40 {
41         int      i;
42
43         ktx->ktx_rail = -1;                     /* unset rail */
44
45         if (ktx->ktx_nmappedpages == 0)
46                 return;
47         
48         CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
49                ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
50
51         for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
52                 ep_dvma_unload(kqswnal_data.kqn_ep,
53                                kqswnal_data.kqn_ep_tx_nmh,
54                                &ktx->ktx_frags[i]);
55
56         ktx->ktx_nmappedpages = 0;
57 }
58
59 int
60 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, 
61                      unsigned int niov, lnet_kiov_t *kiov)
62 {
63         int       nfrags    = ktx->ktx_nfrag;
64         int       nmapped   = ktx->ktx_nmappedpages;
65         int       maxmapped = ktx->ktx_npages;
66         __u32     basepage  = ktx->ktx_basepage + nmapped;
67         char     *ptr;
68
69         EP_RAILMASK railmask;
70         int         rail;
71
72         if (ktx->ktx_rail < 0)
73                 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
74                                                  EP_RAILMASK_ALL,
75                                                  kqswnal_nid2elanid(ktx->ktx_nid));
76         rail = ktx->ktx_rail;
77         if (rail < 0) {
78                 CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
79                 return (-ENETDOWN);
80         }
81         railmask = 1 << rail;
82
83         LASSERT (nmapped <= maxmapped);
84         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
85         LASSERT (nfrags <= EP_MAXFRAG);
86         LASSERT (niov > 0);
87         LASSERT (nob > 0);
88
89         /* skip complete frags before 'offset' */
90         while (offset >= kiov->kiov_len) {
91                 offset -= kiov->kiov_len;
92                 kiov++;
93                 niov--;
94                 LASSERT (niov > 0);
95         }
96
97         do {
98                 int  fraglen = kiov->kiov_len - offset;
99
100                 /* each page frag is contained in one page */
101                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
102
103                 if (fraglen > nob)
104                         fraglen = nob;
105
106                 nmapped++;
107                 if (nmapped > maxmapped) {
108                         CERROR("Can't map message in %d pages (max %d)\n",
109                                nmapped, maxmapped);
110                         return (-EMSGSIZE);
111                 }
112
113                 if (nfrags == EP_MAXFRAG) {
114                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
115                                EP_MAXFRAG);
116                         return (-EMSGSIZE);
117                 }
118
119                 /* XXX this is really crap, but we'll have to kmap until
120                  * EKC has a page (rather than vaddr) mapping interface */
121
122                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
123
124                 CDEBUG(D_NET,
125                        "%p[%d] loading %p for %d, page %d, %d total\n",
126                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
127
128                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
129                              ptr, fraglen,
130                              kqswnal_data.kqn_ep_tx_nmh, basepage,
131                              &railmask, &ktx->ktx_frags[nfrags]);
132
133                 if (nfrags == ktx->ktx_firsttmpfrag ||
134                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
135                                   &ktx->ktx_frags[nfrags - 1],
136                                   &ktx->ktx_frags[nfrags])) {
137                         /* new frag if this is the first or can't merge */
138                         nfrags++;
139                 }
140
141                 kunmap (kiov->kiov_page);
142                 
143                 /* keep in loop for failure case */
144                 ktx->ktx_nmappedpages = nmapped;
145
146                 basepage++;
147                 kiov++;
148                 niov--;
149                 nob -= fraglen;
150                 offset = 0;
151
152                 /* iov must not run out before end of data */
153                 LASSERT (nob == 0 || niov > 0);
154
155         } while (nob > 0);
156
157         ktx->ktx_nfrag = nfrags;
158         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
159                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
160
161         return (0);
162 }
163
164 #if KQSW_CKSUM
165 __u32
166 kqswnal_csum_kiov (__u32 csum, int offset, int nob, 
167                    unsigned int niov, lnet_kiov_t *kiov)
168 {
169         char     *ptr;
170
171         if (nob == 0)
172                 return csum;
173
174         LASSERT (niov > 0);
175         LASSERT (nob > 0);
176
177         /* skip complete frags before 'offset' */
178         while (offset >= kiov->kiov_len) {
179                 offset -= kiov->kiov_len;
180                 kiov++;
181                 niov--;
182                 LASSERT (niov > 0);
183         }
184
185         do {
186                 int  fraglen = kiov->kiov_len - offset;
187
188                 /* each page frag is contained in one page */
189                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
190
191                 if (fraglen > nob)
192                         fraglen = nob;
193
194                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
195
196                 csum = kqswnal_csum(csum, ptr, fraglen);
197
198                 kunmap (kiov->kiov_page);
199                 
200                 kiov++;
201                 niov--;
202                 nob -= fraglen;
203                 offset = 0;
204
205                 /* iov must not run out before end of data */
206                 LASSERT (nob == 0 || niov > 0);
207
208         } while (nob > 0);
209
210         return csum;
211 }
212 #endif
213
214 int
215 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
216                     unsigned int niov, struct iovec *iov)
217 {
218         int       nfrags    = ktx->ktx_nfrag;
219         int       nmapped   = ktx->ktx_nmappedpages;
220         int       maxmapped = ktx->ktx_npages;
221         __u32     basepage  = ktx->ktx_basepage + nmapped;
222
223         EP_RAILMASK railmask;
224         int         rail;
225         
226         if (ktx->ktx_rail < 0)
227                 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
228                                                  EP_RAILMASK_ALL,
229                                                  kqswnal_nid2elanid(ktx->ktx_nid));
230         rail = ktx->ktx_rail;
231         if (rail < 0) {
232                 CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
233                 return (-ENETDOWN);
234         }
235         railmask = 1 << rail;
236
237         LASSERT (nmapped <= maxmapped);
238         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
239         LASSERT (nfrags <= EP_MAXFRAG);
240         LASSERT (niov > 0);
241         LASSERT (nob > 0);
242
243         /* skip complete frags before offset */
244         while (offset >= iov->iov_len) {
245                 offset -= iov->iov_len;
246                 iov++;
247                 niov--;
248                 LASSERT (niov > 0);
249         }
250         
251         do {
252                 int  fraglen = iov->iov_len - offset;
253                 long npages;
254                 
255                 if (fraglen > nob)
256                         fraglen = nob;
257                 npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
258
259                 nmapped += npages;
260                 if (nmapped > maxmapped) {
261                         CERROR("Can't map message in %d pages (max %d)\n",
262                                nmapped, maxmapped);
263                         return (-EMSGSIZE);
264                 }
265
266                 if (nfrags == EP_MAXFRAG) {
267                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
268                                EP_MAXFRAG);
269                         return (-EMSGSIZE);
270                 }
271
272                 CDEBUG(D_NET,
273                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
274                        ktx, nfrags, iov->iov_base + offset, fraglen, 
275                        basepage, npages, nmapped);
276
277                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
278                              iov->iov_base + offset, fraglen,
279                              kqswnal_data.kqn_ep_tx_nmh, basepage,
280                              &railmask, &ktx->ktx_frags[nfrags]);
281
282                 if (nfrags == ktx->ktx_firsttmpfrag ||
283                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
284                                   &ktx->ktx_frags[nfrags - 1],
285                                   &ktx->ktx_frags[nfrags])) {
286                         /* new frag if this is the first or can't merge */
287                         nfrags++;
288                 }
289
290                 /* keep in loop for failure case */
291                 ktx->ktx_nmappedpages = nmapped;
292
293                 basepage += npages;
294                 iov++;
295                 niov--;
296                 nob -= fraglen;
297                 offset = 0;
298
299                 /* iov must not run out before end of data */
300                 LASSERT (nob == 0 || niov > 0);
301
302         } while (nob > 0);
303
304         ktx->ktx_nfrag = nfrags;
305         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
306                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
307
308         return (0);
309 }
310
311 #if KQSW_CKSUM
312 __u32
313 kqswnal_csum_iov (__u32 csum, int offset, int nob, 
314                   unsigned int niov, struct iovec *iov)
315 {
316         if (nob == 0)
317                 return csum;
318         
319         LASSERT (niov > 0);
320         LASSERT (nob > 0);
321
322         /* skip complete frags before offset */
323         while (offset >= iov->iov_len) {
324                 offset -= iov->iov_len;
325                 iov++;
326                 niov--;
327                 LASSERT (niov > 0);
328         }
329         
330         do {
331                 int  fraglen = iov->iov_len - offset;
332                 
333                 if (fraglen > nob)
334                         fraglen = nob;
335
336                 csum = kqswnal_csum(csum, iov->iov_base + offset, fraglen);
337
338                 iov++;
339                 niov--;
340                 nob -= fraglen;
341                 offset = 0;
342
343                 /* iov must not run out before end of data */
344                 LASSERT (nob == 0 || niov > 0);
345
346         } while (nob > 0);
347
348         return csum;
349 }
350 #endif
351
352 void
353 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
354 {
355         unsigned long     flags;
356
357         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
358         ktx->ktx_state = KTX_IDLE;
359
360         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
361
362         list_del (&ktx->ktx_list);              /* take off active list */
363         list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
364
365         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
366 }
367
368 kqswnal_tx_t *
369 kqswnal_get_idle_tx (void)
370 {
371         unsigned long  flags;
372         kqswnal_tx_t  *ktx;
373
374         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
375
376         if (kqswnal_data.kqn_shuttingdown ||
377             list_empty (&kqswnal_data.kqn_idletxds)) {
378                 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
379
380                 return NULL;
381         }
382
383         ktx = list_entry (kqswnal_data.kqn_idletxds.next, kqswnal_tx_t, ktx_list);
384         list_del (&ktx->ktx_list);
385
386         list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
387         ktx->ktx_launcher = current->pid;
388         atomic_inc(&kqswnal_data.kqn_pending_txs);
389
390         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
391
392         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
393         LASSERT (ktx->ktx_nmappedpages == 0);
394         return (ktx);
395 }
396
397 void
398 kqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx)
399 {
400         lnet_msg_t    *lnetmsg0 = NULL;
401         lnet_msg_t    *lnetmsg1 = NULL;
402         int            status0  = 0;
403         int            status1  = 0;
404         kqswnal_rx_t  *krx;
405         
406         LASSERT (!in_interrupt());
407         
408         if (ktx->ktx_status == -EHOSTDOWN)
409                 kqswnal_notify_peer_down(ktx);
410
411         switch (ktx->ktx_state) {
412         case KTX_RDMA_FETCH:                    /* optimized PUT/REPLY handled */
413                 krx      = (kqswnal_rx_t *)ktx->ktx_args[0];
414                 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
415                 status0  = ktx->ktx_status;
416 #if KQSW_CKSUM
417                 if (status0 == 0) {             /* RDMA succeeded */
418                         kqswnal_msg_t *msg;
419                         __u32          csum;
420
421                         msg = (kqswnal_msg_t *)
422                               page_address(krx->krx_kiov[0].kiov_page);
423
424                         csum = (lnetmsg0->msg_kiov != NULL) ?
425                                kqswnal_csum_kiov(krx->krx_cksum,
426                                                  lnetmsg0->msg_offset,
427                                                  lnetmsg0->msg_wanted,
428                                                  lnetmsg0->msg_niov,
429                                                  lnetmsg0->msg_kiov) :
430                                kqswnal_csum_iov(krx->krx_cksum,
431                                                 lnetmsg0->msg_offset,
432                                                 lnetmsg0->msg_wanted,
433                                                 lnetmsg0->msg_niov,
434                                                 lnetmsg0->msg_iov);
435
436                         /* Can only check csum if I got it all */
437                         if (lnetmsg0->msg_wanted == lnetmsg0->msg_len &&
438                             csum != msg->kqm_cksum) {
439                                 ktx->ktx_status = -EIO;
440                                 krx->krx_rpc_reply.msg.status = -EIO;
441                                 CERROR("RDMA checksum failed %u(%u) from %s\n",
442                                        csum, msg->kqm_cksum,
443                                        libcfs_nid2str(kqswnal_rx_nid(krx)));
444                         }
445                 }
446 #endif       
447                 LASSERT (krx->krx_state == KRX_COMPLETING);
448                 kqswnal_rx_decref (krx);
449                 break;
450
451         case KTX_RDMA_STORE:       /* optimized GET handled */
452         case KTX_PUTTING:          /* optimized PUT sent */
453         case KTX_SENDING:          /* normal send */
454                 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
455                 status0  = ktx->ktx_status;
456                 break;
457
458         case KTX_GETTING:          /* optimized GET sent & payload received */
459                 /* Complete the GET with success since we can't avoid
460                  * delivering a REPLY event; we committed to it when we
461                  * launched the GET */
462                 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
463                 status0  = 0;
464                 lnetmsg1 = (lnet_msg_t *)ktx->ktx_args[2];
465                 status1  = ktx->ktx_status;
466 #if KQSW_CKSUM
467                 if (status1 == 0) {             /* RDMA succeeded */
468                         lnet_msg_t   *lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
469                         lnet_libmd_t *md = lnetmsg0->msg_md;
470                         __u32         csum;
471                 
472                         csum = ((md->md_options & LNET_MD_KIOV) != 0) ? 
473                                kqswnal_csum_kiov(~0, 0,
474                                                  md->md_length,
475                                                  md->md_niov, 
476                                                  md->md_iov.kiov) :
477                                kqswnal_csum_iov(~0, 0,
478                                                 md->md_length,
479                                                 md->md_niov,
480                                                 md->md_iov.iov);
481
482                         if (csum != ktx->ktx_cksum) {
483                                 CERROR("RDMA checksum failed %u(%u) from %s\n",
484                                        csum, ktx->ktx_cksum,
485                                        libcfs_nid2str(ktx->ktx_nid));
486                                 status1 = -EIO;
487                         }
488                 }
489 #endif                
490                 break;
491
492         default:
493                 LASSERT (0);
494         }
495
496         kqswnal_put_idle_tx (ktx);
497
498         lnet_finalize (kqswnal_data.kqn_ni, lnetmsg0, status0);
499         if (lnetmsg1 != NULL)
500                 lnet_finalize (kqswnal_data.kqn_ni, lnetmsg1, status1);
501 }
502
503 void
504 kqswnal_tx_done (kqswnal_tx_t *ktx, int status)
505 {
506         unsigned long      flags;
507
508         ktx->ktx_status = status;
509
510         if (!in_interrupt()) {
511                 kqswnal_tx_done_in_thread_context(ktx);
512                 return;
513         }
514
515         /* Complete the send in thread context */
516         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
517         
518         list_add_tail(&ktx->ktx_schedlist, 
519                       &kqswnal_data.kqn_donetxds);
520         wake_up(&kqswnal_data.kqn_sched_waitq);
521         
522         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);
523 }
524
525 static void
526 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
527 {
528         kqswnal_tx_t         *ktx = (kqswnal_tx_t *)arg;
529         kqswnal_rpc_reply_t  *reply;
530
531         LASSERT (txd != NULL);
532         LASSERT (ktx != NULL);
533
534         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
535
536         if (status != EP_SUCCESS) {
537
538                 CDEBUG (D_NETERROR, "Tx completion to %s failed: %d\n", 
539                         libcfs_nid2str(ktx->ktx_nid), status);
540
541                 status = -EHOSTDOWN;
542
543         } else switch (ktx->ktx_state) {
544
545         case KTX_GETTING:
546         case KTX_PUTTING:
547                 /* RPC complete! */
548                 reply = (kqswnal_rpc_reply_t *)ep_txd_statusblk(txd);
549                 if (reply->msg.magic == 0) {    /* "old" peer */
550                         status = reply->msg.status;
551                         break;
552                 }
553                 
554                 if (reply->msg.magic != LNET_PROTO_QSW_MAGIC) {
555                         if (reply->msg.magic != swab32(LNET_PROTO_QSW_MAGIC)) {
556                                 CERROR("%s unexpected rpc reply magic %08x\n",
557                                        libcfs_nid2str(ktx->ktx_nid),
558                                        reply->msg.magic);
559                                 status = -EPROTO;
560                                 break;
561                         }
562
563                         __swab32s(&reply->msg.status);
564                         __swab32s(&reply->msg.version);
565                         
566                         if (ktx->ktx_state == KTX_GETTING) {
567                                 __swab32s(&reply->msg.u.get.len);
568                                 __swab32s(&reply->msg.u.get.cksum);
569                         }
570                 }
571                         
572                 status = reply->msg.status;
573                 if (status != 0) {
574                         CERROR("%s RPC status %08x\n",
575                                libcfs_nid2str(ktx->ktx_nid), status);
576                         break;
577                 }
578
579                 if (ktx->ktx_state == KTX_GETTING) {
580                         lnet_set_reply_msg_len(kqswnal_data.kqn_ni,
581                                                (lnet_msg_t *)ktx->ktx_args[2],
582                                                reply->msg.u.get.len);
583 #if KQSW_CKSUM
584                         ktx->ktx_cksum = reply->msg.u.get.cksum;
585 #endif
586                 }
587                 break;
588                 
589         case KTX_SENDING:
590                 status = 0;
591                 break;
592                 
593         default:
594                 LBUG();
595                 break;
596         }
597
598         kqswnal_tx_done(ktx, status);
599 }
600
601 int
602 kqswnal_launch (kqswnal_tx_t *ktx)
603 {
604         /* Don't block for transmit descriptor if we're in interrupt context */
605         int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
606         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
607         unsigned long flags;
608         int   rc;
609
610         ktx->ktx_launchtime = cfs_time_current();
611
612         if (kqswnal_data.kqn_shuttingdown)
613                 return (-ESHUTDOWN);
614
615         LASSERT (dest >= 0);                    /* must be a peer */
616
617         if (ktx->ktx_nmappedpages != 0)
618                 attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
619
620         switch (ktx->ktx_state) {
621         case KTX_GETTING:
622         case KTX_PUTTING:
623                 if (the_lnet.ln_testprotocompat != 0) {
624                         kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
625
626                         /* single-shot proto test:
627                          * Future version queries will use an RPC, so I'll
628                          * co-opt one of the existing ones */
629                         LNET_LOCK();
630                         if ((the_lnet.ln_testprotocompat & 1) != 0) {
631                                 msg->kqm_version++;
632                                 the_lnet.ln_testprotocompat &= ~1;
633                         }
634                         if ((the_lnet.ln_testprotocompat & 2) != 0) {
635                                 msg->kqm_magic = LNET_PROTO_MAGIC;
636                                 the_lnet.ln_testprotocompat &= ~2;
637                         }
638                         LNET_UNLOCK();
639                 }
640
641                 /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
642                  * The other frags are the payload, awaiting RDMA */
643                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
644                                      ktx->ktx_port, attr,
645                                      kqswnal_txhandler, ktx,
646                                      NULL, ktx->ktx_frags, 1);
647                 break;
648
649         case KTX_SENDING:
650                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
651                                          ktx->ktx_port, attr,
652                                          kqswnal_txhandler, ktx,
653                                          NULL, ktx->ktx_frags, ktx->ktx_nfrag);
654                 break;
655                 
656         default:
657                 LBUG();
658                 rc = -EINVAL;                   /* no compiler warning please */
659                 break;
660         }
661
662         switch (rc) {
663         case EP_SUCCESS: /* success */
664                 return (0);
665
666         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
667                 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
668
669                 list_add_tail (&ktx->ktx_schedlist, &kqswnal_data.kqn_delayedtxds);
670                 wake_up (&kqswnal_data.kqn_sched_waitq);
671
672                 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
673                 return (0);
674
675         default: /* fatal error */
676                 CDEBUG (D_NETERROR, "Tx to %s failed: %d\n", libcfs_nid2str(ktx->ktx_nid), rc);
677                 kqswnal_notify_peer_down(ktx);
678                 return (-EHOSTUNREACH);
679         }
680 }
681
682 #if 0
683 static char *
684 hdr_type_string (lnet_hdr_t *hdr)
685 {
686         switch (hdr->type) {
687         case LNET_MSG_ACK:
688                 return ("ACK");
689         case LNET_MSG_PUT:
690                 return ("PUT");
691         case LNET_MSG_GET:
692                 return ("GET");
693         case LNET_MSG_REPLY:
694                 return ("REPLY");
695         default:
696                 return ("<UNKNOWN>");
697         }
698 }
699
700 static void
701 kqswnal_cerror_hdr(lnet_hdr_t * hdr)
702 {
703         char *type_str = hdr_type_string (hdr);
704
705         CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
706                le32_to_cpu(hdr->payload_length));
707         CERROR("    From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),
708                le32_to_cpu(hdr->src_pid));
709         CERROR("    To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),
710                le32_to_cpu(hdr->dest_pid));
711
712         switch (le32_to_cpu(hdr->type)) {
713         case LNET_MSG_PUT:
714                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
715                        "match bits "LPX64"\n",
716                        le32_to_cpu(hdr->msg.put.ptl_index),
717                        hdr->msg.put.ack_wmd.wh_interface_cookie,
718                        hdr->msg.put.ack_wmd.wh_object_cookie,
719                        le64_to_cpu(hdr->msg.put.match_bits));
720                 CERROR("    offset %d, hdr data "LPX64"\n",
721                        le32_to_cpu(hdr->msg.put.offset),
722                        hdr->msg.put.hdr_data);
723                 break;
724
725         case LNET_MSG_GET:
726                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
727                        "match bits "LPX64"\n",
728                        le32_to_cpu(hdr->msg.get.ptl_index),
729                        hdr->msg.get.return_wmd.wh_interface_cookie,
730                        hdr->msg.get.return_wmd.wh_object_cookie,
731                        hdr->msg.get.match_bits);
732                 CERROR("    Length %d, src offset %d\n",
733                        le32_to_cpu(hdr->msg.get.sink_length),
734                        le32_to_cpu(hdr->msg.get.src_offset));
735                 break;
736
737         case LNET_MSG_ACK:
738                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
739                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
740                        hdr->msg.ack.dst_wmd.wh_object_cookie,
741                        le32_to_cpu(hdr->msg.ack.mlength));
742                 break;
743
744         case LNET_MSG_REPLY:
745                 CERROR("    dst md "LPX64"."LPX64"\n",
746                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
747                        hdr->msg.reply.dst_wmd.wh_object_cookie);
748         }
749
750 }                               /* end of print_hdr() */
751 #endif
752
753 int
754 kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
755                     int nrfrag, EP_NMD *rfrag)
756 {
757         int  i;
758
759         if (nlfrag != nrfrag) {
760                 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
761                        nlfrag, nrfrag);
762                 return (-EINVAL);
763         }
764         
765         for (i = 0; i < nlfrag; i++)
766                 if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
767                         CERROR("Can't cope with unequal frags %d(%d):"
768                                " %d local %d remote\n",
769                                i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
770                         return (-EINVAL);
771                 }
772         
773         return (0);
774 }
775
776 kqswnal_remotemd_t *
777 kqswnal_get_portalscompat_rmd (kqswnal_rx_t *krx)
778 {
779         /* Check that the RMD sent after the "raw" LNET header in a
780          * portals-compatible QSWLND message is OK */
781         char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
782         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + sizeof(lnet_hdr_t));
783
784         /* Note RDMA addresses are sent in native endian-ness in the "old"
785          * portals protocol so no swabbing... */
786
787         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
788                 /* msg too small to discover rmd size */
789                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
790                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
791                 return (NULL);
792         }
793
794         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
795                 /* rmd doesn't fit in the incoming message */
796                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
797                         krx->krx_nob, rmd->kqrmd_nfrag,
798                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
799                 return (NULL);
800         }
801
802         return (rmd);
803 }
804
805 void
806 kqswnal_rdma_store_complete (EP_RXD *rxd) 
807 {
808         int           status = ep_rxd_status(rxd);
809         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
810         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
811         
812         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
813                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
814
815         LASSERT (ktx->ktx_state == KTX_RDMA_STORE);
816         LASSERT (krx->krx_rxd == rxd);
817         LASSERT (krx->krx_rpc_reply_needed);
818
819         krx->krx_rpc_reply_needed = 0;
820         kqswnal_rx_decref (krx);
821
822         /* free ktx & finalize() its lnet_msg_t */
823         kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
824 }
825
826 void
827 kqswnal_rdma_fetch_complete (EP_RXD *rxd) 
828 {
829         /* Completed fetching the PUT/REPLY data */
830         int           status = ep_rxd_status(rxd);
831         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
832         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
833         
834         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
835                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
836
837         LASSERT (ktx->ktx_state == KTX_RDMA_FETCH);
838         LASSERT (krx->krx_rxd == rxd);
839         /* RPC completes with failure by default */
840         LASSERT (krx->krx_rpc_reply_needed);
841         LASSERT (krx->krx_rpc_reply.msg.status != 0);
842
843         if (status == EP_SUCCESS) {
844                 krx->krx_rpc_reply.msg.status = 0;
845                 status = 0;
846         } else {
847                 /* Abandon RPC since get failed */
848                 krx->krx_rpc_reply_needed = 0;
849                 status = -ECONNABORTED;
850         }
851
852         /* krx gets decref'd in kqswnal_tx_done_in_thread_context() */
853         LASSERT (krx->krx_state == KRX_PARSE);
854         krx->krx_state = KRX_COMPLETING;
855
856         /* free ktx & finalize() its lnet_msg_t */
857         kqswnal_tx_done(ktx, status);
858 }
859
860 int
861 kqswnal_rdma (kqswnal_rx_t *krx, lnet_msg_t *lntmsg,
862               int type, kqswnal_remotemd_t *rmd,
863               unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
864               unsigned int offset, unsigned int len)
865 {
866         kqswnal_tx_t       *ktx;
867         int                 eprc;
868         int                 rc;
869
870         /* Not both mapped and paged payload */
871         LASSERT (iov == NULL || kiov == NULL);
872         /* RPC completes with failure by default */
873         LASSERT (krx->krx_rpc_reply_needed);
874         LASSERT (krx->krx_rpc_reply.msg.status != 0);
875
876         if (len == 0) {
877                 /* data got truncated to nothing. */
878                 lnet_finalize(kqswnal_data.kqn_ni, lntmsg, 0);
879                 /* Let kqswnal_rx_done() complete the RPC with success */
880                 krx->krx_rpc_reply.msg.status = 0;
881                 return (0);
882         }
883         
884         /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
885            actually sending a portals message with it */
886         ktx = kqswnal_get_idle_tx();
887         if (ktx == NULL) {
888                 CERROR ("Can't get txd for RDMA with %s\n",
889                         libcfs_nid2str(kqswnal_rx_nid(krx)));
890                 return (-ENOMEM);
891         }
892
893         ktx->ktx_state   = type;
894         ktx->ktx_nid     = kqswnal_rx_nid(krx);
895         ktx->ktx_args[0] = krx;
896         ktx->ktx_args[1] = lntmsg;
897
898         LASSERT (atomic_read(&krx->krx_refcount) > 0);
899         /* Take an extra ref for the completion callback */
900         atomic_inc(&krx->krx_refcount);
901
902         /* Map on the rail the RPC prefers */
903         ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
904                                          ep_rxd_railmask(krx->krx_rxd));
905
906         /* Start mapping at offset 0 (we're not mapping any headers) */
907         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
908         
909         if (kiov != NULL)
910                 rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
911         else
912                 rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
913
914         if (rc != 0) {
915                 CERROR ("Can't map local RDMA data: %d\n", rc);
916                 goto out;
917         }
918
919         rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
920                                  rmd->kqrmd_nfrag, rmd->kqrmd_frag);
921         if (rc != 0) {
922                 CERROR ("Incompatible RDMA descriptors\n");
923                 goto out;
924         }
925
926         switch (type) {
927         default:
928                 LBUG();
929                 
930         case KTX_RDMA_STORE:
931                 krx->krx_rpc_reply.msg.status    = 0;
932                 krx->krx_rpc_reply.msg.magic     = LNET_PROTO_QSW_MAGIC;
933                 krx->krx_rpc_reply.msg.version   = QSWLND_PROTO_VERSION;
934                 krx->krx_rpc_reply.msg.u.get.len = len;
935 #if KQSW_CKSUM
936                 krx->krx_rpc_reply.msg.u.get.cksum = (kiov != NULL) ?
937                             kqswnal_csum_kiov(~0, offset, len, niov, kiov) :
938                             kqswnal_csum_iov(~0, offset, len, niov, iov);
939                 if (*kqswnal_tunables.kqn_inject_csum_error == 4) {
940                         krx->krx_rpc_reply.msg.u.get.cksum++;
941                         *kqswnal_tunables.kqn_inject_csum_error = 0;
942                 }
943 #endif
944                 eprc = ep_complete_rpc(krx->krx_rxd, 
945                                        kqswnal_rdma_store_complete, ktx, 
946                                        &krx->krx_rpc_reply.ep_statusblk, 
947                                        ktx->ktx_frags, rmd->kqrmd_frag, 
948                                        rmd->kqrmd_nfrag);
949                 if (eprc != EP_SUCCESS) {
950                         CERROR("can't complete RPC: %d\n", eprc);
951                         /* don't re-attempt RPC completion */
952                         krx->krx_rpc_reply_needed = 0;
953                         rc = -ECONNABORTED;
954                 }
955                 break;
956                 
957         case KTX_RDMA_FETCH:
958                 eprc = ep_rpc_get (krx->krx_rxd, 
959                                    kqswnal_rdma_fetch_complete, ktx,
960                                    rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
961                 if (eprc != EP_SUCCESS) {
962                         CERROR("ep_rpc_get failed: %d\n", eprc);
963                         /* Don't attempt RPC completion: 
964                          * EKC nuked it when the get failed */
965                         krx->krx_rpc_reply_needed = 0;
966                         rc = -ECONNABORTED;
967                 }
968                 break;
969         }
970
971  out:
972         if (rc != 0) {
973                 kqswnal_rx_decref(krx);                 /* drop callback's ref */
974                 kqswnal_put_idle_tx (ktx);
975         }
976
977         atomic_dec(&kqswnal_data.kqn_pending_txs);
978         return (rc);
979 }
980
981 int
982 kqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
983 {
984         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
985         int               type = lntmsg->msg_type;
986         lnet_process_id_t target = lntmsg->msg_target;
987         int               target_is_router = lntmsg->msg_target_is_router;
988         int               routing = lntmsg->msg_routing;
989         unsigned int      payload_niov = lntmsg->msg_niov;
990         struct iovec     *payload_iov = lntmsg->msg_iov;
991         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
992         unsigned int      payload_offset = lntmsg->msg_offset;
993         unsigned int      payload_nob = lntmsg->msg_len;
994         int               nob;
995         kqswnal_tx_t     *ktx;
996         int               rc;
997
998         /* NB 1. hdr is in network byte order */
999         /*    2. 'private' depends on the message type */
1000         
1001         CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",
1002                payload_nob, payload_niov, libcfs_id2str(target));
1003
1004         LASSERT (payload_nob == 0 || payload_niov > 0);
1005         LASSERT (payload_niov <= LNET_MAX_IOV);
1006
1007         /* It must be OK to kmap() if required */
1008         LASSERT (payload_kiov == NULL || !in_interrupt ());
1009         /* payload is either all vaddrs or all pages */
1010         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1011
1012         if (kqswnal_nid2elanid (target.nid) < 0) {
1013                 CERROR("%s not in my cluster\n", libcfs_nid2str(target.nid));
1014                 return -EIO;
1015         }
1016
1017         /* I may not block for a transmit descriptor if I might block the
1018          * router, receiver, or an interrupt handler. */
1019         ktx = kqswnal_get_idle_tx();
1020         if (ktx == NULL) {
1021                 CERROR ("Can't get txd for msg type %d for %s\n",
1022                         type, libcfs_nid2str(target.nid));
1023                 return (-ENOMEM);
1024         }
1025
1026         ktx->ktx_state   = KTX_SENDING;
1027         ktx->ktx_nid     = target.nid;
1028         ktx->ktx_args[0] = private;
1029         ktx->ktx_args[1] = lntmsg;
1030         ktx->ktx_args[2] = NULL;    /* set when a GET commits to REPLY */
1031
1032         /* The first frag will be the pre-mapped buffer. */
1033         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1034
1035         if ((!target_is_router &&               /* target.nid is final dest */
1036              !routing &&                        /* I'm the source */
1037              type == LNET_MSG_GET &&            /* optimize GET? */
1038              *kqswnal_tunables.kqn_optimized_gets != 0 &&
1039              lntmsg->msg_md->md_length >= 
1040              *kqswnal_tunables.kqn_optimized_gets) ||
1041             ((type == LNET_MSG_PUT ||            /* optimize PUT? */
1042               type == LNET_MSG_REPLY) &&         /* optimize REPLY? */
1043              *kqswnal_tunables.kqn_optimized_puts != 0 &&
1044              payload_nob >= *kqswnal_tunables.kqn_optimized_puts)) {
1045                 lnet_libmd_t       *md = lntmsg->msg_md;
1046                 kqswnal_msg_t      *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1047                 lnet_hdr_t         *mhdr;
1048                 kqswnal_remotemd_t *rmd;
1049
1050                 /* Optimised path: I send over the Elan vaddrs of the local
1051                  * buffers, and my peer DMAs directly to/from them.
1052                  *
1053                  * First I set up ktx as if it was going to send this
1054                  * payload, (it needs to map it anyway).  This fills
1055                  * ktx_frags[1] and onward with the network addresses
1056                  * of the buffer frags. */
1057
1058                 /* Send an RDMA message */
1059                 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1060                 msg->kqm_version = QSWLND_PROTO_VERSION;
1061                 msg->kqm_type = QSWLND_MSG_RDMA;
1062
1063                 mhdr = &msg->kqm_u.rdma.kqrm_hdr;
1064                 rmd  = &msg->kqm_u.rdma.kqrm_rmd;
1065
1066                 *mhdr = *hdr;
1067                 nob = (((char *)rmd) - ktx->ktx_buffer);
1068
1069                 if (type == LNET_MSG_GET) {
1070                         if ((md->md_options & LNET_MD_KIOV) != 0) 
1071                                 rc = kqswnal_map_tx_kiov (ktx, 0, md->md_length,
1072                                                           md->md_niov, md->md_iov.kiov);
1073                         else
1074                                 rc = kqswnal_map_tx_iov (ktx, 0, md->md_length,
1075                                                          md->md_niov, md->md_iov.iov);
1076                         ktx->ktx_state = KTX_GETTING;
1077                 } else {
1078                         if (payload_kiov != NULL)
1079                                 rc = kqswnal_map_tx_kiov(ktx, 0, payload_nob,
1080                                                          payload_niov, payload_kiov);
1081                         else
1082                                 rc = kqswnal_map_tx_iov(ktx, 0, payload_nob,
1083                                                         payload_niov, payload_iov);
1084                         ktx->ktx_state = KTX_PUTTING;
1085                 }
1086
1087                 if (rc != 0)
1088                         goto out;
1089
1090                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1091                 nob += offsetof(kqswnal_remotemd_t,
1092                                 kqrmd_frag[rmd->kqrmd_nfrag]);
1093                 LASSERT (nob <= KQSW_TX_BUFFER_SIZE);
1094
1095                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1096                        rmd->kqrmd_nfrag * sizeof(EP_NMD));
1097
1098                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1099 #if KQSW_CKSUM
1100                 msg->kqm_nob   = nob + payload_nob;
1101                 msg->kqm_cksum = 0;
1102                 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1103 #endif
1104                 if (type == LNET_MSG_GET) {
1105                         /* Allocate reply message now while I'm in thread context */
1106                         ktx->ktx_args[2] = lnet_create_reply_msg (
1107                                 kqswnal_data.kqn_ni, lntmsg);
1108                         if (ktx->ktx_args[2] == NULL)
1109                                 goto out;
1110
1111                         /* NB finalizing the REPLY message is my
1112                          * responsibility now, whatever happens. */
1113 #if KQSW_CKSUM
1114                         if (*kqswnal_tunables.kqn_inject_csum_error ==  3) {
1115                                 msg->kqm_cksum++;
1116                                 *kqswnal_tunables.kqn_inject_csum_error = 0;
1117                         }
1118
1119                 } else if (payload_kiov != NULL) {
1120                         /* must checksum payload after header so receiver can
1121                          * compute partial header cksum before swab.  Sadly
1122                          * this causes 2 rounds of kmap */
1123                         msg->kqm_cksum =
1124                                 kqswnal_csum_kiov(msg->kqm_cksum, 0, payload_nob,
1125                                                   payload_niov, payload_kiov);
1126                         if (*kqswnal_tunables.kqn_inject_csum_error ==  2) {
1127                                 msg->kqm_cksum++;
1128                                 *kqswnal_tunables.kqn_inject_csum_error = 0;
1129                         }
1130                 } else {
1131                         msg->kqm_cksum =
1132                                 kqswnal_csum_iov(msg->kqm_cksum, 0, payload_nob,
1133                                                  payload_niov, payload_iov);
1134                         if (*kqswnal_tunables.kqn_inject_csum_error ==  2) {
1135                                 msg->kqm_cksum++;
1136                                 *kqswnal_tunables.kqn_inject_csum_error = 0;
1137                         }
1138 #endif
1139                 }
1140                 
1141         } else if (payload_nob <= *kqswnal_tunables.kqn_tx_maxcontig) {
1142                 lnet_hdr_t    *mhdr;
1143                 char          *payload;
1144                 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1145
1146                 /* single frag copied into the pre-mapped buffer */
1147                 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1148                 msg->kqm_version = QSWLND_PROTO_VERSION;
1149                 msg->kqm_type = QSWLND_MSG_IMMEDIATE;
1150
1151                 mhdr = &msg->kqm_u.immediate.kqim_hdr;
1152                 payload = msg->kqm_u.immediate.kqim_payload;
1153
1154                 *mhdr = *hdr;
1155                 nob = (payload - ktx->ktx_buffer) + payload_nob;
1156
1157                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1158
1159                 if (payload_kiov != NULL)
1160                         lnet_copy_kiov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
1161                                             payload_niov, payload_kiov, 
1162                                             payload_offset, payload_nob);
1163                 else
1164                         lnet_copy_iov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
1165                                            payload_niov, payload_iov, 
1166                                            payload_offset, payload_nob);
1167 #if KQSW_CKSUM
1168                 msg->kqm_nob   = nob;
1169                 msg->kqm_cksum = 0;
1170                 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1171                 if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
1172                         msg->kqm_cksum++;
1173                         *kqswnal_tunables.kqn_inject_csum_error = 0;
1174                 }
1175 #endif
1176         } else {
1177                 lnet_hdr_t    *mhdr;
1178                 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1179
1180                 /* multiple frags: first is hdr in pre-mapped buffer */
1181                 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1182                 msg->kqm_version = QSWLND_PROTO_VERSION;
1183                 msg->kqm_type = QSWLND_MSG_IMMEDIATE;
1184
1185                 mhdr = &msg->kqm_u.immediate.kqim_hdr;
1186                 nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1187
1188                 *mhdr = *hdr;
1189
1190                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1191
1192                 if (payload_kiov != NULL)
1193                         rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
1194                                                   payload_niov, payload_kiov);
1195                 else
1196                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1197                                                  payload_niov, payload_iov);
1198                 if (rc != 0)
1199                         goto out;
1200
1201 #if KQSW_CKSUM
1202                 msg->kqm_nob   = nob + payload_nob;
1203                 msg->kqm_cksum = 0;
1204                 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1205
1206                 msg->kqm_cksum = (payload_kiov != NULL) ?
1207                                  kqswnal_csum_kiov(msg->kqm_cksum,
1208                                                    payload_offset, payload_nob,
1209                                                    payload_niov, payload_kiov) :
1210                                  kqswnal_csum_iov(msg->kqm_cksum,
1211                                                   payload_offset, payload_nob,
1212                                                   payload_niov, payload_iov);
1213
1214                 if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
1215                         msg->kqm_cksum++;
1216                         *kqswnal_tunables.kqn_inject_csum_error = 0;
1217                 }
1218 #endif
1219                 nob += payload_nob;
1220         }
1221         
1222         ktx->ktx_port = (nob <= KQSW_SMALLMSG) ?
1223                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1224
1225         rc = kqswnal_launch (ktx);
1226
1227  out:
1228         CDEBUG(rc == 0 ? D_NET : D_NETERROR, "%s %d bytes to %s%s: rc %d\n",
1229                routing ? (rc == 0 ? "Routed" : "Failed to route") :
1230                          (rc == 0 ? "Sent" : "Failed to send"),
1231                nob, libcfs_nid2str(target.nid),
1232                target_is_router ? "(router)" : "", rc);
1233
1234         if (rc != 0) {
1235                 lnet_msg_t *repmsg = (lnet_msg_t *)ktx->ktx_args[2];
1236                 int         state = ktx->ktx_state;
1237                 
1238                 kqswnal_put_idle_tx (ktx);
1239
1240                 if (state == KTX_GETTING && repmsg != NULL) {
1241                         /* We committed to reply, but there was a problem
1242                          * launching the GET.  We can't avoid delivering a
1243                          * REPLY event since we committed above, so we
1244                          * pretend the GET succeeded but the REPLY
1245                          * failed. */
1246                         rc = 0;
1247                         lnet_finalize (kqswnal_data.kqn_ni, lntmsg, 0);
1248                         lnet_finalize (kqswnal_data.kqn_ni, repmsg, -EIO);
1249                 }
1250                 
1251         }
1252         
1253         atomic_dec(&kqswnal_data.kqn_pending_txs);
1254         return (rc == 0 ? 0 : -EIO);
1255 }
1256
1257 void
1258 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1259 {
1260         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1261         LASSERT (!krx->krx_rpc_reply_needed);
1262
1263         krx->krx_state = KRX_POSTED;
1264
1265         if (kqswnal_data.kqn_shuttingdown) {
1266                 /* free EKC rxd on shutdown */
1267                 ep_complete_receive(krx->krx_rxd);
1268         } else {
1269                 /* repost receive */
1270                 ep_requeue_receive(krx->krx_rxd, 
1271                                    kqswnal_rxhandler, krx,
1272                                    &krx->krx_elanbuffer, 0);
1273         }
1274 }
1275
1276 void
1277 kqswnal_rpc_complete (EP_RXD *rxd)
1278 {
1279         int           status = ep_rxd_status(rxd);
1280         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1281         
1282         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1283                "rxd %p, krx %p, status %d\n", rxd, krx, status);
1284
1285         LASSERT (krx->krx_rxd == rxd);
1286         LASSERT (krx->krx_rpc_reply_needed);
1287         
1288         krx->krx_rpc_reply_needed = 0;
1289         kqswnal_requeue_rx (krx);
1290 }
1291
1292 void
1293 kqswnal_rx_done (kqswnal_rx_t *krx) 
1294 {
1295         int           rc;
1296
1297         LASSERT (atomic_read(&krx->krx_refcount) == 0);
1298
1299         if (krx->krx_rpc_reply_needed) {
1300                 /* We've not completed the peer's RPC yet... */
1301                 krx->krx_rpc_reply.msg.magic   = LNET_PROTO_QSW_MAGIC;
1302                 krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION;
1303
1304                 LASSERT (!in_interrupt());
1305
1306                 rc = ep_complete_rpc(krx->krx_rxd, 
1307                                      kqswnal_rpc_complete, krx,
1308                                      &krx->krx_rpc_reply.ep_statusblk, 
1309                                      NULL, NULL, 0);
1310                 if (rc == EP_SUCCESS)
1311                         return;
1312
1313                 CERROR("can't complete RPC: %d\n", rc);
1314                 krx->krx_rpc_reply_needed = 0;
1315         }
1316
1317         kqswnal_requeue_rx(krx);
1318 }
1319         
1320 void
1321 kqswnal_parse (kqswnal_rx_t *krx)
1322 {
1323         lnet_ni_t      *ni = kqswnal_data.kqn_ni;
1324         kqswnal_msg_t  *msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
1325         lnet_nid_t      fromnid = kqswnal_rx_nid(krx);
1326         int             swab;
1327         int             n;
1328         int             i;
1329         int             nob;
1330         int             rc;
1331
1332         LASSERT (atomic_read(&krx->krx_refcount) == 1);
1333
1334         if (krx->krx_nob < offsetof(kqswnal_msg_t, kqm_u)) {
1335                 CERROR("Short message %d received from %s\n",
1336                        krx->krx_nob, libcfs_nid2str(fromnid));
1337                 goto done;
1338         }
1339
1340         swab = msg->kqm_magic == __swab32(LNET_PROTO_QSW_MAGIC);
1341
1342         if (swab || msg->kqm_magic == LNET_PROTO_QSW_MAGIC) {
1343 #if KQSW_CKSUM
1344                 __u32 csum0;
1345                 __u32 csum1;
1346
1347                 /* csum byte array before swab */
1348                 csum1 = msg->kqm_cksum;
1349                 msg->kqm_cksum = 0;
1350                 csum0 = kqswnal_csum_kiov(~0, 0, krx->krx_nob,
1351                                           krx->krx_npages, krx->krx_kiov);
1352                 msg->kqm_cksum = csum1;
1353 #endif
1354
1355                 if (swab) {
1356                         __swab16s(&msg->kqm_version);
1357                         __swab16s(&msg->kqm_type);
1358 #if KQSW_CKSUM
1359                         __swab32s(&msg->kqm_cksum);
1360                         __swab32s(&msg->kqm_nob);
1361 #endif
1362                 }
1363
1364                 if (msg->kqm_version != QSWLND_PROTO_VERSION) {
1365                         /* Future protocol version compatibility support!
1366                          * The next qswlnd-specific protocol rev will first
1367                          * send an RPC to check version.
1368                          * 1.4.6 and 1.4.7.early reply with a status
1369                          * block containing its current version.
1370                          * Later versions send a failure (-ve) status +
1371                          * magic/version */
1372
1373                         if (!krx->krx_rpc_reply_needed) {
1374                                 CERROR("Unexpected version %d from %s\n",
1375                                        msg->kqm_version, libcfs_nid2str(fromnid));
1376                                 goto done;
1377                         }
1378
1379                         LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
1380                         goto done;
1381                 }
1382
1383                 switch (msg->kqm_type) {
1384                 default:
1385                         CERROR("Bad request type %x from %s\n",
1386                                msg->kqm_type, libcfs_nid2str(fromnid));
1387                         goto done;
1388
1389                 case QSWLND_MSG_IMMEDIATE:
1390                         if (krx->krx_rpc_reply_needed) {
1391                                 /* Should have been a simple message */
1392                                 CERROR("IMMEDIATE sent as RPC from %s\n",
1393                                        libcfs_nid2str(fromnid));
1394                                 goto done;
1395                         }
1396
1397                         nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1398                         if (krx->krx_nob < nob) {
1399                                 CERROR("Short IMMEDIATE %d(%d) from %s\n",
1400                                        krx->krx_nob, nob, libcfs_nid2str(fromnid));
1401                                 goto done;
1402                         }
1403
1404 #if KQSW_CKSUM
1405                         if (csum0 != msg->kqm_cksum) {
1406                                 CERROR("Bad IMMEDIATE checksum %08x(%08x) from %s\n",
1407                                        csum0, msg->kqm_cksum, libcfs_nid2str(fromnid));
1408                                 CERROR("nob %d (%d)\n", krx->krx_nob, msg->kqm_nob);
1409                                 goto done;
1410                         }
1411 #endif
1412                         rc = lnet_parse(ni, &msg->kqm_u.immediate.kqim_hdr,
1413                                         fromnid, krx, 0);
1414                         if (rc < 0)
1415                                 goto done;
1416                         return;
1417
1418                 case QSWLND_MSG_RDMA:
1419                         if (!krx->krx_rpc_reply_needed) {
1420                                 /* Should have been a simple message */
1421                                 CERROR("RDMA sent as simple message from %s\n",
1422                                        libcfs_nid2str(fromnid));
1423                                 goto done;
1424                         }
1425
1426                         nob = offsetof(kqswnal_msg_t,
1427                                        kqm_u.rdma.kqrm_rmd.kqrmd_frag[0]);
1428                         if (krx->krx_nob < nob) {
1429                                 CERROR("Short RDMA message %d(%d) from %s\n",
1430                                        krx->krx_nob, nob, libcfs_nid2str(fromnid));
1431                                 goto done;
1432                         }
1433
1434                         if (swab)
1435                                 __swab32s(&msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag);
1436
1437                         n = msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag;
1438                         nob = offsetof(kqswnal_msg_t,
1439                                        kqm_u.rdma.kqrm_rmd.kqrmd_frag[n]);
1440
1441                         if (krx->krx_nob < nob) {
1442                                 CERROR("short RDMA message %d(%d) from %s\n",
1443                                        krx->krx_nob, nob, libcfs_nid2str(fromnid));
1444                                 goto done;
1445                         }
1446
1447                         if (swab) {
1448                                 for (i = 0; i < n; i++) {
1449                                         EP_NMD *nmd = &msg->kqm_u.rdma.kqrm_rmd.kqrmd_frag[i];
1450
1451                                         __swab32s(&nmd->nmd_addr);
1452                                         __swab32s(&nmd->nmd_len);
1453                                         __swab32s(&nmd->nmd_attr);
1454                                 }
1455                         }
1456
1457 #if KQSW_CKSUM
1458                         krx->krx_cksum = csum0; /* stash checksum so far */
1459 #endif
1460                         rc = lnet_parse(ni, &msg->kqm_u.rdma.kqrm_hdr,
1461                                         fromnid, krx, 1);
1462                         if (rc < 0)
1463                                 goto done;
1464                         return;
1465                 }
1466                 /* Not Reached */
1467         }
1468
1469         if (msg->kqm_magic == LNET_PROTO_MAGIC ||
1470             msg->kqm_magic == __swab32(LNET_PROTO_MAGIC)) {
1471                 /* Future protocol version compatibility support!
1472                  * When LNET unifies protocols over all LNDs, the first thing a
1473                  * peer will send will be a version query RPC.  
1474                  * 1.4.6 and 1.4.7.early reply with a status block containing
1475                  * LNET_PROTO_QSW_MAGIC..
1476                  * Later versions send a failure (-ve) status +
1477                  * magic/version */
1478
1479                 if (!krx->krx_rpc_reply_needed) {
1480                         CERROR("Unexpected magic %08x from %s\n",
1481                                msg->kqm_magic, libcfs_nid2str(fromnid));
1482                         goto done;
1483                 }
1484
1485                 LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
1486                 goto done;
1487         }
1488
1489         CERROR("Unrecognised magic %08x from %s\n",
1490                msg->kqm_magic, libcfs_nid2str(fromnid));
1491  done:
1492         kqswnal_rx_decref(krx);
1493 }
1494
1495 /* Receive Interrupt Handler: posts to schedulers */
1496 void 
1497 kqswnal_rxhandler(EP_RXD *rxd)
1498 {
1499         unsigned long flags;
1500         int           nob    = ep_rxd_len (rxd);
1501         int           status = ep_rxd_status (rxd);
1502         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1503         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1504                rxd, krx, nob, status);
1505
1506         LASSERT (krx != NULL);
1507         LASSERT (krx->krx_state == KRX_POSTED);
1508         
1509         krx->krx_state = KRX_PARSE;
1510         krx->krx_rxd = rxd;
1511         krx->krx_nob = nob;
1512
1513         /* RPC reply iff rpc request received without error */
1514         krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
1515                                     (status == EP_SUCCESS ||
1516                                      status == EP_MSG_TOO_BIG);
1517
1518         /* Default to failure if an RPC reply is requested but not handled */
1519         krx->krx_rpc_reply.msg.status = -EPROTO;
1520         atomic_set (&krx->krx_refcount, 1);
1521
1522         if (status != EP_SUCCESS) {
1523                 /* receives complete with failure when receiver is removed */
1524                 if (status == EP_SHUTDOWN)
1525                         LASSERT (kqswnal_data.kqn_shuttingdown);
1526                 else
1527                         CERROR("receive status failed with status %d nob %d\n",
1528                                ep_rxd_status(rxd), nob);
1529                 kqswnal_rx_decref(krx);
1530                 return;
1531         }
1532
1533         if (!in_interrupt()) {
1534                 kqswnal_parse(krx);
1535                 return;
1536         }
1537
1538         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1539
1540         list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1541         wake_up (&kqswnal_data.kqn_sched_waitq);
1542
1543         spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1544 }
1545
1546 int
1547 kqswnal_recv (lnet_ni_t     *ni,
1548               void          *private,
1549               lnet_msg_t    *lntmsg,
1550               int            delayed,
1551               unsigned int   niov,
1552               struct iovec  *iov,
1553               lnet_kiov_t   *kiov,
1554               unsigned int   offset,
1555               unsigned int   mlen,
1556               unsigned int   rlen)
1557 {
1558         kqswnal_rx_t       *krx = (kqswnal_rx_t *)private;
1559         lnet_nid_t          fromnid;
1560         kqswnal_msg_t      *msg;
1561         lnet_hdr_t         *hdr;
1562         kqswnal_remotemd_t *rmd;
1563         int                 msg_offset;
1564         int                 rc;
1565
1566         LASSERT (!in_interrupt ());             /* OK to map */
1567         /* Either all pages or all vaddrs */
1568         LASSERT (!(kiov != NULL && iov != NULL));
1569
1570         fromnid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ep_rxd_node(krx->krx_rxd));
1571         msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
1572
1573         if (krx->krx_rpc_reply_needed) {
1574                 /* optimized (rdma) request sent as RPC */
1575
1576                 LASSERT (msg->kqm_type == QSWLND_MSG_RDMA);
1577                 hdr = &msg->kqm_u.rdma.kqrm_hdr;
1578                 rmd = &msg->kqm_u.rdma.kqrm_rmd;
1579
1580                 /* NB header is still in wire byte order */
1581
1582                 switch (le32_to_cpu(hdr->type)) {
1583                         case LNET_MSG_PUT:
1584                         case LNET_MSG_REPLY:
1585                                 /* This is an optimized PUT/REPLY */
1586                                 rc = kqswnal_rdma(krx, lntmsg, 
1587                                                   KTX_RDMA_FETCH, rmd,
1588                                                   niov, iov, kiov, offset, mlen);
1589                                 break;
1590
1591                         case LNET_MSG_GET:
1592 #if KQSW_CKSUM
1593                                 if (krx->krx_cksum != msg->kqm_cksum) {
1594                                         CERROR("Bad GET checksum %08x(%08x) from %s\n",
1595                                                krx->krx_cksum, msg->kqm_cksum,
1596                                                libcfs_nid2str(fromnid));
1597                                         rc = -EIO;
1598                                         break;
1599                                 }
1600 #endif                                
1601                                 if (lntmsg == NULL) {
1602                                         /* No buffer match: my decref will
1603                                          * complete the RPC with failure */
1604                                         rc = 0;
1605                                 } else {
1606                                         /* Matched something! */
1607                                         rc = kqswnal_rdma(krx, lntmsg,
1608                                                           KTX_RDMA_STORE, rmd,
1609                                                           lntmsg->msg_niov,
1610                                                           lntmsg->msg_iov,
1611                                                           lntmsg->msg_kiov,
1612                                                           lntmsg->msg_offset,
1613                                                           lntmsg->msg_len);
1614                                 }
1615                                 break;
1616
1617                         default:
1618                                 CERROR("Bad RPC type %d\n",
1619                                        le32_to_cpu(hdr->type));
1620                                 rc = -EPROTO;
1621                                 break;
1622                 }
1623
1624                 kqswnal_rx_decref(krx);
1625                 return rc;
1626         }
1627
1628         LASSERT (msg->kqm_type == QSWLND_MSG_IMMEDIATE);
1629         msg_offset = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1630         
1631         if (krx->krx_nob < msg_offset + rlen) {
1632                 CERROR("Bad message size from %s: have %d, need %d + %d\n",
1633                        libcfs_nid2str(fromnid), krx->krx_nob,
1634                        msg_offset, rlen);
1635                 kqswnal_rx_decref(krx);
1636                 return -EPROTO;
1637         }
1638
1639         if (kiov != NULL)
1640                 lnet_copy_kiov2kiov(niov, kiov, offset,
1641                                     krx->krx_npages, krx->krx_kiov, 
1642                                     msg_offset, mlen);
1643         else
1644                 lnet_copy_kiov2iov(niov, iov, offset,
1645                                    krx->krx_npages, krx->krx_kiov, 
1646                                    msg_offset, mlen);
1647
1648         lnet_finalize(ni, lntmsg, 0);
1649         kqswnal_rx_decref(krx);
1650         return 0;
1651 }
1652
1653 int
1654 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1655 {
1656         long    pid = kernel_thread (fn, arg, 0);
1657
1658         if (pid < 0)
1659                 return ((int)pid);
1660
1661         atomic_inc (&kqswnal_data.kqn_nthreads);
1662         return (0);
1663 }
1664
1665 void
1666 kqswnal_thread_fini (void)
1667 {
1668         atomic_dec (&kqswnal_data.kqn_nthreads);
1669 }
1670
1671 int
1672 kqswnal_scheduler (void *arg)
1673 {
1674         kqswnal_rx_t    *krx;
1675         kqswnal_tx_t    *ktx;
1676         unsigned long    flags;
1677         int              rc;
1678         int              counter = 0;
1679         int              did_something;
1680
1681         cfs_daemonize ("kqswnal_sched");
1682         cfs_block_allsigs ();
1683         
1684         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1685
1686         for (;;)
1687         {
1688                 did_something = 0;
1689
1690                 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1691                 {
1692                         krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1693                                          kqswnal_rx_t, krx_list);
1694                         list_del (&krx->krx_list);
1695                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1696                                                flags);
1697
1698                         LASSERT (krx->krx_state == KRX_PARSE);
1699                         kqswnal_parse (krx);
1700
1701                         did_something = 1;
1702                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1703                 }
1704
1705                 if (!list_empty (&kqswnal_data.kqn_donetxds))
1706                 {
1707                         ktx = list_entry(kqswnal_data.kqn_donetxds.next,
1708                                          kqswnal_tx_t, ktx_schedlist);
1709                         list_del_init (&ktx->ktx_schedlist);
1710                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1711                                                flags);
1712
1713                         kqswnal_tx_done_in_thread_context(ktx);
1714
1715                         did_something = 1;
1716                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1717                 }
1718
1719                 if (!list_empty (&kqswnal_data.kqn_delayedtxds))
1720                 {
1721                         ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1722                                          kqswnal_tx_t, ktx_schedlist);
1723                         list_del_init (&ktx->ktx_schedlist);
1724                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1725                                                flags);
1726
1727                         rc = kqswnal_launch (ktx);
1728                         if (rc != 0) {
1729                                 CERROR("Failed delayed transmit to %s: %d\n", 
1730                                        libcfs_nid2str(ktx->ktx_nid), rc);
1731                                 kqswnal_tx_done (ktx, rc);
1732                         }
1733                         atomic_dec (&kqswnal_data.kqn_pending_txs);
1734
1735                         did_something = 1;
1736                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1737                 }
1738
1739                 /* nothing to do or hogging CPU */
1740                 if (!did_something || counter++ == KQSW_RESCHED) {
1741                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1742                                                flags);
1743
1744                         counter = 0;
1745
1746                         if (!did_something) {
1747                                 if (kqswnal_data.kqn_shuttingdown == 2) {
1748                                         /* We only exit in stage 2 of shutdown when 
1749                                          * there's nothing left to do */
1750                                         break;
1751                                 }
1752                                 rc = wait_event_interruptible_exclusive (
1753                                         kqswnal_data.kqn_sched_waitq,
1754                                         kqswnal_data.kqn_shuttingdown == 2 ||
1755                                         !list_empty(&kqswnal_data.kqn_readyrxds) ||
1756                                         !list_empty(&kqswnal_data.kqn_donetxds) ||
1757                                         !list_empty(&kqswnal_data.kqn_delayedtxds));
1758                                 LASSERT (rc == 0);
1759                         } else if (need_resched())
1760                                 schedule ();
1761
1762                         spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1763                 }
1764         }
1765
1766         kqswnal_thread_fini ();
1767         return (0);
1768 }