Whamcloud - gitweb
LU-6068 misc: update Intel copyright messages 2014
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd_cb.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ralnd/ralnd_cb.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40
41 #include <asm/page.h>
42 #include "ralnd.h"
43
44 void
45 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
46 {
47         kra_device_t *dev;
48         int           i;
49         unsigned long flags;
50
51         CDEBUG(D_NET, "callback for device %d\n", devid);
52
53         for (i = 0; i < kranal_data.kra_ndevs; i++) {
54
55                 dev = &kranal_data.kra_devices[i];
56                 if (dev->rad_id != devid)
57                         continue;
58
59                 spin_lock_irqsave(&dev->rad_lock, flags);
60
61                 if (!dev->rad_ready) {
62                         dev->rad_ready = 1;
63                         wake_up(&dev->rad_waitq);
64                 }
65
66                 spin_unlock_irqrestore(&dev->rad_lock, flags);
67                 return;
68         }
69
70         CWARN("callback for unknown device %d\n", devid);
71 }
72
73 void
74 kranal_schedule_conn(kra_conn_t *conn)
75 {
76         kra_device_t    *dev = conn->rac_device;
77         unsigned long    flags;
78
79         spin_lock_irqsave(&dev->rad_lock, flags);
80
81         if (!conn->rac_scheduled) {
82                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
83                 conn->rac_scheduled = 1;
84                 cfs_list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
85                 wake_up(&dev->rad_waitq);
86         }
87
88         spin_unlock_irqrestore(&dev->rad_lock, flags);
89 }
90
91 kra_tx_t *
92 kranal_get_idle_tx (void)
93 {
94         unsigned long  flags;
95         kra_tx_t      *tx;
96
97         spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
98
99         if (cfs_list_empty(&kranal_data.kra_idle_txs)) {
100                 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
101                 return NULL;
102         }
103
104         tx = cfs_list_entry(kranal_data.kra_idle_txs.next, kra_tx_t, tx_list);
105         cfs_list_del(&tx->tx_list);
106
107         /* Allocate a new completion cookie.  It might not be needed, but we've
108          * got a lock right now... */
109         tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
110
111         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
112
113         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
114         LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
115         LASSERT (tx->tx_conn == NULL);
116         LASSERT (tx->tx_lntmsg[0] == NULL);
117         LASSERT (tx->tx_lntmsg[1] == NULL);
118
119         return tx;
120 }
121
122 void
123 kranal_init_msg(kra_msg_t *msg, int type)
124 {
125         msg->ram_magic = RANAL_MSG_MAGIC;
126         msg->ram_version = RANAL_MSG_VERSION;
127         msg->ram_type = type;
128         msg->ram_srcnid = kranal_data.kra_ni->ni_nid;
129         /* ram_connstamp gets set when FMA is sent */
130 }
131
132 kra_tx_t *
133 kranal_new_tx_msg (int type)
134 {
135         kra_tx_t *tx = kranal_get_idle_tx();
136
137         if (tx != NULL)
138                 kranal_init_msg(&tx->tx_msg, type);
139
140         return tx;
141 }
142
143 int
144 kranal_setup_immediate_buffer (kra_tx_t *tx, 
145                                unsigned int niov, struct iovec *iov,
146                                int offset, int nob)
147
148 {
149         /* For now this is almost identical to kranal_setup_virt_buffer, but we
150          * could "flatten" the payload into a single contiguous buffer ready
151          * for sending direct over an FMA if we ever needed to. */
152
153         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
154         LASSERT (nob >= 0);
155
156         if (nob == 0) {
157                 tx->tx_buffer = NULL;
158         } else {
159                 LASSERT (niov > 0);
160
161                 while (offset >= iov->iov_len) {
162                         offset -= iov->iov_len;
163                         niov--;
164                         iov++;
165                         LASSERT (niov > 0);
166                 }
167
168                 if (nob > iov->iov_len - offset) {
169                         CERROR("Can't handle multiple vaddr fragments\n");
170                         return -EMSGSIZE;
171                 }
172
173                 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
174         }
175
176         tx->tx_buftype = RANAL_BUF_IMMEDIATE;
177         tx->tx_nob = nob;
178         return 0;
179 }
180
181 int
182 kranal_setup_virt_buffer (kra_tx_t *tx, 
183                           unsigned int niov, struct iovec *iov,
184                           int offset, int nob)
185
186 {
187         LASSERT (nob > 0);
188         LASSERT (niov > 0);
189         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
190
191         while (offset >= iov->iov_len) {
192                 offset -= iov->iov_len;
193                 niov--;
194                 iov++;
195                 LASSERT (niov > 0);
196         }
197
198         if (nob > iov->iov_len - offset) {
199                 CERROR("Can't handle multiple vaddr fragments\n");
200                 return -EMSGSIZE;
201         }
202
203         tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
204         tx->tx_nob = nob;
205         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
206         return 0;
207 }
208
209 int
210 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
211                           int offset, int nob)
212 {
213         RAP_PHYS_REGION *phys = tx->tx_phys;
214         int              resid;
215
216         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
217
218         LASSERT (nob > 0);
219         LASSERT (nkiov > 0);
220         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
221
222         while (offset >= kiov->kiov_len) {
223                 offset -= kiov->kiov_len;
224                 nkiov--;
225                 kiov++;
226                 LASSERT (nkiov > 0);
227         }
228
229         tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
230         tx->tx_nob = nob;
231         tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
232
233         phys->Address = page_to_phys(kiov->kiov_page);
234         phys++;
235
236         resid = nob - (kiov->kiov_len - offset);
237         while (resid > 0) {
238                 kiov++;
239                 nkiov--;
240                 LASSERT (nkiov > 0);
241
242                 if (kiov->kiov_offset != 0 ||
243                     ((resid > PAGE_SIZE) &&
244                      kiov->kiov_len < PAGE_SIZE)) {
245                         /* Can't have gaps */
246                         CERROR("Can't make payload contiguous in I/O VM:"
247                                "page %d, offset %d, len %d \n",
248                                (int)(phys - tx->tx_phys),
249                                kiov->kiov_offset, kiov->kiov_len);
250                         return -EINVAL;
251                 }
252
253                 if ((phys - tx->tx_phys) == LNET_MAX_IOV) {
254                         CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
255                         return -EMSGSIZE;
256                 }
257
258                 phys->Address = page_to_phys(kiov->kiov_page);
259                 phys++;
260
261                 resid -= PAGE_SIZE;
262         }
263
264         tx->tx_phys_npages = phys - tx->tx_phys;
265         return 0;
266 }
267
268 static inline int
269 kranal_setup_rdma_buffer (kra_tx_t *tx, unsigned int niov,
270                           struct iovec *iov, lnet_kiov_t *kiov,
271                           int offset, int nob)
272 {
273         LASSERT ((iov == NULL) != (kiov == NULL));
274
275         if (kiov != NULL)
276                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
277
278         return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
279 }
280
281 int
282 kranal_map_buffer (kra_tx_t *tx)
283 {
284         kra_conn_t     *conn = tx->tx_conn;
285         kra_device_t   *dev = conn->rac_device;
286         RAP_RETURN      rrc;
287
288         LASSERT (current == dev->rad_scheduler);
289
290         switch (tx->tx_buftype) {
291         default:
292                 LBUG();
293
294         case RANAL_BUF_NONE:
295         case RANAL_BUF_IMMEDIATE:
296         case RANAL_BUF_PHYS_MAPPED:
297         case RANAL_BUF_VIRT_MAPPED:
298                 return 0;
299
300         case RANAL_BUF_PHYS_UNMAPPED:
301                 rrc = RapkRegisterPhys(dev->rad_handle,
302                                        tx->tx_phys, tx->tx_phys_npages,
303                                        &tx->tx_map_key);
304                 if (rrc != RAP_SUCCESS) {
305                         CERROR ("Can't map %d pages: dev %d "
306                                 "phys %u pp %u, virt %u nob %lu\n",
307                                 tx->tx_phys_npages, dev->rad_id, 
308                                 dev->rad_nphysmap, dev->rad_nppphysmap,
309                                 dev->rad_nvirtmap, dev->rad_nobvirtmap);
310                         return -ENOMEM; /* assume insufficient resources */
311                 }
312
313                 dev->rad_nphysmap++;
314                 dev->rad_nppphysmap += tx->tx_phys_npages;
315
316                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
317                 return 0;
318
319         case RANAL_BUF_VIRT_UNMAPPED:
320                 rrc = RapkRegisterMemory(dev->rad_handle,
321                                          tx->tx_buffer, tx->tx_nob,
322                                          &tx->tx_map_key);
323                 if (rrc != RAP_SUCCESS) {
324                         CERROR ("Can't map %d bytes: dev %d "
325                                 "phys %u pp %u, virt %u nob %lu\n",
326                                 tx->tx_nob, dev->rad_id, 
327                                 dev->rad_nphysmap, dev->rad_nppphysmap,
328                                 dev->rad_nvirtmap, dev->rad_nobvirtmap);
329                         return -ENOMEM; /* assume insufficient resources */
330                 }
331
332                 dev->rad_nvirtmap++;
333                 dev->rad_nobvirtmap += tx->tx_nob;
334
335                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
336                 return 0;
337         }
338 }
339
340 void
341 kranal_unmap_buffer (kra_tx_t *tx)
342 {
343         kra_device_t   *dev;
344         RAP_RETURN      rrc;
345
346         switch (tx->tx_buftype) {
347         default:
348                 LBUG();
349
350         case RANAL_BUF_NONE:
351         case RANAL_BUF_IMMEDIATE:
352         case RANAL_BUF_PHYS_UNMAPPED:
353         case RANAL_BUF_VIRT_UNMAPPED:
354                 break;
355
356         case RANAL_BUF_PHYS_MAPPED:
357                 LASSERT (tx->tx_conn != NULL);
358                 dev = tx->tx_conn->rac_device;
359                 LASSERT (current == dev->rad_scheduler);
360                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
361                                            &tx->tx_map_key);
362                 LASSERT (rrc == RAP_SUCCESS);
363
364                 dev->rad_nphysmap--;
365                 dev->rad_nppphysmap -= tx->tx_phys_npages;
366
367                 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
368                 break;
369
370         case RANAL_BUF_VIRT_MAPPED:
371                 LASSERT (tx->tx_conn != NULL);
372                 dev = tx->tx_conn->rac_device;
373                 LASSERT (current == dev->rad_scheduler);
374                 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
375                                            &tx->tx_map_key);
376                 LASSERT (rrc == RAP_SUCCESS);
377
378                 dev->rad_nvirtmap--;
379                 dev->rad_nobvirtmap -= tx->tx_nob;
380
381                 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
382                 break;
383         }
384 }
385
386 void
387 kranal_tx_done (kra_tx_t *tx, int completion)
388 {
389         lnet_msg_t      *lnetmsg[2];
390         unsigned long    flags;
391         int              i;
392
393         LASSERT (!in_interrupt());
394
395         kranal_unmap_buffer(tx);
396
397         lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
398         lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
399
400         tx->tx_buftype = RANAL_BUF_NONE;
401         tx->tx_msg.ram_type = RANAL_MSG_NONE;
402         tx->tx_conn = NULL;
403
404         spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
405
406         cfs_list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
407
408         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
409
410         /* finalize AFTER freeing lnet msgs */
411         for (i = 0; i < 2; i++) {
412                 if (lnetmsg[i] == NULL)
413                         continue;
414
415                 lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion);
416         }
417 }
418
419 kra_conn_t *
420 kranal_find_conn_locked (kra_peer_t *peer)
421 {
422         cfs_list_t *tmp;
423
424         /* just return the first connection */
425         cfs_list_for_each (tmp, &peer->rap_conns) {
426                 return cfs_list_entry(tmp, kra_conn_t, rac_list);
427         }
428
429         return NULL;
430 }
431
432 void
433 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
434 {
435         unsigned long    flags;
436
437         tx->tx_conn = conn;
438
439         spin_lock_irqsave(&conn->rac_lock, flags);
440         cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
441         tx->tx_qtime = jiffies;
442         spin_unlock_irqrestore(&conn->rac_lock, flags);
443
444         kranal_schedule_conn(conn);
445 }
446
447 void
448 kranal_launch_tx (kra_tx_t *tx, lnet_nid_t nid)
449 {
450         unsigned long    flags;
451         kra_peer_t      *peer;
452         kra_conn_t      *conn;
453         int              rc;
454         int              retry;
455         rwlock_t    *g_lock = &kranal_data.kra_global_lock;
456
457         /* If I get here, I've committed to send, so I complete the tx with
458          * failure on any problems */
459
460         LASSERT (tx->tx_conn == NULL);      /* only set when assigned a conn */
461
462         for (retry = 0; ; retry = 1) {
463
464                 read_lock(g_lock);
465
466                 peer = kranal_find_peer_locked(nid);
467                 if (peer != NULL) {
468                         conn = kranal_find_conn_locked(peer);
469                         if (conn != NULL) {
470                                 kranal_post_fma(conn, tx);
471                                 read_unlock(g_lock);
472                                 return;
473                         }
474                 }
475                 
476                 /* Making connections; I'll need a write lock... */
477                 read_unlock(g_lock);
478                 write_lock_irqsave(g_lock, flags);
479
480                 peer = kranal_find_peer_locked(nid);
481                 if (peer != NULL)
482                         break;
483                 
484                 write_unlock_irqrestore(g_lock, flags);
485                 
486                 if (retry) {
487                         CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
488                         kranal_tx_done(tx, -EHOSTUNREACH);
489                         return;
490                 }
491
492                 rc = kranal_add_persistent_peer(nid, LNET_NIDADDR(nid),
493                                                 lnet_acceptor_port());
494                 if (rc != 0) {
495                         CERROR("Can't add peer %s: %d\n",
496                                libcfs_nid2str(nid), rc);
497                         kranal_tx_done(tx, rc);
498                         return;
499                 }
500         }
501         
502         conn = kranal_find_conn_locked(peer);
503         if (conn != NULL) {
504                 /* Connection exists; queue message on it */
505                 kranal_post_fma(conn, tx);
506                 write_unlock_irqrestore(g_lock, flags);
507                 return;
508         }
509                         
510         LASSERT (peer->rap_persistence > 0);
511
512         if (!peer->rap_connecting) {
513                 LASSERT (cfs_list_empty(&peer->rap_tx_queue));
514
515                 if (!(peer->rap_reconnect_interval == 0 || /* first attempt */
516                       cfs_time_aftereq(jiffies, peer->rap_reconnect_time))) {
517                         write_unlock_irqrestore(g_lock, flags);
518                         kranal_tx_done(tx, -EHOSTUNREACH);
519                         return;
520                 }
521
522                 peer->rap_connecting = 1;
523                 kranal_peer_addref(peer); /* extra ref for connd */
524
525                 spin_lock(&kranal_data.kra_connd_lock);
526
527                 cfs_list_add_tail(&peer->rap_connd_list,
528                               &kranal_data.kra_connd_peers);
529                 wake_up(&kranal_data.kra_connd_waitq);
530
531                 spin_unlock(&kranal_data.kra_connd_lock);
532         }
533
534         /* A connection is being established; queue the message... */
535         cfs_list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
536
537         write_unlock_irqrestore(g_lock, flags);
538 }
539
540 void
541 kranal_rdma(kra_tx_t *tx, int type,
542             kra_rdma_desc_t *sink, int nob, __u64 cookie)
543 {
544         kra_conn_t   *conn = tx->tx_conn;
545         RAP_RETURN    rrc;
546         unsigned long flags;
547
548         LASSERT (kranal_tx_mapped(tx));
549         LASSERT (nob <= sink->rard_nob);
550         LASSERT (nob <= tx->tx_nob);
551
552         /* No actual race with scheduler sending CLOSE (I'm she!) */
553         LASSERT (current == conn->rac_device->rad_scheduler);
554
555         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
556         tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
557         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
558         tx->tx_rdma_desc.DstPtr = sink->rard_addr;
559         tx->tx_rdma_desc.DstKey = sink->rard_key;
560         tx->tx_rdma_desc.Length = nob;
561         tx->tx_rdma_desc.AppPtr = tx;
562
563         /* prep final completion message */
564         kranal_init_msg(&tx->tx_msg, type);
565         tx->tx_msg.ram_u.completion.racm_cookie = cookie;
566
567         if (nob == 0) { /* Immediate completion */
568                 kranal_post_fma(conn, tx);
569                 return;
570         }
571
572         LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
573
574         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
575         LASSERT (rrc == RAP_SUCCESS);
576
577         spin_lock_irqsave(&conn->rac_lock, flags);
578         cfs_list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
579         tx->tx_qtime = jiffies;
580         spin_unlock_irqrestore(&conn->rac_lock, flags);
581 }
582
583 int
584 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
585 {
586         __u32      nob_received = nob;
587         RAP_RETURN rrc;
588
589         LASSERT (conn->rac_rxmsg != NULL);
590         CDEBUG(D_NET, "Consuming %p\n", conn);
591
592         rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
593                              &nob_received, sizeof(kra_msg_t));
594         LASSERT (rrc == RAP_SUCCESS);
595
596         conn->rac_rxmsg = NULL;
597
598         if (nob_received < nob) {
599                 CWARN("Incomplete immediate msg from %s: expected %d, got %d\n",
600                       libcfs_nid2str(conn->rac_peer->rap_nid), 
601                       nob, nob_received);
602                 return -EPROTO;
603         }
604
605         return 0;
606 }
607
608 int
609 kranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
610 {
611         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
612         int               type = lntmsg->msg_type;
613         lnet_process_id_t target = lntmsg->msg_target;
614         int               target_is_router = lntmsg->msg_target_is_router;
615         int               routing = lntmsg->msg_routing;
616         unsigned int      niov = lntmsg->msg_niov;
617         struct iovec     *iov = lntmsg->msg_iov;
618         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
619         unsigned int      offset = lntmsg->msg_offset;
620         unsigned int      nob = lntmsg->msg_len;
621         kra_tx_t         *tx;
622         int               rc;
623
624         /* NB 'private' is different depending on what we're sending.... */
625
626         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
627                nob, niov, libcfs_id2str(target));
628
629         LASSERT (nob == 0 || niov > 0);
630         LASSERT (niov <= LNET_MAX_IOV);
631
632         LASSERT (!in_interrupt());
633         /* payload is either all vaddrs or all pages */
634         LASSERT (!(kiov != NULL && iov != NULL));
635
636         if (routing) {
637                 CERROR ("Can't route\n");
638                 return -EIO;
639         }
640
641         switch(type) {
642         default:
643                 LBUG();
644
645         case LNET_MSG_ACK:
646                 LASSERT (nob == 0);
647                 break;
648
649         case LNET_MSG_GET:
650                 LASSERT (niov == 0);
651                 LASSERT (nob == 0);
652                 /* We have to consider the eventual sink buffer rather than any
653                  * payload passed here (there isn't any, and strictly, looking
654                  * inside lntmsg is a layering violation).  We send a simple
655                  * IMMEDIATE GET if the sink buffer is mapped already and small
656                  * enough for FMA */
657
658                 if (routing || target_is_router)
659                         break;                  /* send IMMEDIATE */
660
661                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0 &&
662                     lntmsg->msg_md->md_length <= RANAL_FMA_MAX_DATA &&
663                     lntmsg->msg_md->md_length <= *kranal_tunables.kra_max_immediate)
664                         break;                  /* send IMMEDIATE */
665
666                 tx = kranal_new_tx_msg(RANAL_MSG_GET_REQ);
667                 if (tx == NULL)
668                         return -ENOMEM;
669
670                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
671                         rc = kranal_setup_virt_buffer(tx, lntmsg->msg_md->md_niov,
672                                                       lntmsg->msg_md->md_iov.iov,
673                                                       0, lntmsg->msg_md->md_length);
674                 else
675                         rc = kranal_setup_phys_buffer(tx, lntmsg->msg_md->md_niov,
676                                                       lntmsg->msg_md->md_iov.kiov,
677                                                       0, lntmsg->msg_md->md_length);
678                 if (rc != 0) {
679                         kranal_tx_done(tx, rc);
680                         return -EIO;
681                 }
682
683                 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
684                 if (tx->tx_lntmsg[1] == NULL) {
685                         CERROR("Can't create reply for GET to %s\n", 
686                                libcfs_nid2str(target.nid));
687                         kranal_tx_done(tx, rc);
688                         return -EIO;
689                 }
690
691                 tx->tx_lntmsg[0] = lntmsg;
692                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
693                 /* rest of tx_msg is setup just before it is sent */
694                 kranal_launch_tx(tx, target.nid);
695                 return 0;
696
697         case LNET_MSG_REPLY:
698         case LNET_MSG_PUT:
699                 if (kiov == NULL &&             /* not paged */
700                     nob <= RANAL_FMA_MAX_DATA && /* small enough */
701                     nob <= *kranal_tunables.kra_max_immediate)
702                         break;                  /* send IMMEDIATE */
703
704                 tx = kranal_new_tx_msg(RANAL_MSG_PUT_REQ);
705                 if (tx == NULL)
706                         return -ENOMEM;
707
708                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
709                 if (rc != 0) {
710                         kranal_tx_done(tx, rc);
711                         return -EIO;
712                 }
713
714                 tx->tx_lntmsg[0] = lntmsg;
715                 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
716                 /* rest of tx_msg is setup just before it is sent */
717                 kranal_launch_tx(tx, target.nid);
718                 return 0;
719         }
720
721         /* send IMMEDIATE */
722
723         LASSERT (kiov == NULL);
724         LASSERT (nob <= RANAL_FMA_MAX_DATA);
725
726         tx = kranal_new_tx_msg(RANAL_MSG_IMMEDIATE);
727         if (tx == NULL)
728                 return -ENOMEM;
729
730         rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
731         if (rc != 0) {
732                 kranal_tx_done(tx, rc);
733                 return -EIO;
734         }
735
736         tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
737         tx->tx_lntmsg[0] = lntmsg;
738         kranal_launch_tx(tx, target.nid);
739         return 0;
740 }
741
742 void
743 kranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg)
744 {
745         kra_msg_t     *rxmsg = conn->rac_rxmsg;
746         unsigned int   niov = lntmsg->msg_niov;
747         struct iovec  *iov = lntmsg->msg_iov;
748         lnet_kiov_t   *kiov = lntmsg->msg_kiov;
749         unsigned int   offset = lntmsg->msg_offset;
750         unsigned int   nob = lntmsg->msg_len;
751         kra_tx_t      *tx;
752         int            rc;
753
754         tx = kranal_get_idle_tx();
755         if (tx == NULL)
756                 goto failed_0;
757
758         rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
759         if (rc != 0)
760                 goto failed_1;
761
762         tx->tx_conn = conn;
763
764         rc = kranal_map_buffer(tx);
765         if (rc != 0)
766                 goto failed_1;
767
768         tx->tx_lntmsg[0] = lntmsg;
769
770         kranal_rdma(tx, RANAL_MSG_GET_DONE,
771                     &rxmsg->ram_u.get.ragm_desc, nob,
772                     rxmsg->ram_u.get.ragm_cookie);
773         return;
774
775  failed_1:
776         kranal_tx_done(tx, -EIO);
777  failed_0:
778         lnet_finalize(ni, lntmsg, -EIO);
779 }
780
781 int
782 kranal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
783                    void **new_private)
784 {
785         kra_conn_t *conn = (kra_conn_t *)private;
786
787         LCONSOLE_ERROR_MSG(0x12b, "Dropping message from %s: no buffers free.\n",
788                            libcfs_nid2str(conn->rac_peer->rap_nid));
789
790         return -EDEADLK;
791 }
792
793 int
794 kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
795              int delayed, unsigned int niov, 
796              struct iovec *iov, lnet_kiov_t *kiov,
797              unsigned int offset, unsigned int mlen, unsigned int rlen)
798 {
799         kra_conn_t  *conn = private;
800         kra_msg_t   *rxmsg = conn->rac_rxmsg;
801         kra_tx_t    *tx;
802         void        *buffer;
803         int          rc;
804
805         LASSERT (mlen <= rlen);
806         LASSERT (!in_interrupt());
807         /* Either all pages or all vaddrs */
808         LASSERT (!(kiov != NULL && iov != NULL));
809
810         CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg);
811
812         switch(rxmsg->ram_type) {
813         default:
814                 LBUG();
815
816         case RANAL_MSG_IMMEDIATE:
817                 if (mlen == 0) {
818                         buffer = NULL;
819                 } else if (kiov != NULL) {
820                         CERROR("Can't recv immediate into paged buffer\n");
821                         return -EIO;
822                 } else {
823                         LASSERT (niov > 0);
824                         while (offset >= iov->iov_len) {
825                                 offset -= iov->iov_len;
826                                 iov++;
827                                 niov--;
828                                 LASSERT (niov > 0);
829                         }
830                         if (mlen > iov->iov_len - offset) {
831                                 CERROR("Can't handle immediate frags\n");
832                                 return -EIO;
833                         }
834                         buffer = ((char *)iov->iov_base) + offset;
835                 }
836                 rc = kranal_consume_rxmsg(conn, buffer, mlen);
837                 lnet_finalize(ni, lntmsg, (rc == 0) ? 0 : -EIO);
838                 return 0;
839
840         case RANAL_MSG_PUT_REQ:
841                 tx = kranal_new_tx_msg(RANAL_MSG_PUT_ACK);
842                 if (tx == NULL) {
843                         kranal_consume_rxmsg(conn, NULL, 0);
844                         return -ENOMEM;
845                 }
846                 
847                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
848                 if (rc != 0) {
849                         kranal_tx_done(tx, rc);
850                         kranal_consume_rxmsg(conn, NULL, 0);
851                         return -EIO;
852                 }
853
854                 tx->tx_conn = conn;
855                 rc = kranal_map_buffer(tx);
856                 if (rc != 0) {
857                         kranal_tx_done(tx, rc);
858                         kranal_consume_rxmsg(conn, NULL, 0);
859                         return -EIO;
860                 }
861
862                 tx->tx_msg.ram_u.putack.rapam_src_cookie =
863                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
864                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
865                 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
866                 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
867                         (__u64)((unsigned long)tx->tx_buffer);
868                 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
869
870                 tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
871
872                 kranal_post_fma(conn, tx);
873                 kranal_consume_rxmsg(conn, NULL, 0);
874                 return 0;
875
876         case RANAL_MSG_GET_REQ:
877                 if (lntmsg != NULL) {
878                         /* Matched! */
879                         kranal_reply(ni, conn, lntmsg);
880                 } else {
881                         /* No match */
882                         tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);
883                         if (tx != NULL) {
884                                 tx->tx_msg.ram_u.completion.racm_cookie =
885                                         rxmsg->ram_u.get.ragm_cookie;
886                                 kranal_post_fma(conn, tx);
887                         }
888                 }
889                 kranal_consume_rxmsg(conn, NULL, 0);
890                 return 0;
891         }
892 }
893
894 int
895 kranal_thread_start(int(*fn)(void *arg), void *arg, char *name)
896 {
897         struct task_struct *task = cfs_thread_run(fn, arg, name);
898
899         if (!IS_ERR(task))
900                 atomic_inc(&kranal_data.kra_nthreads);
901         return PTR_ERR(task);
902 }
903
904 void
905 kranal_thread_fini (void)
906 {
907         atomic_dec(&kranal_data.kra_nthreads);
908 }
909
910 int
911 kranal_check_conn_timeouts (kra_conn_t *conn)
912 {
913         kra_tx_t          *tx;
914         cfs_list_t        *ttmp;
915         unsigned long      flags;
916         long               timeout;
917         unsigned long      now = jiffies;
918
919         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
920                  conn->rac_state == RANAL_CONN_CLOSING);
921
922         if (!conn->rac_close_sent &&
923             cfs_time_aftereq(now, conn->rac_last_tx +
924                              msecs_to_jiffies(conn->rac_keepalive *
925                                               MSEC_PER_SEC))) {
926                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
927                 CDEBUG(D_NET, "Scheduling keepalive %p->%s\n",
928                        conn, libcfs_nid2str(conn->rac_peer->rap_nid));
929                 kranal_schedule_conn(conn);
930         }
931
932         timeout = msecs_to_jiffies(conn->rac_timeout * MSEC_PER_SEC);
933
934         if (!conn->rac_close_recvd &&
935             cfs_time_aftereq(now, conn->rac_last_rx + timeout)) {
936                 CERROR("%s received from %s within %lu seconds\n",
937                        (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
938                        "Nothing" : "CLOSE not",
939                        libcfs_nid2str(conn->rac_peer->rap_nid),
940                        jiffies_to_msecs(now - conn->rac_last_rx)/MSEC_PER_SEC);
941                 return -ETIMEDOUT;
942         }
943
944         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
945                 return 0;
946
947         /* Check the conn's queues are moving.  These are "belt+braces" checks,
948          * in case of hardware/software errors that make this conn seem
949          * responsive even though it isn't progressing its message queues. */
950
951         spin_lock_irqsave(&conn->rac_lock, flags);
952
953         cfs_list_for_each (ttmp, &conn->rac_fmaq) {
954                 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
955
956                 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
957                         spin_unlock_irqrestore(&conn->rac_lock, flags);
958                         CERROR("tx on fmaq for %s blocked %lu seconds\n",
959                                libcfs_nid2str(conn->rac_peer->rap_nid),
960                                jiffies_to_msecs(now-tx->tx_qtime)/MSEC_PER_SEC);
961                         return -ETIMEDOUT;
962                 }
963         }
964
965         cfs_list_for_each (ttmp, &conn->rac_rdmaq) {
966                 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
967
968                 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
969                         spin_unlock_irqrestore(&conn->rac_lock, flags);
970                         CERROR("tx on rdmaq for %s blocked %lu seconds\n",
971                                libcfs_nid2str(conn->rac_peer->rap_nid),
972                                jiffies_to_msecs(now-tx->tx_qtime)/MSEC_PER_SEC);
973                         return -ETIMEDOUT;
974                 }
975         }
976
977         cfs_list_for_each (ttmp, &conn->rac_replyq) {
978                 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
979
980                 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
981                         spin_unlock_irqrestore(&conn->rac_lock, flags);
982                         CERROR("tx on replyq for %s blocked %lu seconds\n",
983                                libcfs_nid2str(conn->rac_peer->rap_nid),
984                                jiffies_to_msecs(now-tx->tx_qtime)/MSEC_PER_SEC);
985                         return -ETIMEDOUT;
986                 }
987         }
988
989         spin_unlock_irqrestore(&conn->rac_lock, flags);
990         return 0;
991 }
992
993 void
994 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
995 {
996         cfs_list_t        *conns = &kranal_data.kra_conns[idx];
997         cfs_list_t        *ctmp;
998         kra_conn_t        *conn;
999         unsigned long      flags;
1000         int                rc;
1001
1002  again:
1003         /* NB. We expect to check all the conns and not find any problems, so
1004          * we just use a shared lock while we take a look... */
1005         read_lock(&kranal_data.kra_global_lock);
1006
1007         cfs_list_for_each (ctmp, conns) {
1008                 conn = cfs_list_entry(ctmp, kra_conn_t, rac_hashlist);
1009
1010                 if (conn->rac_timeout < *min_timeoutp )
1011                         *min_timeoutp = conn->rac_timeout;
1012                 if (conn->rac_keepalive < *min_timeoutp )
1013                         *min_timeoutp = conn->rac_keepalive;
1014
1015                 rc = kranal_check_conn_timeouts(conn);
1016                 if (rc == 0)
1017                         continue;
1018
1019                 kranal_conn_addref(conn);
1020                 read_unlock(&kranal_data.kra_global_lock);
1021
1022                 CERROR("Conn to %s, cqid %d timed out\n",
1023                        libcfs_nid2str(conn->rac_peer->rap_nid), 
1024                        conn->rac_cqid);
1025
1026                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1027
1028                 switch (conn->rac_state) {
1029                 default:
1030                         LBUG();
1031
1032                 case RANAL_CONN_ESTABLISHED:
1033                         kranal_close_conn_locked(conn, -ETIMEDOUT);
1034                         break;
1035
1036                 case RANAL_CONN_CLOSING:
1037                         kranal_terminate_conn_locked(conn);
1038                         break;
1039                 }
1040
1041                 write_unlock_irqrestore(&kranal_data.kra_global_lock,
1042                                             flags);
1043
1044                 kranal_conn_decref(conn);
1045
1046                 /* start again now I've dropped the lock */
1047                 goto again;
1048         }
1049
1050         read_unlock(&kranal_data.kra_global_lock);
1051 }
1052
1053 int
1054 kranal_connd (void *arg)
1055 {
1056         long               id = (long)arg;
1057         wait_queue_t     wait;
1058         unsigned long      flags;
1059         kra_peer_t        *peer;
1060         kra_acceptsock_t  *ras;
1061         int                did_something;
1062
1063         cfs_block_allsigs();
1064
1065         init_waitqueue_entry_current(&wait);
1066
1067         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1068
1069         while (!kranal_data.kra_shutdown) {
1070                 did_something = 0;
1071
1072                 if (!cfs_list_empty(&kranal_data.kra_connd_acceptq)) {
1073                         ras = cfs_list_entry(kranal_data.kra_connd_acceptq.next,
1074                                              kra_acceptsock_t, ras_list);
1075                         cfs_list_del(&ras->ras_list);
1076
1077                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
1078                                                    flags);
1079
1080                         CDEBUG(D_NET,"About to handshake someone\n");
1081
1082                         kranal_conn_handshake(ras->ras_sock, NULL);
1083                         kranal_free_acceptsock(ras);
1084
1085                         CDEBUG(D_NET,"Finished handshaking someone\n");
1086
1087                         spin_lock_irqsave(&kranal_data.kra_connd_lock,
1088                                               flags);
1089                         did_something = 1;
1090                 }
1091
1092                 if (!cfs_list_empty(&kranal_data.kra_connd_peers)) {
1093                         peer = cfs_list_entry(kranal_data.kra_connd_peers.next,
1094                                               kra_peer_t, rap_connd_list);
1095
1096                         cfs_list_del_init(&peer->rap_connd_list);
1097                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
1098                                                    flags);
1099
1100                         kranal_connect(peer);
1101                         kranal_peer_decref(peer);
1102
1103                         spin_lock_irqsave(&kranal_data.kra_connd_lock,
1104                                               flags);
1105                         did_something = 1;
1106                 }
1107
1108                 if (did_something)
1109                         continue;
1110
1111                 set_current_state(TASK_INTERRUPTIBLE);
1112                 add_wait_queue_exclusive(&kranal_data.kra_connd_waitq, &wait);
1113
1114                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1115
1116                 waitq_wait(&wait, TASK_INTERRUPTIBLE);
1117
1118                 set_current_state(TASK_RUNNING);
1119                 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1120
1121                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1122         }
1123
1124         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1125
1126         kranal_thread_fini();
1127         return 0;
1128 }
1129
1130 void
1131 kranal_update_reaper_timeout(long timeout)
1132 {
1133         unsigned long   flags;
1134
1135         LASSERT (timeout > 0);
1136
1137         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1138
1139         if (timeout < kranal_data.kra_new_min_timeout)
1140                 kranal_data.kra_new_min_timeout = timeout;
1141
1142         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1143 }
1144
1145 int
1146 kranal_reaper (void *arg)
1147 {
1148         wait_queue_t     wait;
1149         unsigned long      flags;
1150         long               timeout;
1151         int                i;
1152         int                conn_entries = kranal_data.kra_conn_hash_size;
1153         int                conn_index = 0;
1154         int                base_index = conn_entries - 1;
1155         unsigned long      next_check_time = jiffies;
1156         long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1157         long               current_min_timeout = 1;
1158
1159         cfs_block_allsigs();
1160
1161         init_waitqueue_entry_current(&wait);
1162
1163         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1164
1165         while (!kranal_data.kra_shutdown) {
1166                 /* I wake up every 'p' seconds to check for timeouts on some
1167                  * more peers.  I try to check every connection 'n' times
1168                  * within the global minimum of all keepalive and timeout
1169                  * intervals, to ensure I attend to every connection within
1170                  * (n+1)/n times its timeout intervals. */
1171                 const int     p = 1;
1172                 const int     n = 3;
1173                 unsigned long min_timeout;
1174                 int           chunk;
1175
1176                 /* careful with the jiffy wrap... */
1177                 timeout = (long)(next_check_time - jiffies);
1178                 if (timeout > 0) {
1179                         set_current_state(TASK_INTERRUPTIBLE);
1180                         add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1181
1182                         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1183                                                    flags);
1184
1185                         waitq_timedwait(&wait, TASK_INTERRUPTIBLE,
1186                                             timeout);
1187
1188                         spin_lock_irqsave(&kranal_data.kra_reaper_lock,
1189                                               flags);
1190
1191                         set_current_state(TASK_RUNNING);
1192                         remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1193                         continue;
1194                 }
1195
1196                 if (kranal_data.kra_new_min_timeout !=
1197                     MAX_SCHEDULE_TIMEOUT) {
1198                         /* new min timeout set: restart min timeout scan */
1199                         next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1200                         base_index = conn_index - 1;
1201                         if (base_index < 0)
1202                                 base_index = conn_entries - 1;
1203
1204                         if (kranal_data.kra_new_min_timeout <
1205                             current_min_timeout) {
1206                                 current_min_timeout =
1207                                         kranal_data.kra_new_min_timeout;
1208                                 CDEBUG(D_NET, "Set new min timeout %ld\n",
1209                                        current_min_timeout);
1210                         }
1211
1212                         kranal_data.kra_new_min_timeout =
1213                                 MAX_SCHEDULE_TIMEOUT;
1214                 }
1215                 min_timeout = current_min_timeout;
1216
1217                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1218
1219                 LASSERT (min_timeout > 0);
1220
1221                 /* Compute how many table entries to check now so I get round
1222                  * the whole table fast enough given that I do this at fixed
1223                  * intervals of 'p' seconds) */
1224                 chunk = conn_entries;
1225                 if (min_timeout > n * p)
1226                         chunk = (chunk * n * p) / min_timeout;
1227                 if (chunk == 0)
1228                         chunk = 1;
1229
1230                 for (i = 0; i < chunk; i++) {
1231                         kranal_reaper_check(conn_index,
1232                                             &next_min_timeout);
1233                         conn_index = (conn_index + 1) % conn_entries;
1234                 }
1235
1236                 next_check_time += msecs_to_jiffies(p * MSEC_PER_SEC);
1237
1238                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1239
1240                 if (((conn_index - chunk <= base_index &&
1241                       base_index < conn_index) ||
1242                      (conn_index - conn_entries - chunk <= base_index &&
1243                       base_index < conn_index - conn_entries))) {
1244
1245                         /* Scanned all conns: set current_min_timeout... */
1246                         if (current_min_timeout != next_min_timeout) {
1247                                 current_min_timeout = next_min_timeout;
1248                                 CDEBUG(D_NET, "Set new min timeout %ld\n",
1249                                        current_min_timeout);
1250                         }
1251
1252                         /* ...and restart min timeout scan */
1253                         next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1254                         base_index = conn_index - 1;
1255                         if (base_index < 0)
1256                                 base_index = conn_entries - 1;
1257                 }
1258         }
1259
1260         kranal_thread_fini();
1261         return 0;
1262 }
1263
1264 void
1265 kranal_check_rdma_cq (kra_device_t *dev)
1266 {
1267         kra_conn_t          *conn;
1268         kra_tx_t            *tx;
1269         RAP_RETURN           rrc;
1270         unsigned long        flags;
1271         RAP_RDMA_DESCRIPTOR *desc;
1272         __u32                cqid;
1273         __u32                event_type;
1274
1275         for (;;) {
1276                 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1277                 if (rrc == RAP_NOT_DONE) {
1278                         CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
1279                         return;
1280                 }
1281
1282                 LASSERT (rrc == RAP_SUCCESS);
1283                 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1284
1285                 read_lock(&kranal_data.kra_global_lock);
1286
1287                 conn = kranal_cqid2conn_locked(cqid);
1288                 if (conn == NULL) {
1289                         /* Conn was destroyed? */
1290                         CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
1291                         read_unlock(&kranal_data.kra_global_lock);
1292                         continue;
1293                 }
1294
1295                 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1296                 LASSERT (rrc == RAP_SUCCESS);
1297
1298                 CDEBUG(D_NET, "Completed %p\n",
1299                        cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
1300
1301                 spin_lock_irqsave(&conn->rac_lock, flags);
1302
1303                 LASSERT (!cfs_list_empty(&conn->rac_rdmaq));
1304                 tx = cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1305                 cfs_list_del(&tx->tx_list);
1306
1307                 LASSERT(desc->AppPtr == (void *)tx);
1308                 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1309                         tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1310
1311                 cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1312                 tx->tx_qtime = jiffies;
1313
1314                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1315
1316                 /* Get conn's fmaq processed, now I've just put something
1317                  * there */
1318                 kranal_schedule_conn(conn);
1319
1320                 read_unlock(&kranal_data.kra_global_lock);
1321         }
1322 }
1323
1324 void
1325 kranal_check_fma_cq (kra_device_t *dev)
1326 {
1327         kra_conn_t         *conn;
1328         RAP_RETURN          rrc;
1329         __u32               cqid;
1330         __u32               event_type;
1331         cfs_list_t         *conns;
1332         cfs_list_t         *tmp;
1333         int                 i;
1334
1335         for (;;) {
1336                 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1337                 if (rrc == RAP_NOT_DONE) {
1338                         CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
1339                         return;
1340                 }
1341
1342                 LASSERT (rrc == RAP_SUCCESS);
1343
1344                 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1345
1346                         read_lock(&kranal_data.kra_global_lock);
1347
1348                         conn = kranal_cqid2conn_locked(cqid);
1349                         if (conn == NULL) {
1350                                 CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
1351                                        cqid);
1352                         } else {
1353                                 CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
1354                                        conn, cqid);
1355                                 kranal_schedule_conn(conn);
1356                         }
1357
1358                         read_unlock(&kranal_data.kra_global_lock);
1359                         continue;
1360                 }
1361
1362                 /* FMA CQ has overflowed: check ALL conns */
1363                 CWARN("FMA CQ overflow: scheduling ALL conns on device %d\n", 
1364                       dev->rad_id);
1365
1366                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1367
1368                         read_lock(&kranal_data.kra_global_lock);
1369
1370                         conns = &kranal_data.kra_conns[i];
1371
1372                         cfs_list_for_each (tmp, conns) {
1373                                 conn = cfs_list_entry(tmp, kra_conn_t,
1374                                                       rac_hashlist);
1375
1376                                 if (conn->rac_device == dev)
1377                                         kranal_schedule_conn(conn);
1378                         }
1379
1380                         /* don't block write lockers for too long... */
1381                         read_unlock(&kranal_data.kra_global_lock);
1382                 }
1383         }
1384 }
1385
1386 int
1387 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1388                void *immediate, int immediatenob)
1389 {
1390         int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1391         RAP_RETURN rrc;
1392
1393         CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
1394                conn, msg, msg->ram_type, sync ? "(sync)" : "",
1395                immediate, immediatenob);
1396
1397         LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1398         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1399                  immediatenob <= RANAL_FMA_MAX_DATA :
1400                  immediatenob == 0);
1401
1402         msg->ram_connstamp = conn->rac_my_connstamp;
1403         msg->ram_seq = conn->rac_tx_seq;
1404
1405         if (sync)
1406                 rrc = RapkFmaSyncSend(conn->rac_rihandle,
1407                                       immediate, immediatenob,
1408                                       msg, sizeof(*msg));
1409         else
1410                 rrc = RapkFmaSend(conn->rac_rihandle,
1411                                   immediate, immediatenob,
1412                                   msg, sizeof(*msg));
1413
1414         switch (rrc) {
1415         default:
1416                 LBUG();
1417
1418         case RAP_SUCCESS:
1419                 conn->rac_last_tx = jiffies;
1420                 conn->rac_tx_seq++;
1421                 return 0;
1422
1423         case RAP_NOT_DONE:
1424                 if (cfs_time_aftereq(jiffies,
1425                                      conn->rac_last_tx +
1426                                      msecs_to_jiffies(conn->rac_keepalive *
1427                                                       MSEC_PER_SEC)))
1428                         CWARN("EAGAIN sending %02x (idle %lu secs)\n",
1429                               msg->ram_type,
1430                               jiffies_to_msecs(jiffies - conn->rac_last_tx) /
1431                               MSEC_PER_SEC);
1432                 return -EAGAIN;
1433         }
1434 }
1435
1436 void
1437 kranal_process_fmaq (kra_conn_t *conn)
1438 {
1439         unsigned long flags;
1440         int           more_to_do;
1441         kra_tx_t     *tx;
1442         int           rc;
1443         int           expect_reply;
1444
1445         /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1446          *       However I will be rescheduled by an FMA completion event
1447          *       when I eventually get some.
1448          * NB 2. Sampling rac_state here races with setting it elsewhere.
1449          *       But it doesn't matter if I try to send a "real" message just
1450          *       as I start closing because I'll get scheduled to send the
1451          *       close anyway. */
1452
1453         /* Not racing with incoming message processing! */
1454         LASSERT (current == conn->rac_device->rad_scheduler);
1455
1456         if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1457                 if (!cfs_list_empty(&conn->rac_rdmaq)) {
1458                         /* RDMAs in progress */
1459                         LASSERT (!conn->rac_close_sent);
1460
1461                         if (cfs_time_aftereq(jiffies,
1462                                              conn->rac_last_tx +
1463                                              msecs_to_jiffies(conn->rac_keepalive *
1464                                                               MSEC_PER_SEC))) {
1465                                 CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
1466                                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1467                                 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1468                         }
1469                         return;
1470                 }
1471
1472                 if (conn->rac_close_sent)
1473                         return;
1474
1475                 CWARN("sending CLOSE to %s\n", 
1476                       libcfs_nid2str(conn->rac_peer->rap_nid));
1477                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1478                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1479                 if (rc != 0)
1480                         return;
1481
1482                 conn->rac_close_sent = 1;
1483                 if (!conn->rac_close_recvd)
1484                         return;
1485
1486                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1487
1488                 if (conn->rac_state == RANAL_CONN_CLOSING)
1489                         kranal_terminate_conn_locked(conn);
1490
1491                 write_unlock_irqrestore(&kranal_data.kra_global_lock,
1492                                             flags);
1493                 return;
1494         }
1495
1496         spin_lock_irqsave(&conn->rac_lock, flags);
1497
1498         if (cfs_list_empty(&conn->rac_fmaq)) {
1499
1500                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1501
1502                 if (cfs_time_aftereq(jiffies,
1503                                      conn->rac_last_tx +
1504                                      msecs_to_jiffies(conn->rac_keepalive *
1505                                                       MSEC_PER_SEC))) {
1506                         CDEBUG(D_NET, "sending NOOP -> %s (%p idle %lu(%ld))\n",
1507                                libcfs_nid2str(conn->rac_peer->rap_nid), conn,
1508                                jiffies_to_msecs(jiffies - conn->rac_last_tx) /
1509                                MSEC_PER_SEC,
1510                                conn->rac_keepalive);
1511                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1512                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1513                 }
1514                 return;
1515         }
1516
1517         tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1518         cfs_list_del(&tx->tx_list);
1519         more_to_do = !cfs_list_empty(&conn->rac_fmaq);
1520
1521         spin_unlock_irqrestore(&conn->rac_lock, flags);
1522
1523         expect_reply = 0;
1524         CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
1525                tx, tx->tx_msg.ram_type, tx->tx_cookie);
1526         switch (tx->tx_msg.ram_type) {
1527         default:
1528                 LBUG();
1529
1530         case RANAL_MSG_IMMEDIATE:
1531                 rc = kranal_sendmsg(conn, &tx->tx_msg,
1532                                     tx->tx_buffer, tx->tx_nob);
1533                 break;
1534
1535         case RANAL_MSG_PUT_NAK:
1536         case RANAL_MSG_PUT_DONE:
1537         case RANAL_MSG_GET_NAK:
1538         case RANAL_MSG_GET_DONE:
1539                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1540                 break;
1541
1542         case RANAL_MSG_PUT_REQ:
1543                 rc = kranal_map_buffer(tx);
1544                 LASSERT (rc != -EAGAIN);
1545                 if (rc != 0)
1546                         break;
1547
1548                 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1549                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1550                 expect_reply = 1;
1551                 break;
1552
1553         case RANAL_MSG_PUT_ACK:
1554                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1555                 expect_reply = 1;
1556                 break;
1557
1558         case RANAL_MSG_GET_REQ:
1559                 rc = kranal_map_buffer(tx);
1560                 LASSERT (rc != -EAGAIN);
1561                 if (rc != 0)
1562                         break;
1563
1564                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1565                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1566                 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1567                         (__u64)((unsigned long)tx->tx_buffer);
1568                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1569                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1570                 expect_reply = 1;
1571                 break;
1572         }
1573
1574         if (rc == -EAGAIN) {
1575                 /* I need credits to send this.  Replace tx at the head of the
1576                  * fmaq and I'll get rescheduled when credits appear */
1577                 CDEBUG(D_NET, "EAGAIN on %p\n", conn);
1578                 spin_lock_irqsave(&conn->rac_lock, flags);
1579                 cfs_list_add(&tx->tx_list, &conn->rac_fmaq);
1580                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1581                 return;
1582         }
1583
1584         if (!expect_reply || rc != 0) {
1585                 kranal_tx_done(tx, rc);
1586         } else {
1587                 /* LASSERT(current) above ensures this doesn't race with reply
1588                  * processing */
1589                 spin_lock_irqsave(&conn->rac_lock, flags);
1590                 cfs_list_add_tail(&tx->tx_list, &conn->rac_replyq);
1591                 tx->tx_qtime = jiffies;
1592                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1593         }
1594
1595         if (more_to_do) {
1596                 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
1597                 kranal_schedule_conn(conn);
1598         }
1599 }
1600
1601 static inline void
1602 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1603 {
1604         __swab64s(&d->rard_key.Key);
1605         __swab16s(&d->rard_key.Cookie);
1606         __swab16s(&d->rard_key.MdHandle);
1607         __swab32s(&d->rard_key.Flags);
1608         __swab64s(&d->rard_addr.AddressBits);
1609         __swab32s(&d->rard_nob);
1610 }
1611
1612 kra_tx_t *
1613 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1614 {
1615         cfs_list_t       *ttmp;
1616         kra_tx_t         *tx;
1617         unsigned long     flags;
1618
1619         spin_lock_irqsave(&conn->rac_lock, flags);
1620
1621         cfs_list_for_each(ttmp, &conn->rac_replyq) {
1622                 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
1623
1624                 CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
1625                        tx, tx->tx_msg.ram_type, tx->tx_cookie);
1626
1627                 if (tx->tx_cookie != cookie)
1628                         continue;
1629
1630                 if (tx->tx_msg.ram_type != type) {
1631                         spin_unlock_irqrestore(&conn->rac_lock, flags);
1632                         CWARN("Unexpected type %x (%x expected) "
1633                               "matched reply from %s\n",
1634                               tx->tx_msg.ram_type, type,
1635                               libcfs_nid2str(conn->rac_peer->rap_nid));
1636                         return NULL;
1637                 }
1638
1639                 cfs_list_del(&tx->tx_list);
1640                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1641                 return tx;
1642         }
1643
1644         spin_unlock_irqrestore(&conn->rac_lock, flags);
1645         CWARN("Unmatched reply %02x/"LPX64" from %s\n",
1646               type, cookie, libcfs_nid2str(conn->rac_peer->rap_nid));
1647         return NULL;
1648 }
1649
1650 void
1651 kranal_check_fma_rx (kra_conn_t *conn)
1652 {
1653         unsigned long flags;
1654         __u32         seq;
1655         kra_tx_t     *tx;
1656         kra_msg_t    *msg;
1657         void         *prefix;
1658         RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1659         kra_peer_t   *peer = conn->rac_peer;
1660         int           rc = 0;
1661         int           repost = 1;
1662
1663         if (rrc == RAP_NOT_DONE)
1664                 return;
1665
1666         CDEBUG(D_NET, "RX on %p\n", conn);
1667
1668         LASSERT (rrc == RAP_SUCCESS);
1669         conn->rac_last_rx = jiffies;
1670         seq = conn->rac_rx_seq++;
1671         msg = (kra_msg_t *)prefix;
1672
1673         /* stash message for portals callbacks they'll NULL
1674          * rac_rxmsg if they consume it */
1675         LASSERT (conn->rac_rxmsg == NULL);
1676         conn->rac_rxmsg = msg;
1677
1678         if (msg->ram_magic != RANAL_MSG_MAGIC) {
1679                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1680                         CERROR("Unexpected magic %08x from %s\n",
1681                                msg->ram_magic, libcfs_nid2str(peer->rap_nid));
1682                         rc = -EPROTO;
1683                         goto out;
1684                 }
1685
1686                 __swab32s(&msg->ram_magic);
1687                 __swab16s(&msg->ram_version);
1688                 __swab16s(&msg->ram_type);
1689                 __swab64s(&msg->ram_srcnid);
1690                 __swab64s(&msg->ram_connstamp);
1691                 __swab32s(&msg->ram_seq);
1692
1693                 /* NB message type checked below; NOT here... */
1694                 switch (msg->ram_type) {
1695                 case RANAL_MSG_PUT_ACK:
1696                         kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1697                         break;
1698
1699                 case RANAL_MSG_GET_REQ:
1700                         kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1701                         break;
1702
1703                 default:
1704                         break;
1705                 }
1706         }
1707
1708         if (msg->ram_version != RANAL_MSG_VERSION) {
1709                 CERROR("Unexpected protocol version %d from %s\n",
1710                        msg->ram_version, libcfs_nid2str(peer->rap_nid));
1711                 rc = -EPROTO;
1712                 goto out;
1713         }
1714
1715         if (msg->ram_srcnid != peer->rap_nid) {
1716                 CERROR("Unexpected peer %s from %s\n",
1717                        libcfs_nid2str(msg->ram_srcnid), 
1718                        libcfs_nid2str(peer->rap_nid));
1719                 rc = -EPROTO;
1720                 goto out;
1721         }
1722
1723         if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1724                 CERROR("Unexpected connstamp "LPX64"("LPX64
1725                        " expected) from %s\n",
1726                        msg->ram_connstamp, conn->rac_peer_connstamp,
1727                        libcfs_nid2str(peer->rap_nid));
1728                 rc = -EPROTO;
1729                 goto out;
1730         }
1731
1732         if (msg->ram_seq != seq) {
1733                 CERROR("Unexpected sequence number %d(%d expected) from %s\n",
1734                        msg->ram_seq, seq, libcfs_nid2str(peer->rap_nid));
1735                 rc = -EPROTO;
1736                 goto out;
1737         }
1738
1739         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1740                 /* This message signals RDMA completion... */
1741                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1742                 if (rrc != RAP_SUCCESS) {
1743                         CERROR("RapkFmaSyncWait failed: %d\n", rrc);
1744                         rc = -ENETDOWN;
1745                         goto out;
1746                 }
1747         }
1748
1749         if (conn->rac_close_recvd) {
1750                 CERROR("Unexpected message %d after CLOSE from %s\n",
1751                        msg->ram_type, libcfs_nid2str(conn->rac_peer->rap_nid));
1752                 rc = -EPROTO;
1753                 goto out;
1754         }
1755
1756         if (msg->ram_type == RANAL_MSG_CLOSE) {
1757                 CWARN("RX CLOSE from %s\n", libcfs_nid2str(conn->rac_peer->rap_nid));
1758                 conn->rac_close_recvd = 1;
1759                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1760
1761                 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1762                         kranal_close_conn_locked(conn, 0);
1763                 else if (conn->rac_state == RANAL_CONN_CLOSING &&
1764                          conn->rac_close_sent)
1765                         kranal_terminate_conn_locked(conn);
1766
1767                 write_unlock_irqrestore(&kranal_data.kra_global_lock,
1768                                             flags);
1769                 goto out;
1770         }
1771
1772         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1773                 goto out;
1774
1775         switch (msg->ram_type) {
1776         case RANAL_MSG_NOOP:
1777                 /* Nothing to do; just a keepalive */
1778                 CDEBUG(D_NET, "RX NOOP on %p\n", conn);
1779                 break;
1780
1781         case RANAL_MSG_IMMEDIATE:
1782                 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
1783                 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr, 
1784                                 msg->ram_srcnid, conn, 0);
1785                 repost = rc < 0;
1786                 break;
1787
1788         case RANAL_MSG_PUT_REQ:
1789                 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
1790                 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr, 
1791                                 msg->ram_srcnid, conn, 1);
1792                 repost = rc < 0;
1793                 break;
1794
1795         case RANAL_MSG_PUT_NAK:
1796                 CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
1797                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1798                                         msg->ram_u.completion.racm_cookie);
1799                 if (tx == NULL)
1800                         break;
1801
1802                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1803                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1804                 kranal_tx_done(tx, -ENOENT);    /* no match */
1805                 break;
1806
1807         case RANAL_MSG_PUT_ACK:
1808                 CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
1809                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1810                                         msg->ram_u.putack.rapam_src_cookie);
1811                 if (tx == NULL)
1812                         break;
1813
1814                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1815                             &msg->ram_u.putack.rapam_desc,
1816                             msg->ram_u.putack.rapam_desc.rard_nob,
1817                             msg->ram_u.putack.rapam_dst_cookie);
1818                 break;
1819
1820         case RANAL_MSG_PUT_DONE:
1821                 CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
1822                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1823                                         msg->ram_u.completion.racm_cookie);
1824                 if (tx == NULL)
1825                         break;
1826
1827                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1828                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1829                 kranal_tx_done(tx, 0);
1830                 break;
1831
1832         case RANAL_MSG_GET_REQ:
1833                 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
1834                 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr, 
1835                                 msg->ram_srcnid, conn, 1);
1836                 repost = rc < 0;
1837                 break;
1838
1839         case RANAL_MSG_GET_NAK:
1840                 CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
1841                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1842                                         msg->ram_u.completion.racm_cookie);
1843                 if (tx == NULL)
1844                         break;
1845
1846                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1847                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1848                 kranal_tx_done(tx, -ENOENT);    /* no match */
1849                 break;
1850
1851         case RANAL_MSG_GET_DONE:
1852                 CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
1853                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1854                                         msg->ram_u.completion.racm_cookie);
1855                 if (tx == NULL)
1856                         break;
1857
1858                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1859                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1860 #if 0
1861                 /* completion message should send rdma length if we ever allow
1862                  * GET truncation */
1863                 lnet_set_reply_msg_len(kranal_data.kra_ni, tx->tx_lntmsg[1], ???);
1864 #endif
1865                 kranal_tx_done(tx, 0);
1866                 break;
1867         }
1868
1869  out:
1870         if (rc < 0)                             /* protocol/comms error */
1871                 kranal_close_conn (conn, rc);
1872
1873         if (repost && conn->rac_rxmsg != NULL)
1874                 kranal_consume_rxmsg(conn, NULL, 0);
1875
1876         /* check again later */
1877         kranal_schedule_conn(conn);
1878 }
1879
1880 void
1881 kranal_complete_closed_conn (kra_conn_t *conn)
1882 {
1883         kra_tx_t   *tx;
1884         int         nfma;
1885         int         nreplies;
1886
1887         LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1888         LASSERT (cfs_list_empty(&conn->rac_list));
1889         LASSERT (cfs_list_empty(&conn->rac_hashlist));
1890
1891         for (nfma = 0; !cfs_list_empty(&conn->rac_fmaq); nfma++) {
1892                 tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1893
1894                 cfs_list_del(&tx->tx_list);
1895                 kranal_tx_done(tx, -ECONNABORTED);
1896         }
1897
1898         LASSERT (cfs_list_empty(&conn->rac_rdmaq));
1899
1900         for (nreplies = 0; !cfs_list_empty(&conn->rac_replyq); nreplies++) {
1901                 tx = cfs_list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1902
1903                 cfs_list_del(&tx->tx_list);
1904                 kranal_tx_done(tx, -ECONNABORTED);
1905         }
1906
1907         CWARN("Closed conn %p -> %s: nmsg %d nreplies %d\n",
1908                conn, libcfs_nid2str(conn->rac_peer->rap_nid), nfma, nreplies);
1909 }
1910
1911 int kranal_process_new_conn (kra_conn_t *conn)
1912 {
1913         RAP_RETURN   rrc;
1914
1915         rrc = RapkCompleteSync(conn->rac_rihandle, 1);
1916         if (rrc == RAP_SUCCESS)
1917                 return 0;
1918
1919         LASSERT (rrc == RAP_NOT_DONE);
1920         if (!cfs_time_aftereq(jiffies, conn->rac_last_tx +
1921                               msecs_to_jiffies(conn->rac_timeout*MSEC_PER_SEC)))
1922                 return -EAGAIN;
1923
1924         /* Too late */
1925         rrc = RapkCompleteSync(conn->rac_rihandle, 0);
1926         LASSERT (rrc == RAP_SUCCESS);
1927         return -ETIMEDOUT;
1928 }
1929
1930 int
1931 kranal_scheduler (void *arg)
1932 {
1933         kra_device_t     *dev = (kra_device_t *)arg;
1934         wait_queue_t    wait;
1935         kra_conn_t       *conn;
1936         unsigned long     flags;
1937         unsigned long     deadline;
1938         unsigned long     soonest;
1939         int               nsoonest;
1940         long              timeout;
1941         cfs_list_t       *tmp;
1942         cfs_list_t       *nxt;
1943         int               rc;
1944         int               dropped_lock;
1945         int               busy_loops = 0;
1946
1947         cfs_block_allsigs();
1948
1949         dev->rad_scheduler = current;
1950         init_waitqueue_entry_current(&wait);
1951
1952         spin_lock_irqsave(&dev->rad_lock, flags);
1953
1954         while (!kranal_data.kra_shutdown) {
1955                 /* Safe: kra_shutdown only set when quiescent */
1956
1957                 if (busy_loops++ >= RANAL_RESCHED) {
1958                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1959
1960                         cond_resched();
1961                         busy_loops = 0;
1962
1963                         spin_lock_irqsave(&dev->rad_lock, flags);
1964                 }
1965
1966                 dropped_lock = 0;
1967
1968                 if (dev->rad_ready) {
1969                         /* Device callback fired since I last checked it */
1970                         dev->rad_ready = 0;
1971                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1972                         dropped_lock = 1;
1973
1974                         kranal_check_rdma_cq(dev);
1975                         kranal_check_fma_cq(dev);
1976
1977                         spin_lock_irqsave(&dev->rad_lock, flags);
1978                 }
1979
1980                 cfs_list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
1981                         conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
1982
1983                         cfs_list_del_init(&conn->rac_schedlist);
1984                         LASSERT (conn->rac_scheduled);
1985                         conn->rac_scheduled = 0;
1986                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1987                         dropped_lock = 1;
1988
1989                         kranal_check_fma_rx(conn);
1990                         kranal_process_fmaq(conn);
1991
1992                         if (conn->rac_state == RANAL_CONN_CLOSED)
1993                                 kranal_complete_closed_conn(conn);
1994
1995                         kranal_conn_decref(conn);
1996                         spin_lock_irqsave(&dev->rad_lock, flags);
1997                 }
1998
1999                 nsoonest = 0;
2000                 soonest = jiffies;
2001
2002                 cfs_list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
2003                         conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
2004
2005                         deadline = conn->rac_last_tx + conn->rac_keepalive;
2006                         if (cfs_time_aftereq(jiffies, deadline)) {
2007                                 /* Time to process this new conn */
2008                                 spin_unlock_irqrestore(&dev->rad_lock,
2009                                                            flags);
2010                                 dropped_lock = 1;
2011
2012                                 rc = kranal_process_new_conn(conn);
2013                                 if (rc != -EAGAIN) {
2014                                         /* All done with this conn */
2015                                         spin_lock_irqsave(&dev->rad_lock,
2016                                                               flags);
2017                                         cfs_list_del_init(&conn->rac_schedlist);
2018                                         spin_unlock_irqrestore(&dev-> \
2019                                                                    rad_lock,
2020                                                                    flags);
2021
2022                                         kranal_conn_decref(conn);
2023                                         spin_lock_irqsave(&dev->rad_lock,
2024                                                               flags);
2025                                         continue;
2026                                 }
2027
2028                                 /* retry with exponential backoff until HZ */
2029                                 if (conn->rac_keepalive == 0)
2030                                         conn->rac_keepalive = 1;
2031                                 else if (conn->rac_keepalive <=
2032                                          msecs_to_jiffies(MSEC_PER_SEC))
2033                                         conn->rac_keepalive *= 2;
2034                                 else
2035                                         conn->rac_keepalive +=
2036                                                 msecs_to_jiffies(MSEC_PER_SEC);
2037
2038                                 deadline = conn->rac_last_tx + conn->rac_keepalive;
2039                                 spin_lock_irqsave(&dev->rad_lock, flags);
2040                         }
2041
2042                         /* Does this conn need attention soonest? */
2043                         if (nsoonest++ == 0 ||
2044                             !cfs_time_aftereq(deadline, soonest))
2045                                 soonest = deadline;
2046                 }
2047
2048                 if (dropped_lock)               /* may sleep iff I didn't drop the lock */
2049                         continue;
2050
2051                 set_current_state(TASK_INTERRUPTIBLE);
2052                 add_wait_queue_exclusive(&dev->rad_waitq, &wait);
2053                 spin_unlock_irqrestore(&dev->rad_lock, flags);
2054
2055                 if (nsoonest == 0) {
2056                         busy_loops = 0;
2057                         waitq_wait(&wait, TASK_INTERRUPTIBLE);
2058                 } else {
2059                         timeout = (long)(soonest - jiffies);
2060                         if (timeout > 0) {
2061                                 busy_loops = 0;
2062                                 waitq_timedwait(&wait,
2063                                                     TASK_INTERRUPTIBLE,
2064                                                     timeout);
2065                         }
2066                 }
2067
2068                 remove_wait_queue(&dev->rad_waitq, &wait);
2069                 set_current_state(TASK_RUNNING);
2070                 spin_lock_irqsave(&dev->rad_lock, flags);
2071         }
2072
2073         spin_unlock_irqrestore(&dev->rad_lock, flags);
2074
2075         dev->rad_scheduler = NULL;
2076         kranal_thread_fini();
2077         return 0;
2078 }