Whamcloud - gitweb
d4bacdf0a1b628c3a5f3580563ce4128610eade5
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "ranal.h"
25
26 int
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
28 {
29         /* I would guess that if kranal_get_peer (nid) == NULL,
30            and we're not routing, then 'nid' is very distant :) */
31         if ( nal->libnal_ni.ni_pid.nid == nid ) {
32                 *dist = 0;
33         } else {
34                 *dist = 1;
35         }
36
37         return 0;
38 }
39
40 void
41 kranal_device_callback(RAP_INT32 devid)
42 {
43         kra_device_t *dev;
44         int           i;
45         unsigned long flags;
46         
47         for (i = 0; i < kranal_data.kra_ndevs; i++) {
48
49                 dev = &kranal_data.kra_devices[i];
50                 if (dev->rad_id != devid)
51                         continue;
52
53                 spin_lock_irqsave(&dev->rad_lock, flags);
54
55                 if (!dev->rad_ready) {
56                         dev->rad_ready = 1;
57                         wake_up(&dev->rad_waitq);
58                 }
59
60                 spin_unlock_irqrestore(&dev->rad_lock, flags);
61                 return;
62         }
63         
64         CWARN("callback for unknown device %d\n", devid);
65 }
66
67 void
68 kranal_schedule_conn(kra_conn_t *conn)
69 {
70         kra_device_t    *dev = conn->rac_device;
71         unsigned long    flags;
72         
73         spin_lock_irqsave(&dev->rad_lock, flags);
74         
75         if (!conn->rac_scheduled) {
76                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
77                 conn->rac_scheduled = 1;
78                 list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
79                 wake_up(&dev->rad_waitq);
80         }
81
82         spin_unlock_irqrestore(&dev->rad_lock, flags);
83 }
84
85 kra_tx_t *
86 kranal_get_idle_tx (int may_block) 
87 {
88         unsigned long  flags;
89         kra_tx_t      *tx = NULL;
90         
91         for (;;) {
92                 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
93
94                 /* "normal" descriptor is free */
95                 if (!list_empty(&kranal_data.kra_idle_txs)) {
96                         tx = list_entry(kranal_data.kra_idle_txs.next,
97                                         kra_tx_t, tx_list);
98                         break;
99                 }
100
101                 if (!may_block) {
102                         /* may dip into reserve pool */
103                         if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
104                                 CERROR("reserved tx desc pool exhausted\n");
105                                 break;
106                         }
107
108                         tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
109                                         kra_tx_t, tx_list);
110                         break;
111                 }
112
113                 /* block for idle tx */
114                 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
115
116                 wait_event(kranal_data.kra_idle_tx_waitq,
117                            !list_empty(&kranal_data.kra_idle_txs));
118         }
119
120         if (tx != NULL) {
121                 list_del(&tx->tx_list);
122
123                 /* Allocate a new completion cookie.  It might not be
124                  * needed, but we've got a lock right now... */
125                 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
126
127                 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
128                 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
129                 LASSERT (tx->tx_conn == NULL);
130                 LASSERT (tx->tx_libmsg[0] == NULL);
131                 LASSERT (tx->tx_libmsg[1] == NULL);
132         }
133
134         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
135         
136         return tx;
137 }
138
139 void
140 kranal_init_msg(kra_msg_t *msg, int type)
141 {
142         msg->ram_magic = RANAL_MSG_MAGIC;
143         msg->ram_version = RANAL_MSG_VERSION;
144         msg->ram_type = type;
145         msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
146         /* ram_connstamp gets set when FMA is sent */
147 }
148
149 kra_tx_t *
150 kranal_new_tx_msg (int may_block, int type)
151 {
152         kra_tx_t *tx = kranal_get_idle_tx(may_block);
153
154         if (tx == NULL)
155                 return NULL;
156
157         kranal_init_msg(&tx->tx_msg, type);
158         return tx;
159 }
160
161 int
162 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
163                                int offset, int nob)
164                  
165 {
166         /* For now this is almost identical to kranal_setup_virt_buffer, but we
167          * could "flatten" the payload into a single contiguous buffer ready
168          * for sending direct over an FMA if we ever needed to. */
169
170         LASSERT (nob > 0);
171         LASSERT (niov > 0);
172         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
173
174         while (offset >= iov->iov_len) {
175                 offset -= iov->iov_len;
176                 niov--;
177                 iov++;
178                 LASSERT (niov > 0);
179         }
180
181         if (nob > iov->iov_len - offset) {
182                 CERROR("Can't handle multiple vaddr fragments\n");
183                 return -EMSGSIZE;
184         }
185
186         tx->tx_buftype = RANAL_BUF_IMMEDIATE;
187         tx->tx_nob = nob;
188         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
189         return 0;
190 }
191
192 int
193 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
194                           int offset, int nob)
195                  
196 {
197         LASSERT (nob > 0);
198         LASSERT (niov > 0);
199         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
200
201         while (offset >= iov->iov_len) {
202                 offset -= iov->iov_len;
203                 niov--;
204                 iov++;
205                 LASSERT (niov > 0);
206         }
207
208         if (nob > iov->iov_len - offset) {
209                 CERROR("Can't handle multiple vaddr fragments\n");
210                 return -EMSGSIZE;
211         }
212
213         tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
214         tx->tx_nob = nob;
215         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
216         return 0;
217 }
218
219 int
220 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
221                           int offset, int nob)
222 {
223         RAP_PHYS_REGION *phys = tx->tx_phys;
224         int              resid;
225
226         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
227
228         LASSERT (nob > 0);
229         LASSERT (nkiov > 0);
230         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
231
232         while (offset >= kiov->kiov_len) {
233                 offset -= kiov->kiov_len;
234                 nkiov--;
235                 kiov++;
236                 LASSERT (nkiov > 0);
237         }
238
239         tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
240         tx->tx_nob = nob;
241         tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
242         
243         phys->Address = kranal_page2phys(kiov->kiov_page);
244         phys++;
245
246         resid = nob - (kiov->kiov_len - offset);
247         while (resid > 0) {
248                 kiov++;
249                 nkiov--;
250                 LASSERT (nkiov > 0);
251
252                 if (kiov->kiov_offset != 0 ||
253                     ((resid > PAGE_SIZE) && 
254                      kiov->kiov_len < PAGE_SIZE)) {
255                         /* Can't have gaps */
256                         CERROR("Can't make payload contiguous in I/O VM:"
257                                "page %d, offset %d, len %d \n", 
258                                phys - tx->tx_phys, 
259                                kiov->kiov_offset, kiov->kiov_len);                        
260                         return -EINVAL;
261                 }
262
263                 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
264                         CERROR ("payload too big (%d)\n", phys - tx->tx_phys);
265                         return -EMSGSIZE;
266                 }
267
268                 phys->Address = kranal_page2phys(kiov->kiov_page);
269                 phys++;
270
271                 resid -= PAGE_SIZE;
272         }
273
274         tx->tx_phys_npages = phys - tx->tx_phys;
275         return 0;
276 }
277
278 static inline int
279 kranal_setup_rdma_buffer (kra_tx_t *tx, int niov, 
280                           struct iovec *iov, ptl_kiov_t *kiov,
281                           int offset, int nob)
282 {
283         LASSERT ((iov == NULL) != (kiov == NULL));
284         
285         if (kiov != NULL)
286                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
287         
288         return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
289 }
290
291 void
292 kranal_map_buffer (kra_tx_t *tx)
293 {
294         kra_conn_t     *conn = tx->tx_conn;
295         kra_device_t   *dev = conn->rac_device;
296         RAP_RETURN      rrc;
297
298         LASSERT (current == dev->rad_scheduler);
299
300         switch (tx->tx_buftype) {
301         default:
302                 LBUG();
303                 
304         case RANAL_BUF_NONE:
305         case RANAL_BUF_IMMEDIATE:
306         case RANAL_BUF_PHYS_MAPPED:
307         case RANAL_BUF_VIRT_MAPPED:
308                 break;
309                 
310         case RANAL_BUF_PHYS_UNMAPPED:
311                 rrc = RapkRegisterPhys(dev->rad_handle,
312                                        tx->tx_phys, tx->tx_phys_npages,
313                                        &tx->tx_map_key);
314                 LASSERT (rrc == RAP_SUCCESS);
315                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
316                 break;
317
318         case RANAL_BUF_VIRT_UNMAPPED:
319                 rrc = RapkRegisterMemory(dev->rad_handle,
320                                          tx->tx_buffer, tx->tx_nob,
321                                          &tx->tx_map_key);
322                 LASSERT (rrc == RAP_SUCCESS);
323                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
324                 break;
325         }
326 }
327
328 void
329 kranal_unmap_buffer (kra_tx_t *tx)
330 {
331         kra_device_t   *dev;
332         RAP_RETURN      rrc;
333
334         switch (tx->tx_buftype) {
335         default:
336                 LBUG();
337                 
338         case RANAL_BUF_NONE:
339         case RANAL_BUF_IMMEDIATE:
340         case RANAL_BUF_PHYS_UNMAPPED:
341         case RANAL_BUF_VIRT_UNMAPPED:
342                 break;
343                 
344         case RANAL_BUF_PHYS_MAPPED:
345                 LASSERT (tx->tx_conn != NULL);
346                 dev = tx->tx_conn->rac_device;
347                 LASSERT (current == dev->rad_scheduler);
348                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
349                                            &tx->tx_map_key);
350                 LASSERT (rrc == RAP_SUCCESS);
351                 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
352                 break;
353
354         case RANAL_BUF_VIRT_MAPPED:
355                 LASSERT (tx->tx_conn != NULL);
356                 dev = tx->tx_conn->rac_device;
357                 LASSERT (current == dev->rad_scheduler);
358                 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
359                                            &tx->tx_map_key);
360                 LASSERT (rrc == RAP_SUCCESS);
361                 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
362                 break;
363         }
364 }
365
366 void
367 kranal_tx_done (kra_tx_t *tx, int completion)
368 {
369         ptl_err_t        ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
370         unsigned long    flags;
371         int              i;
372
373         LASSERT (!in_interrupt());
374
375         kranal_unmap_buffer(tx);
376
377         for (i = 0; i < 2; i++) {
378                 /* tx may have up to 2 libmsgs to finalise */
379                 if (tx->tx_libmsg[i] == NULL)
380                         continue;
381
382                 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
383                 tx->tx_libmsg[i] = NULL;
384         }
385
386         tx->tx_buftype = RANAL_BUF_NONE;
387         tx->tx_msg.ram_type = RANAL_MSG_NONE;
388         tx->tx_conn = NULL;
389
390         spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
391
392         if (tx->tx_isnblk) {
393                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
394         } else {
395                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
396                 wake_up(&kranal_data.kra_idle_tx_waitq);
397         }
398
399         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
400 }
401
402 kra_conn_t *
403 kranal_find_conn_locked (kra_peer_t *peer)
404 {
405         struct list_head *tmp;
406
407         /* just return the first connection */
408         list_for_each (tmp, &peer->rap_conns) {
409                 return list_entry(tmp, kra_conn_t, rac_list);
410         }
411
412         return NULL;
413 }
414
415 void
416 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
417 {
418         unsigned long    flags;
419
420         tx->tx_conn = conn;
421
422         spin_lock_irqsave(&conn->rac_lock, flags);
423         list_add_tail(&tx->tx_list, &conn->rac_fmaq);
424         tx->tx_qtime = jiffies;
425         spin_unlock_irqrestore(&conn->rac_lock, flags);
426
427         kranal_schedule_conn(conn);
428 }
429
430 void
431 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
432 {
433         unsigned long    flags;
434         kra_peer_t      *peer;
435         kra_conn_t      *conn;
436         unsigned long    now;
437         rwlock_t        *g_lock = &kranal_data.kra_global_lock;
438
439         /* If I get here, I've committed to send, so I complete the tx with
440          * failure on any problems */
441         
442         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
443
444         read_lock(g_lock);
445         
446         peer = kranal_find_peer_locked(nid);
447         if (peer == NULL) {
448                 read_unlock(g_lock);
449                 kranal_tx_done(tx, -EHOSTUNREACH);
450                 return;
451         }
452
453         conn = kranal_find_conn_locked(peer);
454         if (conn != NULL) {
455                 kranal_post_fma(conn, tx);
456                 read_unlock(g_lock);
457                 return;
458         }
459         
460         /* Making one or more connections; I'll need a write lock... */
461         read_unlock(g_lock);
462         write_lock_irqsave(g_lock, flags);
463
464         peer = kranal_find_peer_locked(nid);
465         if (peer == NULL) {
466                 write_unlock_irqrestore(g_lock, flags);
467                 kranal_tx_done(tx, -EHOSTUNREACH);
468                 return;
469         }
470
471         conn = kranal_find_conn_locked(peer);
472         if (conn != NULL) {
473                 /* Connection exists; queue message on it */
474                 kranal_post_fma(conn, tx);
475                 write_unlock_irqrestore(g_lock, flags);
476                 return;
477         }
478
479         LASSERT (peer->rap_persistence > 0);
480
481         if (!peer->rap_connecting) {
482                 LASSERT (list_empty(&peer->rap_tx_queue));
483                 
484                 now = CURRENT_TIME;
485                 if (now < peer->rap_reconnect_time) {
486                         write_unlock_irqrestore(g_lock, flags);
487                         kranal_tx_done(tx, -EHOSTUNREACH);
488                         return;
489                 }
490         
491                 peer->rap_connecting = 1;
492                 kranal_peer_addref(peer); /* extra ref for connd */
493         
494                 spin_lock(&kranal_data.kra_connd_lock);
495         
496                 list_add_tail(&peer->rap_connd_list,
497                               &kranal_data.kra_connd_peers);
498                 wake_up(&kranal_data.kra_connd_waitq);
499         
500                 spin_unlock(&kranal_data.kra_connd_lock);
501         }
502         
503         /* A connection is being established; queue the message... */
504         list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
505
506         write_unlock_irqrestore(g_lock, flags);
507 }
508
509 void
510 kranal_rdma(kra_tx_t *tx, int type, 
511             kra_rdma_desc_t *sink, int nob, __u64 cookie)
512 {
513         kra_conn_t   *conn = tx->tx_conn;
514         RAP_RETURN    rrc;
515         unsigned long flags;
516
517         LASSERT (kranal_tx_mapped(tx));
518         LASSERT (nob <= sink->rard_nob);
519         LASSERT (nob <= tx->tx_nob);
520
521         /* No actual race with scheduler sending CLOSE (I'm she!) */
522         LASSERT (current == conn->rac_device->rad_scheduler);
523         
524         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
525         tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
526         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
527         tx->tx_rdma_desc.DstPtr = sink->rard_addr;
528         tx->tx_rdma_desc.DstKey = sink->rard_key;
529         tx->tx_rdma_desc.Length = nob;
530         tx->tx_rdma_desc.AppPtr = tx;
531
532         /* prep final completion message */
533         kranal_init_msg(&tx->tx_msg, type);
534         tx->tx_msg.ram_u.completion.racm_cookie = cookie;
535
536         if (nob == 0) { /* Immediate completion */
537                 kranal_post_fma(conn, tx);
538                 return;
539         }
540
541         LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
542
543         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
544         LASSERT (rrc == RAP_SUCCESS);
545
546         spin_lock_irqsave(&conn->rac_lock, flags);
547         list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
548         tx->tx_qtime = jiffies;
549         spin_unlock_irqrestore(&conn->rac_lock, flags);
550 }
551
552 int
553 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
554 {
555         __u32      nob_received = nob;
556         RAP_RETURN rrc;
557
558         LASSERT (conn->rac_rxmsg != NULL);
559
560         rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
561                              &nob_received, sizeof(kra_msg_t));
562         LASSERT (rrc == RAP_SUCCESS);
563
564         conn->rac_rxmsg = NULL;
565
566         if (nob_received != nob) {
567                 CWARN("Expected %d immediate bytes but got %d\n",
568                       nob, nob_received);
569                 return -EPROTO;
570         }
571         
572         return 0;
573 }
574
575 ptl_err_t
576 kranal_do_send (lib_nal_t    *nal, 
577                 void         *private,
578                 lib_msg_t    *libmsg,
579                 ptl_hdr_t    *hdr, 
580                 int           type, 
581                 ptl_nid_t     nid, 
582                 ptl_pid_t     pid,
583                 unsigned int  niov, 
584                 struct iovec *iov, 
585                 ptl_kiov_t   *kiov,
586                 size_t        offset,
587                 size_t        nob)
588 {
589         kra_conn_t *conn;
590         kra_tx_t   *tx;
591         int         rc;
592
593         /* NB 'private' is different depending on what we're sending.... */
594
595         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
596                " pid %d\n", nob, niov, nid , pid);
597
598         LASSERT (nob == 0 || niov > 0);
599         LASSERT (niov <= PTL_MD_MAX_IOV);
600
601         LASSERT (!in_interrupt());
602         /* payload is either all vaddrs or all pages */
603         LASSERT (!(kiov != NULL && iov != NULL));
604
605         switch(type) {
606         default:
607                 LBUG();
608                 
609         case PTL_MSG_REPLY: {
610                 /* reply's 'private' is the conn that received the GET_REQ */
611                 conn = private;
612                 LASSERT (conn->rac_rxmsg != NULL);
613
614                 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
615                         if (nob > RANAL_FMA_MAX_DATA) {
616                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
617                                        nob, nid);
618                                 return PTL_FAIL;
619                         }
620                         break;                  /* RDMA not expected */
621                 }
622                 
623                 /* Incoming message consistent with immediate reply? */
624                 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
625                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
626                                nid, conn->rac_rxmsg->ram_type);
627                         return PTL_FAIL;
628                 }
629
630                 tx = kranal_get_idle_tx(0);
631                 if (tx == NULL)
632                         return PTL_FAIL;
633
634                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
635                 if (rc != 0) {
636                         kranal_tx_done(tx, rc);
637                         return PTL_FAIL;
638                 }
639
640                 tx->tx_conn = conn;
641                 tx->tx_libmsg[0] = libmsg;
642
643                 kranal_map_buffer(tx);
644                 kranal_rdma(tx, RANAL_MSG_GET_DONE,
645                             &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
646                             conn->rac_rxmsg->ram_u.get.ragm_cookie);
647                 return PTL_OK;
648         }
649
650         case PTL_MSG_GET:
651                 LASSERT (niov == 0);
652                 LASSERT (nob == 0);
653                 /* We have to consider the eventual sink buffer rather than any
654                  * payload passed here (there isn't any, and strictly, looking
655                  * inside libmsg is a layering violation).  We send a simple
656                  * IMMEDIATE GET if the sink buffer is mapped already and small
657                  * enough for FMA */
658
659                 if ((libmsg->md->options & PTL_MD_KIOV) == 0 &&
660                     libmsg->md->length <= RANAL_FMA_MAX_DATA &&
661                     libmsg->md->length <= kranal_tunables.kra_max_immediate)
662                         break;
663
664                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
665                 if (tx == NULL)
666                         return PTL_NO_SPACE;
667
668                 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
669                         rc = kranal_setup_virt_buffer(tx, libmsg->md->md_niov,
670                                                       libmsg->md->md_iov.iov,
671                                                       0, libmsg->md->length);
672                 else
673                         rc = kranal_setup_phys_buffer(tx, libmsg->md->md_niov,
674                                                       libmsg->md->md_iov.kiov,
675                                                       0, libmsg->md->length);
676                 if (rc != 0) {
677                         kranal_tx_done(tx, rc);
678                         return PTL_FAIL;
679                 }
680
681                 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
682                 if (tx->tx_libmsg[1] == NULL) {
683                         CERROR("Can't create reply for GET to "LPX64"\n", nid);
684                         kranal_tx_done(tx, rc);
685                         return PTL_FAIL;
686                 }
687
688                 tx->tx_libmsg[0] = libmsg;
689                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
690                 /* rest of tx_msg is setup just before it is sent */
691                 kranal_launch_tx(tx, nid);
692                 return PTL_OK;
693
694         case PTL_MSG_ACK:
695                 LASSERT (nob == 0);
696                 break;
697
698         case PTL_MSG_PUT:
699                 if (kiov == NULL &&             /* not paged */
700                     nob <= RANAL_MAX_IMMEDIATE && /* small enough */
701                     nob <= kranal_tunables.kra_max_immediate)
702                         break;                  /* send IMMEDIATE */
703                 
704                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
705                 if (tx == NULL)
706                         return PTL_NO_SPACE;
707
708                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
709                 if (rc != 0) {
710                         kranal_tx_done(tx, rc);
711                         return PTL_FAIL;
712                 }
713
714                 tx->tx_libmsg[0] = libmsg;
715                 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
716                 /* rest of tx_msg is setup just before it is sent */
717                 kranal_launch_tx(tx, nid);
718                 return PTL_OK;
719         }
720
721         LASSERT (kiov == NULL);
722         LASSERT (nob <= RANAL_MAX_IMMEDIATE);
723
724         tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
725                                  type == PTL_MSG_REPLY ||
726                                  in_interrupt()), 
727                                RANAL_MSG_IMMEDIATE);
728         if (tx == NULL)
729                 return PTL_NO_SPACE;
730
731         rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
732         if (rc != 0) {
733                 kranal_tx_done(tx, rc);
734                 return PTL_FAIL;
735         }
736                 
737         tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
738         tx->tx_libmsg[0] = libmsg;
739         kranal_launch_tx(tx, nid);
740         return PTL_OK;
741 }
742
743 ptl_err_t
744 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
745              ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
746              unsigned int niov, struct iovec *iov,
747              size_t offset, size_t len)
748 {
749         return kranal_do_send(nal, private, cookie,
750                               hdr, type, nid, pid,
751                               niov, iov, NULL,
752                               offset, len);
753 }
754
755 ptl_err_t
756 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
757                    ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
758                    unsigned int niov, ptl_kiov_t *kiov, 
759                    size_t offset, size_t len)
760 {
761         return kranal_do_send(nal, private, cookie,
762                               hdr, type, nid, pid,
763                               niov, NULL, kiov,
764                               offset, len);
765 }
766
767 ptl_err_t
768 kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
769                 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
770                 size_t offset, size_t mlen, size_t rlen)
771 {
772         kra_conn_t  *conn = private;
773         kra_msg_t   *rxmsg = conn->rac_rxmsg;
774         kra_tx_t    *tx;
775         void        *buffer;
776         int          rc;
777         
778         LASSERT (mlen <= rlen);
779         LASSERT (!in_interrupt());
780         /* Either all pages or all vaddrs */
781         LASSERT (!(kiov != NULL && iov != NULL));
782
783         switch(rxmsg->ram_type) {
784         default:
785                 LBUG();
786                 return PTL_FAIL;
787                 
788         case RANAL_MSG_IMMEDIATE:
789                 if (mlen == 0) {
790                         buffer = NULL;
791                 } else if (kiov != NULL) {
792                         CERROR("Can't recv immediate into paged buffer\n");
793                         return PTL_FAIL;
794                 } else {
795                         LASSERT (niov > 0);
796                         while (offset >= iov->iov_len) {
797                                 offset -= iov->iov_len;
798                                 iov++;
799                                 niov--;
800                                 LASSERT (niov > 0);
801                         }
802                         if (mlen > iov->iov_len - offset) {
803                                 CERROR("Can't handle immediate frags\n");
804                                 return PTL_FAIL;
805                         }
806                         buffer = ((char *)iov->iov_base) + offset;
807                 }
808                 rc = kranal_consume_rxmsg(conn, buffer, mlen);
809                 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
810                 return PTL_OK;
811
812         case RANAL_MSG_GET_REQ:
813                 /* If the GET matched, we've already handled it in
814                  * kranal_do_send which is called to send the REPLY.  We're
815                  * only called here to complete the GET receive (if we needed
816                  * it which we don't, but I digress...) */
817                 LASSERT (libmsg == NULL);
818                 lib_finalize(nal, NULL, libmsg, PTL_OK);
819                 return PTL_OK;
820
821         case RANAL_MSG_PUT_REQ:
822                 if (libmsg == NULL) {           /* PUT didn't match... */
823                         lib_finalize(nal, NULL, libmsg, PTL_OK);
824                         return PTL_OK;
825                 }
826                 
827                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
828                 if (tx == NULL)
829                         return PTL_NO_SPACE;
830
831                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
832                 if (rc != 0) {
833                         kranal_tx_done(tx, rc);
834                         return PTL_FAIL;
835                 }
836
837                 tx->tx_conn = conn;
838                 kranal_map_buffer(tx);
839                 
840                 tx->tx_msg.ram_u.putack.rapam_src_cookie = 
841                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
842                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
843                 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
844                 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits = 
845                         (__u64)((unsigned long)tx->tx_buffer);
846                 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
847
848                 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
849
850                 kranal_post_fma(conn, tx);
851                 
852                 /* flag matched by consuming rx message */
853                 kranal_consume_rxmsg(conn, NULL, 0);
854                 return PTL_OK;
855         }
856 }
857
858 ptl_err_t
859 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
860              unsigned int niov, struct iovec *iov, 
861              size_t offset, size_t mlen, size_t rlen)
862 {
863         return kranal_recvmsg(nal, private, msg, niov, iov, NULL,
864                               offset, mlen, rlen);
865 }
866
867 ptl_err_t
868 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
869                    unsigned int niov, ptl_kiov_t *kiov, 
870                    size_t offset, size_t mlen, size_t rlen)
871 {
872         return kranal_recvmsg(nal, private, msg, niov, NULL, kiov,
873                               offset, mlen, rlen);
874 }
875
876 int
877 kranal_thread_start (int(*fn)(void *arg), void *arg)
878 {
879         long    pid = kernel_thread(fn, arg, 0);
880
881         if (pid < 0)
882                 return(int)pid;
883
884         atomic_inc(&kranal_data.kra_nthreads);
885         return 0;
886 }
887
888 void
889 kranal_thread_fini (void)
890 {
891         atomic_dec(&kranal_data.kra_nthreads);
892 }
893
894 int
895 kranal_check_conn_timeouts (kra_conn_t *conn)
896 {
897         kra_tx_t          *tx;
898         struct list_head  *ttmp;
899         unsigned long      flags;
900         long               timeout;
901         unsigned long      now = jiffies;
902
903         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
904                  conn->rac_state == RANAL_CONN_CLOSING);
905
906         if (!conn->rac_close_sent &&
907             time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
908                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
909                 kranal_schedule_conn(conn);
910         }
911
912         timeout = conn->rac_timeout * HZ;
913
914         if (!conn->rac_close_recvd &&
915             time_after_eq(now, conn->rac_last_rx + timeout)) {
916                 CERROR("Nothing received from "LPX64" within %lu seconds\n",
917                        conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
918                 return -ETIMEDOUT;
919         }
920
921         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
922                 return 0;
923         
924         /* Check the conn's queues are moving.  These are "belt+braces" checks,
925          * in case of hardware/software errors that make this conn seem
926          * responsive even though it isn't progressing its message queues. */
927
928         spin_lock_irqsave(&conn->rac_lock, flags);
929
930         list_for_each (ttmp, &conn->rac_fmaq) {
931                 tx = list_entry(ttmp, kra_tx_t, tx_list);
932                 
933                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
934                         spin_unlock_irqrestore(&conn->rac_lock, flags);
935                         CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
936                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
937                         return -ETIMEDOUT;
938                 }
939         }
940         
941         list_for_each (ttmp, &conn->rac_rdmaq) {
942                 tx = list_entry(ttmp, kra_tx_t, tx_list);
943                 
944                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
945                         spin_unlock_irqrestore(&conn->rac_lock, flags);
946                         CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
947                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
948                         return -ETIMEDOUT;
949                 }
950         }
951         
952         list_for_each (ttmp, &conn->rac_replyq) {
953                 tx = list_entry(ttmp, kra_tx_t, tx_list);
954                 
955                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
956                         spin_unlock_irqrestore(&conn->rac_lock, flags);
957                         CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
958                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
959                         return -ETIMEDOUT;
960                 }
961         }
962         
963         spin_unlock_irqrestore(&conn->rac_lock, flags);
964         return 0;
965 }
966
967 void
968 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
969 {
970         struct list_head  *conns = &kranal_data.kra_conns[idx];
971         struct list_head  *ctmp;
972         kra_conn_t        *conn;
973         unsigned long      flags;
974         int                rc;
975
976  again:
977         /* NB. We expect to check all the conns and not find any problems, so
978          * we just use a shared lock while we take a look... */
979         read_lock(&kranal_data.kra_global_lock);
980
981         list_for_each (ctmp, conns) {
982                 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
983
984                 if (conn->rac_timeout < *min_timeoutp )
985                         *min_timeoutp = conn->rac_timeout;
986                 if (conn->rac_keepalive < *min_timeoutp )
987                         *min_timeoutp = conn->rac_keepalive;
988
989                 rc = kranal_check_conn_timeouts(conn);
990                 if (rc == 0)
991                         continue;
992
993                 kranal_conn_addref(conn);
994                 read_unlock(&kranal_data.kra_global_lock);
995
996                 CERROR("Conn to "LPX64", cqid %d timed out\n",
997                        conn->rac_peer->rap_nid, conn->rac_cqid);
998
999                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1000
1001                 switch (conn->rac_state) {
1002                 default:
1003                         LBUG();
1004
1005                 case RANAL_CONN_ESTABLISHED:
1006                         kranal_close_conn_locked(conn, -ETIMEDOUT);
1007                         break;
1008                         
1009                 case RANAL_CONN_CLOSING:
1010                         kranal_terminate_conn_locked(conn);
1011                         break;
1012                 }
1013                 
1014                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1015
1016                 kranal_conn_decref(conn);
1017
1018                 /* start again now I've dropped the lock */
1019                 goto again;
1020         }
1021
1022         read_unlock(&kranal_data.kra_global_lock);
1023 }
1024
1025 int
1026 kranal_connd (void *arg)
1027 {
1028         char               name[16];
1029         wait_queue_t       wait;
1030         unsigned long      flags;
1031         kra_peer_t        *peer;
1032         kra_acceptsock_t  *ras;
1033         int                did_something;
1034
1035         snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
1036         kportal_daemonize(name);
1037         kportal_blockallsigs();
1038
1039         init_waitqueue_entry(&wait, current);
1040
1041         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1042
1043         while (!kranal_data.kra_shutdown) {
1044                 did_something = 0;
1045                 
1046                 if (!list_empty(&kranal_data.kra_connd_acceptq)) {
1047                         ras = list_entry(kranal_data.kra_connd_acceptq.next,
1048                                          kra_acceptsock_t, ras_list);
1049                         list_del(&ras->ras_list);
1050                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1051
1052                         kranal_conn_handshake(ras->ras_sock, NULL);
1053                         kranal_free_acceptsock(ras);
1054
1055                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1056                         did_something = 1;
1057                 }
1058                 
1059                 if (!list_empty(&kranal_data.kra_connd_peers)) {
1060                         peer = list_entry(kranal_data.kra_connd_peers.next,
1061                                           kra_peer_t, rap_connd_list);
1062                         
1063                         list_del_init(&peer->rap_connd_list);
1064                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1065
1066                         kranal_connect(peer);
1067                         kranal_peer_decref(peer);
1068
1069                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1070                         did_something = 1;
1071                 }
1072
1073                 if (did_something)
1074                         continue;
1075
1076                 set_current_state(TASK_INTERRUPTIBLE);
1077                 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1078                 
1079                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1080
1081                 schedule ();
1082                 
1083                 set_current_state(TASK_RUNNING);
1084                 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1085
1086                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1087         }
1088
1089         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1090
1091         kranal_thread_fini();
1092         return 0;
1093 }
1094
1095 void
1096 kranal_update_reaper_timeout(long timeout) 
1097 {
1098         unsigned long   flags;
1099
1100         LASSERT (timeout > 0);
1101         
1102         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1103         
1104         if (timeout < kranal_data.kra_new_min_timeout)
1105                 kranal_data.kra_new_min_timeout = timeout;
1106
1107         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1108 }
1109
1110 int
1111 kranal_reaper (void *arg)
1112 {
1113         wait_queue_t       wait;
1114         unsigned long      flags;
1115         long               timeout;
1116         int                i;
1117         int                conn_entries = kranal_data.kra_conn_hash_size;
1118         int                conn_index = 0;
1119         int                base_index = conn_entries - 1;
1120         unsigned long      next_check_time = jiffies;
1121         long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1122         long               current_min_timeout = 1;
1123         
1124         kportal_daemonize("kranal_reaper");
1125         kportal_blockallsigs();
1126
1127         init_waitqueue_entry(&wait, current);
1128
1129         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1130
1131         while (!kranal_data.kra_shutdown) {
1132
1133                 /* careful with the jiffy wrap... */
1134                 timeout = (long)(next_check_time - jiffies);
1135                 if (timeout <= 0) {
1136                 
1137                         /* I wake up every 'p' seconds to check for
1138                          * timeouts on some more peers.  I try to check
1139                          * every connection 'n' times within the global
1140                          * minimum of all keepalive and timeout intervals,
1141                          * to ensure I attend to every connection within
1142                          * (n+1)/n times its timeout intervals. */
1143                 
1144                         const int     p = 1;
1145                         const int     n = 3;
1146                         unsigned long min_timeout;
1147                         int           chunk;
1148
1149                         if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1150                                 /* new min timeout set: restart min timeout scan */
1151                                 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1152                                 base_index = conn_index - 1;
1153                                 if (base_index < 0)
1154                                         base_index = conn_entries - 1;
1155
1156                                 if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1157                                         current_min_timeout = kranal_data.kra_new_min_timeout;
1158                                         CWARN("Set new min timeout %ld\n",
1159                                               current_min_timeout);
1160                                 }
1161
1162                                 kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1163                         }
1164                         min_timeout = current_min_timeout;
1165
1166                         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1167                                                flags);
1168
1169                         LASSERT (min_timeout > 0);
1170
1171                         /* Compute how many table entries to check now so I
1172                          * get round the whole table fast enough (NB I do
1173                          * this at fixed intervals of 'p' seconds) */
1174                         chunk = conn_entries;
1175                         if (min_timeout > n * p)
1176                                 chunk = (chunk * n * p) / min_timeout;
1177                         if (chunk == 0)
1178                                 chunk = 1;
1179
1180                         for (i = 0; i < chunk; i++) {
1181                                 kranal_reaper_check(conn_index, 
1182                                                     &next_min_timeout);
1183                                 conn_index = (conn_index + 1) % conn_entries;
1184                         }
1185
1186                         next_check_time += p * HZ;
1187
1188                         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1189
1190                         if (((conn_index - chunk <= base_index &&
1191                               base_index < conn_index) ||
1192                              (conn_index - conn_entries - chunk <= base_index &&
1193                               base_index < conn_index - conn_entries))) {
1194
1195                                 /* Scanned all conns: set current_min_timeout... */
1196                                 if (current_min_timeout != next_min_timeout) {
1197                                         current_min_timeout = next_min_timeout;                                        
1198                                         CWARN("Set new min timeout %ld\n",
1199                                               current_min_timeout);
1200                                 }
1201
1202                                 /* ...and restart min timeout scan */
1203                                 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1204                                 base_index = conn_index - 1;
1205                                 if (base_index < 0)
1206                                         base_index = conn_entries - 1;
1207                         }
1208                 }
1209
1210                 set_current_state(TASK_INTERRUPTIBLE);
1211                 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1212
1213                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1214
1215                 schedule_timeout(timeout);
1216
1217                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1218
1219                 set_current_state(TASK_RUNNING);
1220                 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1221         }
1222
1223         kranal_thread_fini();
1224         return 0;
1225 }
1226
1227 void
1228 kranal_check_rdma_cq (kra_device_t *dev)
1229 {
1230         kra_conn_t          *conn;
1231         kra_tx_t            *tx;
1232         RAP_RETURN           rrc;
1233         unsigned long        flags;
1234         RAP_RDMA_DESCRIPTOR *desc;
1235         __u32                cqid;
1236         __u32                event_type;
1237
1238         for (;;) {
1239                 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1240                 if (rrc == RAP_NOT_DONE)
1241                         return;
1242
1243                 LASSERT (rrc == RAP_SUCCESS);
1244                 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1245
1246                 read_lock(&kranal_data.kra_global_lock);
1247
1248                 conn = kranal_cqid2conn_locked(cqid);
1249                 if (conn == NULL) {
1250                         /* Conn was destroyed? */
1251                         CWARN("RDMA CQID lookup %d failed\n", cqid);
1252                         read_unlock(&kranal_data.kra_global_lock);
1253                         continue;
1254                 }
1255
1256                 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1257                 LASSERT (rrc == RAP_SUCCESS);
1258
1259                 spin_lock_irqsave(&conn->rac_lock, flags);
1260
1261                 LASSERT (!list_empty(&conn->rac_rdmaq));
1262                 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1263                 list_del(&tx->tx_list);
1264
1265                 LASSERT(desc->AppPtr == (void *)tx);
1266                 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1267                         tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1268
1269                 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1270                 tx->tx_qtime = jiffies;
1271         
1272                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1273
1274                 /* Get conn's fmaq processed, now I've just put something
1275                  * there */
1276                 kranal_schedule_conn(conn);
1277
1278                 read_unlock(&kranal_data.kra_global_lock);
1279         }
1280 }
1281
1282 void
1283 kranal_check_fma_cq (kra_device_t *dev)
1284 {
1285         kra_conn_t         *conn;
1286         RAP_RETURN          rrc;
1287         __u32               cqid;
1288         __u32               event_type;
1289         struct list_head   *conns;
1290         struct list_head   *tmp;
1291         int                 i;
1292
1293         for (;;) {
1294                 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1295                 if (rrc != RAP_NOT_DONE)
1296                         return;
1297                 
1298                 LASSERT (rrc == RAP_SUCCESS);
1299
1300                 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1301
1302                         read_lock(&kranal_data.kra_global_lock);
1303                 
1304                         conn = kranal_cqid2conn_locked(cqid);
1305                         if (conn == NULL)
1306                                 CWARN("FMA CQID lookup %d failed\n", cqid);
1307                         else
1308                                 kranal_schedule_conn(conn);
1309
1310                         read_unlock(&kranal_data.kra_global_lock);
1311                         continue;
1312                 }
1313
1314                 /* FMA CQ has overflowed: check ALL conns */
1315                 CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
1316
1317                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1318                         
1319                         read_lock(&kranal_data.kra_global_lock);
1320                 
1321                         conns = &kranal_data.kra_conns[i];
1322
1323                         list_for_each (tmp, conns) {
1324                                 conn = list_entry(tmp, kra_conn_t, 
1325                                                   rac_hashlist);
1326                 
1327                                 if (conn->rac_device == dev)
1328                                         kranal_schedule_conn(conn);
1329                         }
1330
1331                         /* don't block write lockers for too long... */
1332                         read_unlock(&kranal_data.kra_global_lock);
1333                 }
1334         }
1335 }
1336
1337 int
1338 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1339                void *immediate, int immediatenob)
1340 {
1341         int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1342         RAP_RETURN rrc;
1343         
1344         LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1345         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1346                  immediatenob <= RANAL_FMA_MAX_DATA :
1347                  immediatenob == 0);
1348
1349         msg->ram_connstamp = conn->rac_my_connstamp;
1350         msg->ram_seq = conn->rac_tx_seq;
1351
1352         if (sync)
1353                 rrc = RapkFmaSyncSend(conn->rac_device->rad_handle,
1354                                       immediate, immediatenob,
1355                                       msg, sizeof(*msg));
1356         else
1357                 rrc = RapkFmaSend(conn->rac_device->rad_handle,
1358                                   immediate, immediatenob,
1359                                   msg, sizeof(*msg));
1360
1361         switch (rrc) {
1362         default:
1363                 LBUG();
1364
1365         case RAP_SUCCESS:
1366                 conn->rac_last_tx = jiffies;
1367                 conn->rac_tx_seq++;
1368                 return 0;
1369                 
1370         case RAP_NOT_DONE:
1371                 return -EAGAIN;
1372         }
1373 }
1374
1375 void
1376 kranal_process_fmaq (kra_conn_t *conn) 
1377 {
1378         unsigned long flags;
1379         int           more_to_do;
1380         kra_tx_t     *tx;
1381         int           rc;
1382         int           expect_reply;
1383
1384         /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1385          *       However I will be rescheduled some by an FMA completion event
1386          *       when I eventually get some.
1387          * NB 2. Sampling rac_state here, races with setting it elsewhere
1388          *       kranal_close_conn_locked.  But it doesn't matter if I try to
1389          *       send a "real" message just as I start closing because I'll get
1390          *       scheduled to send the close anyway. */
1391
1392         if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1393                 if (!list_empty(&conn->rac_rdmaq)) {
1394                         /* RDMAs in progress */
1395                         LASSERT (!conn->rac_close_sent);
1396                         
1397                         if (time_after_eq(jiffies, 
1398                                           conn->rac_last_tx + 
1399                                           conn->rac_keepalive)) {
1400                                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1401                                 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1402                         }
1403                         return;
1404                 }
1405                 
1406                 if (conn->rac_close_sent)
1407                         return;
1408
1409                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1410                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1411                 if (rc != 0)
1412                         return;
1413
1414                 conn->rac_close_sent = 1;
1415                 if (!conn->rac_close_recvd)
1416                         return;
1417                         
1418                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1419
1420                 if (conn->rac_state == RANAL_CONN_CLOSING)
1421                         kranal_terminate_conn_locked(conn);
1422
1423                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1424                 return;
1425         }
1426
1427         spin_lock_irqsave(&conn->rac_lock, flags);
1428
1429         if (list_empty(&conn->rac_fmaq)) {
1430
1431                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1432
1433                 if (time_after_eq(jiffies, 
1434                                   conn->rac_last_tx + conn->rac_keepalive)) {
1435                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1436                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1437                 }
1438                 return;
1439         }
1440         
1441         tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1442         list_del(&tx->tx_list);
1443         more_to_do = !list_empty(&conn->rac_fmaq);
1444
1445         spin_unlock_irqrestore(&conn->rac_lock, flags);
1446
1447         expect_reply = 0;
1448         switch (tx->tx_msg.ram_type) {
1449         default:
1450                 LBUG();
1451                 
1452         case RANAL_MSG_IMMEDIATE:
1453         case RANAL_MSG_PUT_NAK:
1454         case RANAL_MSG_PUT_DONE:
1455         case RANAL_MSG_GET_NAK:
1456         case RANAL_MSG_GET_DONE:
1457                 rc = kranal_sendmsg(conn, &tx->tx_msg,
1458                                     tx->tx_buffer, tx->tx_nob);
1459                 expect_reply = 0;
1460                 break;
1461                 
1462         case RANAL_MSG_PUT_REQ:
1463                 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1464                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1465                 kranal_map_buffer(tx);
1466                 expect_reply = 1;
1467                 break;
1468
1469         case RANAL_MSG_PUT_ACK:
1470                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1471                 expect_reply = 1;
1472                 break;
1473
1474         case RANAL_MSG_GET_REQ:
1475                 kranal_map_buffer(tx);
1476                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1477                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1478                 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits = 
1479                         (__u64)((unsigned long)tx->tx_buffer);
1480                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1481                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1482                 expect_reply = 1;
1483                 break;
1484         }
1485
1486         if (rc == -EAGAIN) {
1487                 /* I need credits to send this.  Replace tx at the head of the
1488                  * fmaq and I'll get rescheduled when credits appear */
1489                 spin_lock_irqsave(&conn->rac_lock, flags);
1490                 list_add(&tx->tx_list, &conn->rac_fmaq);
1491                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1492                 return;
1493         }
1494
1495         LASSERT (rc == 0);
1496         
1497         if (!expect_reply) {
1498                 kranal_tx_done(tx, 0);
1499         } else {
1500                 spin_lock_irqsave(&conn->rac_lock, flags);
1501                 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1502                 tx->tx_qtime = jiffies;
1503                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1504         }
1505
1506         if (more_to_do)
1507                 kranal_schedule_conn(conn);
1508 }
1509
1510 static inline void
1511 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1512 {
1513         __swab64s(&d->rard_key.Key);
1514         __swab16s(&d->rard_key.Cookie);
1515         __swab16s(&d->rard_key.MdHandle);
1516         __swab32s(&d->rard_key.Flags);
1517         __swab64s(&d->rard_addr.AddressBits);
1518         __swab32s(&d->rard_nob);
1519 }
1520
1521 kra_tx_t *
1522 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1523 {
1524         struct list_head *ttmp;
1525         kra_tx_t         *tx;
1526         
1527         list_for_each(ttmp, &conn->rac_replyq) {
1528                 tx = list_entry(ttmp, kra_tx_t, tx_list);
1529                 
1530                 if (tx->tx_cookie != cookie)
1531                         continue;
1532                 
1533                 if (tx->tx_msg.ram_type != type) {
1534                         CWARN("Unexpected type %x (%x expected) "
1535                               "matched reply from "LPX64"\n",
1536                               tx->tx_msg.ram_type, type,
1537                               conn->rac_peer->rap_nid);
1538                         return NULL;
1539                 }
1540         }
1541         
1542         CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid);
1543         return NULL;
1544 }
1545
1546 void
1547 kranal_check_fma_rx (kra_conn_t *conn)
1548 {
1549         unsigned long flags;
1550         __u32         seq;
1551         kra_tx_t     *tx;
1552         kra_msg_t    *msg;
1553         void         *prefix;
1554         RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1555         kra_peer_t   *peer = conn->rac_peer;
1556
1557         if (rrc == RAP_NOT_DONE)
1558                 return;
1559         
1560         LASSERT (rrc == RAP_SUCCESS);
1561         conn->rac_last_rx = jiffies;
1562         seq = conn->rac_rx_seq++;
1563         msg = (kra_msg_t *)prefix;
1564
1565         if (msg->ram_magic != RANAL_MSG_MAGIC) {
1566                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1567                         CERROR("Unexpected magic %08x from "LPX64"\n",
1568                                msg->ram_magic, peer->rap_nid);
1569                         goto out;
1570                 }
1571
1572                 __swab32s(&msg->ram_magic);
1573                 __swab16s(&msg->ram_version);
1574                 __swab16s(&msg->ram_type);
1575                 __swab64s(&msg->ram_srcnid);
1576                 __swab64s(&msg->ram_connstamp);
1577                 __swab32s(&msg->ram_seq);
1578
1579                 /* NB message type checked below; NOT here... */
1580                 switch (msg->ram_type) {
1581                 case RANAL_MSG_PUT_ACK:
1582                         kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1583                         break;
1584
1585                 case RANAL_MSG_GET_REQ:
1586                         kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1587                         break;
1588                         
1589                 default:
1590                         break;
1591                 }
1592         }
1593
1594         if (msg->ram_version != RANAL_MSG_VERSION) {
1595                 CERROR("Unexpected protocol version %d from "LPX64"\n",
1596                        msg->ram_version, peer->rap_nid);
1597                 goto out;
1598         }
1599
1600         if (msg->ram_srcnid != peer->rap_nid) {
1601                 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1602                        msg->ram_srcnid, peer->rap_nid);
1603                 goto out;
1604         }
1605         
1606         if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1607                 CERROR("Unexpected connstamp "LPX64"("LPX64
1608                        " expected) from "LPX64"\n",
1609                        msg->ram_connstamp, conn->rac_peer_connstamp,
1610                        peer->rap_nid);
1611                 goto out;
1612         }
1613         
1614         if (msg->ram_seq != seq) {
1615                 CERROR("Unexpected sequence number %d(%d expected) from "
1616                        LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1617                 goto out;
1618         }
1619
1620         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1621                 /* This message signals RDMA completion... */
1622                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1623                 LASSERT (rrc == RAP_SUCCESS);
1624         }
1625
1626         if (conn->rac_close_recvd) {
1627                 CERROR("Unexpected message %d after CLOSE from "LPX64"\n", 
1628                        msg->ram_type, conn->rac_peer->rap_nid);
1629                 goto out;
1630         }
1631
1632         if (msg->ram_type == RANAL_MSG_CLOSE) {
1633                 conn->rac_close_recvd = 1;
1634                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1635
1636                 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1637                         kranal_close_conn_locked(conn, 0);
1638                 else if (conn->rac_close_sent)
1639                         kranal_terminate_conn_locked(conn);
1640
1641                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1642                 goto out;
1643         }
1644
1645         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1646                 goto out;
1647         
1648         conn->rac_rxmsg = msg;                  /* stash message for portals callbacks */
1649                                                 /* they'll NULL rac_rxmsg if they consume it */
1650         switch (msg->ram_type) {
1651         case RANAL_MSG_NOOP:
1652                 /* Nothing to do; just a keepalive */
1653                 break;
1654                 
1655         case RANAL_MSG_IMMEDIATE:
1656                 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1657                 break;
1658                 
1659         case RANAL_MSG_PUT_REQ:
1660                 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1661
1662                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1663                         break;
1664
1665                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1666                 if (tx == NULL)
1667                         break;
1668                 
1669                 tx->tx_msg.ram_u.completion.racm_cookie = 
1670                         msg->ram_u.putreq.raprm_cookie;
1671                 kranal_post_fma(conn, tx);
1672                 break;
1673
1674         case RANAL_MSG_PUT_NAK:
1675                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1676                                         msg->ram_u.completion.racm_cookie);
1677                 if (tx == NULL)
1678                         break;
1679                 
1680                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1681                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1682                 kranal_tx_done(tx, -ENOENT);    /* no match */
1683                 break;
1684                 
1685         case RANAL_MSG_PUT_ACK:
1686                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1687                                         msg->ram_u.putack.rapam_src_cookie);
1688                 if (tx == NULL)
1689                         break;
1690
1691                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1692                             &msg->ram_u.putack.rapam_desc, 
1693                             msg->ram_u.putack.rapam_desc.rard_nob,
1694                             msg->ram_u.putack.rapam_dst_cookie);
1695                 break;
1696
1697         case RANAL_MSG_PUT_DONE:
1698                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1699                                         msg->ram_u.completion.racm_cookie);
1700                 if (tx == NULL)
1701                         break;
1702
1703                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1704                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1705                 kranal_tx_done(tx, 0);
1706                 break;
1707
1708         case RANAL_MSG_GET_REQ:
1709                 lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
1710                 
1711                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1712                         break;
1713
1714                 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1715                 if (tx == NULL)
1716                         break;
1717
1718                 tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
1719                 kranal_post_fma(conn, tx);
1720                 break;
1721                 
1722         case RANAL_MSG_GET_NAK:
1723                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1724                                         msg->ram_u.completion.racm_cookie);
1725                 if (tx == NULL)
1726                         break;
1727                 
1728                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1729                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1730                 kranal_tx_done(tx, -ENOENT);    /* no match */
1731                 break;
1732                 
1733         case RANAL_MSG_GET_DONE:
1734                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1735                                         msg->ram_u.completion.racm_cookie);
1736                 if (tx == NULL)
1737                         break;
1738                 
1739                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1740                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1741                 kranal_tx_done(tx, 0);
1742                 break;
1743         }
1744
1745  out:
1746         if (conn->rac_rxmsg != NULL)
1747                 kranal_consume_rxmsg(conn, NULL, 0);
1748
1749         /* check again later */
1750         kranal_schedule_conn(conn);
1751 }
1752
1753 void
1754 kranal_complete_closed_conn (kra_conn_t *conn) 
1755 {
1756         kra_tx_t   *tx;
1757
1758         LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1759
1760         while (!list_empty(&conn->rac_fmaq)) {
1761                 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1762                 
1763                 list_del(&tx->tx_list);
1764                 kranal_tx_done(tx, -ECONNABORTED);
1765         }
1766         
1767         LASSERT (list_empty(&conn->rac_rdmaq));
1768
1769         while (!list_empty(&conn->rac_replyq)) {
1770                 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1771                 
1772                 list_del(&tx->tx_list);
1773                 kranal_tx_done(tx, -ECONNABORTED);
1774         }
1775 }
1776
1777 int
1778 kranal_scheduler (void *arg)
1779 {
1780         kra_device_t   *dev = (kra_device_t *)arg;
1781         wait_queue_t    wait;
1782         char            name[16];
1783         kra_conn_t     *conn;
1784         unsigned long   flags;
1785         int             busy_loops = 0;
1786
1787         snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1788         kportal_daemonize(name);
1789         kportal_blockallsigs();
1790
1791         dev->rad_scheduler = current;
1792         init_waitqueue_entry(&wait, current);
1793
1794         spin_lock_irqsave(&dev->rad_lock, flags);
1795
1796         while (!kranal_data.kra_shutdown) {
1797                 /* Safe: kra_shutdown only set when quiescent */
1798                 
1799                 if (busy_loops++ >= RANAL_RESCHED) {
1800                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1801
1802                         our_cond_resched();
1803                         busy_loops = 0;
1804
1805                         spin_lock_irqsave(&dev->rad_lock, flags);
1806                 }
1807
1808                 if (dev->rad_ready) {
1809                         /* Device callback fired since I last checked it */
1810                         dev->rad_ready = 0;
1811                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1812
1813                         kranal_check_rdma_cq(dev);
1814                         kranal_check_fma_cq(dev);
1815
1816                         spin_lock_irqsave(&dev->rad_lock, flags);
1817                 }
1818                 
1819                 if (!list_empty(&dev->rad_connq)) {
1820                         /* Connection needs attention */
1821                         conn = list_entry(dev->rad_connq.next,
1822                                           kra_conn_t, rac_schedlist);
1823                         list_del_init(&conn->rac_schedlist);
1824                         LASSERT (conn->rac_scheduled);
1825                         conn->rac_scheduled = 0;
1826                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1827
1828                         kranal_check_fma_rx(conn);
1829                         kranal_process_fmaq(conn);
1830
1831                         if (conn->rac_state == RANAL_CONN_CLOSED)
1832                                 kranal_complete_closed_conn(conn);
1833
1834                         kranal_conn_decref(conn);
1835                         
1836                         spin_lock_irqsave(&dev->rad_lock, flags);
1837                         continue;
1838                 }
1839
1840                 add_wait_queue(&dev->rad_waitq, &wait);
1841                 set_current_state(TASK_INTERRUPTIBLE);
1842
1843                 spin_unlock_irqrestore(&dev->rad_lock, flags);
1844
1845                 busy_loops = 0;
1846                 schedule();
1847
1848                 set_current_state(TASK_RUNNING);
1849                 remove_wait_queue(&dev->rad_waitq, &wait);
1850
1851                 spin_lock_irqsave(&dev->rad_lock, flags);
1852         }
1853
1854         spin_unlock_irqrestore(&dev->rad_lock, flags);
1855
1856         dev->rad_scheduler = NULL;
1857         kranal_thread_fini();
1858         return 0;
1859 }
1860
1861
1862 lib_nal_t kranal_lib = {
1863         libnal_data:        &kranal_data,      /* NAL private data */
1864         libnal_send:         kranal_send,
1865         libnal_send_pages:   kranal_send_pages,
1866         libnal_recv:         kranal_recv,
1867         libnal_recv_pages:   kranal_recv_pages,
1868         libnal_dist:         kranal_dist
1869 };