Whamcloud - gitweb
82ab32435e8c69dc9834972817a78390189ad9d8
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ralnd/ralnd_cb.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40
41 #include "ralnd.h"
42
43 void
44 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
45 {
46         kra_device_t *dev;
47         int           i;
48         unsigned long flags;
49
50         CDEBUG(D_NET, "callback for device %d\n", devid);
51
52         for (i = 0; i < kranal_data.kra_ndevs; i++) {
53
54                 dev = &kranal_data.kra_devices[i];
55                 if (dev->rad_id != devid)
56                         continue;
57
58                 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
59
60                 if (!dev->rad_ready) {
61                         dev->rad_ready = 1;
62                         cfs_waitq_signal(&dev->rad_waitq);
63                 }
64
65                 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
66                 return;
67         }
68
69         CWARN("callback for unknown device %d\n", devid);
70 }
71
72 void
73 kranal_schedule_conn(kra_conn_t *conn)
74 {
75         kra_device_t    *dev = conn->rac_device;
76         unsigned long    flags;
77
78         cfs_spin_lock_irqsave(&dev->rad_lock, flags);
79
80         if (!conn->rac_scheduled) {
81                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
82                 conn->rac_scheduled = 1;
83                 cfs_list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
84                 cfs_waitq_signal(&dev->rad_waitq);
85         }
86
87         cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
88 }
89
90 kra_tx_t *
91 kranal_get_idle_tx (void)
92 {
93         unsigned long  flags;
94         kra_tx_t      *tx;
95
96         cfs_spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
97
98         if (cfs_list_empty(&kranal_data.kra_idle_txs)) {
99                 cfs_spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
100                 return NULL;
101         }
102
103         tx = cfs_list_entry(kranal_data.kra_idle_txs.next, kra_tx_t, tx_list);
104         cfs_list_del(&tx->tx_list);
105
106         /* Allocate a new completion cookie.  It might not be needed, but we've
107          * got a lock right now... */
108         tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
109
110         cfs_spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
111
112         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
113         LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
114         LASSERT (tx->tx_conn == NULL);
115         LASSERT (tx->tx_lntmsg[0] == NULL);
116         LASSERT (tx->tx_lntmsg[1] == NULL);
117
118         return tx;
119 }
120
121 void
122 kranal_init_msg(kra_msg_t *msg, int type)
123 {
124         msg->ram_magic = RANAL_MSG_MAGIC;
125         msg->ram_version = RANAL_MSG_VERSION;
126         msg->ram_type = type;
127         msg->ram_srcnid = kranal_data.kra_ni->ni_nid;
128         /* ram_connstamp gets set when FMA is sent */
129 }
130
131 kra_tx_t *
132 kranal_new_tx_msg (int type)
133 {
134         kra_tx_t *tx = kranal_get_idle_tx();
135
136         if (tx != NULL)
137                 kranal_init_msg(&tx->tx_msg, type);
138
139         return tx;
140 }
141
142 int
143 kranal_setup_immediate_buffer (kra_tx_t *tx, 
144                                unsigned int niov, struct iovec *iov,
145                                int offset, int nob)
146
147 {
148         /* For now this is almost identical to kranal_setup_virt_buffer, but we
149          * could "flatten" the payload into a single contiguous buffer ready
150          * for sending direct over an FMA if we ever needed to. */
151
152         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
153         LASSERT (nob >= 0);
154
155         if (nob == 0) {
156                 tx->tx_buffer = NULL;
157         } else {
158                 LASSERT (niov > 0);
159
160                 while (offset >= iov->iov_len) {
161                         offset -= iov->iov_len;
162                         niov--;
163                         iov++;
164                         LASSERT (niov > 0);
165                 }
166
167                 if (nob > iov->iov_len - offset) {
168                         CERROR("Can't handle multiple vaddr fragments\n");
169                         return -EMSGSIZE;
170                 }
171
172                 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
173         }
174
175         tx->tx_buftype = RANAL_BUF_IMMEDIATE;
176         tx->tx_nob = nob;
177         return 0;
178 }
179
180 int
181 kranal_setup_virt_buffer (kra_tx_t *tx, 
182                           unsigned int niov, struct iovec *iov,
183                           int offset, int nob)
184
185 {
186         LASSERT (nob > 0);
187         LASSERT (niov > 0);
188         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
189
190         while (offset >= iov->iov_len) {
191                 offset -= iov->iov_len;
192                 niov--;
193                 iov++;
194                 LASSERT (niov > 0);
195         }
196
197         if (nob > iov->iov_len - offset) {
198                 CERROR("Can't handle multiple vaddr fragments\n");
199                 return -EMSGSIZE;
200         }
201
202         tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
203         tx->tx_nob = nob;
204         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
205         return 0;
206 }
207
208 int
209 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
210                           int offset, int nob)
211 {
212         RAP_PHYS_REGION *phys = tx->tx_phys;
213         int              resid;
214
215         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
216
217         LASSERT (nob > 0);
218         LASSERT (nkiov > 0);
219         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
220
221         while (offset >= kiov->kiov_len) {
222                 offset -= kiov->kiov_len;
223                 nkiov--;
224                 kiov++;
225                 LASSERT (nkiov > 0);
226         }
227
228         tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
229         tx->tx_nob = nob;
230         tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
231
232         phys->Address = lnet_page2phys(kiov->kiov_page);
233         phys++;
234
235         resid = nob - (kiov->kiov_len - offset);
236         while (resid > 0) {
237                 kiov++;
238                 nkiov--;
239                 LASSERT (nkiov > 0);
240
241                 if (kiov->kiov_offset != 0 ||
242                     ((resid > PAGE_SIZE) &&
243                      kiov->kiov_len < PAGE_SIZE)) {
244                         /* Can't have gaps */
245                         CERROR("Can't make payload contiguous in I/O VM:"
246                                "page %d, offset %d, len %d \n",
247                                (int)(phys - tx->tx_phys),
248                                kiov->kiov_offset, kiov->kiov_len);
249                         return -EINVAL;
250                 }
251
252                 if ((phys - tx->tx_phys) == LNET_MAX_IOV) {
253                         CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
254                         return -EMSGSIZE;
255                 }
256
257                 phys->Address = lnet_page2phys(kiov->kiov_page);
258                 phys++;
259
260                 resid -= PAGE_SIZE;
261         }
262
263         tx->tx_phys_npages = phys - tx->tx_phys;
264         return 0;
265 }
266
267 static inline int
268 kranal_setup_rdma_buffer (kra_tx_t *tx, unsigned int niov,
269                           struct iovec *iov, lnet_kiov_t *kiov,
270                           int offset, int nob)
271 {
272         LASSERT ((iov == NULL) != (kiov == NULL));
273
274         if (kiov != NULL)
275                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
276
277         return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
278 }
279
280 int
281 kranal_map_buffer (kra_tx_t *tx)
282 {
283         kra_conn_t     *conn = tx->tx_conn;
284         kra_device_t   *dev = conn->rac_device;
285         RAP_RETURN      rrc;
286
287         LASSERT (current == dev->rad_scheduler);
288
289         switch (tx->tx_buftype) {
290         default:
291                 LBUG();
292
293         case RANAL_BUF_NONE:
294         case RANAL_BUF_IMMEDIATE:
295         case RANAL_BUF_PHYS_MAPPED:
296         case RANAL_BUF_VIRT_MAPPED:
297                 return 0;
298
299         case RANAL_BUF_PHYS_UNMAPPED:
300                 rrc = RapkRegisterPhys(dev->rad_handle,
301                                        tx->tx_phys, tx->tx_phys_npages,
302                                        &tx->tx_map_key);
303                 if (rrc != RAP_SUCCESS) {
304                         CERROR ("Can't map %d pages: dev %d "
305                                 "phys %u pp %u, virt %u nob %lu\n",
306                                 tx->tx_phys_npages, dev->rad_id, 
307                                 dev->rad_nphysmap, dev->rad_nppphysmap,
308                                 dev->rad_nvirtmap, dev->rad_nobvirtmap);
309                         return -ENOMEM; /* assume insufficient resources */
310                 }
311
312                 dev->rad_nphysmap++;
313                 dev->rad_nppphysmap += tx->tx_phys_npages;
314
315                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
316                 return 0;
317
318         case RANAL_BUF_VIRT_UNMAPPED:
319                 rrc = RapkRegisterMemory(dev->rad_handle,
320                                          tx->tx_buffer, tx->tx_nob,
321                                          &tx->tx_map_key);
322                 if (rrc != RAP_SUCCESS) {
323                         CERROR ("Can't map %d bytes: dev %d "
324                                 "phys %u pp %u, virt %u nob %lu\n",
325                                 tx->tx_nob, dev->rad_id, 
326                                 dev->rad_nphysmap, dev->rad_nppphysmap,
327                                 dev->rad_nvirtmap, dev->rad_nobvirtmap);
328                         return -ENOMEM; /* assume insufficient resources */
329                 }
330
331                 dev->rad_nvirtmap++;
332                 dev->rad_nobvirtmap += tx->tx_nob;
333
334                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
335                 return 0;
336         }
337 }
338
339 void
340 kranal_unmap_buffer (kra_tx_t *tx)
341 {
342         kra_device_t   *dev;
343         RAP_RETURN      rrc;
344
345         switch (tx->tx_buftype) {
346         default:
347                 LBUG();
348
349         case RANAL_BUF_NONE:
350         case RANAL_BUF_IMMEDIATE:
351         case RANAL_BUF_PHYS_UNMAPPED:
352         case RANAL_BUF_VIRT_UNMAPPED:
353                 break;
354
355         case RANAL_BUF_PHYS_MAPPED:
356                 LASSERT (tx->tx_conn != NULL);
357                 dev = tx->tx_conn->rac_device;
358                 LASSERT (current == dev->rad_scheduler);
359                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
360                                            &tx->tx_map_key);
361                 LASSERT (rrc == RAP_SUCCESS);
362
363                 dev->rad_nphysmap--;
364                 dev->rad_nppphysmap -= tx->tx_phys_npages;
365
366                 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
367                 break;
368
369         case RANAL_BUF_VIRT_MAPPED:
370                 LASSERT (tx->tx_conn != NULL);
371                 dev = tx->tx_conn->rac_device;
372                 LASSERT (current == dev->rad_scheduler);
373                 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
374                                            &tx->tx_map_key);
375                 LASSERT (rrc == RAP_SUCCESS);
376
377                 dev->rad_nvirtmap--;
378                 dev->rad_nobvirtmap -= tx->tx_nob;
379
380                 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
381                 break;
382         }
383 }
384
385 void
386 kranal_tx_done (kra_tx_t *tx, int completion)
387 {
388         lnet_msg_t      *lnetmsg[2];
389         unsigned long    flags;
390         int              i;
391
392         LASSERT (!cfs_in_interrupt());
393
394         kranal_unmap_buffer(tx);
395
396         lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
397         lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
398
399         tx->tx_buftype = RANAL_BUF_NONE;
400         tx->tx_msg.ram_type = RANAL_MSG_NONE;
401         tx->tx_conn = NULL;
402
403         cfs_spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
404
405         cfs_list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
406
407         cfs_spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
408
409         /* finalize AFTER freeing lnet msgs */
410         for (i = 0; i < 2; i++) {
411                 if (lnetmsg[i] == NULL)
412                         continue;
413
414                 lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion);
415         }
416 }
417
418 kra_conn_t *
419 kranal_find_conn_locked (kra_peer_t *peer)
420 {
421         cfs_list_t *tmp;
422
423         /* just return the first connection */
424         cfs_list_for_each (tmp, &peer->rap_conns) {
425                 return cfs_list_entry(tmp, kra_conn_t, rac_list);
426         }
427
428         return NULL;
429 }
430
431 void
432 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
433 {
434         unsigned long    flags;
435
436         tx->tx_conn = conn;
437
438         cfs_spin_lock_irqsave(&conn->rac_lock, flags);
439         cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
440         tx->tx_qtime = jiffies;
441         cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
442
443         kranal_schedule_conn(conn);
444 }
445
446 void
447 kranal_launch_tx (kra_tx_t *tx, lnet_nid_t nid)
448 {
449         unsigned long    flags;
450         kra_peer_t      *peer;
451         kra_conn_t      *conn;
452         int              rc;
453         int              retry;
454         cfs_rwlock_t    *g_lock = &kranal_data.kra_global_lock;
455
456         /* If I get here, I've committed to send, so I complete the tx with
457          * failure on any problems */
458
459         LASSERT (tx->tx_conn == NULL);      /* only set when assigned a conn */
460
461         for (retry = 0; ; retry = 1) {
462
463                 cfs_read_lock(g_lock);
464
465                 peer = kranal_find_peer_locked(nid);
466                 if (peer != NULL) {
467                         conn = kranal_find_conn_locked(peer);
468                         if (conn != NULL) {
469                                 kranal_post_fma(conn, tx);
470                                 cfs_read_unlock(g_lock);
471                                 return;
472                         }
473                 }
474                 
475                 /* Making connections; I'll need a write lock... */
476                 cfs_read_unlock(g_lock);
477                 cfs_write_lock_irqsave(g_lock, flags);
478
479                 peer = kranal_find_peer_locked(nid);
480                 if (peer != NULL)
481                         break;
482                 
483                 cfs_write_unlock_irqrestore(g_lock, flags);
484                 
485                 if (retry) {
486                         CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
487                         kranal_tx_done(tx, -EHOSTUNREACH);
488                         return;
489                 }
490
491                 rc = kranal_add_persistent_peer(nid, LNET_NIDADDR(nid),
492                                                 lnet_acceptor_port());
493                 if (rc != 0) {
494                         CERROR("Can't add peer %s: %d\n",
495                                libcfs_nid2str(nid), rc);
496                         kranal_tx_done(tx, rc);
497                         return;
498                 }
499         }
500         
501         conn = kranal_find_conn_locked(peer);
502         if (conn != NULL) {
503                 /* Connection exists; queue message on it */
504                 kranal_post_fma(conn, tx);
505                 cfs_write_unlock_irqrestore(g_lock, flags);
506                 return;
507         }
508                         
509         LASSERT (peer->rap_persistence > 0);
510
511         if (!peer->rap_connecting) {
512                 LASSERT (cfs_list_empty(&peer->rap_tx_queue));
513
514                 if (!(peer->rap_reconnect_interval == 0 || /* first attempt */
515                       cfs_time_aftereq(jiffies, peer->rap_reconnect_time))) {
516                         cfs_write_unlock_irqrestore(g_lock, flags);
517                         kranal_tx_done(tx, -EHOSTUNREACH);
518                         return;
519                 }
520
521                 peer->rap_connecting = 1;
522                 kranal_peer_addref(peer); /* extra ref for connd */
523
524                 cfs_spin_lock(&kranal_data.kra_connd_lock);
525
526                 cfs_list_add_tail(&peer->rap_connd_list,
527                               &kranal_data.kra_connd_peers);
528                 cfs_waitq_signal(&kranal_data.kra_connd_waitq);
529
530                 cfs_spin_unlock(&kranal_data.kra_connd_lock);
531         }
532
533         /* A connection is being established; queue the message... */
534         cfs_list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
535
536         cfs_write_unlock_irqrestore(g_lock, flags);
537 }
538
539 void
540 kranal_rdma(kra_tx_t *tx, int type,
541             kra_rdma_desc_t *sink, int nob, __u64 cookie)
542 {
543         kra_conn_t   *conn = tx->tx_conn;
544         RAP_RETURN    rrc;
545         unsigned long flags;
546
547         LASSERT (kranal_tx_mapped(tx));
548         LASSERT (nob <= sink->rard_nob);
549         LASSERT (nob <= tx->tx_nob);
550
551         /* No actual race with scheduler sending CLOSE (I'm she!) */
552         LASSERT (current == conn->rac_device->rad_scheduler);
553
554         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
555         tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
556         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
557         tx->tx_rdma_desc.DstPtr = sink->rard_addr;
558         tx->tx_rdma_desc.DstKey = sink->rard_key;
559         tx->tx_rdma_desc.Length = nob;
560         tx->tx_rdma_desc.AppPtr = tx;
561
562         /* prep final completion message */
563         kranal_init_msg(&tx->tx_msg, type);
564         tx->tx_msg.ram_u.completion.racm_cookie = cookie;
565
566         if (nob == 0) { /* Immediate completion */
567                 kranal_post_fma(conn, tx);
568                 return;
569         }
570
571         LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
572
573         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
574         LASSERT (rrc == RAP_SUCCESS);
575
576         cfs_spin_lock_irqsave(&conn->rac_lock, flags);
577         cfs_list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
578         tx->tx_qtime = jiffies;
579         cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
580 }
581
582 int
583 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
584 {
585         __u32      nob_received = nob;
586         RAP_RETURN rrc;
587
588         LASSERT (conn->rac_rxmsg != NULL);
589         CDEBUG(D_NET, "Consuming %p\n", conn);
590
591         rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
592                              &nob_received, sizeof(kra_msg_t));
593         LASSERT (rrc == RAP_SUCCESS);
594
595         conn->rac_rxmsg = NULL;
596
597         if (nob_received < nob) {
598                 CWARN("Incomplete immediate msg from %s: expected %d, got %d\n",
599                       libcfs_nid2str(conn->rac_peer->rap_nid), 
600                       nob, nob_received);
601                 return -EPROTO;
602         }
603
604         return 0;
605 }
606
607 int
608 kranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
609 {
610         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
611         int               type = lntmsg->msg_type;
612         lnet_process_id_t target = lntmsg->msg_target;
613         int               target_is_router = lntmsg->msg_target_is_router;
614         int               routing = lntmsg->msg_routing;
615         unsigned int      niov = lntmsg->msg_niov;
616         struct iovec     *iov = lntmsg->msg_iov;
617         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
618         unsigned int      offset = lntmsg->msg_offset;
619         unsigned int      nob = lntmsg->msg_len;
620         kra_tx_t         *tx;
621         int               rc;
622
623         /* NB 'private' is different depending on what we're sending.... */
624
625         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
626                nob, niov, libcfs_id2str(target));
627
628         LASSERT (nob == 0 || niov > 0);
629         LASSERT (niov <= LNET_MAX_IOV);
630
631         LASSERT (!cfs_in_interrupt());
632         /* payload is either all vaddrs or all pages */
633         LASSERT (!(kiov != NULL && iov != NULL));
634
635         if (routing) {
636                 CERROR ("Can't route\n");
637                 return -EIO;
638         }
639
640         switch(type) {
641         default:
642                 LBUG();
643
644         case LNET_MSG_ACK:
645                 LASSERT (nob == 0);
646                 break;
647
648         case LNET_MSG_GET:
649                 LASSERT (niov == 0);
650                 LASSERT (nob == 0);
651                 /* We have to consider the eventual sink buffer rather than any
652                  * payload passed here (there isn't any, and strictly, looking
653                  * inside lntmsg is a layering violation).  We send a simple
654                  * IMMEDIATE GET if the sink buffer is mapped already and small
655                  * enough for FMA */
656
657                 if (routing || target_is_router)
658                         break;                  /* send IMMEDIATE */
659
660                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0 &&
661                     lntmsg->msg_md->md_length <= RANAL_FMA_MAX_DATA &&
662                     lntmsg->msg_md->md_length <= *kranal_tunables.kra_max_immediate)
663                         break;                  /* send IMMEDIATE */
664
665                 tx = kranal_new_tx_msg(RANAL_MSG_GET_REQ);
666                 if (tx == NULL)
667                         return -ENOMEM;
668
669                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
670                         rc = kranal_setup_virt_buffer(tx, lntmsg->msg_md->md_niov,
671                                                       lntmsg->msg_md->md_iov.iov,
672                                                       0, lntmsg->msg_md->md_length);
673                 else
674                         rc = kranal_setup_phys_buffer(tx, lntmsg->msg_md->md_niov,
675                                                       lntmsg->msg_md->md_iov.kiov,
676                                                       0, lntmsg->msg_md->md_length);
677                 if (rc != 0) {
678                         kranal_tx_done(tx, rc);
679                         return -EIO;
680                 }
681
682                 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
683                 if (tx->tx_lntmsg[1] == NULL) {
684                         CERROR("Can't create reply for GET to %s\n", 
685                                libcfs_nid2str(target.nid));
686                         kranal_tx_done(tx, rc);
687                         return -EIO;
688                 }
689
690                 tx->tx_lntmsg[0] = lntmsg;
691                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
692                 /* rest of tx_msg is setup just before it is sent */
693                 kranal_launch_tx(tx, target.nid);
694                 return 0;
695
696         case LNET_MSG_REPLY:
697         case LNET_MSG_PUT:
698                 if (kiov == NULL &&             /* not paged */
699                     nob <= RANAL_FMA_MAX_DATA && /* small enough */
700                     nob <= *kranal_tunables.kra_max_immediate)
701                         break;                  /* send IMMEDIATE */
702
703                 tx = kranal_new_tx_msg(RANAL_MSG_PUT_REQ);
704                 if (tx == NULL)
705                         return -ENOMEM;
706
707                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
708                 if (rc != 0) {
709                         kranal_tx_done(tx, rc);
710                         return -EIO;
711                 }
712
713                 tx->tx_lntmsg[0] = lntmsg;
714                 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
715                 /* rest of tx_msg is setup just before it is sent */
716                 kranal_launch_tx(tx, target.nid);
717                 return 0;
718         }
719
720         /* send IMMEDIATE */
721
722         LASSERT (kiov == NULL);
723         LASSERT (nob <= RANAL_FMA_MAX_DATA);
724
725         tx = kranal_new_tx_msg(RANAL_MSG_IMMEDIATE);
726         if (tx == NULL)
727                 return -ENOMEM;
728
729         rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
730         if (rc != 0) {
731                 kranal_tx_done(tx, rc);
732                 return -EIO;
733         }
734
735         tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
736         tx->tx_lntmsg[0] = lntmsg;
737         kranal_launch_tx(tx, target.nid);
738         return 0;
739 }
740
741 void
742 kranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg)
743 {
744         kra_msg_t     *rxmsg = conn->rac_rxmsg;
745         unsigned int   niov = lntmsg->msg_niov;
746         struct iovec  *iov = lntmsg->msg_iov;
747         lnet_kiov_t   *kiov = lntmsg->msg_kiov;
748         unsigned int   offset = lntmsg->msg_offset;
749         unsigned int   nob = lntmsg->msg_len;
750         kra_tx_t      *tx;
751         int            rc;
752
753         tx = kranal_get_idle_tx();
754         if (tx == NULL)
755                 goto failed_0;
756
757         rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
758         if (rc != 0)
759                 goto failed_1;
760
761         tx->tx_conn = conn;
762
763         rc = kranal_map_buffer(tx);
764         if (rc != 0)
765                 goto failed_1;
766
767         tx->tx_lntmsg[0] = lntmsg;
768
769         kranal_rdma(tx, RANAL_MSG_GET_DONE,
770                     &rxmsg->ram_u.get.ragm_desc, nob,
771                     rxmsg->ram_u.get.ragm_cookie);
772         return;
773
774  failed_1:
775         kranal_tx_done(tx, -EIO);
776  failed_0:
777         lnet_finalize(ni, lntmsg, -EIO);
778 }
779
780 int
781 kranal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
782                    void **new_private)
783 {
784         kra_conn_t *conn = (kra_conn_t *)private;
785
786         LCONSOLE_ERROR_MSG(0x12b, "Dropping message from %s: no buffers free.\n",
787                            libcfs_nid2str(conn->rac_peer->rap_nid));
788
789         return -EDEADLK;
790 }
791
792 int
793 kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
794              int delayed, unsigned int niov, 
795              struct iovec *iov, lnet_kiov_t *kiov,
796              unsigned int offset, unsigned int mlen, unsigned int rlen)
797 {
798         kra_conn_t  *conn = private;
799         kra_msg_t   *rxmsg = conn->rac_rxmsg;
800         kra_tx_t    *tx;
801         void        *buffer;
802         int          rc;
803
804         LASSERT (mlen <= rlen);
805         LASSERT (!cfs_in_interrupt());
806         /* Either all pages or all vaddrs */
807         LASSERT (!(kiov != NULL && iov != NULL));
808
809         CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg);
810
811         switch(rxmsg->ram_type) {
812         default:
813                 LBUG();
814
815         case RANAL_MSG_IMMEDIATE:
816                 if (mlen == 0) {
817                         buffer = NULL;
818                 } else if (kiov != NULL) {
819                         CERROR("Can't recv immediate into paged buffer\n");
820                         return -EIO;
821                 } else {
822                         LASSERT (niov > 0);
823                         while (offset >= iov->iov_len) {
824                                 offset -= iov->iov_len;
825                                 iov++;
826                                 niov--;
827                                 LASSERT (niov > 0);
828                         }
829                         if (mlen > iov->iov_len - offset) {
830                                 CERROR("Can't handle immediate frags\n");
831                                 return -EIO;
832                         }
833                         buffer = ((char *)iov->iov_base) + offset;
834                 }
835                 rc = kranal_consume_rxmsg(conn, buffer, mlen);
836                 lnet_finalize(ni, lntmsg, (rc == 0) ? 0 : -EIO);
837                 return 0;
838
839         case RANAL_MSG_PUT_REQ:
840                 tx = kranal_new_tx_msg(RANAL_MSG_PUT_ACK);
841                 if (tx == NULL) {
842                         kranal_consume_rxmsg(conn, NULL, 0);
843                         return -ENOMEM;
844                 }
845                 
846                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
847                 if (rc != 0) {
848                         kranal_tx_done(tx, rc);
849                         kranal_consume_rxmsg(conn, NULL, 0);
850                         return -EIO;
851                 }
852
853                 tx->tx_conn = conn;
854                 rc = kranal_map_buffer(tx);
855                 if (rc != 0) {
856                         kranal_tx_done(tx, rc);
857                         kranal_consume_rxmsg(conn, NULL, 0);
858                         return -EIO;
859                 }
860
861                 tx->tx_msg.ram_u.putack.rapam_src_cookie =
862                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
863                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
864                 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
865                 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
866                         (__u64)((unsigned long)tx->tx_buffer);
867                 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
868
869                 tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
870
871                 kranal_post_fma(conn, tx);
872                 kranal_consume_rxmsg(conn, NULL, 0);
873                 return 0;
874
875         case RANAL_MSG_GET_REQ:
876                 if (lntmsg != NULL) {
877                         /* Matched! */
878                         kranal_reply(ni, conn, lntmsg);
879                 } else {
880                         /* No match */
881                         tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);
882                         if (tx != NULL) {
883                                 tx->tx_msg.ram_u.completion.racm_cookie =
884                                         rxmsg->ram_u.get.ragm_cookie;
885                                 kranal_post_fma(conn, tx);
886                         }
887                 }
888                 kranal_consume_rxmsg(conn, NULL, 0);
889                 return 0;
890         }
891 }
892
893 int
894 kranal_thread_start (int(*fn)(void *arg), void *arg)
895 {
896         long    pid = cfs_kernel_thread(fn, arg, 0);
897
898         if (pid < 0)
899                 return(int)pid;
900
901         cfs_atomic_inc(&kranal_data.kra_nthreads);
902         return 0;
903 }
904
905 void
906 kranal_thread_fini (void)
907 {
908         cfs_atomic_dec(&kranal_data.kra_nthreads);
909 }
910
911 int
912 kranal_check_conn_timeouts (kra_conn_t *conn)
913 {
914         kra_tx_t          *tx;
915         cfs_list_t        *ttmp;
916         unsigned long      flags;
917         long               timeout;
918         unsigned long      now = jiffies;
919
920         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
921                  conn->rac_state == RANAL_CONN_CLOSING);
922
923         if (!conn->rac_close_sent &&
924             cfs_time_aftereq(now, conn->rac_last_tx + conn->rac_keepalive *
925                              CFS_HZ)) {
926                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
927                 CDEBUG(D_NET, "Scheduling keepalive %p->%s\n",
928                        conn, libcfs_nid2str(conn->rac_peer->rap_nid));
929                 kranal_schedule_conn(conn);
930         }
931
932         timeout = conn->rac_timeout * CFS_HZ;
933
934         if (!conn->rac_close_recvd &&
935             cfs_time_aftereq(now, conn->rac_last_rx + timeout)) {
936                 CERROR("%s received from %s within %lu seconds\n",
937                        (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
938                        "Nothing" : "CLOSE not",
939                        libcfs_nid2str(conn->rac_peer->rap_nid), 
940                        (now - conn->rac_last_rx)/CFS_HZ);
941                 return -ETIMEDOUT;
942         }
943
944         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
945                 return 0;
946
947         /* Check the conn's queues are moving.  These are "belt+braces" checks,
948          * in case of hardware/software errors that make this conn seem
949          * responsive even though it isn't progressing its message queues. */
950
951         cfs_spin_lock_irqsave(&conn->rac_lock, flags);
952
953         cfs_list_for_each (ttmp, &conn->rac_fmaq) {
954                 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
955
956                 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
957                         cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
958                         CERROR("tx on fmaq for %s blocked %lu seconds\n",
959                                libcfs_nid2str(conn->rac_peer->rap_nid),
960                                (now - tx->tx_qtime)/CFS_HZ);
961                         return -ETIMEDOUT;
962                 }
963         }
964
965         cfs_list_for_each (ttmp, &conn->rac_rdmaq) {
966                 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
967
968                 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
969                         cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
970                         CERROR("tx on rdmaq for %s blocked %lu seconds\n",
971                                libcfs_nid2str(conn->rac_peer->rap_nid), 
972                                (now - tx->tx_qtime)/CFS_HZ);
973                         return -ETIMEDOUT;
974                 }
975         }
976
977         cfs_list_for_each (ttmp, &conn->rac_replyq) {
978                 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
979
980                 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
981                         cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
982                         CERROR("tx on replyq for %s blocked %lu seconds\n",
983                                libcfs_nid2str(conn->rac_peer->rap_nid),
984                                (now - tx->tx_qtime)/CFS_HZ);
985                         return -ETIMEDOUT;
986                 }
987         }
988
989         cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
990         return 0;
991 }
992
993 void
994 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
995 {
996         cfs_list_t        *conns = &kranal_data.kra_conns[idx];
997         cfs_list_t        *ctmp;
998         kra_conn_t        *conn;
999         unsigned long      flags;
1000         int                rc;
1001
1002  again:
1003         /* NB. We expect to check all the conns and not find any problems, so
1004          * we just use a shared lock while we take a look... */
1005         cfs_read_lock(&kranal_data.kra_global_lock);
1006
1007         cfs_list_for_each (ctmp, conns) {
1008                 conn = cfs_list_entry(ctmp, kra_conn_t, rac_hashlist);
1009
1010                 if (conn->rac_timeout < *min_timeoutp )
1011                         *min_timeoutp = conn->rac_timeout;
1012                 if (conn->rac_keepalive < *min_timeoutp )
1013                         *min_timeoutp = conn->rac_keepalive;
1014
1015                 rc = kranal_check_conn_timeouts(conn);
1016                 if (rc == 0)
1017                         continue;
1018
1019                 kranal_conn_addref(conn);
1020                 cfs_read_unlock(&kranal_data.kra_global_lock);
1021
1022                 CERROR("Conn to %s, cqid %d timed out\n",
1023                        libcfs_nid2str(conn->rac_peer->rap_nid), 
1024                        conn->rac_cqid);
1025
1026                 cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1027
1028                 switch (conn->rac_state) {
1029                 default:
1030                         LBUG();
1031
1032                 case RANAL_CONN_ESTABLISHED:
1033                         kranal_close_conn_locked(conn, -ETIMEDOUT);
1034                         break;
1035
1036                 case RANAL_CONN_CLOSING:
1037                         kranal_terminate_conn_locked(conn);
1038                         break;
1039                 }
1040
1041                 cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock,
1042                                             flags);
1043
1044                 kranal_conn_decref(conn);
1045
1046                 /* start again now I've dropped the lock */
1047                 goto again;
1048         }
1049
1050         cfs_read_unlock(&kranal_data.kra_global_lock);
1051 }
1052
1053 int
1054 kranal_connd (void *arg)
1055 {
1056         long               id = (long)arg;
1057         char               name[16];
1058         cfs_waitlink_t     wait;
1059         unsigned long      flags;
1060         kra_peer_t        *peer;
1061         kra_acceptsock_t  *ras;
1062         int                did_something;
1063
1064         snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
1065         cfs_daemonize(name);
1066         cfs_block_allsigs();
1067
1068         cfs_waitlink_init(&wait);
1069
1070         cfs_spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1071
1072         while (!kranal_data.kra_shutdown) {
1073                 did_something = 0;
1074
1075                 if (!cfs_list_empty(&kranal_data.kra_connd_acceptq)) {
1076                         ras = cfs_list_entry(kranal_data.kra_connd_acceptq.next,
1077                                              kra_acceptsock_t, ras_list);
1078                         cfs_list_del(&ras->ras_list);
1079
1080                         cfs_spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
1081                                                    flags);
1082
1083                         CDEBUG(D_NET,"About to handshake someone\n");
1084
1085                         kranal_conn_handshake(ras->ras_sock, NULL);
1086                         kranal_free_acceptsock(ras);
1087
1088                         CDEBUG(D_NET,"Finished handshaking someone\n");
1089
1090                         cfs_spin_lock_irqsave(&kranal_data.kra_connd_lock,
1091                                               flags);
1092                         did_something = 1;
1093                 }
1094
1095                 if (!cfs_list_empty(&kranal_data.kra_connd_peers)) {
1096                         peer = cfs_list_entry(kranal_data.kra_connd_peers.next,
1097                                               kra_peer_t, rap_connd_list);
1098
1099                         cfs_list_del_init(&peer->rap_connd_list);
1100                         cfs_spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
1101                                                    flags);
1102
1103                         kranal_connect(peer);
1104                         kranal_peer_decref(peer);
1105
1106                         cfs_spin_lock_irqsave(&kranal_data.kra_connd_lock,
1107                                               flags);
1108                         did_something = 1;
1109                 }
1110
1111                 if (did_something)
1112                         continue;
1113
1114                 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
1115                 cfs_waitq_add_exclusive(&kranal_data.kra_connd_waitq, &wait);
1116
1117                 cfs_spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1118
1119                 cfs_waitq_wait(&wait, CFS_TASK_INTERRUPTIBLE);
1120
1121                 cfs_set_current_state(CFS_TASK_RUNNING);
1122                 cfs_waitq_del(&kranal_data.kra_connd_waitq, &wait);
1123
1124                 cfs_spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1125         }
1126
1127         cfs_spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1128
1129         kranal_thread_fini();
1130         return 0;
1131 }
1132
1133 void
1134 kranal_update_reaper_timeout(long timeout)
1135 {
1136         unsigned long   flags;
1137
1138         LASSERT (timeout > 0);
1139
1140         cfs_spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1141
1142         if (timeout < kranal_data.kra_new_min_timeout)
1143                 kranal_data.kra_new_min_timeout = timeout;
1144
1145         cfs_spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1146 }
1147
1148 int
1149 kranal_reaper (void *arg)
1150 {
1151         cfs_waitlink_t     wait;
1152         unsigned long      flags;
1153         long               timeout;
1154         int                i;
1155         int                conn_entries = kranal_data.kra_conn_hash_size;
1156         int                conn_index = 0;
1157         int                base_index = conn_entries - 1;
1158         unsigned long      next_check_time = jiffies;
1159         long               next_min_timeout = CFS_MAX_SCHEDULE_TIMEOUT;
1160         long               current_min_timeout = 1;
1161
1162         cfs_daemonize("kranal_reaper");
1163         cfs_block_allsigs();
1164
1165         cfs_waitlink_init(&wait);
1166
1167         cfs_spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1168
1169         while (!kranal_data.kra_shutdown) {
1170                 /* I wake up every 'p' seconds to check for timeouts on some
1171                  * more peers.  I try to check every connection 'n' times
1172                  * within the global minimum of all keepalive and timeout
1173                  * intervals, to ensure I attend to every connection within
1174                  * (n+1)/n times its timeout intervals. */
1175                 const int     p = 1;
1176                 const int     n = 3;
1177                 unsigned long min_timeout;
1178                 int           chunk;
1179
1180                 /* careful with the jiffy wrap... */
1181                 timeout = (long)(next_check_time - jiffies);
1182                 if (timeout > 0) {
1183                         cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
1184                         cfs_waitq_add(&kranal_data.kra_reaper_waitq, &wait);
1185
1186                         cfs_spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1187                                                    flags);
1188
1189                         cfs_waitq_timedwait(&wait, CFS_TASK_INTERRUPTIBLE,
1190                                             timeout);
1191
1192                         cfs_spin_lock_irqsave(&kranal_data.kra_reaper_lock,
1193                                               flags);
1194
1195                         cfs_set_current_state(CFS_TASK_RUNNING);
1196                         cfs_waitq_del(&kranal_data.kra_reaper_waitq, &wait);
1197                         continue;
1198                 }
1199
1200                 if (kranal_data.kra_new_min_timeout !=
1201                     CFS_MAX_SCHEDULE_TIMEOUT) {
1202                         /* new min timeout set: restart min timeout scan */
1203                         next_min_timeout = CFS_MAX_SCHEDULE_TIMEOUT;
1204                         base_index = conn_index - 1;
1205                         if (base_index < 0)
1206                                 base_index = conn_entries - 1;
1207
1208                         if (kranal_data.kra_new_min_timeout <
1209                             current_min_timeout) {
1210                                 current_min_timeout =
1211                                         kranal_data.kra_new_min_timeout;
1212                                 CDEBUG(D_NET, "Set new min timeout %ld\n",
1213                                        current_min_timeout);
1214                         }
1215
1216                         kranal_data.kra_new_min_timeout =
1217                                 CFS_MAX_SCHEDULE_TIMEOUT;
1218                 }
1219                 min_timeout = current_min_timeout;
1220
1221                 cfs_spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1222
1223                 LASSERT (min_timeout > 0);
1224
1225                 /* Compute how many table entries to check now so I get round
1226                  * the whole table fast enough given that I do this at fixed
1227                  * intervals of 'p' seconds) */
1228                 chunk = conn_entries;
1229                 if (min_timeout > n * p)
1230                         chunk = (chunk * n * p) / min_timeout;
1231                 if (chunk == 0)
1232                         chunk = 1;
1233
1234                 for (i = 0; i < chunk; i++) {
1235                         kranal_reaper_check(conn_index,
1236                                             &next_min_timeout);
1237                         conn_index = (conn_index + 1) % conn_entries;
1238                 }
1239
1240                 next_check_time += p * CFS_HZ;
1241
1242                 cfs_spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1243
1244                 if (((conn_index - chunk <= base_index &&
1245                       base_index < conn_index) ||
1246                      (conn_index - conn_entries - chunk <= base_index &&
1247                       base_index < conn_index - conn_entries))) {
1248
1249                         /* Scanned all conns: set current_min_timeout... */
1250                         if (current_min_timeout != next_min_timeout) {
1251                                 current_min_timeout = next_min_timeout;
1252                                 CDEBUG(D_NET, "Set new min timeout %ld\n",
1253                                        current_min_timeout);
1254                         }
1255
1256                         /* ...and restart min timeout scan */
1257                         next_min_timeout = CFS_MAX_SCHEDULE_TIMEOUT;
1258                         base_index = conn_index - 1;
1259                         if (base_index < 0)
1260                                 base_index = conn_entries - 1;
1261                 }
1262         }
1263
1264         kranal_thread_fini();
1265         return 0;
1266 }
1267
1268 void
1269 kranal_check_rdma_cq (kra_device_t *dev)
1270 {
1271         kra_conn_t          *conn;
1272         kra_tx_t            *tx;
1273         RAP_RETURN           rrc;
1274         unsigned long        flags;
1275         RAP_RDMA_DESCRIPTOR *desc;
1276         __u32                cqid;
1277         __u32                event_type;
1278
1279         for (;;) {
1280                 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1281                 if (rrc == RAP_NOT_DONE) {
1282                         CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
1283                         return;
1284                 }
1285
1286                 LASSERT (rrc == RAP_SUCCESS);
1287                 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1288
1289                 cfs_read_lock(&kranal_data.kra_global_lock);
1290
1291                 conn = kranal_cqid2conn_locked(cqid);
1292                 if (conn == NULL) {
1293                         /* Conn was destroyed? */
1294                         CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
1295                         cfs_read_unlock(&kranal_data.kra_global_lock);
1296                         continue;
1297                 }
1298
1299                 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1300                 LASSERT (rrc == RAP_SUCCESS);
1301
1302                 CDEBUG(D_NET, "Completed %p\n",
1303                        cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
1304
1305                 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
1306
1307                 LASSERT (!cfs_list_empty(&conn->rac_rdmaq));
1308                 tx = cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1309                 cfs_list_del(&tx->tx_list);
1310
1311                 LASSERT(desc->AppPtr == (void *)tx);
1312                 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1313                         tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1314
1315                 cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1316                 tx->tx_qtime = jiffies;
1317
1318                 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1319
1320                 /* Get conn's fmaq processed, now I've just put something
1321                  * there */
1322                 kranal_schedule_conn(conn);
1323
1324                 cfs_read_unlock(&kranal_data.kra_global_lock);
1325         }
1326 }
1327
1328 void
1329 kranal_check_fma_cq (kra_device_t *dev)
1330 {
1331         kra_conn_t         *conn;
1332         RAP_RETURN          rrc;
1333         __u32               cqid;
1334         __u32               event_type;
1335         cfs_list_t         *conns;
1336         cfs_list_t         *tmp;
1337         int                 i;
1338
1339         for (;;) {
1340                 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1341                 if (rrc == RAP_NOT_DONE) {
1342                         CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
1343                         return;
1344                 }
1345
1346                 LASSERT (rrc == RAP_SUCCESS);
1347
1348                 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1349
1350                         cfs_read_lock(&kranal_data.kra_global_lock);
1351
1352                         conn = kranal_cqid2conn_locked(cqid);
1353                         if (conn == NULL) {
1354                                 CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
1355                                        cqid);
1356                         } else {
1357                                 CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
1358                                        conn, cqid);
1359                                 kranal_schedule_conn(conn);
1360                         }
1361
1362                         cfs_read_unlock(&kranal_data.kra_global_lock);
1363                         continue;
1364                 }
1365
1366                 /* FMA CQ has overflowed: check ALL conns */
1367                 CWARN("FMA CQ overflow: scheduling ALL conns on device %d\n", 
1368                       dev->rad_id);
1369
1370                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1371
1372                         cfs_read_lock(&kranal_data.kra_global_lock);
1373
1374                         conns = &kranal_data.kra_conns[i];
1375
1376                         cfs_list_for_each (tmp, conns) {
1377                                 conn = cfs_list_entry(tmp, kra_conn_t,
1378                                                       rac_hashlist);
1379
1380                                 if (conn->rac_device == dev)
1381                                         kranal_schedule_conn(conn);
1382                         }
1383
1384                         /* don't block write lockers for too long... */
1385                         cfs_read_unlock(&kranal_data.kra_global_lock);
1386                 }
1387         }
1388 }
1389
1390 int
1391 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1392                void *immediate, int immediatenob)
1393 {
1394         int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1395         RAP_RETURN rrc;
1396
1397         CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
1398                conn, msg, msg->ram_type, sync ? "(sync)" : "",
1399                immediate, immediatenob);
1400
1401         LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1402         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1403                  immediatenob <= RANAL_FMA_MAX_DATA :
1404                  immediatenob == 0);
1405
1406         msg->ram_connstamp = conn->rac_my_connstamp;
1407         msg->ram_seq = conn->rac_tx_seq;
1408
1409         if (sync)
1410                 rrc = RapkFmaSyncSend(conn->rac_rihandle,
1411                                       immediate, immediatenob,
1412                                       msg, sizeof(*msg));
1413         else
1414                 rrc = RapkFmaSend(conn->rac_rihandle,
1415                                   immediate, immediatenob,
1416                                   msg, sizeof(*msg));
1417
1418         switch (rrc) {
1419         default:
1420                 LBUG();
1421
1422         case RAP_SUCCESS:
1423                 conn->rac_last_tx = jiffies;
1424                 conn->rac_tx_seq++;
1425                 return 0;
1426
1427         case RAP_NOT_DONE:
1428                 if (cfs_time_aftereq(jiffies,
1429                                      conn->rac_last_tx + conn->rac_keepalive *
1430                                      CFS_HZ))
1431                         CWARN("EAGAIN sending %02x (idle %lu secs)\n",
1432                               msg->ram_type,
1433                               (jiffies - conn->rac_last_tx)/CFS_HZ);
1434                 return -EAGAIN;
1435         }
1436 }
1437
1438 void
1439 kranal_process_fmaq (kra_conn_t *conn)
1440 {
1441         unsigned long flags;
1442         int           more_to_do;
1443         kra_tx_t     *tx;
1444         int           rc;
1445         int           expect_reply;
1446
1447         /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1448          *       However I will be rescheduled by an FMA completion event
1449          *       when I eventually get some.
1450          * NB 2. Sampling rac_state here races with setting it elsewhere.
1451          *       But it doesn't matter if I try to send a "real" message just
1452          *       as I start closing because I'll get scheduled to send the
1453          *       close anyway. */
1454
1455         /* Not racing with incoming message processing! */
1456         LASSERT (current == conn->rac_device->rad_scheduler);
1457
1458         if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1459                 if (!cfs_list_empty(&conn->rac_rdmaq)) {
1460                         /* RDMAs in progress */
1461                         LASSERT (!conn->rac_close_sent);
1462
1463                         if (cfs_time_aftereq(jiffies,
1464                                              conn->rac_last_tx +
1465                                              conn->rac_keepalive * CFS_HZ)) {
1466                                 CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
1467                                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1468                                 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1469                         }
1470                         return;
1471                 }
1472
1473                 if (conn->rac_close_sent)
1474                         return;
1475
1476                 CWARN("sending CLOSE to %s\n", 
1477                       libcfs_nid2str(conn->rac_peer->rap_nid));
1478                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1479                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1480                 if (rc != 0)
1481                         return;
1482
1483                 conn->rac_close_sent = 1;
1484                 if (!conn->rac_close_recvd)
1485                         return;
1486
1487                 cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1488
1489                 if (conn->rac_state == RANAL_CONN_CLOSING)
1490                         kranal_terminate_conn_locked(conn);
1491
1492                 cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock,
1493                                             flags);
1494                 return;
1495         }
1496
1497         cfs_spin_lock_irqsave(&conn->rac_lock, flags);
1498
1499         if (cfs_list_empty(&conn->rac_fmaq)) {
1500
1501                 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1502
1503                 if (cfs_time_aftereq(jiffies,
1504                                      conn->rac_last_tx + conn->rac_keepalive *
1505                                      CFS_HZ)) {
1506                         CDEBUG(D_NET, "sending NOOP -> %s (%p idle %lu(%ld))\n",
1507                                libcfs_nid2str(conn->rac_peer->rap_nid), conn,
1508                                (jiffies - conn->rac_last_tx)/CFS_HZ,
1509                                conn->rac_keepalive);
1510                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1511                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1512                 }
1513                 return;
1514         }
1515
1516         tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1517         cfs_list_del(&tx->tx_list);
1518         more_to_do = !cfs_list_empty(&conn->rac_fmaq);
1519
1520         cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1521
1522         expect_reply = 0;
1523         CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
1524                tx, tx->tx_msg.ram_type, tx->tx_cookie);
1525         switch (tx->tx_msg.ram_type) {
1526         default:
1527                 LBUG();
1528
1529         case RANAL_MSG_IMMEDIATE:
1530                 rc = kranal_sendmsg(conn, &tx->tx_msg,
1531                                     tx->tx_buffer, tx->tx_nob);
1532                 break;
1533
1534         case RANAL_MSG_PUT_NAK:
1535         case RANAL_MSG_PUT_DONE:
1536         case RANAL_MSG_GET_NAK:
1537         case RANAL_MSG_GET_DONE:
1538                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1539                 break;
1540
1541         case RANAL_MSG_PUT_REQ:
1542                 rc = kranal_map_buffer(tx);
1543                 LASSERT (rc != -EAGAIN);
1544                 if (rc != 0)
1545                         break;
1546
1547                 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1548                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1549                 expect_reply = 1;
1550                 break;
1551
1552         case RANAL_MSG_PUT_ACK:
1553                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1554                 expect_reply = 1;
1555                 break;
1556
1557         case RANAL_MSG_GET_REQ:
1558                 rc = kranal_map_buffer(tx);
1559                 LASSERT (rc != -EAGAIN);
1560                 if (rc != 0)
1561                         break;
1562
1563                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1564                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1565                 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1566                         (__u64)((unsigned long)tx->tx_buffer);
1567                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1568                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1569                 expect_reply = 1;
1570                 break;
1571         }
1572
1573         if (rc == -EAGAIN) {
1574                 /* I need credits to send this.  Replace tx at the head of the
1575                  * fmaq and I'll get rescheduled when credits appear */
1576                 CDEBUG(D_NET, "EAGAIN on %p\n", conn);
1577                 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
1578                 cfs_list_add(&tx->tx_list, &conn->rac_fmaq);
1579                 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1580                 return;
1581         }
1582
1583         if (!expect_reply || rc != 0) {
1584                 kranal_tx_done(tx, rc);
1585         } else {
1586                 /* LASSERT(current) above ensures this doesn't race with reply
1587                  * processing */
1588                 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
1589                 cfs_list_add_tail(&tx->tx_list, &conn->rac_replyq);
1590                 tx->tx_qtime = jiffies;
1591                 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1592         }
1593
1594         if (more_to_do) {
1595                 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
1596                 kranal_schedule_conn(conn);
1597         }
1598 }
1599
1600 static inline void
1601 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1602 {
1603         __swab64s(&d->rard_key.Key);
1604         __swab16s(&d->rard_key.Cookie);
1605         __swab16s(&d->rard_key.MdHandle);
1606         __swab32s(&d->rard_key.Flags);
1607         __swab64s(&d->rard_addr.AddressBits);
1608         __swab32s(&d->rard_nob);
1609 }
1610
1611 kra_tx_t *
1612 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1613 {
1614         cfs_list_t       *ttmp;
1615         kra_tx_t         *tx;
1616         unsigned long     flags;
1617
1618         cfs_spin_lock_irqsave(&conn->rac_lock, flags);
1619
1620         cfs_list_for_each(ttmp, &conn->rac_replyq) {
1621                 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
1622
1623                 CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
1624                        tx, tx->tx_msg.ram_type, tx->tx_cookie);
1625
1626                 if (tx->tx_cookie != cookie)
1627                         continue;
1628
1629                 if (tx->tx_msg.ram_type != type) {
1630                         cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1631                         CWARN("Unexpected type %x (%x expected) "
1632                               "matched reply from %s\n",
1633                               tx->tx_msg.ram_type, type,
1634                               libcfs_nid2str(conn->rac_peer->rap_nid));
1635                         return NULL;
1636                 }
1637
1638                 cfs_list_del(&tx->tx_list);
1639                 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1640                 return tx;
1641         }
1642
1643         cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1644         CWARN("Unmatched reply %02x/"LPX64" from %s\n",
1645               type, cookie, libcfs_nid2str(conn->rac_peer->rap_nid));
1646         return NULL;
1647 }
1648
1649 void
1650 kranal_check_fma_rx (kra_conn_t *conn)
1651 {
1652         unsigned long flags;
1653         __u32         seq;
1654         kra_tx_t     *tx;
1655         kra_msg_t    *msg;
1656         void         *prefix;
1657         RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1658         kra_peer_t   *peer = conn->rac_peer;
1659         int           rc = 0;
1660         int           repost = 1;
1661
1662         if (rrc == RAP_NOT_DONE)
1663                 return;
1664
1665         CDEBUG(D_NET, "RX on %p\n", conn);
1666
1667         LASSERT (rrc == RAP_SUCCESS);
1668         conn->rac_last_rx = jiffies;
1669         seq = conn->rac_rx_seq++;
1670         msg = (kra_msg_t *)prefix;
1671
1672         /* stash message for portals callbacks they'll NULL
1673          * rac_rxmsg if they consume it */
1674         LASSERT (conn->rac_rxmsg == NULL);
1675         conn->rac_rxmsg = msg;
1676
1677         if (msg->ram_magic != RANAL_MSG_MAGIC) {
1678                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1679                         CERROR("Unexpected magic %08x from %s\n",
1680                                msg->ram_magic, libcfs_nid2str(peer->rap_nid));
1681                         rc = -EPROTO;
1682                         goto out;
1683                 }
1684
1685                 __swab32s(&msg->ram_magic);
1686                 __swab16s(&msg->ram_version);
1687                 __swab16s(&msg->ram_type);
1688                 __swab64s(&msg->ram_srcnid);
1689                 __swab64s(&msg->ram_connstamp);
1690                 __swab32s(&msg->ram_seq);
1691
1692                 /* NB message type checked below; NOT here... */
1693                 switch (msg->ram_type) {
1694                 case RANAL_MSG_PUT_ACK:
1695                         kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1696                         break;
1697
1698                 case RANAL_MSG_GET_REQ:
1699                         kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1700                         break;
1701
1702                 default:
1703                         break;
1704                 }
1705         }
1706
1707         if (msg->ram_version != RANAL_MSG_VERSION) {
1708                 CERROR("Unexpected protocol version %d from %s\n",
1709                        msg->ram_version, libcfs_nid2str(peer->rap_nid));
1710                 rc = -EPROTO;
1711                 goto out;
1712         }
1713
1714         if (msg->ram_srcnid != peer->rap_nid) {
1715                 CERROR("Unexpected peer %s from %s\n",
1716                        libcfs_nid2str(msg->ram_srcnid), 
1717                        libcfs_nid2str(peer->rap_nid));
1718                 rc = -EPROTO;
1719                 goto out;
1720         }
1721
1722         if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1723                 CERROR("Unexpected connstamp "LPX64"("LPX64
1724                        " expected) from %s\n",
1725                        msg->ram_connstamp, conn->rac_peer_connstamp,
1726                        libcfs_nid2str(peer->rap_nid));
1727                 rc = -EPROTO;
1728                 goto out;
1729         }
1730
1731         if (msg->ram_seq != seq) {
1732                 CERROR("Unexpected sequence number %d(%d expected) from %s\n",
1733                        msg->ram_seq, seq, libcfs_nid2str(peer->rap_nid));
1734                 rc = -EPROTO;
1735                 goto out;
1736         }
1737
1738         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1739                 /* This message signals RDMA completion... */
1740                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1741                 if (rrc != RAP_SUCCESS) {
1742                         CERROR("RapkFmaSyncWait failed: %d\n", rrc);
1743                         rc = -ENETDOWN;
1744                         goto out;
1745                 }
1746         }
1747
1748         if (conn->rac_close_recvd) {
1749                 CERROR("Unexpected message %d after CLOSE from %s\n",
1750                        msg->ram_type, libcfs_nid2str(conn->rac_peer->rap_nid));
1751                 rc = -EPROTO;
1752                 goto out;
1753         }
1754
1755         if (msg->ram_type == RANAL_MSG_CLOSE) {
1756                 CWARN("RX CLOSE from %s\n", libcfs_nid2str(conn->rac_peer->rap_nid));
1757                 conn->rac_close_recvd = 1;
1758                 cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1759
1760                 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1761                         kranal_close_conn_locked(conn, 0);
1762                 else if (conn->rac_state == RANAL_CONN_CLOSING &&
1763                          conn->rac_close_sent)
1764                         kranal_terminate_conn_locked(conn);
1765
1766                 cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock,
1767                                             flags);
1768                 goto out;
1769         }
1770
1771         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1772                 goto out;
1773
1774         switch (msg->ram_type) {
1775         case RANAL_MSG_NOOP:
1776                 /* Nothing to do; just a keepalive */
1777                 CDEBUG(D_NET, "RX NOOP on %p\n", conn);
1778                 break;
1779
1780         case RANAL_MSG_IMMEDIATE:
1781                 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
1782                 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr, 
1783                                 msg->ram_srcnid, conn, 0);
1784                 repost = rc < 0;
1785                 break;
1786
1787         case RANAL_MSG_PUT_REQ:
1788                 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
1789                 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr, 
1790                                 msg->ram_srcnid, conn, 1);
1791                 repost = rc < 0;
1792                 break;
1793
1794         case RANAL_MSG_PUT_NAK:
1795                 CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
1796                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1797                                         msg->ram_u.completion.racm_cookie);
1798                 if (tx == NULL)
1799                         break;
1800
1801                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1802                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1803                 kranal_tx_done(tx, -ENOENT);    /* no match */
1804                 break;
1805
1806         case RANAL_MSG_PUT_ACK:
1807                 CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
1808                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1809                                         msg->ram_u.putack.rapam_src_cookie);
1810                 if (tx == NULL)
1811                         break;
1812
1813                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1814                             &msg->ram_u.putack.rapam_desc,
1815                             msg->ram_u.putack.rapam_desc.rard_nob,
1816                             msg->ram_u.putack.rapam_dst_cookie);
1817                 break;
1818
1819         case RANAL_MSG_PUT_DONE:
1820                 CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
1821                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1822                                         msg->ram_u.completion.racm_cookie);
1823                 if (tx == NULL)
1824                         break;
1825
1826                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1827                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1828                 kranal_tx_done(tx, 0);
1829                 break;
1830
1831         case RANAL_MSG_GET_REQ:
1832                 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
1833                 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr, 
1834                                 msg->ram_srcnid, conn, 1);
1835                 repost = rc < 0;
1836                 break;
1837
1838         case RANAL_MSG_GET_NAK:
1839                 CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
1840                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1841                                         msg->ram_u.completion.racm_cookie);
1842                 if (tx == NULL)
1843                         break;
1844
1845                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1846                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1847                 kranal_tx_done(tx, -ENOENT);    /* no match */
1848                 break;
1849
1850         case RANAL_MSG_GET_DONE:
1851                 CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
1852                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1853                                         msg->ram_u.completion.racm_cookie);
1854                 if (tx == NULL)
1855                         break;
1856
1857                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1858                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1859 #if 0
1860                 /* completion message should send rdma length if we ever allow
1861                  * GET truncation */
1862                 lnet_set_reply_msg_len(kranal_data.kra_ni, tx->tx_lntmsg[1], ???);
1863 #endif
1864                 kranal_tx_done(tx, 0);
1865                 break;
1866         }
1867
1868  out:
1869         if (rc < 0)                             /* protocol/comms error */
1870                 kranal_close_conn (conn, rc);
1871
1872         if (repost && conn->rac_rxmsg != NULL)
1873                 kranal_consume_rxmsg(conn, NULL, 0);
1874
1875         /* check again later */
1876         kranal_schedule_conn(conn);
1877 }
1878
1879 void
1880 kranal_complete_closed_conn (kra_conn_t *conn)
1881 {
1882         kra_tx_t   *tx;
1883         int         nfma;
1884         int         nreplies;
1885
1886         LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1887         LASSERT (cfs_list_empty(&conn->rac_list));
1888         LASSERT (cfs_list_empty(&conn->rac_hashlist));
1889
1890         for (nfma = 0; !cfs_list_empty(&conn->rac_fmaq); nfma++) {
1891                 tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1892
1893                 cfs_list_del(&tx->tx_list);
1894                 kranal_tx_done(tx, -ECONNABORTED);
1895         }
1896
1897         LASSERT (cfs_list_empty(&conn->rac_rdmaq));
1898
1899         for (nreplies = 0; !cfs_list_empty(&conn->rac_replyq); nreplies++) {
1900                 tx = cfs_list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1901
1902                 cfs_list_del(&tx->tx_list);
1903                 kranal_tx_done(tx, -ECONNABORTED);
1904         }
1905
1906         CWARN("Closed conn %p -> %s: nmsg %d nreplies %d\n",
1907                conn, libcfs_nid2str(conn->rac_peer->rap_nid), nfma, nreplies);
1908 }
1909
1910 int
1911 kranal_process_new_conn (kra_conn_t *conn)
1912 {
1913         RAP_RETURN   rrc;
1914
1915         rrc = RapkCompleteSync(conn->rac_rihandle, 1);
1916         if (rrc == RAP_SUCCESS)
1917                 return 0;
1918
1919         LASSERT (rrc == RAP_NOT_DONE);
1920         if (!cfs_time_aftereq(jiffies, conn->rac_last_tx +
1921                               conn->rac_timeout * CFS_HZ))
1922                 return -EAGAIN;
1923
1924         /* Too late */
1925         rrc = RapkCompleteSync(conn->rac_rihandle, 0);
1926         LASSERT (rrc == RAP_SUCCESS);
1927         return -ETIMEDOUT;
1928 }
1929
1930 int
1931 kranal_scheduler (void *arg)
1932 {
1933         kra_device_t     *dev = (kra_device_t *)arg;
1934         cfs_waitlink_t    wait;
1935         char              name[16];
1936         kra_conn_t       *conn;
1937         unsigned long     flags;
1938         unsigned long     deadline;
1939         unsigned long     soonest;
1940         int               nsoonest;
1941         long              timeout;
1942         cfs_list_t       *tmp;
1943         cfs_list_t       *nxt;
1944         int               rc;
1945         int               dropped_lock;
1946         int               busy_loops = 0;
1947
1948         snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1949         cfs_daemonize(name);
1950         cfs_block_allsigs();
1951
1952         dev->rad_scheduler = current;
1953         cfs_waitlink_init(&wait);
1954
1955         cfs_spin_lock_irqsave(&dev->rad_lock, flags);
1956
1957         while (!kranal_data.kra_shutdown) {
1958                 /* Safe: kra_shutdown only set when quiescent */
1959
1960                 if (busy_loops++ >= RANAL_RESCHED) {
1961                         cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
1962
1963                         cfs_cond_resched();
1964                         busy_loops = 0;
1965
1966                         cfs_spin_lock_irqsave(&dev->rad_lock, flags);
1967                 }
1968
1969                 dropped_lock = 0;
1970
1971                 if (dev->rad_ready) {
1972                         /* Device callback fired since I last checked it */
1973                         dev->rad_ready = 0;
1974                         cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
1975                         dropped_lock = 1;
1976
1977                         kranal_check_rdma_cq(dev);
1978                         kranal_check_fma_cq(dev);
1979
1980                         cfs_spin_lock_irqsave(&dev->rad_lock, flags);
1981                 }
1982
1983                 cfs_list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
1984                         conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
1985
1986                         cfs_list_del_init(&conn->rac_schedlist);
1987                         LASSERT (conn->rac_scheduled);
1988                         conn->rac_scheduled = 0;
1989                         cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
1990                         dropped_lock = 1;
1991
1992                         kranal_check_fma_rx(conn);
1993                         kranal_process_fmaq(conn);
1994
1995                         if (conn->rac_state == RANAL_CONN_CLOSED)
1996                                 kranal_complete_closed_conn(conn);
1997
1998                         kranal_conn_decref(conn);
1999                         cfs_spin_lock_irqsave(&dev->rad_lock, flags);
2000                 }
2001
2002                 nsoonest = 0;
2003                 soonest = jiffies;
2004
2005                 cfs_list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
2006                         conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
2007
2008                         deadline = conn->rac_last_tx + conn->rac_keepalive;
2009                         if (cfs_time_aftereq(jiffies, deadline)) {
2010                                 /* Time to process this new conn */
2011                                 cfs_spin_unlock_irqrestore(&dev->rad_lock,
2012                                                            flags);
2013                                 dropped_lock = 1;
2014
2015                                 rc = kranal_process_new_conn(conn);
2016                                 if (rc != -EAGAIN) {
2017                                         /* All done with this conn */
2018                                         cfs_spin_lock_irqsave(&dev->rad_lock,
2019                                                               flags);
2020                                         cfs_list_del_init(&conn->rac_schedlist);
2021                                         cfs_spin_unlock_irqrestore(&dev-> \
2022                                                                    rad_lock,
2023                                                                    flags);
2024
2025                                         kranal_conn_decref(conn);
2026                                         cfs_spin_lock_irqsave(&dev->rad_lock,
2027                                                               flags);
2028                                         continue;
2029                                 }
2030
2031                                 /* retry with exponential backoff until HZ */
2032                                 if (conn->rac_keepalive == 0)
2033                                         conn->rac_keepalive = 1;
2034                                 else if (conn->rac_keepalive <= CFS_HZ)
2035                                         conn->rac_keepalive *= 2;
2036                                 else
2037                                         conn->rac_keepalive += CFS_HZ;
2038                                 
2039                                 deadline = conn->rac_last_tx + conn->rac_keepalive;
2040                                 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
2041                         }
2042
2043                         /* Does this conn need attention soonest? */
2044                         if (nsoonest++ == 0 ||
2045                             !cfs_time_aftereq(deadline, soonest))
2046                                 soonest = deadline;
2047                 }
2048
2049                 if (dropped_lock)               /* may sleep iff I didn't drop the lock */
2050                         continue;
2051
2052                 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
2053                 cfs_waitq_add_exclusive(&dev->rad_waitq, &wait);
2054                 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
2055
2056                 if (nsoonest == 0) {
2057                         busy_loops = 0;
2058                         cfs_waitq_wait(&wait, CFS_TASK_INTERRUPTIBLE);
2059                 } else {
2060                         timeout = (long)(soonest - jiffies);
2061                         if (timeout > 0) {
2062                                 busy_loops = 0;
2063                                 cfs_waitq_timedwait(&wait,
2064                                                     CFS_TASK_INTERRUPTIBLE,
2065                                                     timeout);
2066                         }
2067                 }
2068
2069                 cfs_waitq_del(&dev->rad_waitq, &wait);
2070                 cfs_set_current_state(CFS_TASK_RUNNING);
2071                 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
2072         }
2073
2074         cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
2075
2076         dev->rad_scheduler = NULL;
2077         kranal_thread_fini();
2078         return 0;
2079 }