Whamcloud - gitweb
Landing b_hd_newconfig on HEAD
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "ralnd.h"
25
26 void
27 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
28 {
29         kra_device_t *dev;
30         int           i;
31         unsigned long flags;
32
33         CDEBUG(D_NET, "callback for device %d\n", devid);
34
35         for (i = 0; i < kranal_data.kra_ndevs; i++) {
36
37                 dev = &kranal_data.kra_devices[i];
38                 if (dev->rad_id != devid)
39                         continue;
40
41                 spin_lock_irqsave(&dev->rad_lock, flags);
42
43                 if (!dev->rad_ready) {
44                         dev->rad_ready = 1;
45                         wake_up(&dev->rad_waitq);
46                 }
47
48                 spin_unlock_irqrestore(&dev->rad_lock, flags);
49                 return;
50         }
51
52         CWARN("callback for unknown device %d\n", devid);
53 }
54
55 void
56 kranal_schedule_conn(kra_conn_t *conn)
57 {
58         kra_device_t    *dev = conn->rac_device;
59         unsigned long    flags;
60
61         spin_lock_irqsave(&dev->rad_lock, flags);
62
63         if (!conn->rac_scheduled) {
64                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
65                 conn->rac_scheduled = 1;
66                 list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
67                 wake_up(&dev->rad_waitq);
68         }
69
70         spin_unlock_irqrestore(&dev->rad_lock, flags);
71 }
72
73 kra_tx_t *
74 kranal_get_idle_tx (void)
75 {
76         unsigned long  flags;
77         kra_tx_t      *tx;
78
79         spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
80
81         if (list_empty(&kranal_data.kra_idle_txs)) {
82                 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
83                 return NULL;
84         }
85
86         tx = list_entry(kranal_data.kra_idle_txs.next, kra_tx_t, tx_list);
87         list_del(&tx->tx_list);
88
89         /* Allocate a new completion cookie.  It might not be needed, but we've
90          * got a lock right now... */
91         tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
92
93         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
94
95         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
96         LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
97         LASSERT (tx->tx_conn == NULL);
98         LASSERT (tx->tx_lntmsg[0] == NULL);
99         LASSERT (tx->tx_lntmsg[1] == NULL);
100
101         return tx;
102 }
103
104 void
105 kranal_init_msg(kra_msg_t *msg, int type)
106 {
107         msg->ram_magic = RANAL_MSG_MAGIC;
108         msg->ram_version = RANAL_MSG_VERSION;
109         msg->ram_type = type;
110         msg->ram_srcnid = kranal_data.kra_ni->ni_nid;
111         /* ram_connstamp gets set when FMA is sent */
112 }
113
114 kra_tx_t *
115 kranal_new_tx_msg (int type)
116 {
117         kra_tx_t *tx = kranal_get_idle_tx();
118
119         if (tx != NULL)
120                 kranal_init_msg(&tx->tx_msg, type);
121
122         return tx;
123 }
124
125 int
126 kranal_setup_immediate_buffer (kra_tx_t *tx, 
127                                unsigned int niov, struct iovec *iov,
128                                int offset, int nob)
129
130 {
131         /* For now this is almost identical to kranal_setup_virt_buffer, but we
132          * could "flatten" the payload into a single contiguous buffer ready
133          * for sending direct over an FMA if we ever needed to. */
134
135         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
136         LASSERT (nob >= 0);
137
138         if (nob == 0) {
139                 tx->tx_buffer = NULL;
140         } else {
141                 LASSERT (niov > 0);
142
143                 while (offset >= iov->iov_len) {
144                         offset -= iov->iov_len;
145                         niov--;
146                         iov++;
147                         LASSERT (niov > 0);
148                 }
149
150                 if (nob > iov->iov_len - offset) {
151                         CERROR("Can't handle multiple vaddr fragments\n");
152                         return -EMSGSIZE;
153                 }
154
155                 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
156         }
157
158         tx->tx_buftype = RANAL_BUF_IMMEDIATE;
159         tx->tx_nob = nob;
160         return 0;
161 }
162
163 int
164 kranal_setup_virt_buffer (kra_tx_t *tx, 
165                           unsigned int niov, struct iovec *iov,
166                           int offset, int nob)
167
168 {
169         LASSERT (nob > 0);
170         LASSERT (niov > 0);
171         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
172
173         while (offset >= iov->iov_len) {
174                 offset -= iov->iov_len;
175                 niov--;
176                 iov++;
177                 LASSERT (niov > 0);
178         }
179
180         if (nob > iov->iov_len - offset) {
181                 CERROR("Can't handle multiple vaddr fragments\n");
182                 return -EMSGSIZE;
183         }
184
185         tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
186         tx->tx_nob = nob;
187         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
188         return 0;
189 }
190
191 int
192 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
193                           int offset, int nob)
194 {
195         RAP_PHYS_REGION *phys = tx->tx_phys;
196         int              resid;
197
198         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
199
200         LASSERT (nob > 0);
201         LASSERT (nkiov > 0);
202         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
203
204         while (offset >= kiov->kiov_len) {
205                 offset -= kiov->kiov_len;
206                 nkiov--;
207                 kiov++;
208                 LASSERT (nkiov > 0);
209         }
210
211         tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
212         tx->tx_nob = nob;
213         tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
214
215         phys->Address = lnet_page2phys(kiov->kiov_page);
216         phys++;
217
218         resid = nob - (kiov->kiov_len - offset);
219         while (resid > 0) {
220                 kiov++;
221                 nkiov--;
222                 LASSERT (nkiov > 0);
223
224                 if (kiov->kiov_offset != 0 ||
225                     ((resid > PAGE_SIZE) &&
226                      kiov->kiov_len < PAGE_SIZE)) {
227                         /* Can't have gaps */
228                         CERROR("Can't make payload contiguous in I/O VM:"
229                                "page %d, offset %d, len %d \n",
230                                (int)(phys - tx->tx_phys),
231                                kiov->kiov_offset, kiov->kiov_len);
232                         return -EINVAL;
233                 }
234
235                 if ((phys - tx->tx_phys) == LNET_MAX_IOV) {
236                         CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
237                         return -EMSGSIZE;
238                 }
239
240                 phys->Address = lnet_page2phys(kiov->kiov_page);
241                 phys++;
242
243                 resid -= PAGE_SIZE;
244         }
245
246         tx->tx_phys_npages = phys - tx->tx_phys;
247         return 0;
248 }
249
250 static inline int
251 kranal_setup_rdma_buffer (kra_tx_t *tx, unsigned int niov,
252                           struct iovec *iov, lnet_kiov_t *kiov,
253                           int offset, int nob)
254 {
255         LASSERT ((iov == NULL) != (kiov == NULL));
256
257         if (kiov != NULL)
258                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
259
260         return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
261 }
262
263 int
264 kranal_map_buffer (kra_tx_t *tx)
265 {
266         kra_conn_t     *conn = tx->tx_conn;
267         kra_device_t   *dev = conn->rac_device;
268         RAP_RETURN      rrc;
269
270         LASSERT (current == dev->rad_scheduler);
271
272         switch (tx->tx_buftype) {
273         default:
274                 LBUG();
275
276         case RANAL_BUF_NONE:
277         case RANAL_BUF_IMMEDIATE:
278         case RANAL_BUF_PHYS_MAPPED:
279         case RANAL_BUF_VIRT_MAPPED:
280                 return 0;
281
282         case RANAL_BUF_PHYS_UNMAPPED:
283                 rrc = RapkRegisterPhys(dev->rad_handle,
284                                        tx->tx_phys, tx->tx_phys_npages,
285                                        &tx->tx_map_key);
286                 if (rrc != RAP_SUCCESS) {
287                         CERROR ("Can't map %d pages: dev %d "
288                                 "phys %u pp %u, virt %u nob %lu\n",
289                                 tx->tx_phys_npages, dev->rad_id, 
290                                 dev->rad_nphysmap, dev->rad_nppphysmap,
291                                 dev->rad_nvirtmap, dev->rad_nobvirtmap);
292                         return -ENOMEM; /* assume insufficient resources */
293                 }
294
295                 dev->rad_nphysmap++;
296                 dev->rad_nppphysmap += tx->tx_phys_npages;
297
298                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
299                 return 0;
300
301         case RANAL_BUF_VIRT_UNMAPPED:
302                 rrc = RapkRegisterMemory(dev->rad_handle,
303                                          tx->tx_buffer, tx->tx_nob,
304                                          &tx->tx_map_key);
305                 if (rrc != RAP_SUCCESS) {
306                         CERROR ("Can't map %d bytes: dev %d "
307                                 "phys %u pp %u, virt %u nob %lu\n",
308                                 tx->tx_nob, dev->rad_id, 
309                                 dev->rad_nphysmap, dev->rad_nppphysmap,
310                                 dev->rad_nvirtmap, dev->rad_nobvirtmap);
311                         return -ENOMEM; /* assume insufficient resources */
312                 }
313
314                 dev->rad_nvirtmap++;
315                 dev->rad_nobvirtmap += tx->tx_nob;
316
317                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
318                 return 0;
319         }
320 }
321
322 void
323 kranal_unmap_buffer (kra_tx_t *tx)
324 {
325         kra_device_t   *dev;
326         RAP_RETURN      rrc;
327
328         switch (tx->tx_buftype) {
329         default:
330                 LBUG();
331
332         case RANAL_BUF_NONE:
333         case RANAL_BUF_IMMEDIATE:
334         case RANAL_BUF_PHYS_UNMAPPED:
335         case RANAL_BUF_VIRT_UNMAPPED:
336                 break;
337
338         case RANAL_BUF_PHYS_MAPPED:
339                 LASSERT (tx->tx_conn != NULL);
340                 dev = tx->tx_conn->rac_device;
341                 LASSERT (current == dev->rad_scheduler);
342                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
343                                            &tx->tx_map_key);
344                 LASSERT (rrc == RAP_SUCCESS);
345
346                 dev->rad_nphysmap--;
347                 dev->rad_nppphysmap -= tx->tx_phys_npages;
348
349                 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
350                 break;
351
352         case RANAL_BUF_VIRT_MAPPED:
353                 LASSERT (tx->tx_conn != NULL);
354                 dev = tx->tx_conn->rac_device;
355                 LASSERT (current == dev->rad_scheduler);
356                 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
357                                            &tx->tx_map_key);
358                 LASSERT (rrc == RAP_SUCCESS);
359
360                 dev->rad_nvirtmap--;
361                 dev->rad_nobvirtmap -= tx->tx_nob;
362
363                 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
364                 break;
365         }
366 }
367
368 void
369 kranal_tx_done (kra_tx_t *tx, int completion)
370 {
371         lnet_msg_t      *lnetmsg[2];
372         unsigned long    flags;
373         int              i;
374
375         LASSERT (!in_interrupt());
376
377         kranal_unmap_buffer(tx);
378
379         lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
380         lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
381
382         tx->tx_buftype = RANAL_BUF_NONE;
383         tx->tx_msg.ram_type = RANAL_MSG_NONE;
384         tx->tx_conn = NULL;
385
386         spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
387
388         list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
389
390         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
391
392         /* finalize AFTER freeing lnet msgs */
393         for (i = 0; i < 2; i++) {
394                 if (lnetmsg[i] == NULL)
395                         continue;
396
397                 lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion);
398         }
399 }
400
401 kra_conn_t *
402 kranal_find_conn_locked (kra_peer_t *peer)
403 {
404         struct list_head *tmp;
405
406         /* just return the first connection */
407         list_for_each (tmp, &peer->rap_conns) {
408                 return list_entry(tmp, kra_conn_t, rac_list);
409         }
410
411         return NULL;
412 }
413
414 void
415 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
416 {
417         unsigned long    flags;
418
419         tx->tx_conn = conn;
420
421         spin_lock_irqsave(&conn->rac_lock, flags);
422         list_add_tail(&tx->tx_list, &conn->rac_fmaq);
423         tx->tx_qtime = jiffies;
424         spin_unlock_irqrestore(&conn->rac_lock, flags);
425
426         kranal_schedule_conn(conn);
427 }
428
429 void
430 kranal_launch_tx (kra_tx_t *tx, lnet_nid_t nid)
431 {
432         unsigned long    flags;
433         kra_peer_t      *peer;
434         kra_conn_t      *conn;
435         int              rc;
436         int              retry;
437         rwlock_t        *g_lock = &kranal_data.kra_global_lock;
438
439         /* If I get here, I've committed to send, so I complete the tx with
440          * failure on any problems */
441
442         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
443
444         for (retry = 0; ; retry = 1) {
445
446                 read_lock(g_lock);
447
448                 peer = kranal_find_peer_locked(nid);
449                 if (peer != NULL) {
450                         conn = kranal_find_conn_locked(peer);
451                         if (conn != NULL) {
452                                 kranal_post_fma(conn, tx);
453                                 read_unlock(g_lock);
454                                 return;
455                         }
456                 }
457                 
458                 /* Making connections; I'll need a write lock... */
459                 read_unlock(g_lock);
460                 write_lock_irqsave(g_lock, flags);
461
462                 peer = kranal_find_peer_locked(nid);
463                 if (peer != NULL)
464                         break;
465                 
466                 write_unlock_irqrestore(g_lock, flags);
467                 
468                 if (retry) {
469                         CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
470                         kranal_tx_done(tx, -EHOSTUNREACH);
471                         return;
472                 }
473
474                 rc = kranal_add_persistent_peer(nid, LNET_NIDADDR(nid),
475                                                 lnet_acceptor_port());
476                 if (rc != 0) {
477                         CERROR("Can't add peer %s: %d\n",
478                                libcfs_nid2str(nid), rc);
479                         kranal_tx_done(tx, rc);
480                         return;
481                 }
482         }
483         
484         conn = kranal_find_conn_locked(peer);
485         if (conn != NULL) {
486                 /* Connection exists; queue message on it */
487                 kranal_post_fma(conn, tx);
488                 write_unlock_irqrestore(g_lock, flags);
489                 return;
490         }
491                         
492         LASSERT (peer->rap_persistence > 0);
493
494         if (!peer->rap_connecting) {
495                 LASSERT (list_empty(&peer->rap_tx_queue));
496
497                 if (!(peer->rap_reconnect_interval == 0 || /* first attempt */
498                       time_after_eq(jiffies, peer->rap_reconnect_time))) {
499                         write_unlock_irqrestore(g_lock, flags);
500                         kranal_tx_done(tx, -EHOSTUNREACH);
501                         return;
502                 }
503
504                 peer->rap_connecting = 1;
505                 kranal_peer_addref(peer); /* extra ref for connd */
506
507                 spin_lock(&kranal_data.kra_connd_lock);
508
509                 list_add_tail(&peer->rap_connd_list,
510                               &kranal_data.kra_connd_peers);
511                 wake_up(&kranal_data.kra_connd_waitq);
512
513                 spin_unlock(&kranal_data.kra_connd_lock);
514         }
515
516         /* A connection is being established; queue the message... */
517         list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
518
519         write_unlock_irqrestore(g_lock, flags);
520 }
521
522 void
523 kranal_rdma(kra_tx_t *tx, int type,
524             kra_rdma_desc_t *sink, int nob, __u64 cookie)
525 {
526         kra_conn_t   *conn = tx->tx_conn;
527         RAP_RETURN    rrc;
528         unsigned long flags;
529
530         LASSERT (kranal_tx_mapped(tx));
531         LASSERT (nob <= sink->rard_nob);
532         LASSERT (nob <= tx->tx_nob);
533
534         /* No actual race with scheduler sending CLOSE (I'm she!) */
535         LASSERT (current == conn->rac_device->rad_scheduler);
536
537         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
538         tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
539         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
540         tx->tx_rdma_desc.DstPtr = sink->rard_addr;
541         tx->tx_rdma_desc.DstKey = sink->rard_key;
542         tx->tx_rdma_desc.Length = nob;
543         tx->tx_rdma_desc.AppPtr = tx;
544
545         /* prep final completion message */
546         kranal_init_msg(&tx->tx_msg, type);
547         tx->tx_msg.ram_u.completion.racm_cookie = cookie;
548
549         if (nob == 0) { /* Immediate completion */
550                 kranal_post_fma(conn, tx);
551                 return;
552         }
553
554         LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
555
556         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
557         LASSERT (rrc == RAP_SUCCESS);
558
559         spin_lock_irqsave(&conn->rac_lock, flags);
560         list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
561         tx->tx_qtime = jiffies;
562         spin_unlock_irqrestore(&conn->rac_lock, flags);
563 }
564
565 int
566 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
567 {
568         __u32      nob_received = nob;
569         RAP_RETURN rrc;
570
571         LASSERT (conn->rac_rxmsg != NULL);
572         CDEBUG(D_NET, "Consuming %p\n", conn);
573
574         rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
575                              &nob_received, sizeof(kra_msg_t));
576         LASSERT (rrc == RAP_SUCCESS);
577
578         conn->rac_rxmsg = NULL;
579
580         if (nob_received < nob) {
581                 CWARN("Incomplete immediate msg from %s: expected %d, got %d\n",
582                       libcfs_nid2str(conn->rac_peer->rap_nid), 
583                       nob, nob_received);
584                 return -EPROTO;
585         }
586
587         return 0;
588 }
589
590 int
591 kranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
592 {
593         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
594         int               type = lntmsg->msg_type;
595         lnet_process_id_t target = lntmsg->msg_target;
596         int               target_is_router = lntmsg->msg_target_is_router;
597         int               routing = lntmsg->msg_routing;
598         unsigned int      niov = lntmsg->msg_niov;
599         struct iovec     *iov = lntmsg->msg_iov;
600         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
601         unsigned int      offset = lntmsg->msg_offset;
602         unsigned int      nob = lntmsg->msg_len;
603         kra_tx_t         *tx;
604         int               rc;
605
606         /* NB 'private' is different depending on what we're sending.... */
607
608         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
609                nob, niov, libcfs_id2str(target));
610
611         LASSERT (nob == 0 || niov > 0);
612         LASSERT (niov <= LNET_MAX_IOV);
613
614         LASSERT (!in_interrupt());
615         /* payload is either all vaddrs or all pages */
616         LASSERT (!(kiov != NULL && iov != NULL));
617
618         if (routing) {
619                 CERROR ("Can't route\n");
620                 return -EIO;
621         }
622
623         switch(type) {
624         default:
625                 LBUG();
626
627         case LNET_MSG_ACK:
628                 LASSERT (nob == 0);
629                 break;
630
631         case LNET_MSG_GET:
632                 LASSERT (niov == 0);
633                 LASSERT (nob == 0);
634                 /* We have to consider the eventual sink buffer rather than any
635                  * payload passed here (there isn't any, and strictly, looking
636                  * inside lntmsg is a layering violation).  We send a simple
637                  * IMMEDIATE GET if the sink buffer is mapped already and small
638                  * enough for FMA */
639
640                 if (routing || target_is_router)
641                         break;                  /* send IMMEDIATE */
642
643                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0 &&
644                     lntmsg->msg_md->md_length <= RANAL_FMA_MAX_DATA &&
645                     lntmsg->msg_md->md_length <= *kranal_tunables.kra_max_immediate)
646                         break;                  /* send IMMEDIATE */
647
648                 tx = kranal_new_tx_msg(RANAL_MSG_GET_REQ);
649                 if (tx == NULL)
650                         return -ENOMEM;
651
652                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
653                         rc = kranal_setup_virt_buffer(tx, lntmsg->msg_md->md_niov,
654                                                       lntmsg->msg_md->md_iov.iov,
655                                                       0, lntmsg->msg_md->md_length);
656                 else
657                         rc = kranal_setup_phys_buffer(tx, lntmsg->msg_md->md_niov,
658                                                       lntmsg->msg_md->md_iov.kiov,
659                                                       0, lntmsg->msg_md->md_length);
660                 if (rc != 0) {
661                         kranal_tx_done(tx, rc);
662                         return -EIO;
663                 }
664
665                 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
666                 if (tx->tx_lntmsg[1] == NULL) {
667                         CERROR("Can't create reply for GET to %s\n", 
668                                libcfs_nid2str(target.nid));
669                         kranal_tx_done(tx, rc);
670                         return -EIO;
671                 }
672
673                 tx->tx_lntmsg[0] = lntmsg;
674                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
675                 /* rest of tx_msg is setup just before it is sent */
676                 kranal_launch_tx(tx, target.nid);
677                 return 0;
678
679         case LNET_MSG_REPLY:
680         case LNET_MSG_PUT:
681                 if (kiov == NULL &&             /* not paged */
682                     nob <= RANAL_FMA_MAX_DATA && /* small enough */
683                     nob <= *kranal_tunables.kra_max_immediate)
684                         break;                  /* send IMMEDIATE */
685
686                 tx = kranal_new_tx_msg(RANAL_MSG_PUT_REQ);
687                 if (tx == NULL)
688                         return -ENOMEM;
689
690                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
691                 if (rc != 0) {
692                         kranal_tx_done(tx, rc);
693                         return -EIO;
694                 }
695
696                 tx->tx_lntmsg[0] = lntmsg;
697                 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
698                 /* rest of tx_msg is setup just before it is sent */
699                 kranal_launch_tx(tx, target.nid);
700                 return 0;
701         }
702
703         /* send IMMEDIATE */
704
705         LASSERT (kiov == NULL);
706         LASSERT (nob <= RANAL_FMA_MAX_DATA);
707
708         tx = kranal_new_tx_msg(RANAL_MSG_IMMEDIATE);
709         if (tx == NULL)
710                 return -ENOMEM;
711
712         rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
713         if (rc != 0) {
714                 kranal_tx_done(tx, rc);
715                 return -EIO;
716         }
717
718         tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
719         tx->tx_lntmsg[0] = lntmsg;
720         kranal_launch_tx(tx, target.nid);
721         return 0;
722 }
723
724 void
725 kranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg)
726 {
727         kra_msg_t     *rxmsg = conn->rac_rxmsg;
728         unsigned int   niov = lntmsg->msg_niov;
729         struct iovec  *iov = lntmsg->msg_iov;
730         lnet_kiov_t   *kiov = lntmsg->msg_kiov;
731         unsigned int   offset = lntmsg->msg_offset;
732         unsigned int   nob = lntmsg->msg_len;
733         kra_tx_t      *tx;
734         int            rc;
735
736         tx = kranal_get_idle_tx();
737         if (tx == NULL)
738                 goto failed_0;
739
740         rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
741         if (rc != 0)
742                 goto failed_1;
743
744         tx->tx_conn = conn;
745
746         rc = kranal_map_buffer(tx);
747         if (rc != 0)
748                 goto failed_1;
749
750         tx->tx_lntmsg[0] = lntmsg;
751
752         kranal_rdma(tx, RANAL_MSG_GET_DONE,
753                     &rxmsg->ram_u.get.ragm_desc, nob,
754                     rxmsg->ram_u.get.ragm_cookie);
755         return;
756
757  failed_1:
758         kranal_tx_done(tx, -EIO);
759  failed_0:
760         lnet_finalize(ni, lntmsg, -EIO);
761 }
762
763 int
764 kranal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
765                    void **new_private)
766 {
767         kra_conn_t *conn = (kra_conn_t *)private;
768
769         LCONSOLE_ERROR("Dropping message from %s: no buffers free.\n",
770                        libcfs_nid2str(conn->rac_peer->rap_nid));
771
772         return -EDEADLK;
773 }
774
775 int
776 kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
777              int delayed, unsigned int niov, 
778              struct iovec *iov, lnet_kiov_t *kiov,
779              unsigned int offset, unsigned int mlen, unsigned int rlen)
780 {
781         kra_conn_t  *conn = private;
782         kra_msg_t   *rxmsg = conn->rac_rxmsg;
783         kra_tx_t    *tx;
784         void        *buffer;
785         int          rc;
786
787         LASSERT (mlen <= rlen);
788         LASSERT (!in_interrupt());
789         /* Either all pages or all vaddrs */
790         LASSERT (!(kiov != NULL && iov != NULL));
791
792         CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg);
793
794         switch(rxmsg->ram_type) {
795         default:
796                 LBUG();
797
798         case RANAL_MSG_IMMEDIATE:
799                 if (mlen == 0) {
800                         buffer = NULL;
801                 } else if (kiov != NULL) {
802                         CERROR("Can't recv immediate into paged buffer\n");
803                         return -EIO;
804                 } else {
805                         LASSERT (niov > 0);
806                         while (offset >= iov->iov_len) {
807                                 offset -= iov->iov_len;
808                                 iov++;
809                                 niov--;
810                                 LASSERT (niov > 0);
811                         }
812                         if (mlen > iov->iov_len - offset) {
813                                 CERROR("Can't handle immediate frags\n");
814                                 return -EIO;
815                         }
816                         buffer = ((char *)iov->iov_base) + offset;
817                 }
818                 rc = kranal_consume_rxmsg(conn, buffer, mlen);
819                 lnet_finalize(ni, lntmsg, (rc == 0) ? 0 : -EIO);
820                 return 0;
821
822         case RANAL_MSG_PUT_REQ:
823                 tx = kranal_new_tx_msg(RANAL_MSG_PUT_ACK);
824                 if (tx == NULL) {
825                         kranal_consume_rxmsg(conn, NULL, 0);
826                         return -ENOMEM;
827                 }
828                 
829                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
830                 if (rc != 0) {
831                         kranal_tx_done(tx, rc);
832                         kranal_consume_rxmsg(conn, NULL, 0);
833                         return -EIO;
834                 }
835
836                 tx->tx_conn = conn;
837                 rc = kranal_map_buffer(tx);
838                 if (rc != 0) {
839                         kranal_tx_done(tx, rc);
840                         kranal_consume_rxmsg(conn, NULL, 0);
841                         return -EIO;
842                 }
843
844                 tx->tx_msg.ram_u.putack.rapam_src_cookie =
845                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
846                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
847                 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
848                 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
849                         (__u64)((unsigned long)tx->tx_buffer);
850                 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
851
852                 tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
853
854                 kranal_post_fma(conn, tx);
855                 kranal_consume_rxmsg(conn, NULL, 0);
856                 return 0;
857
858         case RANAL_MSG_GET_REQ:
859                 if (lntmsg != NULL) {
860                         /* Matched! */
861                         kranal_reply(ni, conn, lntmsg);
862                 } else {
863                         /* No match */
864                         tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);
865                         if (tx != NULL) {
866                                 tx->tx_msg.ram_u.completion.racm_cookie = 
867                                         rxmsg->ram_u.get.ragm_cookie;
868                                 kranal_post_fma(conn, tx);
869                         }
870                 }
871                 kranal_consume_rxmsg(conn, NULL, 0);
872                 return 0;
873         }
874 }
875
876 int
877 kranal_thread_start (int(*fn)(void *arg), void *arg)
878 {
879         long    pid = kernel_thread(fn, arg, 0);
880
881         if (pid < 0)
882                 return(int)pid;
883
884         atomic_inc(&kranal_data.kra_nthreads);
885         return 0;
886 }
887
888 void
889 kranal_thread_fini (void)
890 {
891         atomic_dec(&kranal_data.kra_nthreads);
892 }
893
894 int
895 kranal_check_conn_timeouts (kra_conn_t *conn)
896 {
897         kra_tx_t          *tx;
898         struct list_head  *ttmp;
899         unsigned long      flags;
900         long               timeout;
901         unsigned long      now = jiffies;
902
903         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
904                  conn->rac_state == RANAL_CONN_CLOSING);
905
906         if (!conn->rac_close_sent &&
907             time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
908                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
909                 CDEBUG(D_NET, "Scheduling keepalive %p->%s\n",
910                        conn, libcfs_nid2str(conn->rac_peer->rap_nid));
911                 kranal_schedule_conn(conn);
912         }
913
914         timeout = conn->rac_timeout * HZ;
915
916         if (!conn->rac_close_recvd &&
917             time_after_eq(now, conn->rac_last_rx + timeout)) {
918                 CERROR("%s received from %s within %lu seconds\n",
919                        (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
920                        "Nothing" : "CLOSE not",
921                        libcfs_nid2str(conn->rac_peer->rap_nid), 
922                        (now - conn->rac_last_rx)/HZ);
923                 return -ETIMEDOUT;
924         }
925
926         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
927                 return 0;
928
929         /* Check the conn's queues are moving.  These are "belt+braces" checks,
930          * in case of hardware/software errors that make this conn seem
931          * responsive even though it isn't progressing its message queues. */
932
933         spin_lock_irqsave(&conn->rac_lock, flags);
934
935         list_for_each (ttmp, &conn->rac_fmaq) {
936                 tx = list_entry(ttmp, kra_tx_t, tx_list);
937
938                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
939                         spin_unlock_irqrestore(&conn->rac_lock, flags);
940                         CERROR("tx on fmaq for %s blocked %lu seconds\n",
941                                libcfs_nid2str(conn->rac_peer->rap_nid),
942                                (now - tx->tx_qtime)/HZ);
943                         return -ETIMEDOUT;
944                 }
945         }
946
947         list_for_each (ttmp, &conn->rac_rdmaq) {
948                 tx = list_entry(ttmp, kra_tx_t, tx_list);
949
950                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
951                         spin_unlock_irqrestore(&conn->rac_lock, flags);
952                         CERROR("tx on rdmaq for %s blocked %lu seconds\n",
953                                libcfs_nid2str(conn->rac_peer->rap_nid), 
954                                (now - tx->tx_qtime)/HZ);
955                         return -ETIMEDOUT;
956                 }
957         }
958
959         list_for_each (ttmp, &conn->rac_replyq) {
960                 tx = list_entry(ttmp, kra_tx_t, tx_list);
961
962                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
963                         spin_unlock_irqrestore(&conn->rac_lock, flags);
964                         CERROR("tx on replyq for %s blocked %lu seconds\n",
965                                libcfs_nid2str(conn->rac_peer->rap_nid),
966                                (now - tx->tx_qtime)/HZ);
967                         return -ETIMEDOUT;
968                 }
969         }
970
971         spin_unlock_irqrestore(&conn->rac_lock, flags);
972         return 0;
973 }
974
975 void
976 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
977 {
978         struct list_head  *conns = &kranal_data.kra_conns[idx];
979         struct list_head  *ctmp;
980         kra_conn_t        *conn;
981         unsigned long      flags;
982         int                rc;
983
984  again:
985         /* NB. We expect to check all the conns and not find any problems, so
986          * we just use a shared lock while we take a look... */
987         read_lock(&kranal_data.kra_global_lock);
988
989         list_for_each (ctmp, conns) {
990                 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
991
992                 if (conn->rac_timeout < *min_timeoutp )
993                         *min_timeoutp = conn->rac_timeout;
994                 if (conn->rac_keepalive < *min_timeoutp )
995                         *min_timeoutp = conn->rac_keepalive;
996
997                 rc = kranal_check_conn_timeouts(conn);
998                 if (rc == 0)
999                         continue;
1000
1001                 kranal_conn_addref(conn);
1002                 read_unlock(&kranal_data.kra_global_lock);
1003
1004                 CERROR("Conn to %s, cqid %d timed out\n",
1005                        libcfs_nid2str(conn->rac_peer->rap_nid), 
1006                        conn->rac_cqid);
1007
1008                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1009
1010                 switch (conn->rac_state) {
1011                 default:
1012                         LBUG();
1013
1014                 case RANAL_CONN_ESTABLISHED:
1015                         kranal_close_conn_locked(conn, -ETIMEDOUT);
1016                         break;
1017
1018                 case RANAL_CONN_CLOSING:
1019                         kranal_terminate_conn_locked(conn);
1020                         break;
1021                 }
1022
1023                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1024
1025                 kranal_conn_decref(conn);
1026
1027                 /* start again now I've dropped the lock */
1028                 goto again;
1029         }
1030
1031         read_unlock(&kranal_data.kra_global_lock);
1032 }
1033
1034 int
1035 kranal_connd (void *arg)
1036 {
1037         long               id = (long)arg;
1038         char               name[16];
1039         wait_queue_t       wait;
1040         unsigned long      flags;
1041         kra_peer_t        *peer;
1042         kra_acceptsock_t  *ras;
1043         int                did_something;
1044
1045         snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
1046         cfs_daemonize(name);
1047         cfs_block_allsigs();
1048
1049         init_waitqueue_entry(&wait, current);
1050
1051         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1052
1053         while (!kranal_data.kra_shutdown) {
1054                 did_something = 0;
1055
1056                 if (!list_empty(&kranal_data.kra_connd_acceptq)) {
1057                         ras = list_entry(kranal_data.kra_connd_acceptq.next,
1058                                          kra_acceptsock_t, ras_list);
1059                         list_del(&ras->ras_list);
1060
1061                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1062
1063                         CDEBUG(D_NET,"About to handshake someone\n");
1064
1065                         kranal_conn_handshake(ras->ras_sock, NULL);
1066                         kranal_free_acceptsock(ras);
1067
1068                         CDEBUG(D_NET,"Finished handshaking someone\n");
1069
1070                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1071                         did_something = 1;
1072                 }
1073
1074                 if (!list_empty(&kranal_data.kra_connd_peers)) {
1075                         peer = list_entry(kranal_data.kra_connd_peers.next,
1076                                           kra_peer_t, rap_connd_list);
1077
1078                         list_del_init(&peer->rap_connd_list);
1079                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1080
1081                         kranal_connect(peer);
1082                         kranal_peer_decref(peer);
1083
1084                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1085                         did_something = 1;
1086                 }
1087
1088                 if (did_something)
1089                         continue;
1090
1091                 set_current_state(TASK_INTERRUPTIBLE);
1092                 add_wait_queue_exclusive(&kranal_data.kra_connd_waitq, &wait);
1093
1094                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1095
1096                 schedule ();
1097
1098                 set_current_state(TASK_RUNNING);
1099                 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1100
1101                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1102         }
1103
1104         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1105
1106         kranal_thread_fini();
1107         return 0;
1108 }
1109
1110 void
1111 kranal_update_reaper_timeout(long timeout)
1112 {
1113         unsigned long   flags;
1114
1115         LASSERT (timeout > 0);
1116
1117         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1118
1119         if (timeout < kranal_data.kra_new_min_timeout)
1120                 kranal_data.kra_new_min_timeout = timeout;
1121
1122         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1123 }
1124
1125 int
1126 kranal_reaper (void *arg)
1127 {
1128         wait_queue_t       wait;
1129         unsigned long      flags;
1130         long               timeout;
1131         int                i;
1132         int                conn_entries = kranal_data.kra_conn_hash_size;
1133         int                conn_index = 0;
1134         int                base_index = conn_entries - 1;
1135         unsigned long      next_check_time = jiffies;
1136         long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1137         long               current_min_timeout = 1;
1138
1139         cfs_daemonize("kranal_reaper");
1140         cfs_block_allsigs();
1141
1142         init_waitqueue_entry(&wait, current);
1143
1144         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1145
1146         while (!kranal_data.kra_shutdown) {
1147                 /* I wake up every 'p' seconds to check for timeouts on some
1148                  * more peers.  I try to check every connection 'n' times
1149                  * within the global minimum of all keepalive and timeout
1150                  * intervals, to ensure I attend to every connection within
1151                  * (n+1)/n times its timeout intervals. */
1152                 const int     p = 1;
1153                 const int     n = 3;
1154                 unsigned long min_timeout;
1155                 int           chunk;
1156
1157                 /* careful with the jiffy wrap... */
1158                 timeout = (long)(next_check_time - jiffies);
1159                 if (timeout > 0) {
1160                         set_current_state(TASK_INTERRUPTIBLE);
1161                         add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1162
1163                         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1164
1165                         schedule_timeout(timeout);
1166
1167                         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1168
1169                         set_current_state(TASK_RUNNING);
1170                         remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1171                         continue;
1172                 }
1173
1174                 if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1175                         /* new min timeout set: restart min timeout scan */
1176                         next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1177                         base_index = conn_index - 1;
1178                         if (base_index < 0)
1179                                 base_index = conn_entries - 1;
1180
1181                         if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1182                                 current_min_timeout = kranal_data.kra_new_min_timeout;
1183                                 CDEBUG(D_NET, "Set new min timeout %ld\n",
1184                                        current_min_timeout);
1185                         }
1186
1187                         kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1188                 }
1189                 min_timeout = current_min_timeout;
1190
1191                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1192
1193                 LASSERT (min_timeout > 0);
1194
1195                 /* Compute how many table entries to check now so I get round
1196                  * the whole table fast enough given that I do this at fixed
1197                  * intervals of 'p' seconds) */
1198                 chunk = conn_entries;
1199                 if (min_timeout > n * p)
1200                         chunk = (chunk * n * p) / min_timeout;
1201                 if (chunk == 0)
1202                         chunk = 1;
1203
1204                 for (i = 0; i < chunk; i++) {
1205                         kranal_reaper_check(conn_index,
1206                                             &next_min_timeout);
1207                         conn_index = (conn_index + 1) % conn_entries;
1208                 }
1209
1210                 next_check_time += p * HZ;
1211
1212                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1213
1214                 if (((conn_index - chunk <= base_index &&
1215                       base_index < conn_index) ||
1216                      (conn_index - conn_entries - chunk <= base_index &&
1217                       base_index < conn_index - conn_entries))) {
1218
1219                         /* Scanned all conns: set current_min_timeout... */
1220                         if (current_min_timeout != next_min_timeout) {
1221                                 current_min_timeout = next_min_timeout;
1222                                 CDEBUG(D_NET, "Set new min timeout %ld\n",
1223                                        current_min_timeout);
1224                         }
1225
1226                         /* ...and restart min timeout scan */
1227                         next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1228                         base_index = conn_index - 1;
1229                         if (base_index < 0)
1230                                 base_index = conn_entries - 1;
1231                 }
1232         }
1233
1234         kranal_thread_fini();
1235         return 0;
1236 }
1237
1238 void
1239 kranal_check_rdma_cq (kra_device_t *dev)
1240 {
1241         kra_conn_t          *conn;
1242         kra_tx_t            *tx;
1243         RAP_RETURN           rrc;
1244         unsigned long        flags;
1245         RAP_RDMA_DESCRIPTOR *desc;
1246         __u32                cqid;
1247         __u32                event_type;
1248
1249         for (;;) {
1250                 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1251                 if (rrc == RAP_NOT_DONE) {
1252                         CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
1253                         return;
1254                 }
1255
1256                 LASSERT (rrc == RAP_SUCCESS);
1257                 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1258
1259                 read_lock(&kranal_data.kra_global_lock);
1260
1261                 conn = kranal_cqid2conn_locked(cqid);
1262                 if (conn == NULL) {
1263                         /* Conn was destroyed? */
1264                         CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
1265                         read_unlock(&kranal_data.kra_global_lock);
1266                         continue;
1267                 }
1268
1269                 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1270                 LASSERT (rrc == RAP_SUCCESS);
1271
1272                 CDEBUG(D_NET, "Completed %p\n",
1273                        list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
1274
1275                 spin_lock_irqsave(&conn->rac_lock, flags);
1276
1277                 LASSERT (!list_empty(&conn->rac_rdmaq));
1278                 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1279                 list_del(&tx->tx_list);
1280
1281                 LASSERT(desc->AppPtr == (void *)tx);
1282                 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1283                         tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1284
1285                 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1286                 tx->tx_qtime = jiffies;
1287
1288                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1289
1290                 /* Get conn's fmaq processed, now I've just put something
1291                  * there */
1292                 kranal_schedule_conn(conn);
1293
1294                 read_unlock(&kranal_data.kra_global_lock);
1295         }
1296 }
1297
1298 void
1299 kranal_check_fma_cq (kra_device_t *dev)
1300 {
1301         kra_conn_t         *conn;
1302         RAP_RETURN          rrc;
1303         __u32               cqid;
1304         __u32               event_type;
1305         struct list_head   *conns;
1306         struct list_head   *tmp;
1307         int                 i;
1308
1309         for (;;) {
1310                 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1311                 if (rrc == RAP_NOT_DONE) {
1312                         CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
1313                         return;
1314                 }
1315
1316                 LASSERT (rrc == RAP_SUCCESS);
1317
1318                 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1319
1320                         read_lock(&kranal_data.kra_global_lock);
1321
1322                         conn = kranal_cqid2conn_locked(cqid);
1323                         if (conn == NULL) {
1324                                 CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
1325                                        cqid);
1326                         } else {
1327                                 CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
1328                                        conn, cqid);
1329                                 kranal_schedule_conn(conn);
1330                         }
1331
1332                         read_unlock(&kranal_data.kra_global_lock);
1333                         continue;
1334                 }
1335
1336                 /* FMA CQ has overflowed: check ALL conns */
1337                 CWARN("FMA CQ overflow: scheduling ALL conns on device %d\n", 
1338                       dev->rad_id);
1339
1340                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1341
1342                         read_lock(&kranal_data.kra_global_lock);
1343
1344                         conns = &kranal_data.kra_conns[i];
1345
1346                         list_for_each (tmp, conns) {
1347                                 conn = list_entry(tmp, kra_conn_t,
1348                                                   rac_hashlist);
1349
1350                                 if (conn->rac_device == dev)
1351                                         kranal_schedule_conn(conn);
1352                         }
1353
1354                         /* don't block write lockers for too long... */
1355                         read_unlock(&kranal_data.kra_global_lock);
1356                 }
1357         }
1358 }
1359
1360 int
1361 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1362                void *immediate, int immediatenob)
1363 {
1364         int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1365         RAP_RETURN rrc;
1366
1367         CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
1368                conn, msg, msg->ram_type, sync ? "(sync)" : "",
1369                immediate, immediatenob);
1370
1371         LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1372         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1373                  immediatenob <= RANAL_FMA_MAX_DATA :
1374                  immediatenob == 0);
1375
1376         msg->ram_connstamp = conn->rac_my_connstamp;
1377         msg->ram_seq = conn->rac_tx_seq;
1378
1379         if (sync)
1380                 rrc = RapkFmaSyncSend(conn->rac_rihandle,
1381                                       immediate, immediatenob,
1382                                       msg, sizeof(*msg));
1383         else
1384                 rrc = RapkFmaSend(conn->rac_rihandle,
1385                                   immediate, immediatenob,
1386                                   msg, sizeof(*msg));
1387
1388         switch (rrc) {
1389         default:
1390                 LBUG();
1391
1392         case RAP_SUCCESS:
1393                 conn->rac_last_tx = jiffies;
1394                 conn->rac_tx_seq++;
1395                 return 0;
1396
1397         case RAP_NOT_DONE:
1398                 if (time_after_eq(jiffies,
1399                                   conn->rac_last_tx + conn->rac_keepalive*HZ))
1400                         CWARN("EAGAIN sending %02x (idle %lu secs)\n",
1401                                msg->ram_type, (jiffies - conn->rac_last_tx)/HZ);
1402                 return -EAGAIN;
1403         }
1404 }
1405
1406 void
1407 kranal_process_fmaq (kra_conn_t *conn)
1408 {
1409         unsigned long flags;
1410         int           more_to_do;
1411         kra_tx_t     *tx;
1412         int           rc;
1413         int           expect_reply;
1414
1415         /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1416          *       However I will be rescheduled by an FMA completion event
1417          *       when I eventually get some.
1418          * NB 2. Sampling rac_state here races with setting it elsewhere.
1419          *       But it doesn't matter if I try to send a "real" message just
1420          *       as I start closing because I'll get scheduled to send the
1421          *       close anyway. */
1422
1423         /* Not racing with incoming message processing! */
1424         LASSERT (current == conn->rac_device->rad_scheduler);
1425
1426         if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1427                 if (!list_empty(&conn->rac_rdmaq)) {
1428                         /* RDMAs in progress */
1429                         LASSERT (!conn->rac_close_sent);
1430
1431                         if (time_after_eq(jiffies,
1432                                           conn->rac_last_tx +
1433                                           conn->rac_keepalive * HZ)) {
1434                                 CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
1435                                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1436                                 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1437                         }
1438                         return;
1439                 }
1440
1441                 if (conn->rac_close_sent)
1442                         return;
1443
1444                 CWARN("sending CLOSE to %s\n", 
1445                       libcfs_nid2str(conn->rac_peer->rap_nid));
1446                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1447                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1448                 if (rc != 0)
1449                         return;
1450
1451                 conn->rac_close_sent = 1;
1452                 if (!conn->rac_close_recvd)
1453                         return;
1454
1455                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1456
1457                 if (conn->rac_state == RANAL_CONN_CLOSING)
1458                         kranal_terminate_conn_locked(conn);
1459
1460                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1461                 return;
1462         }
1463
1464         spin_lock_irqsave(&conn->rac_lock, flags);
1465
1466         if (list_empty(&conn->rac_fmaq)) {
1467
1468                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1469
1470                 if (time_after_eq(jiffies,
1471                                   conn->rac_last_tx + conn->rac_keepalive * HZ)) {
1472                         CDEBUG(D_NET, "sending NOOP -> %s (%p idle %lu(%ld))\n",
1473                                libcfs_nid2str(conn->rac_peer->rap_nid), conn,
1474                                (jiffies - conn->rac_last_tx)/HZ, conn->rac_keepalive);
1475                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1476                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1477                 }
1478                 return;
1479         }
1480
1481         tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1482         list_del(&tx->tx_list);
1483         more_to_do = !list_empty(&conn->rac_fmaq);
1484
1485         spin_unlock_irqrestore(&conn->rac_lock, flags);
1486
1487         expect_reply = 0;
1488         CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
1489                tx, tx->tx_msg.ram_type, tx->tx_cookie);
1490         switch (tx->tx_msg.ram_type) {
1491         default:
1492                 LBUG();
1493
1494         case RANAL_MSG_IMMEDIATE:
1495                 rc = kranal_sendmsg(conn, &tx->tx_msg,
1496                                     tx->tx_buffer, tx->tx_nob);
1497                 break;
1498
1499         case RANAL_MSG_PUT_NAK:
1500         case RANAL_MSG_PUT_DONE:
1501         case RANAL_MSG_GET_NAK:
1502         case RANAL_MSG_GET_DONE:
1503                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1504                 break;
1505
1506         case RANAL_MSG_PUT_REQ:
1507                 rc = kranal_map_buffer(tx);
1508                 LASSERT (rc != -EAGAIN);
1509                 if (rc != 0)
1510                         break;
1511
1512                 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1513                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1514                 expect_reply = 1;
1515                 break;
1516
1517         case RANAL_MSG_PUT_ACK:
1518                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1519                 expect_reply = 1;
1520                 break;
1521
1522         case RANAL_MSG_GET_REQ:
1523                 rc = kranal_map_buffer(tx);
1524                 LASSERT (rc != -EAGAIN);
1525                 if (rc != 0)
1526                         break;
1527
1528                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1529                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1530                 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1531                         (__u64)((unsigned long)tx->tx_buffer);
1532                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1533                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1534                 expect_reply = 1;
1535                 break;
1536         }
1537
1538         if (rc == -EAGAIN) {
1539                 /* I need credits to send this.  Replace tx at the head of the
1540                  * fmaq and I'll get rescheduled when credits appear */
1541                 CDEBUG(D_NET, "EAGAIN on %p\n", conn);
1542                 spin_lock_irqsave(&conn->rac_lock, flags);
1543                 list_add(&tx->tx_list, &conn->rac_fmaq);
1544                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1545                 return;
1546         }
1547
1548         if (!expect_reply || rc != 0) {
1549                 kranal_tx_done(tx, rc);
1550         } else {
1551                 /* LASSERT(current) above ensures this doesn't race with reply
1552                  * processing */
1553                 spin_lock_irqsave(&conn->rac_lock, flags);
1554                 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1555                 tx->tx_qtime = jiffies;
1556                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1557         }
1558
1559         if (more_to_do) {
1560                 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
1561                 kranal_schedule_conn(conn);
1562         }
1563 }
1564
1565 static inline void
1566 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1567 {
1568         __swab64s(&d->rard_key.Key);
1569         __swab16s(&d->rard_key.Cookie);
1570         __swab16s(&d->rard_key.MdHandle);
1571         __swab32s(&d->rard_key.Flags);
1572         __swab64s(&d->rard_addr.AddressBits);
1573         __swab32s(&d->rard_nob);
1574 }
1575
1576 kra_tx_t *
1577 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1578 {
1579         struct list_head *ttmp;
1580         kra_tx_t         *tx;
1581         unsigned long     flags;
1582
1583         spin_lock_irqsave(&conn->rac_lock, flags);
1584
1585         list_for_each(ttmp, &conn->rac_replyq) {
1586                 tx = list_entry(ttmp, kra_tx_t, tx_list);
1587
1588                 CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
1589                        tx, tx->tx_msg.ram_type, tx->tx_cookie);
1590
1591                 if (tx->tx_cookie != cookie)
1592                         continue;
1593
1594                 if (tx->tx_msg.ram_type != type) {
1595                         spin_unlock_irqrestore(&conn->rac_lock, flags);
1596                         CWARN("Unexpected type %x (%x expected) "
1597                               "matched reply from %s\n",
1598                               tx->tx_msg.ram_type, type,
1599                               libcfs_nid2str(conn->rac_peer->rap_nid));
1600                         return NULL;
1601                 }
1602
1603                 list_del(&tx->tx_list);
1604                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1605                 return tx;
1606         }
1607
1608         spin_unlock_irqrestore(&conn->rac_lock, flags);
1609         CWARN("Unmatched reply %02x/"LPX64" from %s\n",
1610               type, cookie, libcfs_nid2str(conn->rac_peer->rap_nid));
1611         return NULL;
1612 }
1613
1614 void
1615 kranal_check_fma_rx (kra_conn_t *conn)
1616 {
1617         unsigned long flags;
1618         __u32         seq;
1619         kra_tx_t     *tx;
1620         kra_msg_t    *msg;
1621         void         *prefix;
1622         RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1623         kra_peer_t   *peer = conn->rac_peer;
1624         int           rc = 0;
1625         int           repost = 1;
1626
1627         if (rrc == RAP_NOT_DONE)
1628                 return;
1629
1630         CDEBUG(D_NET, "RX on %p\n", conn);
1631
1632         LASSERT (rrc == RAP_SUCCESS);
1633         conn->rac_last_rx = jiffies;
1634         seq = conn->rac_rx_seq++;
1635         msg = (kra_msg_t *)prefix;
1636
1637         /* stash message for portals callbacks they'll NULL
1638          * rac_rxmsg if they consume it */
1639         LASSERT (conn->rac_rxmsg == NULL);
1640         conn->rac_rxmsg = msg;
1641
1642         if (msg->ram_magic != RANAL_MSG_MAGIC) {
1643                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1644                         CERROR("Unexpected magic %08x from %s\n",
1645                                msg->ram_magic, libcfs_nid2str(peer->rap_nid));
1646                         rc = -EPROTO;
1647                         goto out;
1648                 }
1649
1650                 __swab32s(&msg->ram_magic);
1651                 __swab16s(&msg->ram_version);
1652                 __swab16s(&msg->ram_type);
1653                 __swab64s(&msg->ram_srcnid);
1654                 __swab64s(&msg->ram_connstamp);
1655                 __swab32s(&msg->ram_seq);
1656
1657                 /* NB message type checked below; NOT here... */
1658                 switch (msg->ram_type) {
1659                 case RANAL_MSG_PUT_ACK:
1660                         kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1661                         break;
1662
1663                 case RANAL_MSG_GET_REQ:
1664                         kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1665                         break;
1666
1667                 default:
1668                         break;
1669                 }
1670         }
1671
1672         if (msg->ram_version != RANAL_MSG_VERSION) {
1673                 CERROR("Unexpected protocol version %d from %s\n",
1674                        msg->ram_version, libcfs_nid2str(peer->rap_nid));
1675                 rc = -EPROTO;
1676                 goto out;
1677         }
1678
1679         if (msg->ram_srcnid != peer->rap_nid) {
1680                 CERROR("Unexpected peer %s from %s\n",
1681                        libcfs_nid2str(msg->ram_srcnid), 
1682                        libcfs_nid2str(peer->rap_nid));
1683                 rc = -EPROTO;
1684                 goto out;
1685         }
1686
1687         if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1688                 CERROR("Unexpected connstamp "LPX64"("LPX64
1689                        " expected) from %s\n",
1690                        msg->ram_connstamp, conn->rac_peer_connstamp,
1691                        libcfs_nid2str(peer->rap_nid));
1692                 rc = -EPROTO;
1693                 goto out;
1694         }
1695
1696         if (msg->ram_seq != seq) {
1697                 CERROR("Unexpected sequence number %d(%d expected) from %s\n",
1698                        msg->ram_seq, seq, libcfs_nid2str(peer->rap_nid));
1699                 rc = -EPROTO;
1700                 goto out;
1701         }
1702
1703         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1704                 /* This message signals RDMA completion... */
1705                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1706                 if (rrc != RAP_SUCCESS) {
1707                         CERROR("RapkFmaSyncWait failed: %d\n", rrc);
1708                         rc = -ENETDOWN;
1709                         goto out;
1710                 }
1711         }
1712
1713         if (conn->rac_close_recvd) {
1714                 CERROR("Unexpected message %d after CLOSE from %s\n",
1715                        msg->ram_type, libcfs_nid2str(conn->rac_peer->rap_nid));
1716                 rc = -EPROTO;
1717                 goto out;
1718         }
1719
1720         if (msg->ram_type == RANAL_MSG_CLOSE) {
1721                 CWARN("RX CLOSE from %s\n", libcfs_nid2str(conn->rac_peer->rap_nid));
1722                 conn->rac_close_recvd = 1;
1723                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1724
1725                 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1726                         kranal_close_conn_locked(conn, 0);
1727                 else if (conn->rac_state == RANAL_CONN_CLOSING &&
1728                          conn->rac_close_sent)
1729                         kranal_terminate_conn_locked(conn);
1730
1731                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1732                 goto out;
1733         }
1734
1735         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1736                 goto out;
1737
1738         switch (msg->ram_type) {
1739         case RANAL_MSG_NOOP:
1740                 /* Nothing to do; just a keepalive */
1741                 CDEBUG(D_NET, "RX NOOP on %p\n", conn);
1742                 break;
1743
1744         case RANAL_MSG_IMMEDIATE:
1745                 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
1746                 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr, 
1747                                 msg->ram_srcnid, conn, 0);
1748                 repost = rc < 0;
1749                 break;
1750
1751         case RANAL_MSG_PUT_REQ:
1752                 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
1753                 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr, 
1754                                 msg->ram_srcnid, conn, 1);
1755                 repost = rc < 0;
1756                 break;
1757
1758         case RANAL_MSG_PUT_NAK:
1759                 CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
1760                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1761                                         msg->ram_u.completion.racm_cookie);
1762                 if (tx == NULL)
1763                         break;
1764
1765                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1766                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1767                 kranal_tx_done(tx, -ENOENT);    /* no match */
1768                 break;
1769
1770         case RANAL_MSG_PUT_ACK:
1771                 CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
1772                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1773                                         msg->ram_u.putack.rapam_src_cookie);
1774                 if (tx == NULL)
1775                         break;
1776
1777                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1778                             &msg->ram_u.putack.rapam_desc,
1779                             msg->ram_u.putack.rapam_desc.rard_nob,
1780                             msg->ram_u.putack.rapam_dst_cookie);
1781                 break;
1782
1783         case RANAL_MSG_PUT_DONE:
1784                 CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
1785                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1786                                         msg->ram_u.completion.racm_cookie);
1787                 if (tx == NULL)
1788                         break;
1789
1790                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1791                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1792                 kranal_tx_done(tx, 0);
1793                 break;
1794
1795         case RANAL_MSG_GET_REQ:
1796                 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
1797                 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr, 
1798                                 msg->ram_srcnid, conn, 1);
1799                 repost = rc < 0;
1800                 break;
1801
1802         case RANAL_MSG_GET_NAK:
1803                 CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
1804                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1805                                         msg->ram_u.completion.racm_cookie);
1806                 if (tx == NULL)
1807                         break;
1808
1809                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1810                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1811                 kranal_tx_done(tx, -ENOENT);    /* no match */
1812                 break;
1813
1814         case RANAL_MSG_GET_DONE:
1815                 CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
1816                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1817                                         msg->ram_u.completion.racm_cookie);
1818                 if (tx == NULL)
1819                         break;
1820
1821                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1822                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1823 #if 0
1824                 /* completion message should send rdma length if we ever allow
1825                  * GET truncation */
1826                 lnet_set_reply_msg_len(kranal_data.kra_ni, tx->tx_lntmsg[1], ???);
1827 #endif
1828                 kranal_tx_done(tx, 0);
1829                 break;
1830         }
1831
1832  out:
1833         if (rc < 0)                             /* protocol/comms error */
1834                 kranal_close_conn (conn, rc);
1835
1836         if (repost && conn->rac_rxmsg != NULL)
1837                 kranal_consume_rxmsg(conn, NULL, 0);
1838
1839         /* check again later */
1840         kranal_schedule_conn(conn);
1841 }
1842
1843 void
1844 kranal_complete_closed_conn (kra_conn_t *conn)
1845 {
1846         kra_tx_t   *tx;
1847         int         nfma;
1848         int         nreplies;
1849
1850         LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1851         LASSERT (list_empty(&conn->rac_list));
1852         LASSERT (list_empty(&conn->rac_hashlist));
1853
1854         for (nfma = 0; !list_empty(&conn->rac_fmaq); nfma++) {
1855                 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1856
1857                 list_del(&tx->tx_list);
1858                 kranal_tx_done(tx, -ECONNABORTED);
1859         }
1860
1861         LASSERT (list_empty(&conn->rac_rdmaq));
1862
1863         for (nreplies = 0; !list_empty(&conn->rac_replyq); nreplies++) {
1864                 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1865
1866                 list_del(&tx->tx_list);
1867                 kranal_tx_done(tx, -ECONNABORTED);
1868         }
1869
1870         CWARN("Closed conn %p -> %s: nmsg %d nreplies %d\n",
1871                conn, libcfs_nid2str(conn->rac_peer->rap_nid), nfma, nreplies);
1872 }
1873
1874 int
1875 kranal_process_new_conn (kra_conn_t *conn)
1876 {
1877         RAP_RETURN   rrc;
1878         
1879         rrc = RapkCompleteSync(conn->rac_rihandle, 1);
1880         if (rrc == RAP_SUCCESS)
1881                 return 0;
1882
1883         LASSERT (rrc == RAP_NOT_DONE);
1884         if (!time_after_eq(jiffies, conn->rac_last_tx + 
1885                            conn->rac_timeout * HZ))
1886                 return -EAGAIN;
1887
1888         /* Too late */
1889         rrc = RapkCompleteSync(conn->rac_rihandle, 0);
1890         LASSERT (rrc == RAP_SUCCESS);
1891         return -ETIMEDOUT;
1892 }
1893
1894 int
1895 kranal_scheduler (void *arg)
1896 {
1897         kra_device_t     *dev = (kra_device_t *)arg;
1898         wait_queue_t      wait;
1899         char              name[16];
1900         kra_conn_t       *conn;
1901         unsigned long     flags;
1902         unsigned long     deadline;
1903         unsigned long     soonest;
1904         int               nsoonest;
1905         long              timeout;
1906         struct list_head *tmp;
1907         struct list_head *nxt;
1908         int               rc;
1909         int               dropped_lock;
1910         int               busy_loops = 0;
1911
1912         snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1913         cfs_daemonize(name);
1914         cfs_block_allsigs();
1915
1916         dev->rad_scheduler = current;
1917         init_waitqueue_entry(&wait, current);
1918
1919         spin_lock_irqsave(&dev->rad_lock, flags);
1920
1921         while (!kranal_data.kra_shutdown) {
1922                 /* Safe: kra_shutdown only set when quiescent */
1923
1924                 if (busy_loops++ >= RANAL_RESCHED) {
1925                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1926
1927                         our_cond_resched();
1928                         busy_loops = 0;
1929
1930                         spin_lock_irqsave(&dev->rad_lock, flags);
1931                 }
1932
1933                 dropped_lock = 0;
1934
1935                 if (dev->rad_ready) {
1936                         /* Device callback fired since I last checked it */
1937                         dev->rad_ready = 0;
1938                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1939                         dropped_lock = 1;
1940
1941                         kranal_check_rdma_cq(dev);
1942                         kranal_check_fma_cq(dev);
1943
1944                         spin_lock_irqsave(&dev->rad_lock, flags);
1945                 }
1946
1947                 list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
1948                         conn = list_entry(tmp, kra_conn_t, rac_schedlist);
1949
1950                         list_del_init(&conn->rac_schedlist);
1951                         LASSERT (conn->rac_scheduled);
1952                         conn->rac_scheduled = 0;
1953                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1954                         dropped_lock = 1;
1955
1956                         kranal_check_fma_rx(conn);
1957                         kranal_process_fmaq(conn);
1958
1959                         if (conn->rac_state == RANAL_CONN_CLOSED)
1960                                 kranal_complete_closed_conn(conn);
1961
1962                         kranal_conn_decref(conn);
1963                         spin_lock_irqsave(&dev->rad_lock, flags);
1964                 }
1965
1966                 nsoonest = 0;
1967                 soonest = jiffies;
1968
1969                 list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
1970                         conn = list_entry(tmp, kra_conn_t, rac_schedlist);
1971                         
1972                         deadline = conn->rac_last_tx + conn->rac_keepalive;
1973                         if (time_after_eq(jiffies, deadline)) {
1974                                 /* Time to process this new conn */
1975                                 spin_unlock_irqrestore(&dev->rad_lock, flags);
1976                                 dropped_lock = 1;
1977
1978                                 rc = kranal_process_new_conn(conn);
1979                                 if (rc != -EAGAIN) {
1980                                         /* All done with this conn */
1981                                         spin_lock_irqsave(&dev->rad_lock, flags);
1982                                         list_del_init(&conn->rac_schedlist);
1983                                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1984
1985                                         kranal_conn_decref(conn);
1986                                         spin_lock_irqsave(&dev->rad_lock, flags);
1987                                         continue;
1988                                 }
1989
1990                                 /* retry with exponential backoff until HZ */
1991                                 if (conn->rac_keepalive == 0)
1992                                         conn->rac_keepalive = 1;
1993                                 else if (conn->rac_keepalive <= HZ)
1994                                         conn->rac_keepalive *= 2;
1995                                 else
1996                                         conn->rac_keepalive += HZ;
1997                                 
1998                                 deadline = conn->rac_last_tx + conn->rac_keepalive;
1999                                 spin_lock_irqsave(&dev->rad_lock, flags);
2000                         }
2001
2002                         /* Does this conn need attention soonest? */
2003                         if (nsoonest++ == 0 ||
2004                             !time_after_eq(deadline, soonest))
2005                                 soonest = deadline;
2006                 }
2007
2008                 if (dropped_lock)               /* may sleep iff I didn't drop the lock */
2009                         continue;
2010
2011                 set_current_state(TASK_INTERRUPTIBLE);
2012                 add_wait_queue_exclusive(&dev->rad_waitq, &wait);
2013                 spin_unlock_irqrestore(&dev->rad_lock, flags);
2014
2015                 if (nsoonest == 0) {
2016                         busy_loops = 0;
2017                         schedule();
2018                 } else {
2019                         timeout = (long)(soonest - jiffies);
2020                         if (timeout > 0) {
2021                                 busy_loops = 0;
2022                                 schedule_timeout(timeout);
2023                         }
2024                 }
2025
2026                 remove_wait_queue(&dev->rad_waitq, &wait);
2027                 set_current_state(TASK_RUNNING);
2028                 spin_lock_irqsave(&dev->rad_lock, flags);
2029         }
2030
2031         spin_unlock_irqrestore(&dev->rad_lock, flags);
2032
2033         dev->rad_scheduler = NULL;
2034         kranal_thread_fini();
2035         return 0;
2036 }