Whamcloud - gitweb
LU-812 kernel: remove smp_lock.h
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd_cb.c
1 /*
2  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3  *
4  * Author: Eric Barton <eric@bartonsoftware.com>
5  *
6  * This file is part of Portals, http://www.lustre.org
7  *
8  * Portals is free software; you can redistribute it and/or
9  * modify it under the terms of version 2 of the GNU General Public
10  * License as published by the Free Software Foundation.
11  *
12  * Portals is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with Portals; if not, write to the Free Software
19  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include "qswlnd.h"
23
24 void
25 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
26 {
27         time_t             then;
28
29         then = cfs_time_current_sec() -
30                 cfs_duration_sec(cfs_time_current() -
31                                  ktx->ktx_launchtime);
32
33         lnet_notify(kqswnal_data.kqn_ni, ktx->ktx_nid, 0, then);
34 }
35
36 void
37 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
38 {
39         int      i;
40
41         ktx->ktx_rail = -1;                     /* unset rail */
42
43         if (ktx->ktx_nmappedpages == 0)
44                 return;
45         
46         CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
47                ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
48
49         for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
50                 ep_dvma_unload(kqswnal_data.kqn_ep,
51                                kqswnal_data.kqn_ep_tx_nmh,
52                                &ktx->ktx_frags[i]);
53
54         ktx->ktx_nmappedpages = 0;
55 }
56
57 int
58 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, 
59                      unsigned int niov, lnet_kiov_t *kiov)
60 {
61         int       nfrags    = ktx->ktx_nfrag;
62         int       nmapped   = ktx->ktx_nmappedpages;
63         int       maxmapped = ktx->ktx_npages;
64         __u32     basepage  = ktx->ktx_basepage + nmapped;
65         char     *ptr;
66
67         EP_RAILMASK railmask;
68         int         rail;
69
70         if (ktx->ktx_rail < 0)
71                 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
72                                                  EP_RAILMASK_ALL,
73                                                  kqswnal_nid2elanid(ktx->ktx_nid));
74         rail = ktx->ktx_rail;
75         if (rail < 0) {
76                 CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
77                 return (-ENETDOWN);
78         }
79         railmask = 1 << rail;
80
81         LASSERT (nmapped <= maxmapped);
82         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
83         LASSERT (nfrags <= EP_MAXFRAG);
84         LASSERT (niov > 0);
85         LASSERT (nob > 0);
86
87         /* skip complete frags before 'offset' */
88         while (offset >= kiov->kiov_len) {
89                 offset -= kiov->kiov_len;
90                 kiov++;
91                 niov--;
92                 LASSERT (niov > 0);
93         }
94
95         do {
96                 int  fraglen = kiov->kiov_len - offset;
97
98                 /* each page frag is contained in one page */
99                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
100
101                 if (fraglen > nob)
102                         fraglen = nob;
103
104                 nmapped++;
105                 if (nmapped > maxmapped) {
106                         CERROR("Can't map message in %d pages (max %d)\n",
107                                nmapped, maxmapped);
108                         return (-EMSGSIZE);
109                 }
110
111                 if (nfrags == EP_MAXFRAG) {
112                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
113                                EP_MAXFRAG);
114                         return (-EMSGSIZE);
115                 }
116
117                 /* XXX this is really crap, but we'll have to kmap until
118                  * EKC has a page (rather than vaddr) mapping interface */
119
120                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
121
122                 CDEBUG(D_NET,
123                        "%p[%d] loading %p for %d, page %d, %d total\n",
124                         ktx, nfrags, ptr, fraglen, basepage, nmapped);
125
126                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
127                              ptr, fraglen,
128                              kqswnal_data.kqn_ep_tx_nmh, basepage,
129                              &railmask, &ktx->ktx_frags[nfrags]);
130
131                 if (nfrags == ktx->ktx_firsttmpfrag ||
132                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
133                                   &ktx->ktx_frags[nfrags - 1],
134                                   &ktx->ktx_frags[nfrags])) {
135                         /* new frag if this is the first or can't merge */
136                         nfrags++;
137                 }
138
139                 kunmap (kiov->kiov_page);
140                 
141                 /* keep in loop for failure case */
142                 ktx->ktx_nmappedpages = nmapped;
143
144                 basepage++;
145                 kiov++;
146                 niov--;
147                 nob -= fraglen;
148                 offset = 0;
149
150                 /* iov must not run out before end of data */
151                 LASSERT (nob == 0 || niov > 0);
152
153         } while (nob > 0);
154
155         ktx->ktx_nfrag = nfrags;
156         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
157                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
158
159         return (0);
160 }
161
162 #if KQSW_CKSUM
163 __u32
164 kqswnal_csum_kiov (__u32 csum, int offset, int nob, 
165                    unsigned int niov, lnet_kiov_t *kiov)
166 {
167         char     *ptr;
168
169         if (nob == 0)
170                 return csum;
171
172         LASSERT (niov > 0);
173         LASSERT (nob > 0);
174
175         /* skip complete frags before 'offset' */
176         while (offset >= kiov->kiov_len) {
177                 offset -= kiov->kiov_len;
178                 kiov++;
179                 niov--;
180                 LASSERT (niov > 0);
181         }
182
183         do {
184                 int  fraglen = kiov->kiov_len - offset;
185
186                 /* each page frag is contained in one page */
187                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
188
189                 if (fraglen > nob)
190                         fraglen = nob;
191
192                 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
193
194                 csum = kqswnal_csum(csum, ptr, fraglen);
195
196                 kunmap (kiov->kiov_page);
197                 
198                 kiov++;
199                 niov--;
200                 nob -= fraglen;
201                 offset = 0;
202
203                 /* iov must not run out before end of data */
204                 LASSERT (nob == 0 || niov > 0);
205
206         } while (nob > 0);
207
208         return csum;
209 }
210 #endif
211
212 int
213 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
214                     unsigned int niov, struct iovec *iov)
215 {
216         int       nfrags    = ktx->ktx_nfrag;
217         int       nmapped   = ktx->ktx_nmappedpages;
218         int       maxmapped = ktx->ktx_npages;
219         __u32     basepage  = ktx->ktx_basepage + nmapped;
220
221         EP_RAILMASK railmask;
222         int         rail;
223         
224         if (ktx->ktx_rail < 0)
225                 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
226                                                  EP_RAILMASK_ALL,
227                                                  kqswnal_nid2elanid(ktx->ktx_nid));
228         rail = ktx->ktx_rail;
229         if (rail < 0) {
230                 CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
231                 return (-ENETDOWN);
232         }
233         railmask = 1 << rail;
234
235         LASSERT (nmapped <= maxmapped);
236         LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
237         LASSERT (nfrags <= EP_MAXFRAG);
238         LASSERT (niov > 0);
239         LASSERT (nob > 0);
240
241         /* skip complete frags before offset */
242         while (offset >= iov->iov_len) {
243                 offset -= iov->iov_len;
244                 iov++;
245                 niov--;
246                 LASSERT (niov > 0);
247         }
248         
249         do {
250                 int  fraglen = iov->iov_len - offset;
251                 long npages;
252                 
253                 if (fraglen > nob)
254                         fraglen = nob;
255                 npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
256
257                 nmapped += npages;
258                 if (nmapped > maxmapped) {
259                         CERROR("Can't map message in %d pages (max %d)\n",
260                                nmapped, maxmapped);
261                         return (-EMSGSIZE);
262                 }
263
264                 if (nfrags == EP_MAXFRAG) {
265                         CERROR("Message too fragmented in Elan VM (max %d frags)\n",
266                                EP_MAXFRAG);
267                         return (-EMSGSIZE);
268                 }
269
270                 CDEBUG(D_NET,
271                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
272                        ktx, nfrags, iov->iov_base + offset, fraglen, 
273                        basepage, npages, nmapped);
274
275                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
276                              iov->iov_base + offset, fraglen,
277                              kqswnal_data.kqn_ep_tx_nmh, basepage,
278                              &railmask, &ktx->ktx_frags[nfrags]);
279
280                 if (nfrags == ktx->ktx_firsttmpfrag ||
281                     !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
282                                   &ktx->ktx_frags[nfrags - 1],
283                                   &ktx->ktx_frags[nfrags])) {
284                         /* new frag if this is the first or can't merge */
285                         nfrags++;
286                 }
287
288                 /* keep in loop for failure case */
289                 ktx->ktx_nmappedpages = nmapped;
290
291                 basepage += npages;
292                 iov++;
293                 niov--;
294                 nob -= fraglen;
295                 offset = 0;
296
297                 /* iov must not run out before end of data */
298                 LASSERT (nob == 0 || niov > 0);
299
300         } while (nob > 0);
301
302         ktx->ktx_nfrag = nfrags;
303         CDEBUG (D_NET, "%p got %d frags over %d pages\n",
304                 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
305
306         return (0);
307 }
308
309 #if KQSW_CKSUM
310 __u32
311 kqswnal_csum_iov (__u32 csum, int offset, int nob, 
312                   unsigned int niov, struct iovec *iov)
313 {
314         if (nob == 0)
315                 return csum;
316         
317         LASSERT (niov > 0);
318         LASSERT (nob > 0);
319
320         /* skip complete frags before offset */
321         while (offset >= iov->iov_len) {
322                 offset -= iov->iov_len;
323                 iov++;
324                 niov--;
325                 LASSERT (niov > 0);
326         }
327         
328         do {
329                 int  fraglen = iov->iov_len - offset;
330                 
331                 if (fraglen > nob)
332                         fraglen = nob;
333
334                 csum = kqswnal_csum(csum, iov->iov_base + offset, fraglen);
335
336                 iov++;
337                 niov--;
338                 nob -= fraglen;
339                 offset = 0;
340
341                 /* iov must not run out before end of data */
342                 LASSERT (nob == 0 || niov > 0);
343
344         } while (nob > 0);
345
346         return csum;
347 }
348 #endif
349
350 void
351 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
352 {
353         unsigned long     flags;
354
355         kqswnal_unmap_tx (ktx);                 /* release temporary mappings */
356         ktx->ktx_state = KTX_IDLE;
357
358         cfs_spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
359
360         cfs_list_del (&ktx->ktx_list);              /* take off active list */
361         cfs_list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
362
363         cfs_spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
364 }
365
366 kqswnal_tx_t *
367 kqswnal_get_idle_tx (void)
368 {
369         unsigned long  flags;
370         kqswnal_tx_t  *ktx;
371
372         cfs_spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
373
374         if (kqswnal_data.kqn_shuttingdown ||
375             cfs_list_empty (&kqswnal_data.kqn_idletxds)) {
376                 cfs_spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock,
377                                             flags);
378
379                 return NULL;
380         }
381
382         ktx = cfs_list_entry (kqswnal_data.kqn_idletxds.next, kqswnal_tx_t,
383                               ktx_list);
384         cfs_list_del (&ktx->ktx_list);
385
386         cfs_list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
387         ktx->ktx_launcher = current->pid;
388         cfs_atomic_inc(&kqswnal_data.kqn_pending_txs);
389
390         cfs_spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
391
392         /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
393         LASSERT (ktx->ktx_nmappedpages == 0);
394         return (ktx);
395 }
396
397 void
398 kqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx)
399 {
400         lnet_msg_t    *lnetmsg0 = NULL;
401         lnet_msg_t    *lnetmsg1 = NULL;
402         int            status0  = 0;
403         int            status1  = 0;
404         kqswnal_rx_t  *krx;
405
406         LASSERT (!cfs_in_interrupt());
407
408         if (ktx->ktx_status == -EHOSTDOWN)
409                 kqswnal_notify_peer_down(ktx);
410
411         switch (ktx->ktx_state) {
412         case KTX_RDMA_FETCH:                    /* optimized PUT/REPLY handled */
413                 krx      = (kqswnal_rx_t *)ktx->ktx_args[0];
414                 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
415                 status0  = ktx->ktx_status;
416 #if KQSW_CKSUM
417                 if (status0 == 0) {             /* RDMA succeeded */
418                         kqswnal_msg_t *msg;
419                         __u32          csum;
420
421                         msg = (kqswnal_msg_t *)
422                               page_address(krx->krx_kiov[0].kiov_page);
423
424                         csum = (lnetmsg0->msg_kiov != NULL) ?
425                                kqswnal_csum_kiov(krx->krx_cksum,
426                                                  lnetmsg0->msg_offset,
427                                                  lnetmsg0->msg_wanted,
428                                                  lnetmsg0->msg_niov,
429                                                  lnetmsg0->msg_kiov) :
430                                kqswnal_csum_iov(krx->krx_cksum,
431                                                 lnetmsg0->msg_offset,
432                                                 lnetmsg0->msg_wanted,
433                                                 lnetmsg0->msg_niov,
434                                                 lnetmsg0->msg_iov);
435
436                         /* Can only check csum if I got it all */
437                         if (lnetmsg0->msg_wanted == lnetmsg0->msg_len &&
438                             csum != msg->kqm_cksum) {
439                                 ktx->ktx_status = -EIO;
440                                 krx->krx_rpc_reply.msg.status = -EIO;
441                                 CERROR("RDMA checksum failed %u(%u) from %s\n",
442                                        csum, msg->kqm_cksum,
443                                        libcfs_nid2str(kqswnal_rx_nid(krx)));
444                         }
445                 }
446 #endif       
447                 LASSERT (krx->krx_state == KRX_COMPLETING);
448                 kqswnal_rx_decref (krx);
449                 break;
450
451         case KTX_RDMA_STORE:       /* optimized GET handled */
452         case KTX_PUTTING:          /* optimized PUT sent */
453         case KTX_SENDING:          /* normal send */
454                 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
455                 status0  = ktx->ktx_status;
456                 break;
457
458         case KTX_GETTING:          /* optimized GET sent & payload received */
459                 /* Complete the GET with success since we can't avoid
460                  * delivering a REPLY event; we committed to it when we
461                  * launched the GET */
462                 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
463                 status0  = 0;
464                 lnetmsg1 = (lnet_msg_t *)ktx->ktx_args[2];
465                 status1  = ktx->ktx_status;
466 #if KQSW_CKSUM
467                 if (status1 == 0) {             /* RDMA succeeded */
468                         lnet_msg_t   *lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
469                         lnet_libmd_t *md = lnetmsg0->msg_md;
470                         __u32         csum;
471                 
472                         csum = ((md->md_options & LNET_MD_KIOV) != 0) ? 
473                                kqswnal_csum_kiov(~0, 0,
474                                                  md->md_length,
475                                                  md->md_niov, 
476                                                  md->md_iov.kiov) :
477                                kqswnal_csum_iov(~0, 0,
478                                                 md->md_length,
479                                                 md->md_niov,
480                                                 md->md_iov.iov);
481
482                         if (csum != ktx->ktx_cksum) {
483                                 CERROR("RDMA checksum failed %u(%u) from %s\n",
484                                        csum, ktx->ktx_cksum,
485                                        libcfs_nid2str(ktx->ktx_nid));
486                                 status1 = -EIO;
487                         }
488                 }
489 #endif                
490                 break;
491
492         default:
493                 LASSERT (0);
494         }
495
496         kqswnal_put_idle_tx (ktx);
497
498         lnet_finalize (kqswnal_data.kqn_ni, lnetmsg0, status0);
499         if (lnetmsg1 != NULL)
500                 lnet_finalize (kqswnal_data.kqn_ni, lnetmsg1, status1);
501 }
502
503 void
504 kqswnal_tx_done (kqswnal_tx_t *ktx, int status)
505 {
506         unsigned long      flags;
507
508         ktx->ktx_status = status;
509
510         if (!cfs_in_interrupt()) {
511                 kqswnal_tx_done_in_thread_context(ktx);
512                 return;
513         }
514
515         /* Complete the send in thread context */
516         cfs_spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
517
518         cfs_list_add_tail(&ktx->ktx_schedlist,
519                           &kqswnal_data.kqn_donetxds);
520         cfs_waitq_signal(&kqswnal_data.kqn_sched_waitq);
521
522         cfs_spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);
523 }
524
525 static void
526 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
527 {
528         kqswnal_tx_t         *ktx = (kqswnal_tx_t *)arg;
529         kqswnal_rpc_reply_t  *reply;
530
531         LASSERT (txd != NULL);
532         LASSERT (ktx != NULL);
533
534         CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
535
536         if (status != EP_SUCCESS) {
537
538                 CNETERR("Tx completion to %s failed: %d\n",
539                         libcfs_nid2str(ktx->ktx_nid), status);
540
541                 status = -EHOSTDOWN;
542
543         } else switch (ktx->ktx_state) {
544
545         case KTX_GETTING:
546         case KTX_PUTTING:
547                 /* RPC complete! */
548                 reply = (kqswnal_rpc_reply_t *)ep_txd_statusblk(txd);
549                 if (reply->msg.magic == 0) {    /* "old" peer */
550                         status = reply->msg.status;
551                         break;
552                 }
553                 
554                 if (reply->msg.magic != LNET_PROTO_QSW_MAGIC) {
555                         if (reply->msg.magic != swab32(LNET_PROTO_QSW_MAGIC)) {
556                                 CERROR("%s unexpected rpc reply magic %08x\n",
557                                        libcfs_nid2str(ktx->ktx_nid),
558                                        reply->msg.magic);
559                                 status = -EPROTO;
560                                 break;
561                         }
562
563                         __swab32s(&reply->msg.status);
564                         __swab32s(&reply->msg.version);
565                         
566                         if (ktx->ktx_state == KTX_GETTING) {
567                                 __swab32s(&reply->msg.u.get.len);
568                                 __swab32s(&reply->msg.u.get.cksum);
569                         }
570                 }
571                         
572                 status = reply->msg.status;
573                 if (status != 0) {
574                         CERROR("%s RPC status %08x\n",
575                                libcfs_nid2str(ktx->ktx_nid), status);
576                         break;
577                 }
578
579                 if (ktx->ktx_state == KTX_GETTING) {
580                         lnet_set_reply_msg_len(kqswnal_data.kqn_ni,
581                                                (lnet_msg_t *)ktx->ktx_args[2],
582                                                reply->msg.u.get.len);
583 #if KQSW_CKSUM
584                         ktx->ktx_cksum = reply->msg.u.get.cksum;
585 #endif
586                 }
587                 break;
588                 
589         case KTX_SENDING:
590                 status = 0;
591                 break;
592                 
593         default:
594                 LBUG();
595                 break;
596         }
597
598         kqswnal_tx_done(ktx, status);
599 }
600
601 int
602 kqswnal_launch (kqswnal_tx_t *ktx)
603 {
604         /* Don't block for transmit descriptor if we're in interrupt context */
605         int   attr = cfs_in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
606         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
607         unsigned long flags;
608         int   rc;
609
610         ktx->ktx_launchtime = cfs_time_current();
611
612         if (kqswnal_data.kqn_shuttingdown)
613                 return (-ESHUTDOWN);
614
615         LASSERT (dest >= 0);                    /* must be a peer */
616
617         if (ktx->ktx_nmappedpages != 0)
618                 attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
619
620         switch (ktx->ktx_state) {
621         case KTX_GETTING:
622         case KTX_PUTTING:
623                 if (the_lnet.ln_testprotocompat != 0) {
624                         kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
625
626                         /* single-shot proto test:
627                          * Future version queries will use an RPC, so I'll
628                          * co-opt one of the existing ones */
629                         LNET_LOCK();
630                         if ((the_lnet.ln_testprotocompat & 1) != 0) {
631                                 msg->kqm_version++;
632                                 the_lnet.ln_testprotocompat &= ~1;
633                         }
634                         if ((the_lnet.ln_testprotocompat & 2) != 0) {
635                                 msg->kqm_magic = LNET_PROTO_MAGIC;
636                                 the_lnet.ln_testprotocompat &= ~2;
637                         }
638                         LNET_UNLOCK();
639                 }
640
641                 /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
642                  * The other frags are the payload, awaiting RDMA */
643                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
644                                      ktx->ktx_port, attr,
645                                      kqswnal_txhandler, ktx,
646                                      NULL, ktx->ktx_frags, 1);
647                 break;
648
649         case KTX_SENDING:
650                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
651                                          ktx->ktx_port, attr,
652                                          kqswnal_txhandler, ktx,
653                                          NULL, ktx->ktx_frags, ktx->ktx_nfrag);
654                 break;
655
656         default:
657                 LBUG();
658                 rc = -EINVAL;                   /* no compiler warning please */
659                 break;
660         }
661
662         switch (rc) {
663         case EP_SUCCESS: /* success */
664                 return (0);
665
666         case EP_ENOMEM: /* can't allocate ep txd => queue for later */
667                 cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
668
669                 cfs_list_add_tail (&ktx->ktx_schedlist,
670                                    &kqswnal_data.kqn_delayedtxds);
671                 cfs_waitq_signal (&kqswnal_data.kqn_sched_waitq);
672
673                 cfs_spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock,
674                                             flags);
675                 return (0);
676
677         default: /* fatal error */
678                 CNETERR ("Tx to %s failed: %d\n",
679                         libcfs_nid2str(ktx->ktx_nid), rc);
680                 kqswnal_notify_peer_down(ktx);
681                 return (-EHOSTUNREACH);
682         }
683 }
684
685 #if 0
686 static char *
687 hdr_type_string (lnet_hdr_t *hdr)
688 {
689         switch (hdr->type) {
690         case LNET_MSG_ACK:
691                 return ("ACK");
692         case LNET_MSG_PUT:
693                 return ("PUT");
694         case LNET_MSG_GET:
695                 return ("GET");
696         case LNET_MSG_REPLY:
697                 return ("REPLY");
698         default:
699                 return ("<UNKNOWN>");
700         }
701 }
702
703 static void
704 kqswnal_cerror_hdr(lnet_hdr_t * hdr)
705 {
706         char *type_str = hdr_type_string (hdr);
707
708         CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
709                le32_to_cpu(hdr->payload_length));
710         CERROR("    From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),
711                le32_to_cpu(hdr->src_pid));
712         CERROR("    To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),
713                le32_to_cpu(hdr->dest_pid));
714
715         switch (le32_to_cpu(hdr->type)) {
716         case LNET_MSG_PUT:
717                 CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "
718                        "match bits "LPX64"\n",
719                        le32_to_cpu(hdr->msg.put.ptl_index),
720                        hdr->msg.put.ack_wmd.wh_interface_cookie,
721                        hdr->msg.put.ack_wmd.wh_object_cookie,
722                        le64_to_cpu(hdr->msg.put.match_bits));
723                 CERROR("    offset %d, hdr data "LPX64"\n",
724                        le32_to_cpu(hdr->msg.put.offset),
725                        hdr->msg.put.hdr_data);
726                 break;
727
728         case LNET_MSG_GET:
729                 CERROR("    Ptl index %d, return md "LPX64"."LPX64", "
730                        "match bits "LPX64"\n",
731                        le32_to_cpu(hdr->msg.get.ptl_index),
732                        hdr->msg.get.return_wmd.wh_interface_cookie,
733                        hdr->msg.get.return_wmd.wh_object_cookie,
734                        hdr->msg.get.match_bits);
735                 CERROR("    Length %d, src offset %d\n",
736                        le32_to_cpu(hdr->msg.get.sink_length),
737                        le32_to_cpu(hdr->msg.get.src_offset));
738                 break;
739
740         case LNET_MSG_ACK:
741                 CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",
742                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
743                        hdr->msg.ack.dst_wmd.wh_object_cookie,
744                        le32_to_cpu(hdr->msg.ack.mlength));
745                 break;
746
747         case LNET_MSG_REPLY:
748                 CERROR("    dst md "LPX64"."LPX64"\n",
749                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
750                        hdr->msg.reply.dst_wmd.wh_object_cookie);
751         }
752
753 }                               /* end of print_hdr() */
754 #endif
755
756 int
757 kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
758                     int nrfrag, EP_NMD *rfrag)
759 {
760         int  i;
761
762         if (nlfrag != nrfrag) {
763                 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
764                        nlfrag, nrfrag);
765                 return (-EINVAL);
766         }
767         
768         for (i = 0; i < nlfrag; i++)
769                 if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
770                         CERROR("Can't cope with unequal frags %d(%d):"
771                                " %d local %d remote\n",
772                                i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
773                         return (-EINVAL);
774                 }
775         
776         return (0);
777 }
778
779 kqswnal_remotemd_t *
780 kqswnal_get_portalscompat_rmd (kqswnal_rx_t *krx)
781 {
782         /* Check that the RMD sent after the "raw" LNET header in a
783          * portals-compatible QSWLND message is OK */
784         char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
785         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + sizeof(lnet_hdr_t));
786
787         /* Note RDMA addresses are sent in native endian-ness in the "old"
788          * portals protocol so no swabbing... */
789
790         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
791                 /* msg too small to discover rmd size */
792                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
793                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
794                 return (NULL);
795         }
796
797         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
798                 /* rmd doesn't fit in the incoming message */
799                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
800                         krx->krx_nob, rmd->kqrmd_nfrag,
801                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
802                 return (NULL);
803         }
804
805         return (rmd);
806 }
807
808 void
809 kqswnal_rdma_store_complete (EP_RXD *rxd) 
810 {
811         int           status = ep_rxd_status(rxd);
812         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
813         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
814         
815         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
816                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
817
818         LASSERT (ktx->ktx_state == KTX_RDMA_STORE);
819         LASSERT (krx->krx_rxd == rxd);
820         LASSERT (krx->krx_rpc_reply_needed);
821
822         krx->krx_rpc_reply_needed = 0;
823         kqswnal_rx_decref (krx);
824
825         /* free ktx & finalize() its lnet_msg_t */
826         kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
827 }
828
829 void
830 kqswnal_rdma_fetch_complete (EP_RXD *rxd) 
831 {
832         /* Completed fetching the PUT/REPLY data */
833         int           status = ep_rxd_status(rxd);
834         kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
835         kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
836         
837         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
838                "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
839
840         LASSERT (ktx->ktx_state == KTX_RDMA_FETCH);
841         LASSERT (krx->krx_rxd == rxd);
842         /* RPC completes with failure by default */
843         LASSERT (krx->krx_rpc_reply_needed);
844         LASSERT (krx->krx_rpc_reply.msg.status != 0);
845
846         if (status == EP_SUCCESS) {
847                 krx->krx_rpc_reply.msg.status = 0;
848                 status = 0;
849         } else {
850                 /* Abandon RPC since get failed */
851                 krx->krx_rpc_reply_needed = 0;
852                 status = -ECONNABORTED;
853         }
854
855         /* krx gets decref'd in kqswnal_tx_done_in_thread_context() */
856         LASSERT (krx->krx_state == KRX_PARSE);
857         krx->krx_state = KRX_COMPLETING;
858
859         /* free ktx & finalize() its lnet_msg_t */
860         kqswnal_tx_done(ktx, status);
861 }
862
863 int
864 kqswnal_rdma (kqswnal_rx_t *krx, lnet_msg_t *lntmsg,
865               int type, kqswnal_remotemd_t *rmd,
866               unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
867               unsigned int offset, unsigned int len)
868 {
869         kqswnal_tx_t       *ktx;
870         int                 eprc;
871         int                 rc;
872
873         /* Not both mapped and paged payload */
874         LASSERT (iov == NULL || kiov == NULL);
875         /* RPC completes with failure by default */
876         LASSERT (krx->krx_rpc_reply_needed);
877         LASSERT (krx->krx_rpc_reply.msg.status != 0);
878
879         if (len == 0) {
880                 /* data got truncated to nothing. */
881                 lnet_finalize(kqswnal_data.kqn_ni, lntmsg, 0);
882                 /* Let kqswnal_rx_done() complete the RPC with success */
883                 krx->krx_rpc_reply.msg.status = 0;
884                 return (0);
885         }
886         
887         /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
888            actually sending a portals message with it */
889         ktx = kqswnal_get_idle_tx();
890         if (ktx == NULL) {
891                 CERROR ("Can't get txd for RDMA with %s\n",
892                         libcfs_nid2str(kqswnal_rx_nid(krx)));
893                 return (-ENOMEM);
894         }
895
896         ktx->ktx_state   = type;
897         ktx->ktx_nid     = kqswnal_rx_nid(krx);
898         ktx->ktx_args[0] = krx;
899         ktx->ktx_args[1] = lntmsg;
900
901         LASSERT (cfs_atomic_read(&krx->krx_refcount) > 0);
902         /* Take an extra ref for the completion callback */
903         cfs_atomic_inc(&krx->krx_refcount);
904
905         /* Map on the rail the RPC prefers */
906         ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
907                                          ep_rxd_railmask(krx->krx_rxd));
908
909         /* Start mapping at offset 0 (we're not mapping any headers) */
910         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
911         
912         if (kiov != NULL)
913                 rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
914         else
915                 rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
916
917         if (rc != 0) {
918                 CERROR ("Can't map local RDMA data: %d\n", rc);
919                 goto out;
920         }
921
922         rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
923                                  rmd->kqrmd_nfrag, rmd->kqrmd_frag);
924         if (rc != 0) {
925                 CERROR ("Incompatible RDMA descriptors\n");
926                 goto out;
927         }
928
929         switch (type) {
930         default:
931                 LBUG();
932                 
933         case KTX_RDMA_STORE:
934                 krx->krx_rpc_reply.msg.status    = 0;
935                 krx->krx_rpc_reply.msg.magic     = LNET_PROTO_QSW_MAGIC;
936                 krx->krx_rpc_reply.msg.version   = QSWLND_PROTO_VERSION;
937                 krx->krx_rpc_reply.msg.u.get.len = len;
938 #if KQSW_CKSUM
939                 krx->krx_rpc_reply.msg.u.get.cksum = (kiov != NULL) ?
940                             kqswnal_csum_kiov(~0, offset, len, niov, kiov) :
941                             kqswnal_csum_iov(~0, offset, len, niov, iov);
942                 if (*kqswnal_tunables.kqn_inject_csum_error == 4) {
943                         krx->krx_rpc_reply.msg.u.get.cksum++;
944                         *kqswnal_tunables.kqn_inject_csum_error = 0;
945                 }
946 #endif
947                 eprc = ep_complete_rpc(krx->krx_rxd, 
948                                        kqswnal_rdma_store_complete, ktx, 
949                                        &krx->krx_rpc_reply.ep_statusblk, 
950                                        ktx->ktx_frags, rmd->kqrmd_frag, 
951                                        rmd->kqrmd_nfrag);
952                 if (eprc != EP_SUCCESS) {
953                         CERROR("can't complete RPC: %d\n", eprc);
954                         /* don't re-attempt RPC completion */
955                         krx->krx_rpc_reply_needed = 0;
956                         rc = -ECONNABORTED;
957                 }
958                 break;
959                 
960         case KTX_RDMA_FETCH:
961                 eprc = ep_rpc_get (krx->krx_rxd, 
962                                    kqswnal_rdma_fetch_complete, ktx,
963                                    rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
964                 if (eprc != EP_SUCCESS) {
965                         CERROR("ep_rpc_get failed: %d\n", eprc);
966                         /* Don't attempt RPC completion: 
967                          * EKC nuked it when the get failed */
968                         krx->krx_rpc_reply_needed = 0;
969                         rc = -ECONNABORTED;
970                 }
971                 break;
972         }
973
974  out:
975         if (rc != 0) {
976                 kqswnal_rx_decref(krx);                 /* drop callback's ref */
977                 kqswnal_put_idle_tx (ktx);
978         }
979
980         cfs_atomic_dec(&kqswnal_data.kqn_pending_txs);
981         return (rc);
982 }
983
984 int
985 kqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
986 {
987         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
988         int               type = lntmsg->msg_type;
989         lnet_process_id_t target = lntmsg->msg_target;
990         int               target_is_router = lntmsg->msg_target_is_router;
991         int               routing = lntmsg->msg_routing;
992         unsigned int      payload_niov = lntmsg->msg_niov;
993         struct iovec     *payload_iov = lntmsg->msg_iov;
994         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
995         unsigned int      payload_offset = lntmsg->msg_offset;
996         unsigned int      payload_nob = lntmsg->msg_len;
997         int               nob;
998         kqswnal_tx_t     *ktx;
999         int               rc;
1000
1001         /* NB 1. hdr is in network byte order */
1002         /*    2. 'private' depends on the message type */
1003         
1004         CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",
1005                payload_nob, payload_niov, libcfs_id2str(target));
1006
1007         LASSERT (payload_nob == 0 || payload_niov > 0);
1008         LASSERT (payload_niov <= LNET_MAX_IOV);
1009
1010         /* It must be OK to kmap() if required */
1011         LASSERT (payload_kiov == NULL || !cfs_in_interrupt ());
1012         /* payload is either all vaddrs or all pages */
1013         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1014
1015         if (kqswnal_nid2elanid (target.nid) < 0) {
1016                 CERROR("%s not in my cluster\n", libcfs_nid2str(target.nid));
1017                 return -EIO;
1018         }
1019
1020         /* I may not block for a transmit descriptor if I might block the
1021          * router, receiver, or an interrupt handler. */
1022         ktx = kqswnal_get_idle_tx();
1023         if (ktx == NULL) {
1024                 CERROR ("Can't get txd for msg type %d for %s\n",
1025                         type, libcfs_nid2str(target.nid));
1026                 return (-ENOMEM);
1027         }
1028
1029         ktx->ktx_state   = KTX_SENDING;
1030         ktx->ktx_nid     = target.nid;
1031         ktx->ktx_args[0] = private;
1032         ktx->ktx_args[1] = lntmsg;
1033         ktx->ktx_args[2] = NULL;    /* set when a GET commits to REPLY */
1034
1035         /* The first frag will be the pre-mapped buffer. */
1036         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1037
1038         if ((!target_is_router &&               /* target.nid is final dest */
1039              !routing &&                        /* I'm the source */
1040              type == LNET_MSG_GET &&            /* optimize GET? */
1041              *kqswnal_tunables.kqn_optimized_gets != 0 &&
1042              lntmsg->msg_md->md_length >= 
1043              *kqswnal_tunables.kqn_optimized_gets) ||
1044             ((type == LNET_MSG_PUT ||            /* optimize PUT? */
1045               type == LNET_MSG_REPLY) &&         /* optimize REPLY? */
1046              *kqswnal_tunables.kqn_optimized_puts != 0 &&
1047              payload_nob >= *kqswnal_tunables.kqn_optimized_puts)) {
1048                 lnet_libmd_t       *md = lntmsg->msg_md;
1049                 kqswnal_msg_t      *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1050                 lnet_hdr_t         *mhdr;
1051                 kqswnal_remotemd_t *rmd;
1052
1053                 /* Optimised path: I send over the Elan vaddrs of the local
1054                  * buffers, and my peer DMAs directly to/from them.
1055                  *
1056                  * First I set up ktx as if it was going to send this
1057                  * payload, (it needs to map it anyway).  This fills
1058                  * ktx_frags[1] and onward with the network addresses
1059                  * of the buffer frags. */
1060
1061                 /* Send an RDMA message */
1062                 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1063                 msg->kqm_version = QSWLND_PROTO_VERSION;
1064                 msg->kqm_type = QSWLND_MSG_RDMA;
1065
1066                 mhdr = &msg->kqm_u.rdma.kqrm_hdr;
1067                 rmd  = &msg->kqm_u.rdma.kqrm_rmd;
1068
1069                 *mhdr = *hdr;
1070                 nob = (((char *)rmd) - ktx->ktx_buffer);
1071
1072                 if (type == LNET_MSG_GET) {
1073                         if ((md->md_options & LNET_MD_KIOV) != 0) 
1074                                 rc = kqswnal_map_tx_kiov (ktx, 0, md->md_length,
1075                                                           md->md_niov, md->md_iov.kiov);
1076                         else
1077                                 rc = kqswnal_map_tx_iov (ktx, 0, md->md_length,
1078                                                          md->md_niov, md->md_iov.iov);
1079                         ktx->ktx_state = KTX_GETTING;
1080                 } else {
1081                         if (payload_kiov != NULL)
1082                                 rc = kqswnal_map_tx_kiov(ktx, 0, payload_nob,
1083                                                          payload_niov, payload_kiov);
1084                         else
1085                                 rc = kqswnal_map_tx_iov(ktx, 0, payload_nob,
1086                                                         payload_niov, payload_iov);
1087                         ktx->ktx_state = KTX_PUTTING;
1088                 }
1089
1090                 if (rc != 0)
1091                         goto out;
1092
1093                 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1094                 nob += offsetof(kqswnal_remotemd_t,
1095                                 kqrmd_frag[rmd->kqrmd_nfrag]);
1096                 LASSERT (nob <= KQSW_TX_BUFFER_SIZE);
1097
1098                 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1099                        rmd->kqrmd_nfrag * sizeof(EP_NMD));
1100
1101                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1102 #if KQSW_CKSUM
1103                 msg->kqm_nob   = nob + payload_nob;
1104                 msg->kqm_cksum = 0;
1105                 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1106 #endif
1107                 if (type == LNET_MSG_GET) {
1108                         /* Allocate reply message now while I'm in thread context */
1109                         ktx->ktx_args[2] = lnet_create_reply_msg (
1110                                 kqswnal_data.kqn_ni, lntmsg);
1111                         if (ktx->ktx_args[2] == NULL)
1112                                 goto out;
1113
1114                         /* NB finalizing the REPLY message is my
1115                          * responsibility now, whatever happens. */
1116 #if KQSW_CKSUM
1117                         if (*kqswnal_tunables.kqn_inject_csum_error ==  3) {
1118                                 msg->kqm_cksum++;
1119                                 *kqswnal_tunables.kqn_inject_csum_error = 0;
1120                         }
1121
1122                 } else if (payload_kiov != NULL) {
1123                         /* must checksum payload after header so receiver can
1124                          * compute partial header cksum before swab.  Sadly
1125                          * this causes 2 rounds of kmap */
1126                         msg->kqm_cksum =
1127                                 kqswnal_csum_kiov(msg->kqm_cksum, 0, payload_nob,
1128                                                   payload_niov, payload_kiov);
1129                         if (*kqswnal_tunables.kqn_inject_csum_error ==  2) {
1130                                 msg->kqm_cksum++;
1131                                 *kqswnal_tunables.kqn_inject_csum_error = 0;
1132                         }
1133                 } else {
1134                         msg->kqm_cksum =
1135                                 kqswnal_csum_iov(msg->kqm_cksum, 0, payload_nob,
1136                                                  payload_niov, payload_iov);
1137                         if (*kqswnal_tunables.kqn_inject_csum_error ==  2) {
1138                                 msg->kqm_cksum++;
1139                                 *kqswnal_tunables.kqn_inject_csum_error = 0;
1140                         }
1141 #endif
1142                 }
1143                 
1144         } else if (payload_nob <= *kqswnal_tunables.kqn_tx_maxcontig) {
1145                 lnet_hdr_t    *mhdr;
1146                 char          *payload;
1147                 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1148
1149                 /* single frag copied into the pre-mapped buffer */
1150                 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1151                 msg->kqm_version = QSWLND_PROTO_VERSION;
1152                 msg->kqm_type = QSWLND_MSG_IMMEDIATE;
1153
1154                 mhdr = &msg->kqm_u.immediate.kqim_hdr;
1155                 payload = msg->kqm_u.immediate.kqim_payload;
1156
1157                 *mhdr = *hdr;
1158                 nob = (payload - ktx->ktx_buffer) + payload_nob;
1159
1160                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1161
1162                 if (payload_kiov != NULL)
1163                         lnet_copy_kiov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
1164                                             payload_niov, payload_kiov, 
1165                                             payload_offset, payload_nob);
1166                 else
1167                         lnet_copy_iov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
1168                                            payload_niov, payload_iov, 
1169                                            payload_offset, payload_nob);
1170 #if KQSW_CKSUM
1171                 msg->kqm_nob   = nob;
1172                 msg->kqm_cksum = 0;
1173                 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1174                 if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
1175                         msg->kqm_cksum++;
1176                         *kqswnal_tunables.kqn_inject_csum_error = 0;
1177                 }
1178 #endif
1179         } else {
1180                 lnet_hdr_t    *mhdr;
1181                 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1182
1183                 /* multiple frags: first is hdr in pre-mapped buffer */
1184                 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1185                 msg->kqm_version = QSWLND_PROTO_VERSION;
1186                 msg->kqm_type = QSWLND_MSG_IMMEDIATE;
1187
1188                 mhdr = &msg->kqm_u.immediate.kqim_hdr;
1189                 nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1190
1191                 *mhdr = *hdr;
1192
1193                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1194
1195                 if (payload_kiov != NULL)
1196                         rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
1197                                                   payload_niov, payload_kiov);
1198                 else
1199                         rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1200                                                  payload_niov, payload_iov);
1201                 if (rc != 0)
1202                         goto out;
1203
1204 #if KQSW_CKSUM
1205                 msg->kqm_nob   = nob + payload_nob;
1206                 msg->kqm_cksum = 0;
1207                 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1208
1209                 msg->kqm_cksum = (payload_kiov != NULL) ?
1210                                  kqswnal_csum_kiov(msg->kqm_cksum,
1211                                                    payload_offset, payload_nob,
1212                                                    payload_niov, payload_kiov) :
1213                                  kqswnal_csum_iov(msg->kqm_cksum,
1214                                                   payload_offset, payload_nob,
1215                                                   payload_niov, payload_iov);
1216
1217                 if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
1218                         msg->kqm_cksum++;
1219                         *kqswnal_tunables.kqn_inject_csum_error = 0;
1220                 }
1221 #endif
1222                 nob += payload_nob;
1223         }
1224
1225         ktx->ktx_port = (nob <= KQSW_SMALLMSG) ?
1226                         EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1227
1228         rc = kqswnal_launch (ktx);
1229
1230  out:
1231         CDEBUG_LIMIT(rc == 0? D_NET :D_NETERROR, "%s %d bytes to %s%s: rc %d\n",
1232                      routing ? (rc == 0 ? "Routed" : "Failed to route") :
1233                                (rc == 0 ? "Sent" : "Failed to send"),
1234                      nob, libcfs_nid2str(target.nid),
1235                      target_is_router ? "(router)" : "", rc);
1236
1237         if (rc != 0) {
1238                 lnet_msg_t *repmsg = (lnet_msg_t *)ktx->ktx_args[2];
1239                 int         state = ktx->ktx_state;
1240
1241                 kqswnal_put_idle_tx (ktx);
1242
1243                 if (state == KTX_GETTING && repmsg != NULL) {
1244                         /* We committed to reply, but there was a problem
1245                          * launching the GET.  We can't avoid delivering a
1246                          * REPLY event since we committed above, so we
1247                          * pretend the GET succeeded but the REPLY
1248                          * failed. */
1249                         rc = 0;
1250                         lnet_finalize (kqswnal_data.kqn_ni, lntmsg, 0);
1251                         lnet_finalize (kqswnal_data.kqn_ni, repmsg, -EIO);
1252                 }
1253                 
1254         }
1255         
1256         cfs_atomic_dec(&kqswnal_data.kqn_pending_txs);
1257         return (rc == 0 ? 0 : -EIO);
1258 }
1259
1260 void
1261 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1262 {
1263         LASSERT (cfs_atomic_read(&krx->krx_refcount) == 0);
1264         LASSERT (!krx->krx_rpc_reply_needed);
1265
1266         krx->krx_state = KRX_POSTED;
1267
1268         if (kqswnal_data.kqn_shuttingdown) {
1269                 /* free EKC rxd on shutdown */
1270                 ep_complete_receive(krx->krx_rxd);
1271         } else {
1272                 /* repost receive */
1273                 ep_requeue_receive(krx->krx_rxd, 
1274                                    kqswnal_rxhandler, krx,
1275                                    &krx->krx_elanbuffer, 0);
1276         }
1277 }
1278
1279 void
1280 kqswnal_rpc_complete (EP_RXD *rxd)
1281 {
1282         int           status = ep_rxd_status(rxd);
1283         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1284         
1285         CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1286                "rxd %p, krx %p, status %d\n", rxd, krx, status);
1287
1288         LASSERT (krx->krx_rxd == rxd);
1289         LASSERT (krx->krx_rpc_reply_needed);
1290         
1291         krx->krx_rpc_reply_needed = 0;
1292         kqswnal_requeue_rx (krx);
1293 }
1294
1295 void
1296 kqswnal_rx_done (kqswnal_rx_t *krx) 
1297 {
1298         int           rc;
1299
1300         LASSERT (cfs_atomic_read(&krx->krx_refcount) == 0);
1301
1302         if (krx->krx_rpc_reply_needed) {
1303                 /* We've not completed the peer's RPC yet... */
1304                 krx->krx_rpc_reply.msg.magic   = LNET_PROTO_QSW_MAGIC;
1305                 krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION;
1306
1307                 LASSERT (!cfs_in_interrupt());
1308
1309                 rc = ep_complete_rpc(krx->krx_rxd, 
1310                                      kqswnal_rpc_complete, krx,
1311                                      &krx->krx_rpc_reply.ep_statusblk, 
1312                                      NULL, NULL, 0);
1313                 if (rc == EP_SUCCESS)
1314                         return;
1315
1316                 CERROR("can't complete RPC: %d\n", rc);
1317                 krx->krx_rpc_reply_needed = 0;
1318         }
1319
1320         kqswnal_requeue_rx(krx);
1321 }
1322         
1323 void
1324 kqswnal_parse (kqswnal_rx_t *krx)
1325 {
1326         lnet_ni_t      *ni = kqswnal_data.kqn_ni;
1327         kqswnal_msg_t  *msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
1328         lnet_nid_t      fromnid = kqswnal_rx_nid(krx);
1329         int             swab;
1330         int             n;
1331         int             i;
1332         int             nob;
1333         int             rc;
1334
1335         LASSERT (cfs_atomic_read(&krx->krx_refcount) == 1);
1336
1337         if (krx->krx_nob < offsetof(kqswnal_msg_t, kqm_u)) {
1338                 CERROR("Short message %d received from %s\n",
1339                        krx->krx_nob, libcfs_nid2str(fromnid));
1340                 goto done;
1341         }
1342
1343         swab = msg->kqm_magic == __swab32(LNET_PROTO_QSW_MAGIC);
1344
1345         if (swab || msg->kqm_magic == LNET_PROTO_QSW_MAGIC) {
1346 #if KQSW_CKSUM
1347                 __u32 csum0;
1348                 __u32 csum1;
1349
1350                 /* csum byte array before swab */
1351                 csum1 = msg->kqm_cksum;
1352                 msg->kqm_cksum = 0;
1353                 csum0 = kqswnal_csum_kiov(~0, 0, krx->krx_nob,
1354                                           krx->krx_npages, krx->krx_kiov);
1355                 msg->kqm_cksum = csum1;
1356 #endif
1357
1358                 if (swab) {
1359                         __swab16s(&msg->kqm_version);
1360                         __swab16s(&msg->kqm_type);
1361 #if KQSW_CKSUM
1362                         __swab32s(&msg->kqm_cksum);
1363                         __swab32s(&msg->kqm_nob);
1364 #endif
1365                 }
1366
1367                 if (msg->kqm_version != QSWLND_PROTO_VERSION) {
1368                         /* Future protocol version compatibility support!
1369                          * The next qswlnd-specific protocol rev will first
1370                          * send an RPC to check version.
1371                          * 1.4.6 and 1.4.7.early reply with a status
1372                          * block containing its current version.
1373                          * Later versions send a failure (-ve) status +
1374                          * magic/version */
1375
1376                         if (!krx->krx_rpc_reply_needed) {
1377                                 CERROR("Unexpected version %d from %s\n",
1378                                        msg->kqm_version, libcfs_nid2str(fromnid));
1379                                 goto done;
1380                         }
1381
1382                         LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
1383                         goto done;
1384                 }
1385
1386                 switch (msg->kqm_type) {
1387                 default:
1388                         CERROR("Bad request type %x from %s\n",
1389                                msg->kqm_type, libcfs_nid2str(fromnid));
1390                         goto done;
1391
1392                 case QSWLND_MSG_IMMEDIATE:
1393                         if (krx->krx_rpc_reply_needed) {
1394                                 /* Should have been a simple message */
1395                                 CERROR("IMMEDIATE sent as RPC from %s\n",
1396                                        libcfs_nid2str(fromnid));
1397                                 goto done;
1398                         }
1399
1400                         nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1401                         if (krx->krx_nob < nob) {
1402                                 CERROR("Short IMMEDIATE %d(%d) from %s\n",
1403                                        krx->krx_nob, nob, libcfs_nid2str(fromnid));
1404                                 goto done;
1405                         }
1406
1407 #if KQSW_CKSUM
1408                         if (csum0 != msg->kqm_cksum) {
1409                                 CERROR("Bad IMMEDIATE checksum %08x(%08x) from %s\n",
1410                                        csum0, msg->kqm_cksum, libcfs_nid2str(fromnid));
1411                                 CERROR("nob %d (%d)\n", krx->krx_nob, msg->kqm_nob);
1412                                 goto done;
1413                         }
1414 #endif
1415                         rc = lnet_parse(ni, &msg->kqm_u.immediate.kqim_hdr,
1416                                         fromnid, krx, 0);
1417                         if (rc < 0)
1418                                 goto done;
1419                         return;
1420
1421                 case QSWLND_MSG_RDMA:
1422                         if (!krx->krx_rpc_reply_needed) {
1423                                 /* Should have been a simple message */
1424                                 CERROR("RDMA sent as simple message from %s\n",
1425                                        libcfs_nid2str(fromnid));
1426                                 goto done;
1427                         }
1428
1429                         nob = offsetof(kqswnal_msg_t,
1430                                        kqm_u.rdma.kqrm_rmd.kqrmd_frag[0]);
1431                         if (krx->krx_nob < nob) {
1432                                 CERROR("Short RDMA message %d(%d) from %s\n",
1433                                        krx->krx_nob, nob, libcfs_nid2str(fromnid));
1434                                 goto done;
1435                         }
1436
1437                         if (swab)
1438                                 __swab32s(&msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag);
1439
1440                         n = msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag;
1441                         nob = offsetof(kqswnal_msg_t,
1442                                        kqm_u.rdma.kqrm_rmd.kqrmd_frag[n]);
1443
1444                         if (krx->krx_nob < nob) {
1445                                 CERROR("short RDMA message %d(%d) from %s\n",
1446                                        krx->krx_nob, nob, libcfs_nid2str(fromnid));
1447                                 goto done;
1448                         }
1449
1450                         if (swab) {
1451                                 for (i = 0; i < n; i++) {
1452                                         EP_NMD *nmd = &msg->kqm_u.rdma.kqrm_rmd.kqrmd_frag[i];
1453
1454                                         __swab32s(&nmd->nmd_addr);
1455                                         __swab32s(&nmd->nmd_len);
1456                                         __swab32s(&nmd->nmd_attr);
1457                                 }
1458                         }
1459
1460 #if KQSW_CKSUM
1461                         krx->krx_cksum = csum0; /* stash checksum so far */
1462 #endif
1463                         rc = lnet_parse(ni, &msg->kqm_u.rdma.kqrm_hdr,
1464                                         fromnid, krx, 1);
1465                         if (rc < 0)
1466                                 goto done;
1467                         return;
1468                 }
1469                 /* Not Reached */
1470         }
1471
1472         if (msg->kqm_magic == LNET_PROTO_MAGIC ||
1473             msg->kqm_magic == __swab32(LNET_PROTO_MAGIC)) {
1474                 /* Future protocol version compatibility support!
1475                  * When LNET unifies protocols over all LNDs, the first thing a
1476                  * peer will send will be a version query RPC.  
1477                  * 1.4.6 and 1.4.7.early reply with a status block containing
1478                  * LNET_PROTO_QSW_MAGIC..
1479                  * Later versions send a failure (-ve) status +
1480                  * magic/version */
1481
1482                 if (!krx->krx_rpc_reply_needed) {
1483                         CERROR("Unexpected magic %08x from %s\n",
1484                                msg->kqm_magic, libcfs_nid2str(fromnid));
1485                         goto done;
1486                 }
1487
1488                 LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
1489                 goto done;
1490         }
1491
1492         CERROR("Unrecognised magic %08x from %s\n",
1493                msg->kqm_magic, libcfs_nid2str(fromnid));
1494  done:
1495         kqswnal_rx_decref(krx);
1496 }
1497
1498 /* Receive Interrupt Handler: posts to schedulers */
1499 void 
1500 kqswnal_rxhandler(EP_RXD *rxd)
1501 {
1502         unsigned long flags;
1503         int           nob    = ep_rxd_len (rxd);
1504         int           status = ep_rxd_status (rxd);
1505         kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1506         CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1507                rxd, krx, nob, status);
1508
1509         LASSERT (krx != NULL);
1510         LASSERT (krx->krx_state == KRX_POSTED);
1511         
1512         krx->krx_state = KRX_PARSE;
1513         krx->krx_rxd = rxd;
1514         krx->krx_nob = nob;
1515
1516         /* RPC reply iff rpc request received without error */
1517         krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
1518                                     (status == EP_SUCCESS ||
1519                                      status == EP_MSG_TOO_BIG);
1520
1521         /* Default to failure if an RPC reply is requested but not handled */
1522         krx->krx_rpc_reply.msg.status = -EPROTO;
1523         cfs_atomic_set (&krx->krx_refcount, 1);
1524
1525         if (status != EP_SUCCESS) {
1526                 /* receives complete with failure when receiver is removed */
1527                 if (status == EP_SHUTDOWN)
1528                         LASSERT (kqswnal_data.kqn_shuttingdown);
1529                 else
1530                         CERROR("receive status failed with status %d nob %d\n",
1531                                ep_rxd_status(rxd), nob);
1532                 kqswnal_rx_decref(krx);
1533                 return;
1534         }
1535
1536         if (!cfs_in_interrupt()) {
1537                 kqswnal_parse(krx);
1538                 return;
1539         }
1540
1541         cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1542
1543         cfs_list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1544         cfs_waitq_signal (&kqswnal_data.kqn_sched_waitq);
1545
1546         cfs_spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1547 }
1548
1549 int
1550 kqswnal_recv (lnet_ni_t     *ni,
1551               void          *private,
1552               lnet_msg_t    *lntmsg,
1553               int            delayed,
1554               unsigned int   niov,
1555               struct iovec  *iov,
1556               lnet_kiov_t   *kiov,
1557               unsigned int   offset,
1558               unsigned int   mlen,
1559               unsigned int   rlen)
1560 {
1561         kqswnal_rx_t       *krx = (kqswnal_rx_t *)private;
1562         lnet_nid_t          fromnid;
1563         kqswnal_msg_t      *msg;
1564         lnet_hdr_t         *hdr;
1565         kqswnal_remotemd_t *rmd;
1566         int                 msg_offset;
1567         int                 rc;
1568
1569         LASSERT (!cfs_in_interrupt ());             /* OK to map */
1570         /* Either all pages or all vaddrs */
1571         LASSERT (!(kiov != NULL && iov != NULL));
1572
1573         fromnid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ep_rxd_node(krx->krx_rxd));
1574         msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
1575
1576         if (krx->krx_rpc_reply_needed) {
1577                 /* optimized (rdma) request sent as RPC */
1578
1579                 LASSERT (msg->kqm_type == QSWLND_MSG_RDMA);
1580                 hdr = &msg->kqm_u.rdma.kqrm_hdr;
1581                 rmd = &msg->kqm_u.rdma.kqrm_rmd;
1582
1583                 /* NB header is still in wire byte order */
1584
1585                 switch (le32_to_cpu(hdr->type)) {
1586                         case LNET_MSG_PUT:
1587                         case LNET_MSG_REPLY:
1588                                 /* This is an optimized PUT/REPLY */
1589                                 rc = kqswnal_rdma(krx, lntmsg, 
1590                                                   KTX_RDMA_FETCH, rmd,
1591                                                   niov, iov, kiov, offset, mlen);
1592                                 break;
1593
1594                         case LNET_MSG_GET:
1595 #if KQSW_CKSUM
1596                                 if (krx->krx_cksum != msg->kqm_cksum) {
1597                                         CERROR("Bad GET checksum %08x(%08x) from %s\n",
1598                                                krx->krx_cksum, msg->kqm_cksum,
1599                                                libcfs_nid2str(fromnid));
1600                                         rc = -EIO;
1601                                         break;
1602                                 }
1603 #endif                                
1604                                 if (lntmsg == NULL) {
1605                                         /* No buffer match: my decref will
1606                                          * complete the RPC with failure */
1607                                         rc = 0;
1608                                 } else {
1609                                         /* Matched something! */
1610                                         rc = kqswnal_rdma(krx, lntmsg,
1611                                                           KTX_RDMA_STORE, rmd,
1612                                                           lntmsg->msg_niov,
1613                                                           lntmsg->msg_iov,
1614                                                           lntmsg->msg_kiov,
1615                                                           lntmsg->msg_offset,
1616                                                           lntmsg->msg_len);
1617                                 }
1618                                 break;
1619
1620                         default:
1621                                 CERROR("Bad RPC type %d\n",
1622                                        le32_to_cpu(hdr->type));
1623                                 rc = -EPROTO;
1624                                 break;
1625                 }
1626
1627                 kqswnal_rx_decref(krx);
1628                 return rc;
1629         }
1630
1631         LASSERT (msg->kqm_type == QSWLND_MSG_IMMEDIATE);
1632         msg_offset = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1633         
1634         if (krx->krx_nob < msg_offset + rlen) {
1635                 CERROR("Bad message size from %s: have %d, need %d + %d\n",
1636                        libcfs_nid2str(fromnid), krx->krx_nob,
1637                        msg_offset, rlen);
1638                 kqswnal_rx_decref(krx);
1639                 return -EPROTO;
1640         }
1641
1642         if (kiov != NULL)
1643                 lnet_copy_kiov2kiov(niov, kiov, offset,
1644                                     krx->krx_npages, krx->krx_kiov, 
1645                                     msg_offset, mlen);
1646         else
1647                 lnet_copy_kiov2iov(niov, iov, offset,
1648                                    krx->krx_npages, krx->krx_kiov, 
1649                                    msg_offset, mlen);
1650
1651         lnet_finalize(ni, lntmsg, 0);
1652         kqswnal_rx_decref(krx);
1653         return 0;
1654 }
1655
1656 int
1657 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1658 {
1659         long    pid = cfs_create_thread (fn, arg, 0);
1660
1661         if (pid < 0)
1662                 return ((int)pid);
1663
1664         cfs_atomic_inc (&kqswnal_data.kqn_nthreads);
1665         return (0);
1666 }
1667
1668 void
1669 kqswnal_thread_fini (void)
1670 {
1671         cfs_atomic_dec (&kqswnal_data.kqn_nthreads);
1672 }
1673
1674 int
1675 kqswnal_scheduler (void *arg)
1676 {
1677         kqswnal_rx_t    *krx;
1678         kqswnal_tx_t    *ktx;
1679         unsigned long    flags;
1680         int              rc;
1681         int              counter = 0;
1682         int              did_something;
1683
1684         cfs_daemonize ("kqswnal_sched");
1685         cfs_block_allsigs ();
1686
1687         cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1688
1689         for (;;)
1690         {
1691                 did_something = 0;
1692
1693                 if (!cfs_list_empty (&kqswnal_data.kqn_readyrxds))
1694                 {
1695                         krx = cfs_list_entry(kqswnal_data.kqn_readyrxds.next,
1696                                              kqswnal_rx_t, krx_list);
1697                         cfs_list_del (&krx->krx_list);
1698                         cfs_spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1699                                                    flags);
1700
1701                         LASSERT (krx->krx_state == KRX_PARSE);
1702                         kqswnal_parse (krx);
1703
1704                         did_something = 1;
1705                         cfs_spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
1706                                               flags);
1707                 }
1708
1709                 if (!cfs_list_empty (&kqswnal_data.kqn_donetxds))
1710                 {
1711                         ktx = cfs_list_entry(kqswnal_data.kqn_donetxds.next,
1712                                              kqswnal_tx_t, ktx_schedlist);
1713                         cfs_list_del_init (&ktx->ktx_schedlist);
1714                         cfs_spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1715                                                    flags);
1716
1717                         kqswnal_tx_done_in_thread_context(ktx);
1718
1719                         did_something = 1;
1720                         cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock,
1721                                                flags);
1722                 }
1723
1724                 if (!cfs_list_empty (&kqswnal_data.kqn_delayedtxds))
1725                 {
1726                         ktx = cfs_list_entry(kqswnal_data.kqn_delayedtxds.next,
1727                                              kqswnal_tx_t, ktx_schedlist);
1728                         cfs_list_del_init (&ktx->ktx_schedlist);
1729                         cfs_spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1730                                                    flags);
1731
1732                         rc = kqswnal_launch (ktx);
1733                         if (rc != 0) {
1734                                 CERROR("Failed delayed transmit to %s: %d\n", 
1735                                        libcfs_nid2str(ktx->ktx_nid), rc);
1736                                 kqswnal_tx_done (ktx, rc);
1737                         }
1738                         cfs_atomic_dec (&kqswnal_data.kqn_pending_txs);
1739
1740                         did_something = 1;
1741                         cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock,
1742                                                flags);
1743                 }
1744
1745                 /* nothing to do or hogging CPU */
1746                 if (!did_something || counter++ == KQSW_RESCHED) {
1747                         cfs_spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1748                                                    flags);
1749
1750                         counter = 0;
1751
1752                         if (!did_something) {
1753                                 if (kqswnal_data.kqn_shuttingdown == 2) {
1754                                         /* We only exit in stage 2 of shutdown
1755                                          * when there's nothing left to do */
1756                                         break;
1757                                 }
1758                                 cfs_wait_event_interruptible_exclusive (
1759                                         kqswnal_data.kqn_sched_waitq,
1760                                         kqswnal_data.kqn_shuttingdown == 2 ||
1761                                         !cfs_list_empty(&kqswnal_data. \
1762                                                         kqn_readyrxds) ||
1763                                         !cfs_list_empty(&kqswnal_data. \
1764                                                         kqn_donetxds) ||
1765                                         !cfs_list_empty(&kqswnal_data. \
1766                                                         kqn_delayedtxds, rc));
1767                                 LASSERT (rc == 0);
1768                         } else if (need_resched())
1769                                 cfs_schedule ();
1770
1771                         cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock,
1772                                                flags);
1773                 }
1774         }
1775
1776         kqswnal_thread_fini ();
1777         return (0);
1778 }