Whamcloud - gitweb
LU-1346 libcfs: replace libcfs wrappers with kernel API
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd_cb.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/klnds/ralnd/ralnd_cb.c
35  *
36  * Author: Eric Barton <eric@bartonsoftware.com>
37  */
38
39 #include "ralnd.h"
40
41 void
42 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
43 {
44         kra_device_t *dev;
45         int           i;
46         unsigned long flags;
47
48         CDEBUG(D_NET, "callback for device %d\n", devid);
49
50         for (i = 0; i < kranal_data.kra_ndevs; i++) {
51
52                 dev = &kranal_data.kra_devices[i];
53                 if (dev->rad_id != devid)
54                         continue;
55
56                 spin_lock_irqsave(&dev->rad_lock, flags);
57
58                 if (!dev->rad_ready) {
59                         dev->rad_ready = 1;
60                         cfs_waitq_signal(&dev->rad_waitq);
61                 }
62
63                 spin_unlock_irqrestore(&dev->rad_lock, flags);
64                 return;
65         }
66
67         CWARN("callback for unknown device %d\n", devid);
68 }
69
70 void
71 kranal_schedule_conn(kra_conn_t *conn)
72 {
73         kra_device_t    *dev = conn->rac_device;
74         unsigned long    flags;
75
76         spin_lock_irqsave(&dev->rad_lock, flags);
77
78         if (!conn->rac_scheduled) {
79                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
80                 conn->rac_scheduled = 1;
81                 cfs_list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
82                 cfs_waitq_signal(&dev->rad_waitq);
83         }
84
85         spin_unlock_irqrestore(&dev->rad_lock, flags);
86 }
87
88 kra_tx_t *
89 kranal_get_idle_tx (void)
90 {
91         unsigned long  flags;
92         kra_tx_t      *tx;
93
94         spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
95
96         if (cfs_list_empty(&kranal_data.kra_idle_txs)) {
97                 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
98                 return NULL;
99         }
100
101         tx = cfs_list_entry(kranal_data.kra_idle_txs.next, kra_tx_t, tx_list);
102         cfs_list_del(&tx->tx_list);
103
104         /* Allocate a new completion cookie.  It might not be needed, but we've
105          * got a lock right now... */
106         tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
107
108         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
109
110         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
111         LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
112         LASSERT (tx->tx_conn == NULL);
113         LASSERT (tx->tx_lntmsg[0] == NULL);
114         LASSERT (tx->tx_lntmsg[1] == NULL);
115
116         return tx;
117 }
118
119 void
120 kranal_init_msg(kra_msg_t *msg, int type)
121 {
122         msg->ram_magic = RANAL_MSG_MAGIC;
123         msg->ram_version = RANAL_MSG_VERSION;
124         msg->ram_type = type;
125         msg->ram_srcnid = kranal_data.kra_ni->ni_nid;
126         /* ram_connstamp gets set when FMA is sent */
127 }
128
129 kra_tx_t *
130 kranal_new_tx_msg (int type)
131 {
132         kra_tx_t *tx = kranal_get_idle_tx();
133
134         if (tx != NULL)
135                 kranal_init_msg(&tx->tx_msg, type);
136
137         return tx;
138 }
139
140 int
141 kranal_setup_immediate_buffer (kra_tx_t *tx, 
142                                unsigned int niov, struct iovec *iov,
143                                int offset, int nob)
144
145 {
146         /* For now this is almost identical to kranal_setup_virt_buffer, but we
147          * could "flatten" the payload into a single contiguous buffer ready
148          * for sending direct over an FMA if we ever needed to. */
149
150         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
151         LASSERT (nob >= 0);
152
153         if (nob == 0) {
154                 tx->tx_buffer = NULL;
155         } else {
156                 LASSERT (niov > 0);
157
158                 while (offset >= iov->iov_len) {
159                         offset -= iov->iov_len;
160                         niov--;
161                         iov++;
162                         LASSERT (niov > 0);
163                 }
164
165                 if (nob > iov->iov_len - offset) {
166                         CERROR("Can't handle multiple vaddr fragments\n");
167                         return -EMSGSIZE;
168                 }
169
170                 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
171         }
172
173         tx->tx_buftype = RANAL_BUF_IMMEDIATE;
174         tx->tx_nob = nob;
175         return 0;
176 }
177
178 int
179 kranal_setup_virt_buffer (kra_tx_t *tx, 
180                           unsigned int niov, struct iovec *iov,
181                           int offset, int nob)
182
183 {
184         LASSERT (nob > 0);
185         LASSERT (niov > 0);
186         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
187
188         while (offset >= iov->iov_len) {
189                 offset -= iov->iov_len;
190                 niov--;
191                 iov++;
192                 LASSERT (niov > 0);
193         }
194
195         if (nob > iov->iov_len - offset) {
196                 CERROR("Can't handle multiple vaddr fragments\n");
197                 return -EMSGSIZE;
198         }
199
200         tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
201         tx->tx_nob = nob;
202         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
203         return 0;
204 }
205
206 int
207 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
208                           int offset, int nob)
209 {
210         RAP_PHYS_REGION *phys = tx->tx_phys;
211         int              resid;
212
213         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
214
215         LASSERT (nob > 0);
216         LASSERT (nkiov > 0);
217         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
218
219         while (offset >= kiov->kiov_len) {
220                 offset -= kiov->kiov_len;
221                 nkiov--;
222                 kiov++;
223                 LASSERT (nkiov > 0);
224         }
225
226         tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
227         tx->tx_nob = nob;
228         tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
229
230         phys->Address = lnet_page2phys(kiov->kiov_page);
231         phys++;
232
233         resid = nob - (kiov->kiov_len - offset);
234         while (resid > 0) {
235                 kiov++;
236                 nkiov--;
237                 LASSERT (nkiov > 0);
238
239                 if (kiov->kiov_offset != 0 ||
240                     ((resid > PAGE_SIZE) &&
241                      kiov->kiov_len < PAGE_SIZE)) {
242                         /* Can't have gaps */
243                         CERROR("Can't make payload contiguous in I/O VM:"
244                                "page %d, offset %d, len %d \n",
245                                (int)(phys - tx->tx_phys),
246                                kiov->kiov_offset, kiov->kiov_len);
247                         return -EINVAL;
248                 }
249
250                 if ((phys - tx->tx_phys) == LNET_MAX_IOV) {
251                         CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
252                         return -EMSGSIZE;
253                 }
254
255                 phys->Address = lnet_page2phys(kiov->kiov_page);
256                 phys++;
257
258                 resid -= PAGE_SIZE;
259         }
260
261         tx->tx_phys_npages = phys - tx->tx_phys;
262         return 0;
263 }
264
265 static inline int
266 kranal_setup_rdma_buffer (kra_tx_t *tx, unsigned int niov,
267                           struct iovec *iov, lnet_kiov_t *kiov,
268                           int offset, int nob)
269 {
270         LASSERT ((iov == NULL) != (kiov == NULL));
271
272         if (kiov != NULL)
273                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
274
275         return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
276 }
277
278 int
279 kranal_map_buffer (kra_tx_t *tx)
280 {
281         kra_conn_t     *conn = tx->tx_conn;
282         kra_device_t   *dev = conn->rac_device;
283         RAP_RETURN      rrc;
284
285         LASSERT (current == dev->rad_scheduler);
286
287         switch (tx->tx_buftype) {
288         default:
289                 LBUG();
290
291         case RANAL_BUF_NONE:
292         case RANAL_BUF_IMMEDIATE:
293         case RANAL_BUF_PHYS_MAPPED:
294         case RANAL_BUF_VIRT_MAPPED:
295                 return 0;
296
297         case RANAL_BUF_PHYS_UNMAPPED:
298                 rrc = RapkRegisterPhys(dev->rad_handle,
299                                        tx->tx_phys, tx->tx_phys_npages,
300                                        &tx->tx_map_key);
301                 if (rrc != RAP_SUCCESS) {
302                         CERROR ("Can't map %d pages: dev %d "
303                                 "phys %u pp %u, virt %u nob %lu\n",
304                                 tx->tx_phys_npages, dev->rad_id, 
305                                 dev->rad_nphysmap, dev->rad_nppphysmap,
306                                 dev->rad_nvirtmap, dev->rad_nobvirtmap);
307                         return -ENOMEM; /* assume insufficient resources */
308                 }
309
310                 dev->rad_nphysmap++;
311                 dev->rad_nppphysmap += tx->tx_phys_npages;
312
313                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
314                 return 0;
315
316         case RANAL_BUF_VIRT_UNMAPPED:
317                 rrc = RapkRegisterMemory(dev->rad_handle,
318                                          tx->tx_buffer, tx->tx_nob,
319                                          &tx->tx_map_key);
320                 if (rrc != RAP_SUCCESS) {
321                         CERROR ("Can't map %d bytes: dev %d "
322                                 "phys %u pp %u, virt %u nob %lu\n",
323                                 tx->tx_nob, dev->rad_id, 
324                                 dev->rad_nphysmap, dev->rad_nppphysmap,
325                                 dev->rad_nvirtmap, dev->rad_nobvirtmap);
326                         return -ENOMEM; /* assume insufficient resources */
327                 }
328
329                 dev->rad_nvirtmap++;
330                 dev->rad_nobvirtmap += tx->tx_nob;
331
332                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
333                 return 0;
334         }
335 }
336
337 void
338 kranal_unmap_buffer (kra_tx_t *tx)
339 {
340         kra_device_t   *dev;
341         RAP_RETURN      rrc;
342
343         switch (tx->tx_buftype) {
344         default:
345                 LBUG();
346
347         case RANAL_BUF_NONE:
348         case RANAL_BUF_IMMEDIATE:
349         case RANAL_BUF_PHYS_UNMAPPED:
350         case RANAL_BUF_VIRT_UNMAPPED:
351                 break;
352
353         case RANAL_BUF_PHYS_MAPPED:
354                 LASSERT (tx->tx_conn != NULL);
355                 dev = tx->tx_conn->rac_device;
356                 LASSERT (current == dev->rad_scheduler);
357                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
358                                            &tx->tx_map_key);
359                 LASSERT (rrc == RAP_SUCCESS);
360
361                 dev->rad_nphysmap--;
362                 dev->rad_nppphysmap -= tx->tx_phys_npages;
363
364                 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
365                 break;
366
367         case RANAL_BUF_VIRT_MAPPED:
368                 LASSERT (tx->tx_conn != NULL);
369                 dev = tx->tx_conn->rac_device;
370                 LASSERT (current == dev->rad_scheduler);
371                 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
372                                            &tx->tx_map_key);
373                 LASSERT (rrc == RAP_SUCCESS);
374
375                 dev->rad_nvirtmap--;
376                 dev->rad_nobvirtmap -= tx->tx_nob;
377
378                 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
379                 break;
380         }
381 }
382
383 void
384 kranal_tx_done (kra_tx_t *tx, int completion)
385 {
386         lnet_msg_t      *lnetmsg[2];
387         unsigned long    flags;
388         int              i;
389
390         LASSERT (!cfs_in_interrupt());
391
392         kranal_unmap_buffer(tx);
393
394         lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
395         lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
396
397         tx->tx_buftype = RANAL_BUF_NONE;
398         tx->tx_msg.ram_type = RANAL_MSG_NONE;
399         tx->tx_conn = NULL;
400
401         spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
402
403         cfs_list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
404
405         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
406
407         /* finalize AFTER freeing lnet msgs */
408         for (i = 0; i < 2; i++) {
409                 if (lnetmsg[i] == NULL)
410                         continue;
411
412                 lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion);
413         }
414 }
415
416 kra_conn_t *
417 kranal_find_conn_locked (kra_peer_t *peer)
418 {
419         cfs_list_t *tmp;
420
421         /* just return the first connection */
422         cfs_list_for_each (tmp, &peer->rap_conns) {
423                 return cfs_list_entry(tmp, kra_conn_t, rac_list);
424         }
425
426         return NULL;
427 }
428
429 void
430 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
431 {
432         unsigned long    flags;
433
434         tx->tx_conn = conn;
435
436         spin_lock_irqsave(&conn->rac_lock, flags);
437         cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
438         tx->tx_qtime = jiffies;
439         spin_unlock_irqrestore(&conn->rac_lock, flags);
440
441         kranal_schedule_conn(conn);
442 }
443
444 void
445 kranal_launch_tx (kra_tx_t *tx, lnet_nid_t nid)
446 {
447         unsigned long    flags;
448         kra_peer_t      *peer;
449         kra_conn_t      *conn;
450         int              rc;
451         int              retry;
452         rwlock_t    *g_lock = &kranal_data.kra_global_lock;
453
454         /* If I get here, I've committed to send, so I complete the tx with
455          * failure on any problems */
456
457         LASSERT (tx->tx_conn == NULL);      /* only set when assigned a conn */
458
459         for (retry = 0; ; retry = 1) {
460
461                 read_lock(g_lock);
462
463                 peer = kranal_find_peer_locked(nid);
464                 if (peer != NULL) {
465                         conn = kranal_find_conn_locked(peer);
466                         if (conn != NULL) {
467                                 kranal_post_fma(conn, tx);
468                                 read_unlock(g_lock);
469                                 return;
470                         }
471                 }
472                 
473                 /* Making connections; I'll need a write lock... */
474                 read_unlock(g_lock);
475                 write_lock_irqsave(g_lock, flags);
476
477                 peer = kranal_find_peer_locked(nid);
478                 if (peer != NULL)
479                         break;
480                 
481                 write_unlock_irqrestore(g_lock, flags);
482                 
483                 if (retry) {
484                         CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
485                         kranal_tx_done(tx, -EHOSTUNREACH);
486                         return;
487                 }
488
489                 rc = kranal_add_persistent_peer(nid, LNET_NIDADDR(nid),
490                                                 lnet_acceptor_port());
491                 if (rc != 0) {
492                         CERROR("Can't add peer %s: %d\n",
493                                libcfs_nid2str(nid), rc);
494                         kranal_tx_done(tx, rc);
495                         return;
496                 }
497         }
498         
499         conn = kranal_find_conn_locked(peer);
500         if (conn != NULL) {
501                 /* Connection exists; queue message on it */
502                 kranal_post_fma(conn, tx);
503                 write_unlock_irqrestore(g_lock, flags);
504                 return;
505         }
506                         
507         LASSERT (peer->rap_persistence > 0);
508
509         if (!peer->rap_connecting) {
510                 LASSERT (cfs_list_empty(&peer->rap_tx_queue));
511
512                 if (!(peer->rap_reconnect_interval == 0 || /* first attempt */
513                       cfs_time_aftereq(jiffies, peer->rap_reconnect_time))) {
514                         write_unlock_irqrestore(g_lock, flags);
515                         kranal_tx_done(tx, -EHOSTUNREACH);
516                         return;
517                 }
518
519                 peer->rap_connecting = 1;
520                 kranal_peer_addref(peer); /* extra ref for connd */
521
522                 spin_lock(&kranal_data.kra_connd_lock);
523
524                 cfs_list_add_tail(&peer->rap_connd_list,
525                               &kranal_data.kra_connd_peers);
526                 cfs_waitq_signal(&kranal_data.kra_connd_waitq);
527
528                 spin_unlock(&kranal_data.kra_connd_lock);
529         }
530
531         /* A connection is being established; queue the message... */
532         cfs_list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
533
534         write_unlock_irqrestore(g_lock, flags);
535 }
536
537 void
538 kranal_rdma(kra_tx_t *tx, int type,
539             kra_rdma_desc_t *sink, int nob, __u64 cookie)
540 {
541         kra_conn_t   *conn = tx->tx_conn;
542         RAP_RETURN    rrc;
543         unsigned long flags;
544
545         LASSERT (kranal_tx_mapped(tx));
546         LASSERT (nob <= sink->rard_nob);
547         LASSERT (nob <= tx->tx_nob);
548
549         /* No actual race with scheduler sending CLOSE (I'm she!) */
550         LASSERT (current == conn->rac_device->rad_scheduler);
551
552         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
553         tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
554         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
555         tx->tx_rdma_desc.DstPtr = sink->rard_addr;
556         tx->tx_rdma_desc.DstKey = sink->rard_key;
557         tx->tx_rdma_desc.Length = nob;
558         tx->tx_rdma_desc.AppPtr = tx;
559
560         /* prep final completion message */
561         kranal_init_msg(&tx->tx_msg, type);
562         tx->tx_msg.ram_u.completion.racm_cookie = cookie;
563
564         if (nob == 0) { /* Immediate completion */
565                 kranal_post_fma(conn, tx);
566                 return;
567         }
568
569         LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
570
571         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
572         LASSERT (rrc == RAP_SUCCESS);
573
574         spin_lock_irqsave(&conn->rac_lock, flags);
575         cfs_list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
576         tx->tx_qtime = jiffies;
577         spin_unlock_irqrestore(&conn->rac_lock, flags);
578 }
579
580 int
581 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
582 {
583         __u32      nob_received = nob;
584         RAP_RETURN rrc;
585
586         LASSERT (conn->rac_rxmsg != NULL);
587         CDEBUG(D_NET, "Consuming %p\n", conn);
588
589         rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
590                              &nob_received, sizeof(kra_msg_t));
591         LASSERT (rrc == RAP_SUCCESS);
592
593         conn->rac_rxmsg = NULL;
594
595         if (nob_received < nob) {
596                 CWARN("Incomplete immediate msg from %s: expected %d, got %d\n",
597                       libcfs_nid2str(conn->rac_peer->rap_nid), 
598                       nob, nob_received);
599                 return -EPROTO;
600         }
601
602         return 0;
603 }
604
605 int
606 kranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
607 {
608         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
609         int               type = lntmsg->msg_type;
610         lnet_process_id_t target = lntmsg->msg_target;
611         int               target_is_router = lntmsg->msg_target_is_router;
612         int               routing = lntmsg->msg_routing;
613         unsigned int      niov = lntmsg->msg_niov;
614         struct iovec     *iov = lntmsg->msg_iov;
615         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
616         unsigned int      offset = lntmsg->msg_offset;
617         unsigned int      nob = lntmsg->msg_len;
618         kra_tx_t         *tx;
619         int               rc;
620
621         /* NB 'private' is different depending on what we're sending.... */
622
623         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
624                nob, niov, libcfs_id2str(target));
625
626         LASSERT (nob == 0 || niov > 0);
627         LASSERT (niov <= LNET_MAX_IOV);
628
629         LASSERT (!cfs_in_interrupt());
630         /* payload is either all vaddrs or all pages */
631         LASSERT (!(kiov != NULL && iov != NULL));
632
633         if (routing) {
634                 CERROR ("Can't route\n");
635                 return -EIO;
636         }
637
638         switch(type) {
639         default:
640                 LBUG();
641
642         case LNET_MSG_ACK:
643                 LASSERT (nob == 0);
644                 break;
645
646         case LNET_MSG_GET:
647                 LASSERT (niov == 0);
648                 LASSERT (nob == 0);
649                 /* We have to consider the eventual sink buffer rather than any
650                  * payload passed here (there isn't any, and strictly, looking
651                  * inside lntmsg is a layering violation).  We send a simple
652                  * IMMEDIATE GET if the sink buffer is mapped already and small
653                  * enough for FMA */
654
655                 if (routing || target_is_router)
656                         break;                  /* send IMMEDIATE */
657
658                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0 &&
659                     lntmsg->msg_md->md_length <= RANAL_FMA_MAX_DATA &&
660                     lntmsg->msg_md->md_length <= *kranal_tunables.kra_max_immediate)
661                         break;                  /* send IMMEDIATE */
662
663                 tx = kranal_new_tx_msg(RANAL_MSG_GET_REQ);
664                 if (tx == NULL)
665                         return -ENOMEM;
666
667                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
668                         rc = kranal_setup_virt_buffer(tx, lntmsg->msg_md->md_niov,
669                                                       lntmsg->msg_md->md_iov.iov,
670                                                       0, lntmsg->msg_md->md_length);
671                 else
672                         rc = kranal_setup_phys_buffer(tx, lntmsg->msg_md->md_niov,
673                                                       lntmsg->msg_md->md_iov.kiov,
674                                                       0, lntmsg->msg_md->md_length);
675                 if (rc != 0) {
676                         kranal_tx_done(tx, rc);
677                         return -EIO;
678                 }
679
680                 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
681                 if (tx->tx_lntmsg[1] == NULL) {
682                         CERROR("Can't create reply for GET to %s\n", 
683                                libcfs_nid2str(target.nid));
684                         kranal_tx_done(tx, rc);
685                         return -EIO;
686                 }
687
688                 tx->tx_lntmsg[0] = lntmsg;
689                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
690                 /* rest of tx_msg is setup just before it is sent */
691                 kranal_launch_tx(tx, target.nid);
692                 return 0;
693
694         case LNET_MSG_REPLY:
695         case LNET_MSG_PUT:
696                 if (kiov == NULL &&             /* not paged */
697                     nob <= RANAL_FMA_MAX_DATA && /* small enough */
698                     nob <= *kranal_tunables.kra_max_immediate)
699                         break;                  /* send IMMEDIATE */
700
701                 tx = kranal_new_tx_msg(RANAL_MSG_PUT_REQ);
702                 if (tx == NULL)
703                         return -ENOMEM;
704
705                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
706                 if (rc != 0) {
707                         kranal_tx_done(tx, rc);
708                         return -EIO;
709                 }
710
711                 tx->tx_lntmsg[0] = lntmsg;
712                 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
713                 /* rest of tx_msg is setup just before it is sent */
714                 kranal_launch_tx(tx, target.nid);
715                 return 0;
716         }
717
718         /* send IMMEDIATE */
719
720         LASSERT (kiov == NULL);
721         LASSERT (nob <= RANAL_FMA_MAX_DATA);
722
723         tx = kranal_new_tx_msg(RANAL_MSG_IMMEDIATE);
724         if (tx == NULL)
725                 return -ENOMEM;
726
727         rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
728         if (rc != 0) {
729                 kranal_tx_done(tx, rc);
730                 return -EIO;
731         }
732
733         tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
734         tx->tx_lntmsg[0] = lntmsg;
735         kranal_launch_tx(tx, target.nid);
736         return 0;
737 }
738
739 void
740 kranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg)
741 {
742         kra_msg_t     *rxmsg = conn->rac_rxmsg;
743         unsigned int   niov = lntmsg->msg_niov;
744         struct iovec  *iov = lntmsg->msg_iov;
745         lnet_kiov_t   *kiov = lntmsg->msg_kiov;
746         unsigned int   offset = lntmsg->msg_offset;
747         unsigned int   nob = lntmsg->msg_len;
748         kra_tx_t      *tx;
749         int            rc;
750
751         tx = kranal_get_idle_tx();
752         if (tx == NULL)
753                 goto failed_0;
754
755         rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
756         if (rc != 0)
757                 goto failed_1;
758
759         tx->tx_conn = conn;
760
761         rc = kranal_map_buffer(tx);
762         if (rc != 0)
763                 goto failed_1;
764
765         tx->tx_lntmsg[0] = lntmsg;
766
767         kranal_rdma(tx, RANAL_MSG_GET_DONE,
768                     &rxmsg->ram_u.get.ragm_desc, nob,
769                     rxmsg->ram_u.get.ragm_cookie);
770         return;
771
772  failed_1:
773         kranal_tx_done(tx, -EIO);
774  failed_0:
775         lnet_finalize(ni, lntmsg, -EIO);
776 }
777
778 int
779 kranal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
780                    void **new_private)
781 {
782         kra_conn_t *conn = (kra_conn_t *)private;
783
784         LCONSOLE_ERROR_MSG(0x12b, "Dropping message from %s: no buffers free.\n",
785                            libcfs_nid2str(conn->rac_peer->rap_nid));
786
787         return -EDEADLK;
788 }
789
790 int
791 kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
792              int delayed, unsigned int niov, 
793              struct iovec *iov, lnet_kiov_t *kiov,
794              unsigned int offset, unsigned int mlen, unsigned int rlen)
795 {
796         kra_conn_t  *conn = private;
797         kra_msg_t   *rxmsg = conn->rac_rxmsg;
798         kra_tx_t    *tx;
799         void        *buffer;
800         int          rc;
801
802         LASSERT (mlen <= rlen);
803         LASSERT (!cfs_in_interrupt());
804         /* Either all pages or all vaddrs */
805         LASSERT (!(kiov != NULL && iov != NULL));
806
807         CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg);
808
809         switch(rxmsg->ram_type) {
810         default:
811                 LBUG();
812
813         case RANAL_MSG_IMMEDIATE:
814                 if (mlen == 0) {
815                         buffer = NULL;
816                 } else if (kiov != NULL) {
817                         CERROR("Can't recv immediate into paged buffer\n");
818                         return -EIO;
819                 } else {
820                         LASSERT (niov > 0);
821                         while (offset >= iov->iov_len) {
822                                 offset -= iov->iov_len;
823                                 iov++;
824                                 niov--;
825                                 LASSERT (niov > 0);
826                         }
827                         if (mlen > iov->iov_len - offset) {
828                                 CERROR("Can't handle immediate frags\n");
829                                 return -EIO;
830                         }
831                         buffer = ((char *)iov->iov_base) + offset;
832                 }
833                 rc = kranal_consume_rxmsg(conn, buffer, mlen);
834                 lnet_finalize(ni, lntmsg, (rc == 0) ? 0 : -EIO);
835                 return 0;
836
837         case RANAL_MSG_PUT_REQ:
838                 tx = kranal_new_tx_msg(RANAL_MSG_PUT_ACK);
839                 if (tx == NULL) {
840                         kranal_consume_rxmsg(conn, NULL, 0);
841                         return -ENOMEM;
842                 }
843                 
844                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
845                 if (rc != 0) {
846                         kranal_tx_done(tx, rc);
847                         kranal_consume_rxmsg(conn, NULL, 0);
848                         return -EIO;
849                 }
850
851                 tx->tx_conn = conn;
852                 rc = kranal_map_buffer(tx);
853                 if (rc != 0) {
854                         kranal_tx_done(tx, rc);
855                         kranal_consume_rxmsg(conn, NULL, 0);
856                         return -EIO;
857                 }
858
859                 tx->tx_msg.ram_u.putack.rapam_src_cookie =
860                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
861                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
862                 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
863                 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
864                         (__u64)((unsigned long)tx->tx_buffer);
865                 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
866
867                 tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
868
869                 kranal_post_fma(conn, tx);
870                 kranal_consume_rxmsg(conn, NULL, 0);
871                 return 0;
872
873         case RANAL_MSG_GET_REQ:
874                 if (lntmsg != NULL) {
875                         /* Matched! */
876                         kranal_reply(ni, conn, lntmsg);
877                 } else {
878                         /* No match */
879                         tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);
880                         if (tx != NULL) {
881                                 tx->tx_msg.ram_u.completion.racm_cookie =
882                                         rxmsg->ram_u.get.ragm_cookie;
883                                 kranal_post_fma(conn, tx);
884                         }
885                 }
886                 kranal_consume_rxmsg(conn, NULL, 0);
887                 return 0;
888         }
889 }
890
891 int
892 kranal_thread_start (int(*fn)(void *arg), void *arg)
893 {
894         long    pid = cfs_create_thread(fn, arg, 0);
895
896         if (pid < 0)
897                 return(int)pid;
898
899         cfs_atomic_inc(&kranal_data.kra_nthreads);
900         return 0;
901 }
902
903 void
904 kranal_thread_fini (void)
905 {
906         cfs_atomic_dec(&kranal_data.kra_nthreads);
907 }
908
909 int
910 kranal_check_conn_timeouts (kra_conn_t *conn)
911 {
912         kra_tx_t          *tx;
913         cfs_list_t        *ttmp;
914         unsigned long      flags;
915         long               timeout;
916         unsigned long      now = jiffies;
917
918         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
919                  conn->rac_state == RANAL_CONN_CLOSING);
920
921         if (!conn->rac_close_sent &&
922             cfs_time_aftereq(now, conn->rac_last_tx + conn->rac_keepalive *
923                              CFS_HZ)) {
924                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
925                 CDEBUG(D_NET, "Scheduling keepalive %p->%s\n",
926                        conn, libcfs_nid2str(conn->rac_peer->rap_nid));
927                 kranal_schedule_conn(conn);
928         }
929
930         timeout = conn->rac_timeout * CFS_HZ;
931
932         if (!conn->rac_close_recvd &&
933             cfs_time_aftereq(now, conn->rac_last_rx + timeout)) {
934                 CERROR("%s received from %s within %lu seconds\n",
935                        (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
936                        "Nothing" : "CLOSE not",
937                        libcfs_nid2str(conn->rac_peer->rap_nid), 
938                        (now - conn->rac_last_rx)/CFS_HZ);
939                 return -ETIMEDOUT;
940         }
941
942         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
943                 return 0;
944
945         /* Check the conn's queues are moving.  These are "belt+braces" checks,
946          * in case of hardware/software errors that make this conn seem
947          * responsive even though it isn't progressing its message queues. */
948
949         spin_lock_irqsave(&conn->rac_lock, flags);
950
951         cfs_list_for_each (ttmp, &conn->rac_fmaq) {
952                 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
953
954                 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
955                         spin_unlock_irqrestore(&conn->rac_lock, flags);
956                         CERROR("tx on fmaq for %s blocked %lu seconds\n",
957                                libcfs_nid2str(conn->rac_peer->rap_nid),
958                                (now - tx->tx_qtime)/CFS_HZ);
959                         return -ETIMEDOUT;
960                 }
961         }
962
963         cfs_list_for_each (ttmp, &conn->rac_rdmaq) {
964                 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
965
966                 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
967                         spin_unlock_irqrestore(&conn->rac_lock, flags);
968                         CERROR("tx on rdmaq for %s blocked %lu seconds\n",
969                                libcfs_nid2str(conn->rac_peer->rap_nid), 
970                                (now - tx->tx_qtime)/CFS_HZ);
971                         return -ETIMEDOUT;
972                 }
973         }
974
975         cfs_list_for_each (ttmp, &conn->rac_replyq) {
976                 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
977
978                 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
979                         spin_unlock_irqrestore(&conn->rac_lock, flags);
980                         CERROR("tx on replyq for %s blocked %lu seconds\n",
981                                libcfs_nid2str(conn->rac_peer->rap_nid),
982                                (now - tx->tx_qtime)/CFS_HZ);
983                         return -ETIMEDOUT;
984                 }
985         }
986
987         spin_unlock_irqrestore(&conn->rac_lock, flags);
988         return 0;
989 }
990
991 void
992 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
993 {
994         cfs_list_t        *conns = &kranal_data.kra_conns[idx];
995         cfs_list_t        *ctmp;
996         kra_conn_t        *conn;
997         unsigned long      flags;
998         int                rc;
999
1000  again:
1001         /* NB. We expect to check all the conns and not find any problems, so
1002          * we just use a shared lock while we take a look... */
1003         read_lock(&kranal_data.kra_global_lock);
1004
1005         cfs_list_for_each (ctmp, conns) {
1006                 conn = cfs_list_entry(ctmp, kra_conn_t, rac_hashlist);
1007
1008                 if (conn->rac_timeout < *min_timeoutp )
1009                         *min_timeoutp = conn->rac_timeout;
1010                 if (conn->rac_keepalive < *min_timeoutp )
1011                         *min_timeoutp = conn->rac_keepalive;
1012
1013                 rc = kranal_check_conn_timeouts(conn);
1014                 if (rc == 0)
1015                         continue;
1016
1017                 kranal_conn_addref(conn);
1018                 read_unlock(&kranal_data.kra_global_lock);
1019
1020                 CERROR("Conn to %s, cqid %d timed out\n",
1021                        libcfs_nid2str(conn->rac_peer->rap_nid), 
1022                        conn->rac_cqid);
1023
1024                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1025
1026                 switch (conn->rac_state) {
1027                 default:
1028                         LBUG();
1029
1030                 case RANAL_CONN_ESTABLISHED:
1031                         kranal_close_conn_locked(conn, -ETIMEDOUT);
1032                         break;
1033
1034                 case RANAL_CONN_CLOSING:
1035                         kranal_terminate_conn_locked(conn);
1036                         break;
1037                 }
1038
1039                 write_unlock_irqrestore(&kranal_data.kra_global_lock,
1040                                             flags);
1041
1042                 kranal_conn_decref(conn);
1043
1044                 /* start again now I've dropped the lock */
1045                 goto again;
1046         }
1047
1048         read_unlock(&kranal_data.kra_global_lock);
1049 }
1050
1051 int
1052 kranal_connd (void *arg)
1053 {
1054         long               id = (long)arg;
1055         char               name[16];
1056         cfs_waitlink_t     wait;
1057         unsigned long      flags;
1058         kra_peer_t        *peer;
1059         kra_acceptsock_t  *ras;
1060         int                did_something;
1061
1062         snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
1063         cfs_daemonize(name);
1064         cfs_block_allsigs();
1065
1066         cfs_waitlink_init(&wait);
1067
1068         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1069
1070         while (!kranal_data.kra_shutdown) {
1071                 did_something = 0;
1072
1073                 if (!cfs_list_empty(&kranal_data.kra_connd_acceptq)) {
1074                         ras = cfs_list_entry(kranal_data.kra_connd_acceptq.next,
1075                                              kra_acceptsock_t, ras_list);
1076                         cfs_list_del(&ras->ras_list);
1077
1078                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
1079                                                    flags);
1080
1081                         CDEBUG(D_NET,"About to handshake someone\n");
1082
1083                         kranal_conn_handshake(ras->ras_sock, NULL);
1084                         kranal_free_acceptsock(ras);
1085
1086                         CDEBUG(D_NET,"Finished handshaking someone\n");
1087
1088                         spin_lock_irqsave(&kranal_data.kra_connd_lock,
1089                                               flags);
1090                         did_something = 1;
1091                 }
1092
1093                 if (!cfs_list_empty(&kranal_data.kra_connd_peers)) {
1094                         peer = cfs_list_entry(kranal_data.kra_connd_peers.next,
1095                                               kra_peer_t, rap_connd_list);
1096
1097                         cfs_list_del_init(&peer->rap_connd_list);
1098                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
1099                                                    flags);
1100
1101                         kranal_connect(peer);
1102                         kranal_peer_decref(peer);
1103
1104                         spin_lock_irqsave(&kranal_data.kra_connd_lock,
1105                                               flags);
1106                         did_something = 1;
1107                 }
1108
1109                 if (did_something)
1110                         continue;
1111
1112                 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
1113                 cfs_waitq_add_exclusive(&kranal_data.kra_connd_waitq, &wait);
1114
1115                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1116
1117                 cfs_waitq_wait(&wait, CFS_TASK_INTERRUPTIBLE);
1118
1119                 cfs_set_current_state(CFS_TASK_RUNNING);
1120                 cfs_waitq_del(&kranal_data.kra_connd_waitq, &wait);
1121
1122                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1123         }
1124
1125         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1126
1127         kranal_thread_fini();
1128         return 0;
1129 }
1130
1131 void
1132 kranal_update_reaper_timeout(long timeout)
1133 {
1134         unsigned long   flags;
1135
1136         LASSERT (timeout > 0);
1137
1138         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1139
1140         if (timeout < kranal_data.kra_new_min_timeout)
1141                 kranal_data.kra_new_min_timeout = timeout;
1142
1143         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1144 }
1145
1146 int
1147 kranal_reaper (void *arg)
1148 {
1149         cfs_waitlink_t     wait;
1150         unsigned long      flags;
1151         long               timeout;
1152         int                i;
1153         int                conn_entries = kranal_data.kra_conn_hash_size;
1154         int                conn_index = 0;
1155         int                base_index = conn_entries - 1;
1156         unsigned long      next_check_time = jiffies;
1157         long               next_min_timeout = CFS_MAX_SCHEDULE_TIMEOUT;
1158         long               current_min_timeout = 1;
1159
1160         cfs_daemonize("kranal_reaper");
1161         cfs_block_allsigs();
1162
1163         cfs_waitlink_init(&wait);
1164
1165         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1166
1167         while (!kranal_data.kra_shutdown) {
1168                 /* I wake up every 'p' seconds to check for timeouts on some
1169                  * more peers.  I try to check every connection 'n' times
1170                  * within the global minimum of all keepalive and timeout
1171                  * intervals, to ensure I attend to every connection within
1172                  * (n+1)/n times its timeout intervals. */
1173                 const int     p = 1;
1174                 const int     n = 3;
1175                 unsigned long min_timeout;
1176                 int           chunk;
1177
1178                 /* careful with the jiffy wrap... */
1179                 timeout = (long)(next_check_time - jiffies);
1180                 if (timeout > 0) {
1181                         cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
1182                         cfs_waitq_add(&kranal_data.kra_reaper_waitq, &wait);
1183
1184                         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1185                                                    flags);
1186
1187                         cfs_waitq_timedwait(&wait, CFS_TASK_INTERRUPTIBLE,
1188                                             timeout);
1189
1190                         spin_lock_irqsave(&kranal_data.kra_reaper_lock,
1191                                               flags);
1192
1193                         cfs_set_current_state(CFS_TASK_RUNNING);
1194                         cfs_waitq_del(&kranal_data.kra_reaper_waitq, &wait);
1195                         continue;
1196                 }
1197
1198                 if (kranal_data.kra_new_min_timeout !=
1199                     CFS_MAX_SCHEDULE_TIMEOUT) {
1200                         /* new min timeout set: restart min timeout scan */
1201                         next_min_timeout = CFS_MAX_SCHEDULE_TIMEOUT;
1202                         base_index = conn_index - 1;
1203                         if (base_index < 0)
1204                                 base_index = conn_entries - 1;
1205
1206                         if (kranal_data.kra_new_min_timeout <
1207                             current_min_timeout) {
1208                                 current_min_timeout =
1209                                         kranal_data.kra_new_min_timeout;
1210                                 CDEBUG(D_NET, "Set new min timeout %ld\n",
1211                                        current_min_timeout);
1212                         }
1213
1214                         kranal_data.kra_new_min_timeout =
1215                                 CFS_MAX_SCHEDULE_TIMEOUT;
1216                 }
1217                 min_timeout = current_min_timeout;
1218
1219                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1220
1221                 LASSERT (min_timeout > 0);
1222
1223                 /* Compute how many table entries to check now so I get round
1224                  * the whole table fast enough given that I do this at fixed
1225                  * intervals of 'p' seconds) */
1226                 chunk = conn_entries;
1227                 if (min_timeout > n * p)
1228                         chunk = (chunk * n * p) / min_timeout;
1229                 if (chunk == 0)
1230                         chunk = 1;
1231
1232                 for (i = 0; i < chunk; i++) {
1233                         kranal_reaper_check(conn_index,
1234                                             &next_min_timeout);
1235                         conn_index = (conn_index + 1) % conn_entries;
1236                 }
1237
1238                 next_check_time += p * CFS_HZ;
1239
1240                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1241
1242                 if (((conn_index - chunk <= base_index &&
1243                       base_index < conn_index) ||
1244                      (conn_index - conn_entries - chunk <= base_index &&
1245                       base_index < conn_index - conn_entries))) {
1246
1247                         /* Scanned all conns: set current_min_timeout... */
1248                         if (current_min_timeout != next_min_timeout) {
1249                                 current_min_timeout = next_min_timeout;
1250                                 CDEBUG(D_NET, "Set new min timeout %ld\n",
1251                                        current_min_timeout);
1252                         }
1253
1254                         /* ...and restart min timeout scan */
1255                         next_min_timeout = CFS_MAX_SCHEDULE_TIMEOUT;
1256                         base_index = conn_index - 1;
1257                         if (base_index < 0)
1258                                 base_index = conn_entries - 1;
1259                 }
1260         }
1261
1262         kranal_thread_fini();
1263         return 0;
1264 }
1265
1266 void
1267 kranal_check_rdma_cq (kra_device_t *dev)
1268 {
1269         kra_conn_t          *conn;
1270         kra_tx_t            *tx;
1271         RAP_RETURN           rrc;
1272         unsigned long        flags;
1273         RAP_RDMA_DESCRIPTOR *desc;
1274         __u32                cqid;
1275         __u32                event_type;
1276
1277         for (;;) {
1278                 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1279                 if (rrc == RAP_NOT_DONE) {
1280                         CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
1281                         return;
1282                 }
1283
1284                 LASSERT (rrc == RAP_SUCCESS);
1285                 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1286
1287                 read_lock(&kranal_data.kra_global_lock);
1288
1289                 conn = kranal_cqid2conn_locked(cqid);
1290                 if (conn == NULL) {
1291                         /* Conn was destroyed? */
1292                         CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
1293                         read_unlock(&kranal_data.kra_global_lock);
1294                         continue;
1295                 }
1296
1297                 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1298                 LASSERT (rrc == RAP_SUCCESS);
1299
1300                 CDEBUG(D_NET, "Completed %p\n",
1301                        cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
1302
1303                 spin_lock_irqsave(&conn->rac_lock, flags);
1304
1305                 LASSERT (!cfs_list_empty(&conn->rac_rdmaq));
1306                 tx = cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1307                 cfs_list_del(&tx->tx_list);
1308
1309                 LASSERT(desc->AppPtr == (void *)tx);
1310                 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1311                         tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1312
1313                 cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1314                 tx->tx_qtime = jiffies;
1315
1316                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1317
1318                 /* Get conn's fmaq processed, now I've just put something
1319                  * there */
1320                 kranal_schedule_conn(conn);
1321
1322                 read_unlock(&kranal_data.kra_global_lock);
1323         }
1324 }
1325
1326 void
1327 kranal_check_fma_cq (kra_device_t *dev)
1328 {
1329         kra_conn_t         *conn;
1330         RAP_RETURN          rrc;
1331         __u32               cqid;
1332         __u32               event_type;
1333         cfs_list_t         *conns;
1334         cfs_list_t         *tmp;
1335         int                 i;
1336
1337         for (;;) {
1338                 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1339                 if (rrc == RAP_NOT_DONE) {
1340                         CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
1341                         return;
1342                 }
1343
1344                 LASSERT (rrc == RAP_SUCCESS);
1345
1346                 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1347
1348                         read_lock(&kranal_data.kra_global_lock);
1349
1350                         conn = kranal_cqid2conn_locked(cqid);
1351                         if (conn == NULL) {
1352                                 CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
1353                                        cqid);
1354                         } else {
1355                                 CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
1356                                        conn, cqid);
1357                                 kranal_schedule_conn(conn);
1358                         }
1359
1360                         read_unlock(&kranal_data.kra_global_lock);
1361                         continue;
1362                 }
1363
1364                 /* FMA CQ has overflowed: check ALL conns */
1365                 CWARN("FMA CQ overflow: scheduling ALL conns on device %d\n", 
1366                       dev->rad_id);
1367
1368                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1369
1370                         read_lock(&kranal_data.kra_global_lock);
1371
1372                         conns = &kranal_data.kra_conns[i];
1373
1374                         cfs_list_for_each (tmp, conns) {
1375                                 conn = cfs_list_entry(tmp, kra_conn_t,
1376                                                       rac_hashlist);
1377
1378                                 if (conn->rac_device == dev)
1379                                         kranal_schedule_conn(conn);
1380                         }
1381
1382                         /* don't block write lockers for too long... */
1383                         read_unlock(&kranal_data.kra_global_lock);
1384                 }
1385         }
1386 }
1387
1388 int
1389 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1390                void *immediate, int immediatenob)
1391 {
1392         int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1393         RAP_RETURN rrc;
1394
1395         CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
1396                conn, msg, msg->ram_type, sync ? "(sync)" : "",
1397                immediate, immediatenob);
1398
1399         LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1400         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1401                  immediatenob <= RANAL_FMA_MAX_DATA :
1402                  immediatenob == 0);
1403
1404         msg->ram_connstamp = conn->rac_my_connstamp;
1405         msg->ram_seq = conn->rac_tx_seq;
1406
1407         if (sync)
1408                 rrc = RapkFmaSyncSend(conn->rac_rihandle,
1409                                       immediate, immediatenob,
1410                                       msg, sizeof(*msg));
1411         else
1412                 rrc = RapkFmaSend(conn->rac_rihandle,
1413                                   immediate, immediatenob,
1414                                   msg, sizeof(*msg));
1415
1416         switch (rrc) {
1417         default:
1418                 LBUG();
1419
1420         case RAP_SUCCESS:
1421                 conn->rac_last_tx = jiffies;
1422                 conn->rac_tx_seq++;
1423                 return 0;
1424
1425         case RAP_NOT_DONE:
1426                 if (cfs_time_aftereq(jiffies,
1427                                      conn->rac_last_tx + conn->rac_keepalive *
1428                                      CFS_HZ))
1429                         CWARN("EAGAIN sending %02x (idle %lu secs)\n",
1430                               msg->ram_type,
1431                               (jiffies - conn->rac_last_tx)/CFS_HZ);
1432                 return -EAGAIN;
1433         }
1434 }
1435
1436 void
1437 kranal_process_fmaq (kra_conn_t *conn)
1438 {
1439         unsigned long flags;
1440         int           more_to_do;
1441         kra_tx_t     *tx;
1442         int           rc;
1443         int           expect_reply;
1444
1445         /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1446          *       However I will be rescheduled by an FMA completion event
1447          *       when I eventually get some.
1448          * NB 2. Sampling rac_state here races with setting it elsewhere.
1449          *       But it doesn't matter if I try to send a "real" message just
1450          *       as I start closing because I'll get scheduled to send the
1451          *       close anyway. */
1452
1453         /* Not racing with incoming message processing! */
1454         LASSERT (current == conn->rac_device->rad_scheduler);
1455
1456         if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1457                 if (!cfs_list_empty(&conn->rac_rdmaq)) {
1458                         /* RDMAs in progress */
1459                         LASSERT (!conn->rac_close_sent);
1460
1461                         if (cfs_time_aftereq(jiffies,
1462                                              conn->rac_last_tx +
1463                                              conn->rac_keepalive * CFS_HZ)) {
1464                                 CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
1465                                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1466                                 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1467                         }
1468                         return;
1469                 }
1470
1471                 if (conn->rac_close_sent)
1472                         return;
1473
1474                 CWARN("sending CLOSE to %s\n", 
1475                       libcfs_nid2str(conn->rac_peer->rap_nid));
1476                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1477                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1478                 if (rc != 0)
1479                         return;
1480
1481                 conn->rac_close_sent = 1;
1482                 if (!conn->rac_close_recvd)
1483                         return;
1484
1485                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1486
1487                 if (conn->rac_state == RANAL_CONN_CLOSING)
1488                         kranal_terminate_conn_locked(conn);
1489
1490                 write_unlock_irqrestore(&kranal_data.kra_global_lock,
1491                                             flags);
1492                 return;
1493         }
1494
1495         spin_lock_irqsave(&conn->rac_lock, flags);
1496
1497         if (cfs_list_empty(&conn->rac_fmaq)) {
1498
1499                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1500
1501                 if (cfs_time_aftereq(jiffies,
1502                                      conn->rac_last_tx + conn->rac_keepalive *
1503                                      CFS_HZ)) {
1504                         CDEBUG(D_NET, "sending NOOP -> %s (%p idle %lu(%ld))\n",
1505                                libcfs_nid2str(conn->rac_peer->rap_nid), conn,
1506                                (jiffies - conn->rac_last_tx)/CFS_HZ,
1507                                conn->rac_keepalive);
1508                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1509                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1510                 }
1511                 return;
1512         }
1513
1514         tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1515         cfs_list_del(&tx->tx_list);
1516         more_to_do = !cfs_list_empty(&conn->rac_fmaq);
1517
1518         spin_unlock_irqrestore(&conn->rac_lock, flags);
1519
1520         expect_reply = 0;
1521         CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
1522                tx, tx->tx_msg.ram_type, tx->tx_cookie);
1523         switch (tx->tx_msg.ram_type) {
1524         default:
1525                 LBUG();
1526
1527         case RANAL_MSG_IMMEDIATE:
1528                 rc = kranal_sendmsg(conn, &tx->tx_msg,
1529                                     tx->tx_buffer, tx->tx_nob);
1530                 break;
1531
1532         case RANAL_MSG_PUT_NAK:
1533         case RANAL_MSG_PUT_DONE:
1534         case RANAL_MSG_GET_NAK:
1535         case RANAL_MSG_GET_DONE:
1536                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1537                 break;
1538
1539         case RANAL_MSG_PUT_REQ:
1540                 rc = kranal_map_buffer(tx);
1541                 LASSERT (rc != -EAGAIN);
1542                 if (rc != 0)
1543                         break;
1544
1545                 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1546                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1547                 expect_reply = 1;
1548                 break;
1549
1550         case RANAL_MSG_PUT_ACK:
1551                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1552                 expect_reply = 1;
1553                 break;
1554
1555         case RANAL_MSG_GET_REQ:
1556                 rc = kranal_map_buffer(tx);
1557                 LASSERT (rc != -EAGAIN);
1558                 if (rc != 0)
1559                         break;
1560
1561                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1562                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1563                 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1564                         (__u64)((unsigned long)tx->tx_buffer);
1565                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1566                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1567                 expect_reply = 1;
1568                 break;
1569         }
1570
1571         if (rc == -EAGAIN) {
1572                 /* I need credits to send this.  Replace tx at the head of the
1573                  * fmaq and I'll get rescheduled when credits appear */
1574                 CDEBUG(D_NET, "EAGAIN on %p\n", conn);
1575                 spin_lock_irqsave(&conn->rac_lock, flags);
1576                 cfs_list_add(&tx->tx_list, &conn->rac_fmaq);
1577                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1578                 return;
1579         }
1580
1581         if (!expect_reply || rc != 0) {
1582                 kranal_tx_done(tx, rc);
1583         } else {
1584                 /* LASSERT(current) above ensures this doesn't race with reply
1585                  * processing */
1586                 spin_lock_irqsave(&conn->rac_lock, flags);
1587                 cfs_list_add_tail(&tx->tx_list, &conn->rac_replyq);
1588                 tx->tx_qtime = jiffies;
1589                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1590         }
1591
1592         if (more_to_do) {
1593                 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
1594                 kranal_schedule_conn(conn);
1595         }
1596 }
1597
1598 static inline void
1599 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1600 {
1601         __swab64s(&d->rard_key.Key);
1602         __swab16s(&d->rard_key.Cookie);
1603         __swab16s(&d->rard_key.MdHandle);
1604         __swab32s(&d->rard_key.Flags);
1605         __swab64s(&d->rard_addr.AddressBits);
1606         __swab32s(&d->rard_nob);
1607 }
1608
1609 kra_tx_t *
1610 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1611 {
1612         cfs_list_t       *ttmp;
1613         kra_tx_t         *tx;
1614         unsigned long     flags;
1615
1616         spin_lock_irqsave(&conn->rac_lock, flags);
1617
1618         cfs_list_for_each(ttmp, &conn->rac_replyq) {
1619                 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
1620
1621                 CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
1622                        tx, tx->tx_msg.ram_type, tx->tx_cookie);
1623
1624                 if (tx->tx_cookie != cookie)
1625                         continue;
1626
1627                 if (tx->tx_msg.ram_type != type) {
1628                         spin_unlock_irqrestore(&conn->rac_lock, flags);
1629                         CWARN("Unexpected type %x (%x expected) "
1630                               "matched reply from %s\n",
1631                               tx->tx_msg.ram_type, type,
1632                               libcfs_nid2str(conn->rac_peer->rap_nid));
1633                         return NULL;
1634                 }
1635
1636                 cfs_list_del(&tx->tx_list);
1637                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1638                 return tx;
1639         }
1640
1641         spin_unlock_irqrestore(&conn->rac_lock, flags);
1642         CWARN("Unmatched reply %02x/"LPX64" from %s\n",
1643               type, cookie, libcfs_nid2str(conn->rac_peer->rap_nid));
1644         return NULL;
1645 }
1646
1647 void
1648 kranal_check_fma_rx (kra_conn_t *conn)
1649 {
1650         unsigned long flags;
1651         __u32         seq;
1652         kra_tx_t     *tx;
1653         kra_msg_t    *msg;
1654         void         *prefix;
1655         RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1656         kra_peer_t   *peer = conn->rac_peer;
1657         int           rc = 0;
1658         int           repost = 1;
1659
1660         if (rrc == RAP_NOT_DONE)
1661                 return;
1662
1663         CDEBUG(D_NET, "RX on %p\n", conn);
1664
1665         LASSERT (rrc == RAP_SUCCESS);
1666         conn->rac_last_rx = jiffies;
1667         seq = conn->rac_rx_seq++;
1668         msg = (kra_msg_t *)prefix;
1669
1670         /* stash message for portals callbacks they'll NULL
1671          * rac_rxmsg if they consume it */
1672         LASSERT (conn->rac_rxmsg == NULL);
1673         conn->rac_rxmsg = msg;
1674
1675         if (msg->ram_magic != RANAL_MSG_MAGIC) {
1676                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1677                         CERROR("Unexpected magic %08x from %s\n",
1678                                msg->ram_magic, libcfs_nid2str(peer->rap_nid));
1679                         rc = -EPROTO;
1680                         goto out;
1681                 }
1682
1683                 __swab32s(&msg->ram_magic);
1684                 __swab16s(&msg->ram_version);
1685                 __swab16s(&msg->ram_type);
1686                 __swab64s(&msg->ram_srcnid);
1687                 __swab64s(&msg->ram_connstamp);
1688                 __swab32s(&msg->ram_seq);
1689
1690                 /* NB message type checked below; NOT here... */
1691                 switch (msg->ram_type) {
1692                 case RANAL_MSG_PUT_ACK:
1693                         kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1694                         break;
1695
1696                 case RANAL_MSG_GET_REQ:
1697                         kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1698                         break;
1699
1700                 default:
1701                         break;
1702                 }
1703         }
1704
1705         if (msg->ram_version != RANAL_MSG_VERSION) {
1706                 CERROR("Unexpected protocol version %d from %s\n",
1707                        msg->ram_version, libcfs_nid2str(peer->rap_nid));
1708                 rc = -EPROTO;
1709                 goto out;
1710         }
1711
1712         if (msg->ram_srcnid != peer->rap_nid) {
1713                 CERROR("Unexpected peer %s from %s\n",
1714                        libcfs_nid2str(msg->ram_srcnid), 
1715                        libcfs_nid2str(peer->rap_nid));
1716                 rc = -EPROTO;
1717                 goto out;
1718         }
1719
1720         if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1721                 CERROR("Unexpected connstamp "LPX64"("LPX64
1722                        " expected) from %s\n",
1723                        msg->ram_connstamp, conn->rac_peer_connstamp,
1724                        libcfs_nid2str(peer->rap_nid));
1725                 rc = -EPROTO;
1726                 goto out;
1727         }
1728
1729         if (msg->ram_seq != seq) {
1730                 CERROR("Unexpected sequence number %d(%d expected) from %s\n",
1731                        msg->ram_seq, seq, libcfs_nid2str(peer->rap_nid));
1732                 rc = -EPROTO;
1733                 goto out;
1734         }
1735
1736         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1737                 /* This message signals RDMA completion... */
1738                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1739                 if (rrc != RAP_SUCCESS) {
1740                         CERROR("RapkFmaSyncWait failed: %d\n", rrc);
1741                         rc = -ENETDOWN;
1742                         goto out;
1743                 }
1744         }
1745
1746         if (conn->rac_close_recvd) {
1747                 CERROR("Unexpected message %d after CLOSE from %s\n",
1748                        msg->ram_type, libcfs_nid2str(conn->rac_peer->rap_nid));
1749                 rc = -EPROTO;
1750                 goto out;
1751         }
1752
1753         if (msg->ram_type == RANAL_MSG_CLOSE) {
1754                 CWARN("RX CLOSE from %s\n", libcfs_nid2str(conn->rac_peer->rap_nid));
1755                 conn->rac_close_recvd = 1;
1756                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1757
1758                 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1759                         kranal_close_conn_locked(conn, 0);
1760                 else if (conn->rac_state == RANAL_CONN_CLOSING &&
1761                          conn->rac_close_sent)
1762                         kranal_terminate_conn_locked(conn);
1763
1764                 write_unlock_irqrestore(&kranal_data.kra_global_lock,
1765                                             flags);
1766                 goto out;
1767         }
1768
1769         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1770                 goto out;
1771
1772         switch (msg->ram_type) {
1773         case RANAL_MSG_NOOP:
1774                 /* Nothing to do; just a keepalive */
1775                 CDEBUG(D_NET, "RX NOOP on %p\n", conn);
1776                 break;
1777
1778         case RANAL_MSG_IMMEDIATE:
1779                 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
1780                 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr, 
1781                                 msg->ram_srcnid, conn, 0);
1782                 repost = rc < 0;
1783                 break;
1784
1785         case RANAL_MSG_PUT_REQ:
1786                 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
1787                 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr, 
1788                                 msg->ram_srcnid, conn, 1);
1789                 repost = rc < 0;
1790                 break;
1791
1792         case RANAL_MSG_PUT_NAK:
1793                 CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
1794                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1795                                         msg->ram_u.completion.racm_cookie);
1796                 if (tx == NULL)
1797                         break;
1798
1799                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1800                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1801                 kranal_tx_done(tx, -ENOENT);    /* no match */
1802                 break;
1803
1804         case RANAL_MSG_PUT_ACK:
1805                 CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
1806                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1807                                         msg->ram_u.putack.rapam_src_cookie);
1808                 if (tx == NULL)
1809                         break;
1810
1811                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1812                             &msg->ram_u.putack.rapam_desc,
1813                             msg->ram_u.putack.rapam_desc.rard_nob,
1814                             msg->ram_u.putack.rapam_dst_cookie);
1815                 break;
1816
1817         case RANAL_MSG_PUT_DONE:
1818                 CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
1819                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1820                                         msg->ram_u.completion.racm_cookie);
1821                 if (tx == NULL)
1822                         break;
1823
1824                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1825                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1826                 kranal_tx_done(tx, 0);
1827                 break;
1828
1829         case RANAL_MSG_GET_REQ:
1830                 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
1831                 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr, 
1832                                 msg->ram_srcnid, conn, 1);
1833                 repost = rc < 0;
1834                 break;
1835
1836         case RANAL_MSG_GET_NAK:
1837                 CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
1838                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1839                                         msg->ram_u.completion.racm_cookie);
1840                 if (tx == NULL)
1841                         break;
1842
1843                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1844                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1845                 kranal_tx_done(tx, -ENOENT);    /* no match */
1846                 break;
1847
1848         case RANAL_MSG_GET_DONE:
1849                 CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
1850                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1851                                         msg->ram_u.completion.racm_cookie);
1852                 if (tx == NULL)
1853                         break;
1854
1855                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1856                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1857 #if 0
1858                 /* completion message should send rdma length if we ever allow
1859                  * GET truncation */
1860                 lnet_set_reply_msg_len(kranal_data.kra_ni, tx->tx_lntmsg[1], ???);
1861 #endif
1862                 kranal_tx_done(tx, 0);
1863                 break;
1864         }
1865
1866  out:
1867         if (rc < 0)                             /* protocol/comms error */
1868                 kranal_close_conn (conn, rc);
1869
1870         if (repost && conn->rac_rxmsg != NULL)
1871                 kranal_consume_rxmsg(conn, NULL, 0);
1872
1873         /* check again later */
1874         kranal_schedule_conn(conn);
1875 }
1876
1877 void
1878 kranal_complete_closed_conn (kra_conn_t *conn)
1879 {
1880         kra_tx_t   *tx;
1881         int         nfma;
1882         int         nreplies;
1883
1884         LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1885         LASSERT (cfs_list_empty(&conn->rac_list));
1886         LASSERT (cfs_list_empty(&conn->rac_hashlist));
1887
1888         for (nfma = 0; !cfs_list_empty(&conn->rac_fmaq); nfma++) {
1889                 tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1890
1891                 cfs_list_del(&tx->tx_list);
1892                 kranal_tx_done(tx, -ECONNABORTED);
1893         }
1894
1895         LASSERT (cfs_list_empty(&conn->rac_rdmaq));
1896
1897         for (nreplies = 0; !cfs_list_empty(&conn->rac_replyq); nreplies++) {
1898                 tx = cfs_list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1899
1900                 cfs_list_del(&tx->tx_list);
1901                 kranal_tx_done(tx, -ECONNABORTED);
1902         }
1903
1904         CWARN("Closed conn %p -> %s: nmsg %d nreplies %d\n",
1905                conn, libcfs_nid2str(conn->rac_peer->rap_nid), nfma, nreplies);
1906 }
1907
1908 int
1909 kranal_process_new_conn (kra_conn_t *conn)
1910 {
1911         RAP_RETURN   rrc;
1912
1913         rrc = RapkCompleteSync(conn->rac_rihandle, 1);
1914         if (rrc == RAP_SUCCESS)
1915                 return 0;
1916
1917         LASSERT (rrc == RAP_NOT_DONE);
1918         if (!cfs_time_aftereq(jiffies, conn->rac_last_tx +
1919                               conn->rac_timeout * CFS_HZ))
1920                 return -EAGAIN;
1921
1922         /* Too late */
1923         rrc = RapkCompleteSync(conn->rac_rihandle, 0);
1924         LASSERT (rrc == RAP_SUCCESS);
1925         return -ETIMEDOUT;
1926 }
1927
1928 int
1929 kranal_scheduler (void *arg)
1930 {
1931         kra_device_t     *dev = (kra_device_t *)arg;
1932         cfs_waitlink_t    wait;
1933         char              name[16];
1934         kra_conn_t       *conn;
1935         unsigned long     flags;
1936         unsigned long     deadline;
1937         unsigned long     soonest;
1938         int               nsoonest;
1939         long              timeout;
1940         cfs_list_t       *tmp;
1941         cfs_list_t       *nxt;
1942         int               rc;
1943         int               dropped_lock;
1944         int               busy_loops = 0;
1945
1946         snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1947         cfs_daemonize(name);
1948         cfs_block_allsigs();
1949
1950         dev->rad_scheduler = current;
1951         cfs_waitlink_init(&wait);
1952
1953         spin_lock_irqsave(&dev->rad_lock, flags);
1954
1955         while (!kranal_data.kra_shutdown) {
1956                 /* Safe: kra_shutdown only set when quiescent */
1957
1958                 if (busy_loops++ >= RANAL_RESCHED) {
1959                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1960
1961                         cfs_cond_resched();
1962                         busy_loops = 0;
1963
1964                         spin_lock_irqsave(&dev->rad_lock, flags);
1965                 }
1966
1967                 dropped_lock = 0;
1968
1969                 if (dev->rad_ready) {
1970                         /* Device callback fired since I last checked it */
1971                         dev->rad_ready = 0;
1972                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1973                         dropped_lock = 1;
1974
1975                         kranal_check_rdma_cq(dev);
1976                         kranal_check_fma_cq(dev);
1977
1978                         spin_lock_irqsave(&dev->rad_lock, flags);
1979                 }
1980
1981                 cfs_list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
1982                         conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
1983
1984                         cfs_list_del_init(&conn->rac_schedlist);
1985                         LASSERT (conn->rac_scheduled);
1986                         conn->rac_scheduled = 0;
1987                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1988                         dropped_lock = 1;
1989
1990                         kranal_check_fma_rx(conn);
1991                         kranal_process_fmaq(conn);
1992
1993                         if (conn->rac_state == RANAL_CONN_CLOSED)
1994                                 kranal_complete_closed_conn(conn);
1995
1996                         kranal_conn_decref(conn);
1997                         spin_lock_irqsave(&dev->rad_lock, flags);
1998                 }
1999
2000                 nsoonest = 0;
2001                 soonest = jiffies;
2002
2003                 cfs_list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
2004                         conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
2005
2006                         deadline = conn->rac_last_tx + conn->rac_keepalive;
2007                         if (cfs_time_aftereq(jiffies, deadline)) {
2008                                 /* Time to process this new conn */
2009                                 spin_unlock_irqrestore(&dev->rad_lock,
2010                                                            flags);
2011                                 dropped_lock = 1;
2012
2013                                 rc = kranal_process_new_conn(conn);
2014                                 if (rc != -EAGAIN) {
2015                                         /* All done with this conn */
2016                                         spin_lock_irqsave(&dev->rad_lock,
2017                                                               flags);
2018                                         cfs_list_del_init(&conn->rac_schedlist);
2019                                         spin_unlock_irqrestore(&dev-> \
2020                                                                    rad_lock,
2021                                                                    flags);
2022
2023                                         kranal_conn_decref(conn);
2024                                         spin_lock_irqsave(&dev->rad_lock,
2025                                                               flags);
2026                                         continue;
2027                                 }
2028
2029                                 /* retry with exponential backoff until HZ */
2030                                 if (conn->rac_keepalive == 0)
2031                                         conn->rac_keepalive = 1;
2032                                 else if (conn->rac_keepalive <= CFS_HZ)
2033                                         conn->rac_keepalive *= 2;
2034                                 else
2035                                         conn->rac_keepalive += CFS_HZ;
2036                                 
2037                                 deadline = conn->rac_last_tx + conn->rac_keepalive;
2038                                 spin_lock_irqsave(&dev->rad_lock, flags);
2039                         }
2040
2041                         /* Does this conn need attention soonest? */
2042                         if (nsoonest++ == 0 ||
2043                             !cfs_time_aftereq(deadline, soonest))
2044                                 soonest = deadline;
2045                 }
2046
2047                 if (dropped_lock)               /* may sleep iff I didn't drop the lock */
2048                         continue;
2049
2050                 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
2051                 cfs_waitq_add_exclusive(&dev->rad_waitq, &wait);
2052                 spin_unlock_irqrestore(&dev->rad_lock, flags);
2053
2054                 if (nsoonest == 0) {
2055                         busy_loops = 0;
2056                         cfs_waitq_wait(&wait, CFS_TASK_INTERRUPTIBLE);
2057                 } else {
2058                         timeout = (long)(soonest - jiffies);
2059                         if (timeout > 0) {
2060                                 busy_loops = 0;
2061                                 cfs_waitq_timedwait(&wait,
2062                                                     CFS_TASK_INTERRUPTIBLE,
2063                                                     timeout);
2064                         }
2065                 }
2066
2067                 cfs_waitq_del(&dev->rad_waitq, &wait);
2068                 cfs_set_current_state(CFS_TASK_RUNNING);
2069                 spin_lock_irqsave(&dev->rad_lock, flags);
2070         }
2071
2072         spin_unlock_irqrestore(&dev->rad_lock, flags);
2073
2074         dev->rad_scheduler = NULL;
2075         kranal_thread_fini();
2076         return 0;
2077 }