Whamcloud - gitweb
541a15a5d2d68341b3ccd77176968a726ae3502d
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "ranal.h"
25
26 int
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
28 {
29         /* I would guess that if kranal_get_peer (nid) == NULL,
30            and we're not routing, then 'nid' is very distant :) */
31         if ( nal->libnal_ni.ni_pid.nid == nid ) {
32                 *dist = 0;
33         } else {
34                 *dist = 1;
35         }
36
37         return 0;
38 }
39
40 void
41 kranal_device_callback(RAP_INT32 devid)
42 {
43         kra_device_t *dev;
44         int           i;
45         unsigned long flags;
46         
47         for (i = 0; i < kranal_data.kra_ndevs; i++) {
48
49                 dev = &kranal_data.kra_devices[i];
50                 if (dev->rad_id != devid)
51                         continue;
52
53                 spin_lock_irqsave(&dev->rad_lock, flags);
54
55                 if (!dev->rad_ready) {
56                         dev->rad_ready = 1;
57                         wake_up(&dev->rad_waitq);
58                 }
59
60                 spin_unlock_irqrestore(&dev->rad_lock, flags);
61                 return;
62         }
63         
64         CWARN("callback for unknown device %d\n", devid);
65 }
66
67 void
68 kranal_schedule_conn(kra_conn_t *conn)
69 {
70         kra_device_t    *dev = conn->rac_device;
71         unsigned long    flags;
72         
73         spin_lock_irqsave(&dev->rad_lock, flags);
74         
75         if (!conn->rac_scheduled) {
76                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
77                 conn->rac_scheduled = 1;
78                 list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
79                 wake_up(&dev->rad_waitq);
80         }
81
82         spin_unlock_irqrestore(&dev->rad_lock, flags);
83 }
84
85 kra_tx_t *
86 kranal_get_idle_tx (int may_block) 
87 {
88         unsigned long  flags;
89         kra_tx_t      *tx = NULL;
90         
91         for (;;) {
92                 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
93
94                 /* "normal" descriptor is free */
95                 if (!list_empty(&kranal_data.kra_idle_txs)) {
96                         tx = list_entry(kranal_data.kra_idle_txs.next,
97                                         kra_tx_t, tx_list);
98                         break;
99                 }
100
101                 if (!may_block) {
102                         /* may dip into reserve pool */
103                         if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
104                                 CERROR("reserved tx desc pool exhausted\n");
105                                 break;
106                         }
107
108                         tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
109                                         kra_tx_t, tx_list);
110                         break;
111                 }
112
113                 /* block for idle tx */
114                 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
115
116                 wait_event(kranal_data.kra_idle_tx_waitq,
117                            !list_empty(&kranal_data.kra_idle_txs));
118         }
119
120         if (tx != NULL) {
121                 list_del(&tx->tx_list);
122
123                 /* Allocate a new completion cookie.  It might not be
124                  * needed, but we've got a lock right now... */
125                 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
126
127                 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
128                 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
129                 LASSERT (tx->tx_conn == NULL);
130                 LASSERT (tx->tx_libmsg[0] == NULL);
131                 LASSERT (tx->tx_libmsg[1] == NULL);
132         }
133
134         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
135         
136         return tx;
137 }
138
139 void
140 kranal_init_msg(kra_msg_t *msg, int type)
141 {
142         msg->ram_magic = RANAL_MSG_MAGIC;
143         msg->ram_version = RANAL_MSG_VERSION;
144         msg->ram_type = type;
145         msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
146         /* ram_connstamp gets set when FMA is sent */
147 }
148
149 kra_tx_t *
150 kranal_new_tx_msg (int may_block, int type)
151 {
152         kra_tx_t *tx = kranal_get_idle_tx(may_block);
153
154         if (tx == NULL)
155                 return NULL;
156
157         kranal_init_msg(&tx->tx_msg, type);
158         return tx;
159 }
160
161 int
162 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
163                                int offset, int nob)
164                  
165 {
166         /* For now this is almost identical to kranal_setup_virt_buffer, but we
167          * could "flatten" the payload into a single contiguous buffer ready
168          * for sending direct over an FMA if we ever needed to. */
169
170         LASSERT (nob > 0);
171         LASSERT (niov > 0);
172         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
173
174         while (offset >= iov->iov_len) {
175                 offset -= iov->iov_len;
176                 niov--;
177                 iov++;
178                 LASSERT (niov > 0);
179         }
180
181         if (nob > iov->iov_len - offset) {
182                 CERROR("Can't handle multiple vaddr fragments\n");
183                 return -EMSGSIZE;
184         }
185
186         tx->tx_buftype = RANAL_BUF_IMMEDIATE;
187         tx->tx_nob = nob;
188         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
189         return 0;
190 }
191
192 int
193 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
194                           int offset, int nob)
195                  
196 {
197         LASSERT (nob > 0);
198         LASSERT (niov > 0);
199         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
200
201         while (offset >= iov->iov_len) {
202                 offset -= iov->iov_len;
203                 niov--;
204                 iov++;
205                 LASSERT (niov > 0);
206         }
207
208         if (nob > iov->iov_len - offset) {
209                 CERROR("Can't handle multiple vaddr fragments\n");
210                 return -EMSGSIZE;
211         }
212
213         tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
214         tx->tx_nob = nob;
215         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
216         return 0;
217 }
218
219 int
220 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
221                           int offset, int nob)
222 {
223         RAP_PHYS_REGION *phys = tx->tx_phys;
224         int              resid;
225
226         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
227
228         LASSERT (nob > 0);
229         LASSERT (nkiov > 0);
230         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
231
232         while (offset >= kiov->kiov_len) {
233                 offset -= kiov->kiov_len;
234                 nkiov--;
235                 kiov++;
236                 LASSERT (nkiov > 0);
237         }
238
239         tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
240         tx->tx_nob = nob;
241         tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
242         
243         phys->Address = kranal_page2phys(kiov->kiov_page);
244         phys++;
245
246         resid = nob - (kiov->kiov_len - offset);
247         while (resid > 0) {
248                 kiov++;
249                 nkiov--;
250                 LASSERT (nkiov > 0);
251
252                 if (kiov->kiov_offset != 0 ||
253                     ((resid > PAGE_SIZE) && 
254                      kiov->kiov_len < PAGE_SIZE)) {
255                         /* Can't have gaps */
256                         CERROR("Can't make payload contiguous in I/O VM:"
257                                "page %d, offset %d, len %d \n", 
258                                phys - tx->tx_phys, 
259                                kiov->kiov_offset, kiov->kiov_len);                        
260                         return -EINVAL;
261                 }
262
263                 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
264                         CERROR ("payload too big (%d)\n", phys - tx->tx_phys);
265                         return -EMSGSIZE;
266                 }
267
268                 phys->Address = kranal_page2phys(kiov->kiov_page);
269                 phys++;
270
271                 resid -= PAGE_SIZE;
272         }
273
274         tx->tx_phys_npages = phys - tx->tx_phys;
275         return 0;
276 }
277
278 static inline int
279 kranal_setup_rdma_buffer (kra_tx_t *tx, int niov, 
280                           struct iovec *iov, ptl_kiov_t *kiov,
281                           int offset, int nob)
282 {
283         LASSERT ((iov == NULL) != (kiov == NULL));
284         
285         if (kiov != NULL)
286                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
287         
288         return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
289 }
290
291 void
292 kranal_map_buffer (kra_tx_t *tx)
293 {
294         kra_conn_t     *conn = tx->tx_conn;
295         kra_device_t   *dev = conn->rac_device;
296         RAP_RETURN      rrc;
297
298         LASSERT (current == dev->rad_scheduler);
299
300         switch (tx->tx_buftype) {
301         default:
302                 LBUG();
303                 
304         case RANAL_BUF_NONE:
305         case RANAL_BUF_IMMEDIATE:
306         case RANAL_BUF_PHYS_MAPPED:
307         case RANAL_BUF_VIRT_MAPPED:
308                 break;
309                 
310         case RANAL_BUF_PHYS_UNMAPPED:
311                 rrc = RapkRegisterPhys(dev->rad_handle,
312                                        tx->tx_phys, tx->tx_phys_npages,
313                                        &tx->tx_map_key);
314                 LASSERT (rrc == RAP_SUCCESS);
315                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
316                 break;
317
318         case RANAL_BUF_VIRT_UNMAPPED:
319                 rrc = RapkRegisterMemory(dev->rad_handle,
320                                          tx->tx_buffer, tx->tx_nob,
321                                          &tx->tx_map_key);
322                 LASSERT (rrc == RAP_SUCCESS);
323                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
324                 break;
325         }
326 }
327
328 void
329 kranal_unmap_buffer (kra_tx_t *tx)
330 {
331         kra_device_t   *dev;
332         RAP_RETURN      rrc;
333
334         switch (tx->tx_buftype) {
335         default:
336                 LBUG();
337                 
338         case RANAL_BUF_NONE:
339         case RANAL_BUF_IMMEDIATE:
340         case RANAL_BUF_PHYS_UNMAPPED:
341         case RANAL_BUF_VIRT_UNMAPPED:
342                 break;
343                 
344         case RANAL_BUF_PHYS_MAPPED:
345                 LASSERT (tx->tx_conn != NULL);
346                 dev = tx->tx_conn->rac_device;
347                 LASSERT (current == dev->rad_scheduler);
348                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
349                                            &tx->tx_map_key);
350                 LASSERT (rrc == RAP_SUCCESS);
351                 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
352                 break;
353
354         case RANAL_BUF_VIRT_MAPPED:
355                 LASSERT (tx->tx_conn != NULL);
356                 dev = tx->tx_conn->rac_device;
357                 LASSERT (current == dev->rad_scheduler);
358                 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
359                                            &tx->tx_map_key);
360                 LASSERT (rrc == RAP_SUCCESS);
361                 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
362                 break;
363         }
364 }
365
366 void
367 kranal_tx_done (kra_tx_t *tx, int completion)
368 {
369         ptl_err_t        ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
370         unsigned long    flags;
371         int              i;
372
373         LASSERT (!in_interrupt());
374
375         kranal_unmap_buffer(tx);
376
377         for (i = 0; i < 2; i++) {
378                 /* tx may have up to 2 libmsgs to finalise */
379                 if (tx->tx_libmsg[i] == NULL)
380                         continue;
381
382                 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
383                 tx->tx_libmsg[i] = NULL;
384         }
385
386         tx->tx_buftype = RANAL_BUF_NONE;
387         tx->tx_msg.ram_type = RANAL_MSG_NONE;
388         tx->tx_conn = NULL;
389
390         spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
391
392         if (tx->tx_isnblk) {
393                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
394         } else {
395                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
396                 wake_up(&kranal_data.kra_idle_tx_waitq);
397         }
398
399         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
400 }
401
402 kra_conn_t *
403 kranal_find_conn_locked (kra_peer_t *peer)
404 {
405         struct list_head *tmp;
406
407         /* just return the first connection */
408         list_for_each (tmp, &peer->rap_conns) {
409                 return list_entry(tmp, kra_conn_t, rac_list);
410         }
411
412         return NULL;
413 }
414
415 void
416 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
417 {
418         unsigned long    flags;
419
420         tx->tx_conn = conn;
421
422         spin_lock_irqsave(&conn->rac_lock, flags);
423         list_add_tail(&tx->tx_list, &conn->rac_fmaq);
424         tx->tx_qtime = jiffies;
425         spin_unlock_irqrestore(&conn->rac_lock, flags);
426
427         kranal_schedule_conn(conn);
428 }
429
430 void
431 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
432 {
433         unsigned long    flags;
434         kra_peer_t      *peer;
435         kra_conn_t      *conn;
436         unsigned long    now;
437         rwlock_t        *g_lock = &kranal_data.kra_global_lock;
438
439         /* If I get here, I've committed to send, so I complete the tx with
440          * failure on any problems */
441         
442         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
443
444         read_lock(g_lock);
445         
446         peer = kranal_find_peer_locked(nid);
447         if (peer == NULL) {
448                 read_unlock(g_lock);
449                 kranal_tx_done(tx, -EHOSTUNREACH);
450                 return;
451         }
452
453         conn = kranal_find_conn_locked(peer);
454         if (conn != NULL) {
455                 kranal_post_fma(conn, tx);
456                 read_unlock(g_lock);
457                 return;
458         }
459         
460         /* Making one or more connections; I'll need a write lock... */
461         read_unlock(g_lock);
462         write_lock_irqsave(g_lock, flags);
463
464         peer = kranal_find_peer_locked(nid);
465         if (peer == NULL) {
466                 write_unlock_irqrestore(g_lock, flags);
467                 kranal_tx_done(tx, -EHOSTUNREACH);
468                 return;
469         }
470
471         conn = kranal_find_conn_locked(peer);
472         if (conn != NULL) {
473                 /* Connection exists; queue message on it */
474                 kranal_post_fma(conn, tx);
475                 write_unlock_irqrestore(g_lock, flags);
476                 return;
477         }
478
479         LASSERT (peer->rap_persistence > 0);
480
481         if (!peer->rap_connecting) {
482                 LASSERT (list_empty(&peer->rap_tx_queue));
483                 
484                 now = CURRENT_TIME;
485                 if (now < peer->rap_reconnect_time) {
486                         write_unlock_irqrestore(g_lock, flags);
487                         kranal_tx_done(tx, -EHOSTUNREACH);
488                         return;
489                 }
490         
491                 peer->rap_connecting = 1;
492                 kranal_peer_addref(peer); /* extra ref for connd */
493         
494                 spin_lock(&kranal_data.kra_connd_lock);
495         
496                 list_add_tail(&peer->rap_connd_list,
497                               &kranal_data.kra_connd_peers);
498                 wake_up(&kranal_data.kra_connd_waitq);
499         
500                 spin_unlock(&kranal_data.kra_connd_lock);
501         }
502         
503         /* A connection is being established; queue the message... */
504         list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
505
506         write_unlock_irqrestore(g_lock, flags);
507 }
508
509 void
510 kranal_rdma(kra_tx_t *tx, int type, 
511             kra_rdma_desc_t *sink, int nob, __u64 cookie)
512 {
513         kra_conn_t   *conn = tx->tx_conn;
514         RAP_RETURN    rrc;
515         unsigned long flags;
516
517         LASSERT (kranal_tx_mapped(tx));
518         LASSERT (nob <= sink->rard_nob);
519         LASSERT (nob <= tx->tx_nob);
520
521         /* No actual race with scheduler sending CLOSE (I'm she!) */
522         LASSERT (current == conn->rac_device->rad_scheduler);
523         
524         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
525         tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
526         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
527         tx->tx_rdma_desc.DstPtr = sink->rard_addr;
528         tx->tx_rdma_desc.DstKey = sink->rard_key;
529         tx->tx_rdma_desc.Length = nob;
530         tx->tx_rdma_desc.AppPtr = tx;
531
532         /* prep final completion message */
533         kranal_init_msg(&tx->tx_msg, type);
534         tx->tx_msg.ram_u.completion.racm_cookie = cookie;
535
536         if (nob == 0) { /* Immediate completion */
537                 kranal_post_fma(conn, tx);
538                 return;
539         }
540
541         LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
542
543         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
544         LASSERT (rrc == RAP_SUCCESS);
545
546         spin_lock_irqsave(&conn->rac_lock, flags);
547         list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
548         tx->tx_qtime = jiffies;
549         spin_unlock_irqrestore(&conn->rac_lock, flags);
550 }
551
552 int
553 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
554 {
555         __u32      nob_received = nob;
556         RAP_RETURN rrc;
557
558         LASSERT (conn->rac_rxmsg != NULL);
559
560         rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
561                              &nob_received, sizeof(kra_msg_t));
562         LASSERT (rrc == RAP_SUCCESS);
563
564         conn->rac_rxmsg = NULL;
565
566         if (nob_received != nob) {
567                 CWARN("Expected %d immediate bytes but got %d\n",
568                       nob, nob_received);
569                 return -EPROTO;
570         }
571         
572         return 0;
573 }
574
575 ptl_err_t
576 kranal_do_send (lib_nal_t    *nal, 
577                 void         *private,
578                 lib_msg_t    *libmsg,
579                 ptl_hdr_t    *hdr, 
580                 int           type, 
581                 ptl_nid_t     nid, 
582                 ptl_pid_t     pid,
583                 unsigned int  niov, 
584                 struct iovec *iov, 
585                 ptl_kiov_t   *kiov,
586                 size_t        offset,
587                 size_t        nob)
588 {
589         kra_conn_t *conn;
590         kra_tx_t   *tx;
591         int         rc;
592
593         /* NB 'private' is different depending on what we're sending.... */
594
595         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
596                " pid %d\n", nob, niov, nid , pid);
597
598         LASSERT (nob == 0 || niov > 0);
599         LASSERT (niov <= PTL_MD_MAX_IOV);
600
601         LASSERT (!in_interrupt());
602         /* payload is either all vaddrs or all pages */
603         LASSERT (!(kiov != NULL && iov != NULL));
604
605         switch(type) {
606         default:
607                 LBUG();
608                 
609         case PTL_MSG_REPLY: {
610                 /* reply's 'private' is the conn that received the GET_REQ */
611                 conn = private;
612                 LASSERT (conn->rac_rxmsg != NULL);
613
614                 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
615                         if (nob > RANAL_FMA_MAX_DATA) {
616                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
617                                        nob, nid);
618                                 return PTL_FAIL;
619                         }
620                         break;                  /* RDMA not expected */
621                 }
622                 
623                 /* Incoming message consistent with immediate reply? */
624                 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
625                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
626                                nid, conn->rac_rxmsg->ram_type);
627                         return PTL_FAIL;
628                 }
629
630                 tx = kranal_get_idle_tx(0);
631                 if (tx == NULL)
632                         return PTL_FAIL;
633
634                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
635                 if (rc != 0) {
636                         kranal_tx_done(tx, rc);
637                         return PTL_FAIL;
638                 }
639
640                 tx->tx_conn = conn;
641                 tx->tx_libmsg[0] = libmsg;
642
643                 kranal_map_buffer(tx);
644                 kranal_rdma(tx, RANAL_MSG_GET_DONE,
645                             &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
646                             conn->rac_rxmsg->ram_u.get.ragm_cookie);
647                 return PTL_OK;
648         }
649
650         case PTL_MSG_GET:
651                 LASSERT (niov == 0);
652                 LASSERT (nob == 0);
653                 /* We have to consider the eventual sink buffer rather than any
654                  * payload passed here (there isn't any, and strictly, looking
655                  * inside libmsg is a layering violation).  We send a simple
656                  * IMMEDIATE GET if the sink buffer is mapped already and small
657                  * enough for FMA */
658
659                 if ((libmsg->md->options & PTL_MD_KIOV) == 0 &&
660                     libmsg->md->length <= RANAL_FMA_MAX_DATA &&
661                     libmsg->md->length <= kranal_tunables.kra_max_immediate)
662                         break;
663
664                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
665                 if (tx == NULL)
666                         return PTL_NO_SPACE;
667
668                 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
669                         rc = kranal_setup_virt_buffer(tx, libmsg->md->md_niov,
670                                                       libmsg->md->md_iov.iov,
671                                                       0, libmsg->md->length);
672                 else
673                         rc = kranal_setup_phys_buffer(tx, libmsg->md->md_niov,
674                                                       libmsg->md->md_iov.kiov,
675                                                       0, libmsg->md->length);
676                 if (rc != 0) {
677                         kranal_tx_done(tx, rc);
678                         return PTL_FAIL;
679                 }
680
681                 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
682                 if (tx->tx_libmsg[1] == NULL) {
683                         CERROR("Can't create reply for GET to "LPX64"\n", nid);
684                         kranal_tx_done(tx, rc);
685                         return PTL_FAIL;
686                 }
687
688                 tx->tx_libmsg[0] = libmsg;
689                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
690                 /* rest of tx_msg is setup just before it is sent */
691                 kranal_launch_tx(tx, nid);
692                 return PTL_OK;
693
694         case PTL_MSG_ACK:
695                 LASSERT (nob == 0);
696                 break;
697
698         case PTL_MSG_PUT:
699                 if (kiov == NULL &&             /* not paged */
700                     nob <= RANAL_MAX_IMMEDIATE && /* small enough */
701                     nob <= kranal_tunables.kra_max_immediate)
702                         break;                  /* send IMMEDIATE */
703                 
704                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
705                 if (tx == NULL)
706                         return PTL_NO_SPACE;
707
708                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
709                 if (rc != 0) {
710                         kranal_tx_done(tx, rc);
711                         return PTL_FAIL;
712                 }
713
714                 tx->tx_libmsg[0] = libmsg;
715                 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
716                 /* rest of tx_msg is setup just before it is sent */
717                 kranal_launch_tx(tx, nid);
718                 return PTL_OK;
719         }
720
721         LASSERT (kiov == NULL);
722         LASSERT (nob <= RANAL_MAX_IMMEDIATE);
723
724         tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
725                                  type == PTL_MSG_REPLY ||
726                                  in_interrupt()), 
727                                RANAL_MSG_IMMEDIATE);
728         if (tx == NULL)
729                 return PTL_NO_SPACE;
730
731         rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
732         if (rc != 0) {
733                 kranal_tx_done(tx, rc);
734                 return PTL_FAIL;
735         }
736                 
737         tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
738         tx->tx_libmsg[0] = libmsg;
739         kranal_launch_tx(tx, nid);
740         return PTL_OK;
741 }
742
743 ptl_err_t
744 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
745              ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
746              unsigned int niov, struct iovec *iov,
747              size_t offset, size_t len)
748 {
749         return kranal_do_send(nal, private, cookie,
750                               hdr, type, nid, pid,
751                               niov, iov, NULL,
752                               offset, len);
753 }
754
755 ptl_err_t
756 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
757                    ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
758                    unsigned int niov, ptl_kiov_t *kiov, 
759                    size_t offset, size_t len)
760 {
761         return kranal_do_send(nal, private, cookie,
762                               hdr, type, nid, pid,
763                               niov, NULL, kiov,
764                               offset, len);
765 }
766
767 ptl_err_t
768 kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
769                 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
770                 size_t offset, size_t mlen, size_t rlen)
771 {
772         kra_conn_t  *conn = private;
773         kra_msg_t   *rxmsg = conn->rac_rxmsg;
774         kra_tx_t    *tx;
775         void        *buffer;
776         int          rc;
777         
778         LASSERT (mlen <= rlen);
779         LASSERT (!in_interrupt());
780         /* Either all pages or all vaddrs */
781         LASSERT (!(kiov != NULL && iov != NULL));
782
783         switch(rxmsg->ram_type) {
784         default:
785                 LBUG();
786                 return PTL_FAIL;
787                 
788         case RANAL_MSG_IMMEDIATE:
789                 if (mlen == 0) {
790                         buffer = NULL;
791                 } else if (kiov != NULL) {
792                         CERROR("Can't recv immediate into paged buffer\n");
793                         return PTL_FAIL;
794                 } else {
795                         LASSERT (niov > 0);
796                         while (offset >= iov->iov_len) {
797                                 offset -= iov->iov_len;
798                                 iov++;
799                                 niov--;
800                                 LASSERT (niov > 0);
801                         }
802                         if (mlen > iov->iov_len - offset) {
803                                 CERROR("Can't handle immediate frags\n");
804                                 return PTL_FAIL;
805                         }
806                         buffer = ((char *)iov->iov_base) + offset;
807                 }
808                 rc = kranal_consume_rxmsg(conn, buffer, mlen);
809                 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
810                 return PTL_OK;
811
812         case RANAL_MSG_GET_REQ:
813                 /* If the GET matched, we've already handled it in
814                  * kranal_do_send which is called to send the REPLY.  We're
815                  * only called here to complete the GET receive (if we needed
816                  * it which we don't, but I digress...) */
817                 LASSERT (libmsg == NULL);
818                 lib_finalize(nal, NULL, libmsg, PTL_OK);
819                 return PTL_OK;
820
821         case RANAL_MSG_PUT_REQ:
822                 if (libmsg == NULL) {           /* PUT didn't match... */
823                         lib_finalize(nal, NULL, libmsg, PTL_OK);
824                         return PTL_OK;
825                 }
826                 
827                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
828                 if (tx == NULL)
829                         return PTL_NO_SPACE;
830
831                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
832                 if (rc != 0) {
833                         kranal_tx_done(tx, rc);
834                         return PTL_FAIL;
835                 }
836
837                 tx->tx_conn = conn;
838                 kranal_map_buffer(tx);
839                 
840                 tx->tx_msg.ram_u.putack.rapam_src_cookie = 
841                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
842                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
843                 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
844                 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits = 
845                         (__u64)((unsigned long)tx->tx_buffer);
846                 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
847
848                 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
849
850                 kranal_post_fma(conn, tx);
851                 
852                 /* flag matched by consuming rx message */
853                 kranal_consume_rxmsg(conn, NULL, 0);
854                 return PTL_OK;
855         }
856 }
857
858 ptl_err_t
859 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
860              unsigned int niov, struct iovec *iov, 
861              size_t offset, size_t mlen, size_t rlen)
862 {
863         return kranal_recvmsg(nal, private, msg, niov, iov, NULL,
864                               offset, mlen, rlen);
865 }
866
867 ptl_err_t
868 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
869                    unsigned int niov, ptl_kiov_t *kiov, 
870                    size_t offset, size_t mlen, size_t rlen)
871 {
872         return kranal_recvmsg(nal, private, msg, niov, NULL, kiov,
873                               offset, mlen, rlen);
874 }
875
876 int
877 kranal_thread_start (int(*fn)(void *arg), void *arg)
878 {
879         long    pid = kernel_thread(fn, arg, 0);
880
881         if (pid < 0)
882                 return(int)pid;
883
884         atomic_inc(&kranal_data.kra_nthreads);
885         return 0;
886 }
887
888 void
889 kranal_thread_fini (void)
890 {
891         atomic_dec(&kranal_data.kra_nthreads);
892 }
893
894 int
895 kranal_check_conn_timeouts (kra_conn_t *conn)
896 {
897         kra_tx_t          *tx;
898         struct list_head  *ttmp;
899         unsigned long      flags;
900         long               timeout;
901         unsigned long      now = jiffies;
902
903         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
904                  conn->rac_state == RANAL_CONN_CLOSING);
905
906         if (!conn->rac_close_sent &&
907             time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
908                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
909                 kranal_schedule_conn(conn);
910         }
911
912         timeout = conn->rac_timeout * HZ;
913
914         if (!conn->rac_close_recvd &&
915             time_after_eq(now, conn->rac_last_rx + timeout)) {
916                 CERROR("Nothing received from "LPX64" within %lu seconds\n",
917                        conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
918                 return -ETIMEDOUT;
919         }
920
921         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
922                 return 0;
923         
924         /* Check the conn's queues are moving.  These are "belt+braces" checks,
925          * in case of hardware/software errors that make this conn seem
926          * responsive even though it isn't progressing its message queues. */
927
928         spin_lock_irqsave(&conn->rac_lock, flags);
929
930         list_for_each (ttmp, &conn->rac_fmaq) {
931                 tx = list_entry(ttmp, kra_tx_t, tx_list);
932                 
933                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
934                         spin_unlock_irqrestore(&conn->rac_lock, flags);
935                         CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
936                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
937                         return -ETIMEDOUT;
938                 }
939         }
940         
941         list_for_each (ttmp, &conn->rac_rdmaq) {
942                 tx = list_entry(ttmp, kra_tx_t, tx_list);
943                 
944                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
945                         spin_unlock_irqrestore(&conn->rac_lock, flags);
946                         CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
947                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
948                         return -ETIMEDOUT;
949                 }
950         }
951         
952         list_for_each (ttmp, &conn->rac_replyq) {
953                 tx = list_entry(ttmp, kra_tx_t, tx_list);
954                 
955                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
956                         spin_unlock_irqrestore(&conn->rac_lock, flags);
957                         CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
958                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
959                         return -ETIMEDOUT;
960                 }
961         }
962         
963         spin_unlock_irqrestore(&conn->rac_lock, flags);
964         return 0;
965 }
966
967 void
968 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
969 {
970         struct list_head  *conns = &kranal_data.kra_conns[idx];
971         struct list_head  *ctmp;
972         kra_conn_t        *conn;
973         unsigned long      flags;
974         int                rc;
975
976  again:
977         /* NB. We expect to check all the conns and not find any problems, so
978          * we just use a shared lock while we take a look... */
979         read_lock(&kranal_data.kra_global_lock);
980
981         list_for_each (ctmp, conns) {
982                 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
983
984                 if (conn->rac_timeout < *min_timeoutp )
985                         *min_timeoutp = conn->rac_timeout;
986                 if (conn->rac_keepalive < *min_timeoutp )
987                         *min_timeoutp = conn->rac_keepalive;
988
989                 rc = kranal_check_conn_timeouts(conn);
990                 if (rc == 0)
991                         continue;
992
993                 kranal_conn_addref(conn);
994                 read_unlock(&kranal_data.kra_global_lock);
995
996                 CERROR("Conn to "LPX64", cqid %d timed out\n",
997                        conn->rac_peer->rap_nid, conn->rac_cqid);
998
999                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1000
1001                 switch (conn->rac_state) {
1002                 default:
1003                         LBUG();
1004
1005                 case RANAL_CONN_ESTABLISHED:
1006                         kranal_close_conn_locked(conn, -ETIMEDOUT);
1007                         break;
1008                         
1009                 case RANAL_CONN_CLOSING:
1010                         kranal_terminate_conn_locked(conn);
1011                         break;
1012                 }
1013                 
1014                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1015
1016                 kranal_conn_decref(conn);
1017
1018                 /* start again now I've dropped the lock */
1019                 goto again;
1020         }
1021
1022         read_unlock(&kranal_data.kra_global_lock);
1023 }
1024
1025 int
1026 kranal_connd (void *arg)
1027 {
1028         char               name[16];
1029         wait_queue_t       wait;
1030         unsigned long      flags;
1031         kra_peer_t        *peer;
1032         kra_acceptsock_t  *ras;
1033         int                did_something;
1034
1035         snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
1036         kportal_daemonize(name);
1037         kportal_blockallsigs();
1038
1039         init_waitqueue_entry(&wait, current);
1040
1041         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1042
1043         while (!kranal_data.kra_shutdown) {
1044                 did_something = 0;
1045                 
1046                 if (!list_empty(&kranal_data.kra_connd_acceptq)) {
1047                         ras = list_entry(kranal_data.kra_connd_acceptq.next,
1048                                          kra_acceptsock_t, ras_list);
1049                         list_del(&ras->ras_list);
1050                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1051
1052                         kranal_conn_handshake(ras->ras_sock, NULL);
1053                         sock_release(ras->ras_sock);
1054                         PORTAL_FREE(ras, sizeof(*ras));
1055
1056                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1057                         did_something = 1;
1058                 }
1059                 
1060                 if (!list_empty(&kranal_data.kra_connd_peers)) {
1061                         peer = list_entry(kranal_data.kra_connd_peers.next,
1062                                           kra_peer_t, rap_connd_list);
1063                         
1064                         list_del_init(&peer->rap_connd_list);
1065                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1066
1067                         kranal_connect(peer);
1068                         kranal_peer_decref(peer);
1069
1070                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1071                         did_something = 1;
1072                 }
1073
1074                 if (did_something)
1075                         continue;
1076
1077                 set_current_state(TASK_INTERRUPTIBLE);
1078                 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1079                 
1080                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1081
1082                 schedule ();
1083                 
1084                 set_current_state(TASK_RUNNING);
1085                 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1086
1087                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1088         }
1089
1090         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1091
1092         kranal_thread_fini();
1093         return 0;
1094 }
1095
1096 void
1097 kranal_update_reaper_timeout(long timeout) 
1098 {
1099         unsigned long   flags;
1100
1101         LASSERT (timeout > 0);
1102         
1103         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1104         
1105         if (timeout < kranal_data.kra_new_min_timeout)
1106                 kranal_data.kra_new_min_timeout = timeout;
1107
1108         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1109 }
1110
1111 int
1112 kranal_reaper (void *arg)
1113 {
1114         wait_queue_t       wait;
1115         unsigned long      flags;
1116         long               timeout;
1117         int                i;
1118         int                conn_entries = kranal_data.kra_conn_hash_size;
1119         int                conn_index = 0;
1120         int                base_index = conn_entries - 1;
1121         unsigned long      next_check_time = jiffies;
1122         long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1123         long               current_min_timeout = 1;
1124         
1125         kportal_daemonize("kranal_reaper");
1126         kportal_blockallsigs();
1127
1128         init_waitqueue_entry(&wait, current);
1129
1130         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1131
1132         while (!kranal_data.kra_shutdown) {
1133
1134                 /* careful with the jiffy wrap... */
1135                 timeout = (long)(next_check_time - jiffies);
1136                 if (timeout <= 0) {
1137                 
1138                         /* I wake up every 'p' seconds to check for
1139                          * timeouts on some more peers.  I try to check
1140                          * every connection 'n' times within the global
1141                          * minimum of all keepalive and timeout intervals,
1142                          * to ensure I attend to every connection within
1143                          * (n+1)/n times its timeout intervals. */
1144                 
1145                         const int     p = 1;
1146                         const int     n = 3;
1147                         unsigned long min_timeout;
1148                         int           chunk;
1149
1150                         if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1151                                 /* new min timeout set: restart min timeout scan */
1152                                 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1153                                 base_index = conn_index - 1;
1154                                 if (base_index < 0)
1155                                         base_index = conn_entries - 1;
1156
1157                                 if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1158                                         current_min_timeout = kranal_data.kra_new_min_timeout;
1159                                         CWARN("Set new min timeout %ld\n",
1160                                               current_min_timeout);
1161                                 }
1162
1163                                 kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1164                         }
1165                         min_timeout = current_min_timeout;
1166
1167                         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1168                                                flags);
1169
1170                         LASSERT (min_timeout > 0);
1171
1172                         /* Compute how many table entries to check now so I
1173                          * get round the whole table fast enough (NB I do
1174                          * this at fixed intervals of 'p' seconds) */
1175                         chunk = conn_entries;
1176                         if (min_timeout > n * p)
1177                                 chunk = (chunk * n * p) / min_timeout;
1178                         if (chunk == 0)
1179                                 chunk = 1;
1180
1181                         for (i = 0; i < chunk; i++) {
1182                                 kranal_reaper_check(conn_index, 
1183                                                     &next_min_timeout);
1184                                 conn_index = (conn_index + 1) % conn_entries;
1185                         }
1186
1187                         next_check_time += p * HZ;
1188
1189                         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1190
1191                         if (((conn_index - chunk <= base_index &&
1192                               base_index < conn_index) ||
1193                              (conn_index - conn_entries - chunk <= base_index &&
1194                               base_index < conn_index - conn_entries))) {
1195
1196                                 /* Scanned all conns: set current_min_timeout... */
1197                                 if (current_min_timeout != next_min_timeout) {
1198                                         current_min_timeout = next_min_timeout;                                        
1199                                         CWARN("Set new min timeout %ld\n",
1200                                               current_min_timeout);
1201                                 }
1202
1203                                 /* ...and restart min timeout scan */
1204                                 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1205                                 base_index = conn_index - 1;
1206                                 if (base_index < 0)
1207                                         base_index = conn_entries - 1;
1208                         }
1209                 }
1210
1211                 set_current_state(TASK_INTERRUPTIBLE);
1212                 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1213
1214                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1215
1216                 schedule_timeout(timeout);
1217
1218                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1219
1220                 set_current_state(TASK_RUNNING);
1221                 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1222         }
1223
1224         kranal_thread_fini();
1225         return 0;
1226 }
1227
1228 void
1229 kranal_check_rdma_cq (kra_device_t *dev)
1230 {
1231         kra_conn_t          *conn;
1232         kra_tx_t            *tx;
1233         RAP_RETURN           rrc;
1234         unsigned long        flags;
1235         RAP_RDMA_DESCRIPTOR *desc;
1236         __u32                cqid;
1237         __u32                event_type;
1238
1239         for (;;) {
1240                 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1241                 if (rrc == RAP_NOT_DONE)
1242                         return;
1243
1244                 LASSERT (rrc == RAP_SUCCESS);
1245                 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1246
1247                 read_lock(&kranal_data.kra_global_lock);
1248
1249                 conn = kranal_cqid2conn_locked(cqid);
1250                 if (conn == NULL) {
1251                         /* Conn was destroyed? */
1252                         CWARN("RDMA CQID lookup %d failed\n", cqid);
1253                         read_unlock(&kranal_data.kra_global_lock);
1254                         continue;
1255                 }
1256
1257                 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1258                 LASSERT (rrc == RAP_SUCCESS);
1259
1260                 spin_lock_irqsave(&conn->rac_lock, flags);
1261
1262                 LASSERT (!list_empty(&conn->rac_rdmaq));
1263                 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1264                 list_del(&tx->tx_list);
1265
1266                 LASSERT(desc->AppPtr == (void *)tx);
1267                 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1268                         tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1269
1270                 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1271                 tx->tx_qtime = jiffies;
1272         
1273                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1274
1275                 /* Get conn's fmaq processed, now I've just put something
1276                  * there */
1277                 kranal_schedule_conn(conn);
1278
1279                 read_unlock(&kranal_data.kra_global_lock);
1280         }
1281 }
1282
1283 void
1284 kranal_check_fma_cq (kra_device_t *dev)
1285 {
1286         kra_conn_t         *conn;
1287         RAP_RETURN          rrc;
1288         __u32               cqid;
1289         __u32               event_type;
1290         struct list_head   *conns;
1291         struct list_head   *tmp;
1292         int                 i;
1293
1294         for (;;) {
1295                 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1296                 if (rrc != RAP_NOT_DONE)
1297                         return;
1298                 
1299                 LASSERT (rrc == RAP_SUCCESS);
1300
1301                 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1302
1303                         read_lock(&kranal_data.kra_global_lock);
1304                 
1305                         conn = kranal_cqid2conn_locked(cqid);
1306                         if (conn == NULL)
1307                                 CWARN("FMA CQID lookup %d failed\n", cqid);
1308                         else
1309                                 kranal_schedule_conn(conn);
1310
1311                         read_unlock(&kranal_data.kra_global_lock);
1312                         continue;
1313                 }
1314
1315                 /* FMA CQ has overflowed: check ALL conns */
1316                 CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
1317
1318                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1319                         
1320                         read_lock(&kranal_data.kra_global_lock);
1321                 
1322                         conns = &kranal_data.kra_conns[i];
1323
1324                         list_for_each (tmp, conns) {
1325                                 conn = list_entry(tmp, kra_conn_t, 
1326                                                   rac_hashlist);
1327                 
1328                                 if (conn->rac_device == dev)
1329                                         kranal_schedule_conn(conn);
1330                         }
1331
1332                         /* don't block write lockers for too long... */
1333                         read_unlock(&kranal_data.kra_global_lock);
1334                 }
1335         }
1336 }
1337
1338 int
1339 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1340                void *immediate, int immediatenob)
1341 {
1342         int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1343         RAP_RETURN rrc;
1344         
1345         LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1346         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1347                  immediatenob <= RANAL_FMA_MAX_DATA :
1348                  immediatenob == 0);
1349
1350         msg->ram_connstamp = conn->rac_my_connstamp;
1351         msg->ram_seq = conn->rac_tx_seq;
1352
1353         if (sync)
1354                 rrc = RapkFmaSyncSend(conn->rac_device->rad_handle,
1355                                       immediate, immediatenob,
1356                                       msg, sizeof(*msg));
1357         else
1358                 rrc = RapkFmaSend(conn->rac_device->rad_handle,
1359                                   immediate, immediatenob,
1360                                   msg, sizeof(*msg));
1361
1362         switch (rrc) {
1363         default:
1364                 LBUG();
1365
1366         case RAP_SUCCESS:
1367                 conn->rac_last_tx = jiffies;
1368                 conn->rac_tx_seq++;
1369                 return 0;
1370                 
1371         case RAP_NOT_DONE:
1372                 return -EAGAIN;
1373         }
1374 }
1375
1376 void
1377 kranal_process_fmaq (kra_conn_t *conn) 
1378 {
1379         unsigned long flags;
1380         int           more_to_do;
1381         kra_tx_t     *tx;
1382         int           rc;
1383         int           expect_reply;
1384
1385         /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1386          *       However I will be rescheduled some by an FMA completion event
1387          *       when I eventually get some.
1388          * NB 2. Sampling rac_state here, races with setting it elsewhere
1389          *       kranal_close_conn_locked.  But it doesn't matter if I try to
1390          *       send a "real" message just as I start closing because I'll get
1391          *       scheduled to send the close anyway. */
1392
1393         if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1394                 if (!list_empty(&conn->rac_rdmaq)) {
1395                         /* RDMAs in progress */
1396                         LASSERT (!conn->rac_close_sent);
1397                         
1398                         if (time_after_eq(jiffies, 
1399                                           conn->rac_last_tx + 
1400                                           conn->rac_keepalive)) {
1401                                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1402                                 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1403                         }
1404                         return;
1405                 }
1406                 
1407                 if (conn->rac_close_sent)
1408                         return;
1409
1410                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1411                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1412                 if (rc != 0)
1413                         return;
1414
1415                 conn->rac_close_sent = 1;
1416                 if (!conn->rac_close_recvd)
1417                         return;
1418                         
1419                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1420
1421                 if (conn->rac_state == RANAL_CONN_CLOSING)
1422                         kranal_terminate_conn_locked(conn);
1423
1424                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1425                 return;
1426         }
1427
1428         spin_lock_irqsave(&conn->rac_lock, flags);
1429
1430         if (list_empty(&conn->rac_fmaq)) {
1431
1432                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1433
1434                 if (time_after_eq(jiffies, 
1435                                   conn->rac_last_tx + conn->rac_keepalive)) {
1436                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1437                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1438                 }
1439                 return;
1440         }
1441         
1442         tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1443         list_del(&tx->tx_list);
1444         more_to_do = !list_empty(&conn->rac_fmaq);
1445
1446         spin_unlock_irqrestore(&conn->rac_lock, flags);
1447
1448         expect_reply = 0;
1449         switch (tx->tx_msg.ram_type) {
1450         default:
1451                 LBUG();
1452                 
1453         case RANAL_MSG_IMMEDIATE:
1454         case RANAL_MSG_PUT_NAK:
1455         case RANAL_MSG_PUT_DONE:
1456         case RANAL_MSG_GET_NAK:
1457         case RANAL_MSG_GET_DONE:
1458                 rc = kranal_sendmsg(conn, &tx->tx_msg,
1459                                     tx->tx_buffer, tx->tx_nob);
1460                 expect_reply = 0;
1461                 break;
1462                 
1463         case RANAL_MSG_PUT_REQ:
1464                 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1465                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1466                 kranal_map_buffer(tx);
1467                 expect_reply = 1;
1468                 break;
1469
1470         case RANAL_MSG_PUT_ACK:
1471                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1472                 expect_reply = 1;
1473                 break;
1474
1475         case RANAL_MSG_GET_REQ:
1476                 kranal_map_buffer(tx);
1477                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1478                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1479                 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits = 
1480                         (__u64)((unsigned long)tx->tx_buffer);
1481                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1482                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1483                 expect_reply = 1;
1484                 break;
1485         }
1486
1487         if (rc == -EAGAIN) {
1488                 /* I need credits to send this.  Replace tx at the head of the
1489                  * fmaq and I'll get rescheduled when credits appear */
1490                 spin_lock_irqsave(&conn->rac_lock, flags);
1491                 list_add(&tx->tx_list, &conn->rac_fmaq);
1492                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1493                 return;
1494         }
1495
1496         LASSERT (rc == 0);
1497         
1498         if (!expect_reply) {
1499                 kranal_tx_done(tx, 0);
1500         } else {
1501                 spin_lock_irqsave(&conn->rac_lock, flags);
1502                 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1503                 tx->tx_qtime = jiffies;
1504                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1505         }
1506
1507         if (more_to_do)
1508                 kranal_schedule_conn(conn);
1509 }
1510
1511 static inline void
1512 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1513 {
1514         __swab64s(&d->rard_key.Key);
1515         __swab16s(&d->rard_key.Cookie);
1516         __swab16s(&d->rard_key.MdHandle);
1517         __swab32s(&d->rard_key.Flags);
1518         __swab64s(&d->rard_addr.AddressBits);
1519         __swab32s(&d->rard_nob);
1520 }
1521
1522 kra_tx_t *
1523 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1524 {
1525         struct list_head *ttmp;
1526         kra_tx_t         *tx;
1527         
1528         list_for_each(ttmp, &conn->rac_replyq) {
1529                 tx = list_entry(ttmp, kra_tx_t, tx_list);
1530                 
1531                 if (tx->tx_cookie != cookie)
1532                         continue;
1533                 
1534                 if (tx->tx_msg.ram_type != type) {
1535                         CWARN("Unexpected type %x (%x expected) "
1536                               "matched reply from "LPX64"\n",
1537                               tx->tx_msg.ram_type, type,
1538                               conn->rac_peer->rap_nid);
1539                         return NULL;
1540                 }
1541         }
1542         
1543         CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid);
1544         return NULL;
1545 }
1546
1547 void
1548 kranal_check_fma_rx (kra_conn_t *conn)
1549 {
1550         unsigned long flags;
1551         __u32         seq;
1552         kra_tx_t     *tx;
1553         kra_msg_t    *msg;
1554         void         *prefix;
1555         RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1556         kra_peer_t   *peer = conn->rac_peer;
1557
1558         if (rrc == RAP_NOT_DONE)
1559                 return;
1560         
1561         LASSERT (rrc == RAP_SUCCESS);
1562         conn->rac_last_rx = jiffies;
1563         seq = conn->rac_rx_seq++;
1564         msg = (kra_msg_t *)prefix;
1565
1566         if (msg->ram_magic != RANAL_MSG_MAGIC) {
1567                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1568                         CERROR("Unexpected magic %08x from "LPX64"\n",
1569                                msg->ram_magic, peer->rap_nid);
1570                         goto out;
1571                 }
1572
1573                 __swab32s(&msg->ram_magic);
1574                 __swab16s(&msg->ram_version);
1575                 __swab16s(&msg->ram_type);
1576                 __swab64s(&msg->ram_srcnid);
1577                 __swab64s(&msg->ram_connstamp);
1578                 __swab32s(&msg->ram_seq);
1579
1580                 /* NB message type checked below; NOT here... */
1581                 switch (msg->ram_type) {
1582                 case RANAL_MSG_PUT_ACK:
1583                         kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1584                         break;
1585
1586                 case RANAL_MSG_GET_REQ:
1587                         kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1588                         break;
1589                         
1590                 default:
1591                         break;
1592                 }
1593         }
1594
1595         if (msg->ram_version != RANAL_MSG_VERSION) {
1596                 CERROR("Unexpected protocol version %d from "LPX64"\n",
1597                        msg->ram_version, peer->rap_nid);
1598                 goto out;
1599         }
1600
1601         if (msg->ram_srcnid != peer->rap_nid) {
1602                 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1603                        msg->ram_srcnid, peer->rap_nid);
1604                 goto out;
1605         }
1606         
1607         if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1608                 CERROR("Unexpected connstamp "LPX64"("LPX64
1609                        " expected) from "LPX64"\n",
1610                        msg->ram_connstamp, conn->rac_peer_connstamp,
1611                        peer->rap_nid);
1612                 goto out;
1613         }
1614         
1615         if (msg->ram_seq != seq) {
1616                 CERROR("Unexpected sequence number %d(%d expected) from "
1617                        LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1618                 goto out;
1619         }
1620
1621         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1622                 /* This message signals RDMA completion... */
1623                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1624                 LASSERT (rrc == RAP_SUCCESS);
1625         }
1626
1627         if (conn->rac_close_recvd) {
1628                 CERROR("Unexpected message %d after CLOSE from "LPX64"\n", 
1629                        msg->ram_type, conn->rac_peer->rap_nid);
1630                 goto out;
1631         }
1632
1633         if (msg->ram_type == RANAL_MSG_CLOSE) {
1634                 conn->rac_close_recvd = 1;
1635                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1636
1637                 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1638                         kranal_close_conn_locked(conn, 0);
1639                 else if (conn->rac_close_sent)
1640                         kranal_terminate_conn_locked(conn);
1641
1642                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1643                 goto out;
1644         }
1645
1646         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1647                 goto out;
1648         
1649         conn->rac_rxmsg = msg;                  /* stash message for portals callbacks */
1650                                                 /* they'll NULL rac_rxmsg if they consume it */
1651         switch (msg->ram_type) {
1652         case RANAL_MSG_NOOP:
1653                 /* Nothing to do; just a keepalive */
1654                 break;
1655                 
1656         case RANAL_MSG_IMMEDIATE:
1657                 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1658                 break;
1659                 
1660         case RANAL_MSG_PUT_REQ:
1661                 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1662
1663                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1664                         break;
1665
1666                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1667                 if (tx == NULL)
1668                         break;
1669                 
1670                 tx->tx_msg.ram_u.completion.racm_cookie = 
1671                         msg->ram_u.putreq.raprm_cookie;
1672                 kranal_post_fma(conn, tx);
1673                 break;
1674
1675         case RANAL_MSG_PUT_NAK:
1676                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1677                                         msg->ram_u.completion.racm_cookie);
1678                 if (tx == NULL)
1679                         break;
1680                 
1681                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1682                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1683                 kranal_tx_done(tx, -ENOENT);    /* no match */
1684                 break;
1685                 
1686         case RANAL_MSG_PUT_ACK:
1687                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1688                                         msg->ram_u.putack.rapam_src_cookie);
1689                 if (tx == NULL)
1690                         break;
1691
1692                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1693                             &msg->ram_u.putack.rapam_desc, 
1694                             msg->ram_u.putack.rapam_desc.rard_nob,
1695                             msg->ram_u.putack.rapam_dst_cookie);
1696                 break;
1697
1698         case RANAL_MSG_PUT_DONE:
1699                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1700                                         msg->ram_u.completion.racm_cookie);
1701                 if (tx == NULL)
1702                         break;
1703
1704                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1705                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1706                 kranal_tx_done(tx, 0);
1707                 break;
1708
1709         case RANAL_MSG_GET_REQ:
1710                 lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
1711                 
1712                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1713                         break;
1714
1715                 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1716                 if (tx == NULL)
1717                         break;
1718
1719                 tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
1720                 kranal_post_fma(conn, tx);
1721                 break;
1722                 
1723         case RANAL_MSG_GET_NAK:
1724                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1725                                         msg->ram_u.completion.racm_cookie);
1726                 if (tx == NULL)
1727                         break;
1728                 
1729                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1730                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1731                 kranal_tx_done(tx, -ENOENT);    /* no match */
1732                 break;
1733                 
1734         case RANAL_MSG_GET_DONE:
1735                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1736                                         msg->ram_u.completion.racm_cookie);
1737                 if (tx == NULL)
1738                         break;
1739                 
1740                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1741                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1742                 kranal_tx_done(tx, 0);
1743                 break;
1744         }
1745
1746  out:
1747         if (conn->rac_rxmsg != NULL)
1748                 kranal_consume_rxmsg(conn, NULL, 0);
1749
1750         /* check again later */
1751         kranal_schedule_conn(conn);
1752 }
1753
1754 void
1755 kranal_complete_closed_conn (kra_conn_t *conn) 
1756 {
1757         kra_tx_t   *tx;
1758
1759         LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1760
1761         while (!list_empty(&conn->rac_fmaq)) {
1762                 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1763                 
1764                 list_del(&tx->tx_list);
1765                 kranal_tx_done(tx, -ECONNABORTED);
1766         }
1767         
1768         LASSERT (list_empty(&conn->rac_rdmaq));
1769
1770         while (!list_empty(&conn->rac_replyq)) {
1771                 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1772                 
1773                 list_del(&tx->tx_list);
1774                 kranal_tx_done(tx, -ECONNABORTED);
1775         }
1776 }
1777
1778 int
1779 kranal_scheduler (void *arg)
1780 {
1781         kra_device_t   *dev = (kra_device_t *)arg;
1782         wait_queue_t    wait;
1783         char            name[16];
1784         kra_conn_t     *conn;
1785         unsigned long   flags;
1786         int             busy_loops = 0;
1787
1788         snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1789         kportal_daemonize(name);
1790         kportal_blockallsigs();
1791
1792         dev->rad_scheduler = current;
1793         init_waitqueue_entry(&wait, current);
1794
1795         spin_lock_irqsave(&dev->rad_lock, flags);
1796
1797         while (!kranal_data.kra_shutdown) {
1798                 /* Safe: kra_shutdown only set when quiescent */
1799                 
1800                 if (busy_loops++ >= RANAL_RESCHED) {
1801                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1802
1803                         our_cond_resched();
1804                         busy_loops = 0;
1805
1806                         spin_lock_irqsave(&dev->rad_lock, flags);
1807                 }
1808
1809                 if (dev->rad_ready) {
1810                         /* Device callback fired since I last checked it */
1811                         dev->rad_ready = 0;
1812                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1813
1814                         kranal_check_rdma_cq(dev);
1815                         kranal_check_fma_cq(dev);
1816
1817                         spin_lock_irqsave(&dev->rad_lock, flags);
1818                 }
1819                 
1820                 if (!list_empty(&dev->rad_connq)) {
1821                         /* Connection needs attention */
1822                         conn = list_entry(dev->rad_connq.next,
1823                                           kra_conn_t, rac_schedlist);
1824                         list_del_init(&conn->rac_schedlist);
1825                         LASSERT (conn->rac_scheduled);
1826                         conn->rac_scheduled = 0;
1827                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1828
1829                         kranal_check_fma_rx(conn);
1830                         kranal_process_fmaq(conn);
1831
1832                         if (conn->rac_state == RANAL_CONN_CLOSED)
1833                                 kranal_complete_closed_conn(conn);
1834
1835                         kranal_conn_decref(conn);
1836                         
1837                         spin_lock_irqsave(&dev->rad_lock, flags);
1838                         continue;
1839                 }
1840
1841                 add_wait_queue(&dev->rad_waitq, &wait);
1842                 set_current_state(TASK_INTERRUPTIBLE);
1843
1844                 spin_unlock_irqrestore(&dev->rad_lock, flags);
1845
1846                 busy_loops = 0;
1847                 schedule();
1848
1849                 set_current_state(TASK_RUNNING);
1850                 remove_wait_queue(&dev->rad_waitq, &wait);
1851
1852                 spin_lock_irqsave(&dev->rad_lock, flags);
1853         }
1854
1855         spin_unlock_irqrestore(&dev->rad_lock, flags);
1856
1857         dev->rad_scheduler = NULL;
1858         kranal_thread_fini();
1859         return 0;
1860 }
1861
1862
1863 lib_nal_t kranal_lib = {
1864         libnal_data:        &kranal_data,      /* NAL private data */
1865         libnal_send:         kranal_send,
1866         libnal_send_pages:   kranal_send_pages,
1867         libnal_recv:         kranal_recv,
1868         libnal_recv_pages:   kranal_recv_pages,
1869         libnal_dist:         kranal_dist
1870 };