Whamcloud - gitweb
* untabified
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "ranal.h"
25
26 int
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
28 {
29         /* I would guess that if kranal_get_peer (nid) == NULL,
30            and we're not routing, then 'nid' is very distant :) */
31         if ( nal->libnal_ni.ni_pid.nid == nid ) {
32                 *dist = 0;
33         } else {
34                 *dist = 1;
35         }
36
37         return 0;
38 }
39
40 void
41 kranal_device_callback(RAP_INT32 devid)
42 {
43         kra_device_t *dev;
44         int           i;
45         unsigned long flags;
46         
47         for (i = 0; i < kranal_data.kra_ndevs; i++) {
48
49                 dev = &kranal_data.kra_devices[i];
50                 if (dev->rad_id != devid)
51                         continue;
52
53                 spin_lock_irqsave(&dev->rad_lock, flags);
54
55                 if (!dev->rad_ready) {
56                         dev->rad_ready = 1;
57                         wake_up(&dev->rad_waitq);
58                 }
59
60                 spin_unlock_irqrestore(&dev->rad_lock, flags);
61                 return;
62         }
63         
64         CWARN("callback for unknown device %d\n", devid);
65 }
66
67 void
68 kranal_schedule_conn(kra_conn_t *conn)
69 {
70         kra_device_t    *dev = conn->rac_device;
71         unsigned long    flags;
72         
73         spin_lock_irqsave(&dev->rad_lock, flags);
74         
75         if (!conn->rac_scheduled) {
76                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
77                 conn->rac_scheduled = 1;
78                 list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
79                 wake_up(&dev->rad_waitq);
80         }
81
82         spin_unlock_irqrestore(&dev->rad_lock, flags);
83 }
84
85 kra_tx_t *
86 kranal_get_idle_tx (int may_block) 
87 {
88         unsigned long  flags;
89         kra_tx_t      *tx = NULL;
90         
91         for (;;) {
92                 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
93
94                 /* "normal" descriptor is free */
95                 if (!list_empty(&kranal_data.kra_idle_txs)) {
96                         tx = list_entry(kranal_data.kra_idle_txs.next,
97                                         kra_tx_t, tx_list);
98                         break;
99                 }
100
101                 if (!may_block) {
102                         /* may dip into reserve pool */
103                         if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
104                                 CERROR("reserved tx desc pool exhausted\n");
105                                 break;
106                         }
107
108                         tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
109                                         kra_tx_t, tx_list);
110                         break;
111                 }
112
113                 /* block for idle tx */
114                 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
115
116                 wait_event(kranal_data.kra_idle_tx_waitq,
117                            !list_empty(&kranal_data.kra_idle_txs));
118         }
119
120         if (tx != NULL) {
121                 list_del(&tx->tx_list);
122
123                 /* Allocate a new completion cookie.  It might not be
124                  * needed, but we've got a lock right now... */
125                 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
126
127                 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
128                 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
129                 LASSERT (tx->tx_conn == NULL);
130                 LASSERT (tx->tx_libmsg[0] == NULL);
131                 LASSERT (tx->tx_libmsg[1] == NULL);
132         }
133
134         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
135         
136         return tx;
137 }
138
139 void
140 kranal_init_msg(kra_msg_t *msg, int type)
141 {
142         msg->ram_magic = RANAL_MSG_MAGIC;
143         msg->ram_version = RANAL_MSG_VERSION;
144         msg->ram_type = type;
145         msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
146         /* ram_connstamp gets set when FMA is sent */
147 }
148
149 kra_tx_t *
150 kranal_new_tx_msg (int may_block, int type)
151 {
152         kra_tx_t *tx = kranal_get_idle_tx(may_block);
153
154         if (tx == NULL)
155                 return NULL;
156
157         kranal_init_msg(&tx->tx_msg, type);
158         return tx;
159 }
160
161 int
162 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
163                                int offset, int nob)
164                  
165 {
166         /* For now this is almost identical to kranal_setup_virt_buffer, but we
167          * could "flatten" the payload into a single contiguous buffer ready
168          * for sending direct over an FMA if we ever needed to. */
169
170         LASSERT (nob > 0);
171         LASSERT (niov > 0);
172         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
173
174         while (offset >= iov->iov_len) {
175                 offset -= iov->iov_len;
176                 niov--;
177                 iov++;
178                 LASSERT (niov > 0);
179         }
180
181         if (nob > iov->iov_len - offset) {
182                 CERROR("Can't handle multiple vaddr fragments\n");
183                 return -EMSGSIZE;
184         }
185
186         tx->tx_buftype = RANAL_BUF_IMMEDIATE;
187         tx->tx_nob = nob;
188         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
189         return 0;
190 }
191
192 int
193 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
194                           int offset, int nob)
195                  
196 {
197         LASSERT (nob > 0);
198         LASSERT (niov > 0);
199         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
200
201         while (offset >= iov->iov_len) {
202                 offset -= iov->iov_len;
203                 niov--;
204                 iov++;
205                 LASSERT (niov > 0);
206         }
207
208         if (nob > iov->iov_len - offset) {
209                 CERROR("Can't handle multiple vaddr fragments\n");
210                 return -EMSGSIZE;
211         }
212
213         tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
214         tx->tx_nob = nob;
215         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
216         return 0;
217 }
218
219 int
220 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
221                           int offset, int nob)
222 {
223         RAP_PHYS_REGION *phys = tx->tx_phys;
224         int              resid;
225
226         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
227
228         LASSERT (nob > 0);
229         LASSERT (nkiov > 0);
230         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
231
232         while (offset >= kiov->kiov_len) {
233                 offset -= kiov->kiov_len;
234                 nkiov--;
235                 kiov++;
236                 LASSERT (nkiov > 0);
237         }
238
239         tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
240         tx->tx_nob = nob;
241         tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
242         
243         phys->Address = kranal_page2phys(kiov->kiov_page);
244         phys->Length  = PAGE_SIZE;
245         phys++;
246
247         resid = nob - (kiov->kiov_len - offset);
248         while (resid > 0) {
249                 kiov++;
250                 nkiov--;
251                 LASSERT (nkiov > 0);
252
253                 if (kiov->kiov_offset != 0 ||
254                     ((resid > PAGE_SIZE) && 
255                      kiov->kiov_len < PAGE_SIZE)) {
256                         /* Can't have gaps */
257                         CERROR("Can't make payload contiguous in I/O VM:"
258                                "page %d, offset %d, len %d \n", 
259                                phys - tx->tx_phys, 
260                                kiov->kiov_offset, kiov->kiov_len);                        
261                         return -EINVAL;
262                 }
263
264                 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
265                         CERROR ("payload too big (%d)\n", phys - tx->tx_phys);
266                         return -EMSGSIZE;
267                 }
268
269                 phys->Address = kranal_page2phys(kiov->kiov_page);
270                 phys->Length  = PAGE_SIZE;
271                 phys++;
272
273                 resid -= PAGE_SIZE;
274         }
275
276         tx->tx_phys_npages = phys - tx->tx_phys;
277         return 0;
278 }
279
280 static inline int
281 kranal_setup_rdma_buffer (kra_tx_t *tx, int niov, 
282                           struct iovec *iov, ptl_kiov_t *kiov,
283                           int offset, int nob)
284 {
285         LASSERT ((iov == NULL) != (kiov == NULL));
286         
287         if (kiov != NULL)
288                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
289         
290         return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
291 }
292
293 void
294 kranal_map_buffer (kra_tx_t *tx)
295 {
296         kra_conn_t     *conn = tx->tx_conn;
297         kra_device_t   *dev = conn->rac_device;
298         RAP_RETURN      rrc;
299
300         LASSERT (current == dev->rad_scheduler);
301
302         switch (tx->tx_buftype) {
303         default:
304                 LBUG();
305                 
306         case RANAL_BUF_NONE:
307         case RANAL_BUF_IMMEDIATE:
308         case RANAL_BUF_PHYS_MAPPED:
309         case RANAL_BUF_VIRT_MAPPED:
310                 break;
311                 
312         case RANAL_BUF_PHYS_UNMAPPED:
313                 rrc = RapkRegisterPhys(dev->rad_handle,
314                                        tx->tx_phys, tx->tx_phys_npages,
315                                        dev->rad_ptag, &tx->tx_map_key);
316                 LASSERT (rrc == RAP_SUCCESS);
317                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
318                 break;
319
320         case RANAL_BUF_VIRT_UNMAPPED:
321                 rrc = RapkRegisterMemory(dev->rad_handle,
322                                          tx->tx_buffer, tx->tx_nob,
323                                          dev->rad_ptag, &tx->tx_map_key);
324                 LASSERT (rrc == RAP_SUCCESS);
325                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
326                 break;
327         }
328 }
329
330 void
331 kranal_unmap_buffer (kra_tx_t *tx)
332 {
333         kra_device_t   *dev;
334         RAP_RETURN      rrc;
335
336         switch (tx->tx_buftype) {
337         default:
338                 LBUG();
339                 
340         case RANAL_BUF_NONE:
341         case RANAL_BUF_IMMEDIATE:
342         case RANAL_BUF_PHYS_UNMAPPED:
343         case RANAL_BUF_VIRT_UNMAPPED:
344                 break;
345                 
346         case RANAL_BUF_PHYS_MAPPED:
347                 LASSERT (tx->tx_conn != NULL);
348                 dev = tx->tx_conn->rac_device;
349                 LASSERT (current == dev->rad_scheduler);
350                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
351                                            dev->rad_ptag, &tx->tx_map_key);
352                 LASSERT (rrc == RAP_SUCCESS);
353                 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
354                 break;
355
356         case RANAL_BUF_VIRT_MAPPED:
357                 LASSERT (tx->tx_conn != NULL);
358                 dev = tx->tx_conn->rac_device;
359                 LASSERT (current == dev->rad_scheduler);
360                 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
361                                            dev->rad_ptag, &tx->tx_map_key);
362                 LASSERT (rrc == RAP_SUCCESS);
363                 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
364                 break;
365         }
366 }
367
368 void
369 kranal_tx_done (kra_tx_t *tx, int completion)
370 {
371         ptl_err_t        ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
372         unsigned long    flags;
373         int              i;
374
375         LASSERT (!in_interrupt());
376
377         kranal_unmap_buffer(tx);
378
379         for (i = 0; i < 2; i++) {
380                 /* tx may have up to 2 libmsgs to finalise */
381                 if (tx->tx_libmsg[i] == NULL)
382                         continue;
383
384                 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
385                 tx->tx_libmsg[i] = NULL;
386         }
387
388         tx->tx_buftype = RANAL_BUF_NONE;
389         tx->tx_msg.ram_type = RANAL_MSG_NONE;
390         tx->tx_conn = NULL;
391
392         spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
393
394         if (tx->tx_isnblk) {
395                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
396         } else {
397                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
398                 wake_up(&kranal_data.kra_idle_tx_waitq);
399         }
400
401         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
402 }
403
404 kra_conn_t *
405 kranal_find_conn_locked (kra_peer_t *peer)
406 {
407         struct list_head *tmp;
408
409         /* just return the first connection */
410         list_for_each (tmp, &peer->rap_conns) {
411                 return list_entry(tmp, kra_conn_t, rac_list);
412         }
413
414         return NULL;
415 }
416
417 void
418 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
419 {
420         unsigned long    flags;
421
422         tx->tx_conn = conn;
423
424         spin_lock_irqsave(&conn->rac_lock, flags);
425         list_add_tail(&tx->tx_list, &conn->rac_fmaq);
426         tx->tx_qtime = jiffies;
427         spin_unlock_irqrestore(&conn->rac_lock, flags);
428
429         kranal_schedule_conn(conn);
430 }
431
432 void
433 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
434 {
435         unsigned long    flags;
436         kra_peer_t      *peer;
437         kra_conn_t      *conn;
438         unsigned long    now;
439         rwlock_t        *g_lock = &kranal_data.kra_global_lock;
440
441         /* If I get here, I've committed to send, so I complete the tx with
442          * failure on any problems */
443         
444         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
445
446         read_lock(g_lock);
447         
448         peer = kranal_find_peer_locked(nid);
449         if (peer == NULL) {
450                 read_unlock(g_lock);
451                 kranal_tx_done(tx, -EHOSTUNREACH);
452                 return;
453         }
454
455         conn = kranal_find_conn_locked(peer);
456         if (conn != NULL) {
457                 kranal_post_fma(conn, tx);
458                 read_unlock(g_lock);
459                 return;
460         }
461         
462         /* Making one or more connections; I'll need a write lock... */
463         read_unlock(g_lock);
464         write_lock_irqsave(g_lock, flags);
465
466         peer = kranal_find_peer_locked(nid);
467         if (peer == NULL) {
468                 write_unlock_irqrestore(g_lock, flags);
469                 kranal_tx_done(tx, -EHOSTUNREACH);
470                 return;
471         }
472
473         conn = kranal_find_conn_locked(peer);
474         if (conn != NULL) {
475                 /* Connection exists; queue message on it */
476                 kranal_post_fma(conn, tx);
477                 write_unlock_irqrestore(g_lock, flags);
478                 return;
479         }
480
481         LASSERT (peer->rap_persistence > 0);
482
483         if (!peer->rap_connecting) {
484                 LASSERT (list_empty(&peer->rap_tx_queue));
485                 
486                 now = CURRENT_TIME;
487                 if (now < peer->rap_reconnect_time) {
488                         write_unlock_irqrestore(g_lock, flags);
489                         kranal_tx_done(tx, -EHOSTUNREACH);
490                         return;
491                 }
492         
493                 peer->rap_connecting = 1;
494                 kranal_peer_addref(peer); /* extra ref for connd */
495         
496                 spin_lock(&kranal_data.kra_connd_lock);
497         
498                 list_add_tail(&peer->rap_connd_list,
499                               &kranal_data.kra_connd_peers);
500                 wake_up(&kranal_data.kra_connd_waitq);
501         
502                 spin_unlock(&kranal_data.kra_connd_lock);
503         }
504         
505         /* A connection is being established; queue the message... */
506         list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
507
508         write_unlock_irqrestore(g_lock, flags);
509 }
510
511 void
512 kranal_rdma(kra_tx_t *tx, int type, 
513             kra_rdma_desc_t *sink, int nob, __u64 cookie)
514 {
515         kra_conn_t   *conn = tx->tx_conn;
516         RAP_RETURN    rrc;
517         unsigned long flags;
518
519         LASSERT (kranal_tx_mapped(tx));
520         LASSERT (nob <= sink->rard_nob);
521         LASSERT (nob <= tx->tx_nob);
522
523         /* No actual race with scheduler sending CLOSE (I'm she!) */
524         LASSERT (current == conn->rac_device->rad_scheduler);
525         
526         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
527         tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
528         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
529         tx->tx_rdma_desc.DstPtr = sink->rard_addr;
530         tx->tx_rdma_desc.DstKey = sink->rard_key;
531         tx->tx_rdma_desc.Length = nob;
532         tx->tx_rdma_desc.AppPtr = tx;
533
534         /* prep final completion message */
535         kranal_init_msg(&tx->tx_msg, type);
536         tx->tx_msg.ram_u.completion.racm_cookie = cookie;
537
538         if (nob == 0) { /* Immediate completion */
539                 kranal_post_fma(conn, tx);
540                 return;
541         }
542
543         LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
544
545         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
546         LASSERT (rrc == RAP_SUCCESS);
547
548         spin_lock_irqsave(&conn->rac_lock, flags);
549         list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
550         tx->tx_qtime = jiffies;
551         spin_unlock_irqrestore(&conn->rac_lock, flags);
552 }
553
554 int
555 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
556 {
557         __u32      nob_received = nob;
558         RAP_RETURN rrc;
559
560         LASSERT (conn->rac_rxmsg != NULL);
561
562         rrc = RapkFmaCopyToUser(conn->rac_rihandle, buffer,
563                                 &nob_received, sizeof(kra_msg_t));
564         LASSERT (rrc == RAP_SUCCESS);
565
566         conn->rac_rxmsg = NULL;
567
568         if (nob_received != nob) {
569                 CWARN("Expected %d immediate bytes but got %d\n",
570                       nob, nob_received);
571                 return -EPROTO;
572         }
573         
574         return 0;
575 }
576
577 ptl_err_t
578 kranal_do_send (lib_nal_t    *nal, 
579                 void         *private,
580                 lib_msg_t    *libmsg,
581                 ptl_hdr_t    *hdr, 
582                 int           type, 
583                 ptl_nid_t     nid, 
584                 ptl_pid_t     pid,
585                 unsigned int  niov, 
586                 struct iovec *iov, 
587                 ptl_kiov_t   *kiov,
588                 size_t        offset,
589                 size_t        nob)
590 {
591         kra_conn_t *conn;
592         kra_tx_t   *tx;
593         int         rc;
594
595         /* NB 'private' is different depending on what we're sending.... */
596
597         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
598                " pid %d\n", nob, niov, nid , pid);
599
600         LASSERT (nob == 0 || niov > 0);
601         LASSERT (niov <= PTL_MD_MAX_IOV);
602
603         LASSERT (!in_interrupt());
604         /* payload is either all vaddrs or all pages */
605         LASSERT (!(kiov != NULL && iov != NULL));
606
607         switch(type) {
608         default:
609                 LBUG();
610                 
611         case PTL_MSG_REPLY: {
612                 /* reply's 'private' is the conn that received the GET_REQ */
613                 conn = private;
614                 LASSERT (conn->rac_rxmsg != NULL);
615
616                 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
617                         if (nob > RANAL_FMA_MAX_DATA) {
618                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
619                                        nob, nid);
620                                 return PTL_FAIL;
621                         }
622                         break;                  /* RDMA not expected */
623                 }
624                 
625                 /* Incoming message consistent with immediate reply? */
626                 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
627                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
628                                nid, conn->rac_rxmsg->ram_type);
629                         return PTL_FAIL;
630                 }
631
632                 tx = kranal_get_idle_tx(0);
633                 if (tx == NULL)
634                         return PTL_FAIL;
635
636                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
637                 if (rc != 0) {
638                         kranal_tx_done(tx, rc);
639                         return PTL_FAIL;
640                 }
641
642                 tx->tx_conn = conn;
643                 tx->tx_libmsg[0] = libmsg;
644
645                 kranal_map_buffer(tx);
646                 kranal_rdma(tx, RANAL_MSG_GET_DONE,
647                             &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
648                             conn->rac_rxmsg->ram_u.get.ragm_cookie);
649                 return PTL_OK;
650         }
651
652         case PTL_MSG_GET:
653                 LASSERT (niov == 0);
654                 LASSERT (nob == 0);
655                 /* We have to consider the eventual sink buffer rather than any
656                  * payload passed here (there isn't any, and strictly, looking
657                  * inside libmsg is a layering violation).  We send a simple
658                  * IMMEDIATE GET if the sink buffer is mapped already and small
659                  * enough for FMA */
660
661                 if ((libmsg->md->options & PTL_MD_KIOV) == 0 &&
662                     libmsg->md->length <= RANAL_FMA_MAX_DATA &&
663                     libmsg->md->length <= kranal_tunables.kra_max_immediate)
664                         break;
665
666                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
667                 if (tx == NULL)
668                         return PTL_NO_SPACE;
669
670                 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
671                         rc = kranal_setup_virt_buffer(tx, libmsg->md->md_niov,
672                                                       libmsg->md->md_iov.iov,
673                                                       0, libmsg->md->length);
674                 else
675                         rc = kranal_setup_phys_buffer(tx, libmsg->md->md_niov,
676                                                       libmsg->md->md_iov.kiov,
677                                                       0, libmsg->md->length);
678                 if (rc != 0) {
679                         kranal_tx_done(tx, rc);
680                         return PTL_FAIL;
681                 }
682
683                 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
684                 if (tx->tx_libmsg[1] == NULL) {
685                         CERROR("Can't create reply for GET to "LPX64"\n", nid);
686                         kranal_tx_done(tx, rc);
687                         return PTL_FAIL;
688                 }
689
690                 tx->tx_libmsg[0] = libmsg;
691                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
692                 /* rest of tx_msg is setup just before it is sent */
693                 kranal_launch_tx(tx, nid);
694                 return PTL_OK;
695
696         case PTL_MSG_ACK:
697                 LASSERT (nob == 0);
698                 break;
699
700         case PTL_MSG_PUT:
701                 if (kiov == NULL &&             /* not paged */
702                     nob <= RANAL_MAX_IMMEDIATE && /* small enough */
703                     nob <= kranal_tunables.kra_max_immediate)
704                         break;                  /* send IMMEDIATE */
705                 
706                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
707                 if (tx == NULL)
708                         return PTL_NO_SPACE;
709
710                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
711                 if (rc != 0) {
712                         kranal_tx_done(tx, rc);
713                         return PTL_FAIL;
714                 }
715
716                 tx->tx_libmsg[0] = libmsg;
717                 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
718                 /* rest of tx_msg is setup just before it is sent */
719                 kranal_launch_tx(tx, nid);
720                 return PTL_OK;
721         }
722
723         LASSERT (kiov == NULL);
724         LASSERT (nob <= RANAL_MAX_IMMEDIATE);
725
726         tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
727                                  type == PTL_MSG_REPLY ||
728                                  in_interrupt()), 
729                                RANAL_MSG_IMMEDIATE);
730         if (tx == NULL)
731                 return PTL_NO_SPACE;
732
733         rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
734         if (rc != 0) {
735                 kranal_tx_done(tx, rc);
736                 return PTL_FAIL;
737         }
738                 
739         tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
740         tx->tx_libmsg[0] = libmsg;
741         kranal_launch_tx(tx, nid);
742         return PTL_OK;
743 }
744
745 ptl_err_t
746 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
747              ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
748              unsigned int niov, struct iovec *iov,
749              size_t offset, size_t len)
750 {
751         return kranal_do_send(nal, private, cookie,
752                               hdr, type, nid, pid,
753                               niov, iov, NULL,
754                               offset, len);
755 }
756
757 ptl_err_t
758 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
759                    ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
760                    unsigned int niov, ptl_kiov_t *kiov, 
761                    size_t offset, size_t len)
762 {
763         return kranal_do_send(nal, private, cookie,
764                               hdr, type, nid, pid,
765                               niov, NULL, kiov,
766                               offset, len);
767 }
768
769 ptl_err_t
770 kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
771                 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
772                 size_t offset, size_t mlen, size_t rlen)
773 {
774         kra_conn_t  *conn = private;
775         kra_msg_t   *rxmsg = conn->rac_rxmsg;
776         kra_tx_t    *tx;
777         void        *buffer;
778         int          rc;
779         
780         LASSERT (mlen <= rlen);
781         LASSERT (!in_interrupt());
782         /* Either all pages or all vaddrs */
783         LASSERT (!(kiov != NULL && iov != NULL));
784
785         switch(rxmsg->ram_type) {
786         default:
787                 LBUG();
788                 return PTL_FAIL;
789                 
790         case RANAL_MSG_IMMEDIATE:
791                 if (mlen == 0) {
792                         buffer = NULL;
793                 } else if (kiov != NULL) {
794                         CERROR("Can't recv immediate into paged buffer\n");
795                         return PTL_FAIL;
796                 } else {
797                         LASSERT (niov > 0);
798                         while (offset >= iov->iov_len) {
799                                 offset -= iov->iov_len;
800                                 iov++;
801                                 niov--;
802                                 LASSERT (niov > 0);
803                         }
804                         if (mlen > iov->iov_len - offset) {
805                                 CERROR("Can't handle immediate frags\n");
806                                 return PTL_FAIL;
807                         }
808                         buffer = ((char *)iov->iov_base) + offset;
809                 }
810                 rc = kranal_consume_rxmsg(conn, buffer, mlen);
811                 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
812                 return PTL_OK;
813
814         case RANAL_MSG_GET_REQ:
815                 /* If the GET matched, we've already handled it in
816                  * kranal_do_send which is called to send the REPLY.  We're
817                  * only called here to complete the GET receive (if we needed
818                  * it which we don't, but I digress...) */
819                 LASSERT (libmsg == NULL);
820                 lib_finalize(nal, NULL, libmsg, PTL_OK);
821                 return PTL_OK;
822
823         case RANAL_MSG_PUT_REQ:
824                 if (libmsg == NULL) {           /* PUT didn't match... */
825                         lib_finalize(nal, NULL, libmsg, PTL_OK);
826                         return PTL_OK;
827                 }
828                 
829                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
830                 if (tx == NULL)
831                         return PTL_NO_SPACE;
832
833                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
834                 if (rc != 0) {
835                         kranal_tx_done(tx, rc);
836                         return PTL_FAIL;
837                 }
838
839                 tx->tx_conn = conn;
840                 kranal_map_buffer(tx);
841                 
842                 tx->tx_msg.ram_u.putack.rapam_src_cookie = 
843                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
844                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
845                 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
846                 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits = 
847                         (__u64)((unsigned long)tx->tx_buffer);
848                 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
849
850                 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
851
852                 kranal_post_fma(conn, tx);
853                 
854                 /* flag matched by consuming rx message */
855                 kranal_consume_rxmsg(conn, NULL, 0);
856                 return PTL_OK;
857         }
858 }
859
860 ptl_err_t
861 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
862              unsigned int niov, struct iovec *iov, 
863              size_t offset, size_t mlen, size_t rlen)
864 {
865         return kranal_recvmsg(nal, private, msg, niov, iov, NULL,
866                               offset, mlen, rlen);
867 }
868
869 ptl_err_t
870 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
871                    unsigned int niov, ptl_kiov_t *kiov, 
872                    size_t offset, size_t mlen, size_t rlen)
873 {
874         return kranal_recvmsg(nal, private, msg, niov, NULL, kiov,
875                               offset, mlen, rlen);
876 }
877
878 int
879 kranal_thread_start (int(*fn)(void *arg), void *arg)
880 {
881         long    pid = kernel_thread(fn, arg, 0);
882
883         if (pid < 0)
884                 return(int)pid;
885
886         atomic_inc(&kranal_data.kra_nthreads);
887         return 0;
888 }
889
890 void
891 kranal_thread_fini (void)
892 {
893         atomic_dec(&kranal_data.kra_nthreads);
894 }
895
896 int
897 kranal_check_conn_timeouts (kra_conn_t *conn)
898 {
899         kra_tx_t          *tx;
900         struct list_head  *ttmp;
901         unsigned long      flags;
902         long               timeout;
903         unsigned long      now = jiffies;
904
905         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
906                  conn->rac_state == RANAL_CONN_CLOSING);
907
908         if (!conn->rac_close_sent &&
909             time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
910                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
911                 kranal_schedule_conn(conn);
912         }
913
914         timeout = conn->rac_timeout * HZ;
915
916         if (!conn->rac_close_recvd &&
917             time_after_eq(now, conn->rac_last_rx + timeout)) {
918                 CERROR("Nothing received from "LPX64" within %lu seconds\n",
919                        conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
920                 return -ETIMEDOUT;
921         }
922
923         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
924                 return 0;
925         
926         /* Check the conn's queues are moving.  These are "belt+braces" checks,
927          * in case of hardware/software errors that make this conn seem
928          * responsive even though it isn't progressing its message queues. */
929
930         spin_lock_irqsave(&conn->rac_lock, flags);
931
932         list_for_each (ttmp, &conn->rac_fmaq) {
933                 tx = list_entry(ttmp, kra_tx_t, tx_list);
934                 
935                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
936                         spin_unlock_irqrestore(&conn->rac_lock, flags);
937                         CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
938                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
939                         return -ETIMEDOUT;
940                 }
941         }
942         
943         list_for_each (ttmp, &conn->rac_rdmaq) {
944                 tx = list_entry(ttmp, kra_tx_t, tx_list);
945                 
946                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
947                         spin_unlock_irqrestore(&conn->rac_lock, flags);
948                         CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
949                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
950                         return -ETIMEDOUT;
951                 }
952         }
953         
954         list_for_each (ttmp, &conn->rac_replyq) {
955                 tx = list_entry(ttmp, kra_tx_t, tx_list);
956                 
957                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
958                         spin_unlock_irqrestore(&conn->rac_lock, flags);
959                         CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
960                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
961                         return -ETIMEDOUT;
962                 }
963         }
964         
965         spin_unlock_irqrestore(&conn->rac_lock, flags);
966         return 0;
967 }
968
969 void
970 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
971 {
972         struct list_head  *conns = &kranal_data.kra_conns[idx];
973         struct list_head  *ctmp;
974         kra_conn_t        *conn;
975         unsigned long      flags;
976         int                rc;
977
978  again:
979         /* NB. We expect to check all the conns and not find any problems, so
980          * we just use a shared lock while we take a look... */
981         read_lock(&kranal_data.kra_global_lock);
982
983         list_for_each (ctmp, conns) {
984                 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
985
986                 if (conn->rac_timeout < *min_timeoutp )
987                         *min_timeoutp = conn->rac_timeout;
988                 if (conn->rac_keepalive < *min_timeoutp )
989                         *min_timeoutp = conn->rac_keepalive;
990
991                 rc = kranal_check_conn_timeouts(conn);
992                 if (rc == 0)
993                         continue;
994
995                 kranal_conn_addref(conn);
996                 read_unlock(&kranal_data.kra_global_lock);
997
998                 CERROR("Conn to "LPX64", cqid %d timed out\n",
999                        conn->rac_peer->rap_nid, conn->rac_cqid);
1000
1001                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1002
1003                 switch (conn->rac_state) {
1004                 default:
1005                         LBUG();
1006
1007                 case RANAL_CONN_ESTABLISHED:
1008                         kranal_close_conn_locked(conn, -ETIMEDOUT);
1009                         break;
1010                         
1011                 case RANAL_CONN_CLOSING:
1012                         kranal_terminate_conn_locked(conn);
1013                         break;
1014                 }
1015                 
1016                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1017
1018                 kranal_conn_decref(conn);
1019
1020                 /* start again now I've dropped the lock */
1021                 goto again;
1022         }
1023
1024         read_unlock(&kranal_data.kra_global_lock);
1025 }
1026
1027 int
1028 kranal_connd (void *arg)
1029 {
1030         char               name[16];
1031         wait_queue_t       wait;
1032         unsigned long      flags;
1033         kra_peer_t        *peer;
1034
1035         snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
1036         kportal_daemonize(name);
1037         kportal_blockallsigs();
1038
1039         init_waitqueue_entry(&wait, current);
1040
1041         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1042
1043         while (!kranal_data.kra_shutdown) {
1044                 /* Safe: kra_shutdown only set when quiescent */
1045
1046                 if (!list_empty(&kranal_data.kra_connd_peers)) {
1047                         peer = list_entry(kranal_data.kra_connd_peers.next,
1048                                           kra_peer_t, rap_connd_list);
1049                         
1050                         list_del_init(&peer->rap_connd_list);
1051                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1052
1053                         kranal_connect(peer);
1054                         kranal_peer_decref(peer);
1055
1056                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1057                         continue;
1058                 }
1059
1060                 set_current_state(TASK_INTERRUPTIBLE);
1061                 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1062                 
1063                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1064
1065                 schedule ();
1066                 
1067                 set_current_state(TASK_RUNNING);
1068                 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1069
1070                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1071         }
1072
1073         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1074
1075         kranal_thread_fini();
1076         return 0;
1077 }
1078
1079 void
1080 kranal_update_reaper_timeout(long timeout) 
1081 {
1082         unsigned long   flags;
1083
1084         LASSERT (timeout > 0);
1085         
1086         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1087         
1088         if (timeout < kranal_data.kra_new_min_timeout)
1089                 kranal_data.kra_new_min_timeout = timeout;
1090
1091         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1092 }
1093
1094 int
1095 kranal_reaper (void *arg)
1096 {
1097         wait_queue_t       wait;
1098         unsigned long      flags;
1099         long               timeout;
1100         int                i;
1101         int                conn_entries = kranal_data.kra_conn_hash_size;
1102         int                conn_index = 0;
1103         int                base_index = conn_entries - 1;
1104         unsigned long      next_check_time = jiffies;
1105         long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1106         long               current_min_timeout = 1;
1107         
1108         kportal_daemonize("kranal_reaper");
1109         kportal_blockallsigs();
1110
1111         init_waitqueue_entry(&wait, current);
1112
1113         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1114
1115         while (!kranal_data.kra_shutdown) {
1116
1117                 /* careful with the jiffy wrap... */
1118                 timeout = (long)(next_check_time - jiffies);
1119                 if (timeout <= 0) {
1120                 
1121                         /* I wake up every 'p' seconds to check for
1122                          * timeouts on some more peers.  I try to check
1123                          * every connection 'n' times within the global
1124                          * minimum of all keepalive and timeout intervals,
1125                          * to ensure I attend to every connection within
1126                          * (n+1)/n times its timeout intervals. */
1127                 
1128                         const int     p = 1;
1129                         const int     n = 3;
1130                         unsigned long min_timeout;
1131                         int           chunk;
1132
1133                         if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1134                                 /* new min timeout set: restart min timeout scan */
1135                                 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1136                                 base_index = conn_index - 1;
1137                                 if (base_index < 0)
1138                                         base_index = conn_entries - 1;
1139
1140                                 if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1141                                         current_min_timeout = kranal_data.kra_new_min_timeout;
1142                                         CWARN("Set new min timeout %ld\n",
1143                                               current_min_timeout);
1144                                 }
1145
1146                                 kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1147                         }
1148                         min_timeout = current_min_timeout;
1149
1150                         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1151                                                flags);
1152
1153                         LASSERT (min_timeout > 0);
1154
1155                         /* Compute how many table entries to check now so I
1156                          * get round the whole table fast enough (NB I do
1157                          * this at fixed intervals of 'p' seconds) */
1158                         chunk = conn_entries;
1159                         if (min_timeout > n * p)
1160                                 chunk = (chunk * n * p) / min_timeout;
1161                         if (chunk == 0)
1162                                 chunk = 1;
1163
1164                         for (i = 0; i < chunk; i++) {
1165                                 kranal_reaper_check(conn_index, 
1166                                                     &next_min_timeout);
1167                                 conn_index = (conn_index + 1) % conn_entries;
1168                         }
1169
1170                         next_check_time += p * HZ;
1171
1172                         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1173
1174                         if (((conn_index - chunk <= base_index &&
1175                               base_index < conn_index) ||
1176                              (conn_index - conn_entries - chunk <= base_index &&
1177                               base_index < conn_index - conn_entries))) {
1178
1179                                 /* Scanned all conns: set current_min_timeout... */
1180                                 if (current_min_timeout != next_min_timeout) {
1181                                         current_min_timeout = next_min_timeout;                                        
1182                                         CWARN("Set new min timeout %ld\n",
1183                                               current_min_timeout);
1184                                 }
1185
1186                                 /* ...and restart min timeout scan */
1187                                 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1188                                 base_index = conn_index - 1;
1189                                 if (base_index < 0)
1190                                         base_index = conn_entries - 1;
1191                         }
1192                 }
1193
1194                 set_current_state(TASK_INTERRUPTIBLE);
1195                 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1196
1197                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1198
1199                 schedule_timeout(timeout);
1200
1201                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1202
1203                 set_current_state(TASK_RUNNING);
1204                 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1205         }
1206
1207         kranal_thread_fini();
1208         return 0;
1209 }
1210
1211 void
1212 kranal_check_rdma_cq (kra_device_t *dev)
1213 {
1214         kra_conn_t          *conn;
1215         kra_tx_t            *tx;
1216         RAP_RETURN           rrc;
1217         unsigned long        flags;
1218         RAP_RDMA_DESCRIPTOR *desc;
1219         __u32                cqid;
1220         __u32                event_type;
1221
1222         for (;;) {
1223                 rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type);
1224                 if (rrc == RAP_NOT_DONE)
1225                         return;
1226
1227                 LASSERT (rrc == RAP_SUCCESS);
1228                 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1229
1230                 read_lock(&kranal_data.kra_global_lock);
1231
1232                 conn = kranal_cqid2conn_locked(cqid);
1233                 if (conn == NULL) {
1234                         /* Conn was destroyed? */
1235                         CWARN("RDMA CQID lookup %d failed\n", cqid);
1236                         read_unlock(&kranal_data.kra_global_lock);
1237                         continue;
1238                 }
1239
1240                 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1241                 LASSERT (rrc == RAP_SUCCESS);
1242
1243                 spin_lock_irqsave(&conn->rac_lock, flags);
1244
1245                 LASSERT (!list_empty(&conn->rac_rdmaq));
1246                 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1247                 list_del(&tx->tx_list);
1248
1249                 LASSERT(desc->AppPtr == (void *)tx);
1250                 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1251                         tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1252
1253                 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1254                 tx->tx_qtime = jiffies;
1255         
1256                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1257
1258                 /* Get conn's fmaq processed, now I've just put something
1259                  * there */
1260                 kranal_schedule_conn(conn);
1261
1262                 read_unlock(&kranal_data.kra_global_lock);
1263         }
1264 }
1265
1266 void
1267 kranal_check_fma_cq (kra_device_t *dev)
1268 {
1269         kra_conn_t         *conn;
1270         RAP_RETURN          rrc;
1271         __u32               cqid;
1272         __u32               event_type;
1273         struct list_head   *conns;
1274         struct list_head   *tmp;
1275         int                 i;
1276
1277         for (;;) {
1278                 rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type);
1279                 if (rrc != RAP_NOT_DONE)
1280                         return;
1281                 
1282                 LASSERT (rrc == RAP_SUCCESS);
1283
1284                 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1285
1286                         read_lock(&kranal_data.kra_global_lock);
1287                 
1288                         conn = kranal_cqid2conn_locked(cqid);
1289                         if (conn == NULL)
1290                                 CWARN("FMA CQID lookup %d failed\n", cqid);
1291                         else
1292                                 kranal_schedule_conn(conn);
1293
1294                         read_unlock(&kranal_data.kra_global_lock);
1295                         continue;
1296                 }
1297
1298                 /* FMA CQ has overflowed: check ALL conns */
1299                 CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
1300
1301                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1302                         
1303                         read_lock(&kranal_data.kra_global_lock);
1304                 
1305                         conns = &kranal_data.kra_conns[i];
1306
1307                         list_for_each (tmp, conns) {
1308                                 conn = list_entry(tmp, kra_conn_t, 
1309                                                   rac_hashlist);
1310                 
1311                                 if (conn->rac_device == dev)
1312                                         kranal_schedule_conn(conn);
1313                         }
1314
1315                         /* don't block write lockers for too long... */
1316                         read_unlock(&kranal_data.kra_global_lock);
1317                 }
1318         }
1319 }
1320
1321 int
1322 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1323                void *immediate, int immediatenob)
1324 {
1325         int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1326         RAP_RETURN rrc;
1327         
1328         LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1329         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1330                  immediatenob <= RANAL_FMA_MAX_DATA :
1331                  immediatenob == 0);
1332
1333         msg->ram_connstamp = conn->rac_my_connstamp;
1334         msg->ram_seq = conn->rac_tx_seq;
1335
1336         if (sync)
1337                 rrc = RapkFmaSyncSend(conn->rac_device->rad_handle,
1338                                       immediate, immediatenob,
1339                                       msg, sizeof(*msg));
1340         else
1341                 rrc = RapkFmaSend(conn->rac_device->rad_handle,
1342                                   immediate, immediatenob,
1343                                   msg, sizeof(*msg));
1344
1345         switch (rrc) {
1346         default:
1347                 LBUG();
1348
1349         case RAP_SUCCESS:
1350                 conn->rac_last_tx = jiffies;
1351                 conn->rac_tx_seq++;
1352                 return 0;
1353                 
1354         case RAP_NOT_DONE:
1355                 return -EAGAIN;
1356         }
1357 }
1358
1359 void
1360 kranal_process_fmaq (kra_conn_t *conn) 
1361 {
1362         unsigned long flags;
1363         int           more_to_do;
1364         kra_tx_t     *tx;
1365         int           rc;
1366         int           expect_reply;
1367
1368         /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1369          *       However I will be rescheduled some by a rad_fma_cq event when
1370          *       I eventually get some.
1371          * NB 2. Sampling rac_state here, races with setting it elsewhere
1372          *       kranal_close_conn_locked.  But it doesn't matter if I try to
1373          *       send a "real" message just as I start closing because I'll get
1374          *       scheduled to send the close anyway. */
1375
1376         if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1377                 if (!list_empty(&conn->rac_rdmaq)) {
1378                         /* RDMAs in progress */
1379                         LASSERT (!conn->rac_close_sent);
1380                         
1381                         if (time_after_eq(jiffies, 
1382                                           conn->rac_last_tx + 
1383                                           conn->rac_keepalive)) {
1384                                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1385                                 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1386                         }
1387                         return;
1388                 }
1389                 
1390                 if (conn->rac_close_sent)
1391                         return;
1392
1393                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1394                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1395                 if (rc != 0)
1396                         return;
1397
1398                 conn->rac_close_sent = 1;
1399                 if (!conn->rac_close_recvd)
1400                         return;
1401                         
1402                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1403
1404                 if (conn->rac_state == RANAL_CONN_CLOSING)
1405                         kranal_terminate_conn_locked(conn);
1406
1407                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1408                 return;
1409         }
1410
1411         spin_lock_irqsave(&conn->rac_lock, flags);
1412
1413         if (list_empty(&conn->rac_fmaq)) {
1414
1415                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1416
1417                 if (time_after_eq(jiffies, 
1418                                   conn->rac_last_tx + conn->rac_keepalive)) {
1419                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1420                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1421                 }
1422                 return;
1423         }
1424         
1425         tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1426         list_del(&tx->tx_list);
1427         more_to_do = !list_empty(&conn->rac_fmaq);
1428
1429         spin_unlock_irqrestore(&conn->rac_lock, flags);
1430
1431         expect_reply = 0;
1432         switch (tx->tx_msg.ram_type) {
1433         default:
1434                 LBUG();
1435                 
1436         case RANAL_MSG_IMMEDIATE:
1437         case RANAL_MSG_PUT_NAK:
1438         case RANAL_MSG_PUT_DONE:
1439         case RANAL_MSG_GET_NAK:
1440         case RANAL_MSG_GET_DONE:
1441                 rc = kranal_sendmsg(conn, &tx->tx_msg,
1442                                     tx->tx_buffer, tx->tx_nob);
1443                 expect_reply = 0;
1444                 break;
1445                 
1446         case RANAL_MSG_PUT_REQ:
1447                 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1448                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1449                 kranal_map_buffer(tx);
1450                 expect_reply = 1;
1451                 break;
1452
1453         case RANAL_MSG_PUT_ACK:
1454                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1455                 expect_reply = 1;
1456                 break;
1457
1458         case RANAL_MSG_GET_REQ:
1459                 kranal_map_buffer(tx);
1460                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1461                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1462                 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits = 
1463                         (__u64)((unsigned long)tx->tx_buffer);
1464                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1465                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1466                 expect_reply = 1;
1467                 break;
1468         }
1469
1470         if (rc == -EAGAIN) {
1471                 /* I need credits to send this.  Replace tx at the head of the
1472                  * fmaq and I'll get rescheduled when credits appear */
1473                 spin_lock_irqsave(&conn->rac_lock, flags);
1474                 list_add(&tx->tx_list, &conn->rac_fmaq);
1475                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1476                 return;
1477         }
1478
1479         LASSERT (rc == 0);
1480         
1481         if (!expect_reply) {
1482                 kranal_tx_done(tx, 0);
1483         } else {
1484                 spin_lock_irqsave(&conn->rac_lock, flags);
1485                 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1486                 tx->tx_qtime = jiffies;
1487                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1488         }
1489
1490         if (more_to_do)
1491                 kranal_schedule_conn(conn);
1492 }
1493
1494 static inline void
1495 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1496 {
1497         __swab64s(&d->rard_key.Key);
1498         __swab16s(&d->rard_key.Cookie);
1499         __swab16s(&d->rard_key.MdHandle);
1500         __swab32s(&d->rard_key.Flags);
1501         __swab64s(&d->rard_addr.AddressBits);
1502         __swab32s(&d->rard_nob);
1503 }
1504
1505 kra_tx_t *
1506 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1507 {
1508         struct list_head *ttmp;
1509         kra_tx_t         *tx;
1510         
1511         list_for_each(ttmp, &conn->rac_replyq) {
1512                 tx = list_entry(ttmp, kra_tx_t, tx_list);
1513                 
1514                 if (tx->tx_cookie != cookie)
1515                         continue;
1516                 
1517                 if (tx->tx_msg.ram_type != type) {
1518                         CWARN("Unexpected type %x (%x expected) "
1519                               "matched reply from "LPX64"\n",
1520                               tx->tx_msg.ram_type, type,
1521                               conn->rac_peer->rap_nid);
1522                         return NULL;
1523                 }
1524         }
1525         
1526         CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid);
1527         return NULL;
1528 }
1529
1530 void
1531 kranal_check_fma_rx (kra_conn_t *conn)
1532 {
1533         unsigned long flags;
1534         __u32         seq;
1535         kra_tx_t     *tx;
1536         kra_msg_t    *msg;
1537         void         *prefix;
1538         RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1539         kra_peer_t   *peer = conn->rac_peer;
1540
1541         if (rrc == RAP_NOT_DONE)
1542                 return;
1543         
1544         LASSERT (rrc == RAP_SUCCESS);
1545         conn->rac_last_rx = jiffies;
1546         seq = conn->rac_rx_seq++;
1547         msg = (kra_msg_t *)prefix;
1548
1549         if (msg->ram_magic != RANAL_MSG_MAGIC) {
1550                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1551                         CERROR("Unexpected magic %08x from "LPX64"\n",
1552                                msg->ram_magic, peer->rap_nid);
1553                         goto out;
1554                 }
1555
1556                 __swab32s(&msg->ram_magic);
1557                 __swab16s(&msg->ram_version);
1558                 __swab16s(&msg->ram_type);
1559                 __swab64s(&msg->ram_srcnid);
1560                 __swab64s(&msg->ram_connstamp);
1561                 __swab32s(&msg->ram_seq);
1562
1563                 /* NB message type checked below; NOT here... */
1564                 switch (msg->ram_type) {
1565                 case RANAL_MSG_PUT_ACK:
1566                         kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1567                         break;
1568
1569                 case RANAL_MSG_GET_REQ:
1570                         kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1571                         break;
1572                         
1573                 default:
1574                         break;
1575                 }
1576         }
1577
1578         if (msg->ram_version != RANAL_MSG_VERSION) {
1579                 CERROR("Unexpected protocol version %d from "LPX64"\n",
1580                        msg->ram_version, peer->rap_nid);
1581                 goto out;
1582         }
1583
1584         if (msg->ram_srcnid != peer->rap_nid) {
1585                 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1586                        msg->ram_srcnid, peer->rap_nid);
1587                 goto out;
1588         }
1589         
1590         if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1591                 CERROR("Unexpected connstamp "LPX64"("LPX64
1592                        " expected) from "LPX64"\n",
1593                        msg->ram_connstamp, conn->rac_peer_connstamp,
1594                        peer->rap_nid);
1595                 goto out;
1596         }
1597         
1598         if (msg->ram_seq != seq) {
1599                 CERROR("Unexpected sequence number %d(%d expected) from "
1600                        LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1601                 goto out;
1602         }
1603
1604         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1605                 /* This message signals RDMA completion... */
1606                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1607                 LASSERT (rrc == RAP_SUCCESS);
1608         }
1609
1610         if (conn->rac_close_recvd) {
1611                 CERROR("Unexpected message %d after CLOSE from "LPX64"\n", 
1612                        msg->ram_type, conn->rac_peer->rap_nid);
1613                 goto out;
1614         }
1615
1616         if (msg->ram_type == RANAL_MSG_CLOSE) {
1617                 conn->rac_close_recvd = 1;
1618                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1619
1620                 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1621                         kranal_close_conn_locked(conn, 0);
1622                 else if (conn->rac_close_sent)
1623                         kranal_terminate_conn_locked(conn);
1624
1625                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1626                 goto out;
1627         }
1628
1629         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1630                 goto out;
1631         
1632         conn->rac_rxmsg = msg;                  /* stash message for portals callbacks */
1633                                                 /* they'll NULL rac_rxmsg if they consume it */
1634         switch (msg->ram_type) {
1635         case RANAL_MSG_NOOP:
1636                 /* Nothing to do; just a keepalive */
1637                 break;
1638                 
1639         case RANAL_MSG_IMMEDIATE:
1640                 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1641                 break;
1642                 
1643         case RANAL_MSG_PUT_REQ:
1644                 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1645
1646                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1647                         break;
1648
1649                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1650                 if (tx == NULL)
1651                         break;
1652                 
1653                 tx->tx_msg.ram_u.completion.racm_cookie = 
1654                         msg->ram_u.putreq.raprm_cookie;
1655                 kranal_post_fma(conn, tx);
1656                 break;
1657
1658         case RANAL_MSG_PUT_NAK:
1659                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1660                                         msg->ram_u.completion.racm_cookie);
1661                 if (tx == NULL)
1662                         break;
1663                 
1664                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1665                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1666                 kranal_tx_done(tx, -ENOENT);    /* no match */
1667                 break;
1668                 
1669         case RANAL_MSG_PUT_ACK:
1670                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1671                                         msg->ram_u.putack.rapam_src_cookie);
1672                 if (tx == NULL)
1673                         break;
1674
1675                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1676                             &msg->ram_u.putack.rapam_desc, 
1677                             msg->ram_u.putack.rapam_desc.rard_nob,
1678                             msg->ram_u.putack.rapam_dst_cookie);
1679                 break;
1680
1681         case RANAL_MSG_PUT_DONE:
1682                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1683                                         msg->ram_u.completion.racm_cookie);
1684                 if (tx == NULL)
1685                         break;
1686
1687                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1688                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1689                 kranal_tx_done(tx, 0);
1690                 break;
1691
1692         case RANAL_MSG_GET_REQ:
1693                 lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
1694                 
1695                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1696                         break;
1697
1698                 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1699                 if (tx == NULL)
1700                         break;
1701
1702                 tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
1703                 kranal_post_fma(conn, tx);
1704                 break;
1705                 
1706         case RANAL_MSG_GET_NAK:
1707                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1708                                         msg->ram_u.completion.racm_cookie);
1709                 if (tx == NULL)
1710                         break;
1711                 
1712                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1713                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1714                 kranal_tx_done(tx, -ENOENT);    /* no match */
1715                 break;
1716                 
1717         case RANAL_MSG_GET_DONE:
1718                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1719                                         msg->ram_u.completion.racm_cookie);
1720                 if (tx == NULL)
1721                         break;
1722                 
1723                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1724                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1725                 kranal_tx_done(tx, 0);
1726                 break;
1727         }
1728
1729  out:
1730         if (conn->rac_rxmsg != NULL)
1731                 kranal_consume_rxmsg(conn, NULL, 0);
1732
1733         /* check again later */
1734         kranal_schedule_conn(conn);
1735 }
1736
1737 void
1738 kranal_complete_closed_conn (kra_conn_t *conn) 
1739 {
1740         kra_tx_t   *tx;
1741
1742         LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1743
1744         while (!list_empty(&conn->rac_fmaq)) {
1745                 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1746                 
1747                 list_del(&tx->tx_list);
1748                 kranal_tx_done(tx, -ECONNABORTED);
1749         }
1750         
1751         LASSERT (list_empty(&conn->rac_rdmaq));
1752
1753         while (!list_empty(&conn->rac_replyq)) {
1754                 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1755                 
1756                 list_del(&tx->tx_list);
1757                 kranal_tx_done(tx, -ECONNABORTED);
1758         }
1759 }
1760
1761 int
1762 kranal_scheduler (void *arg)
1763 {
1764         kra_device_t   *dev = (kra_device_t *)arg;
1765         wait_queue_t    wait;
1766         char            name[16];
1767         kra_conn_t     *conn;
1768         unsigned long   flags;
1769         int             busy_loops = 0;
1770
1771         snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1772         kportal_daemonize(name);
1773         kportal_blockallsigs();
1774
1775         dev->rad_scheduler = current;
1776         init_waitqueue_entry(&wait, current);
1777
1778         spin_lock_irqsave(&dev->rad_lock, flags);
1779
1780         while (!kranal_data.kra_shutdown) {
1781                 /* Safe: kra_shutdown only set when quiescent */
1782                 
1783                 if (busy_loops++ >= RANAL_RESCHED) {
1784                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1785
1786                         our_cond_resched();
1787                         busy_loops = 0;
1788
1789                         spin_lock_irqsave(&dev->rad_lock, flags);
1790                 }
1791
1792                 if (dev->rad_ready) {
1793                         /* Device callback fired since I last checked it */
1794                         dev->rad_ready = 0;
1795                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1796
1797                         kranal_check_rdma_cq(dev);
1798                         kranal_check_fma_cq(dev);
1799
1800                         spin_lock_irqsave(&dev->rad_lock, flags);
1801                 }
1802                 
1803                 if (!list_empty(&dev->rad_connq)) {
1804                         /* Connection needs attention */
1805                         conn = list_entry(dev->rad_connq.next,
1806                                           kra_conn_t, rac_schedlist);
1807                         list_del_init(&conn->rac_schedlist);
1808                         LASSERT (conn->rac_scheduled);
1809                         conn->rac_scheduled = 0;
1810                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1811
1812                         kranal_check_fma_rx(conn);
1813                         kranal_process_fmaq(conn);
1814
1815                         if (conn->rac_state == RANAL_CONN_CLOSED)
1816                                 kranal_complete_closed_conn(conn);
1817
1818                         kranal_conn_decref(conn);
1819                         
1820                         spin_lock_irqsave(&dev->rad_lock, flags);
1821                         continue;
1822                 }
1823
1824                 add_wait_queue(&dev->rad_waitq, &wait);
1825                 set_current_state(TASK_INTERRUPTIBLE);
1826
1827                 spin_unlock_irqrestore(&dev->rad_lock, flags);
1828
1829                 busy_loops = 0;
1830                 schedule();
1831
1832                 set_current_state(TASK_RUNNING);
1833                 remove_wait_queue(&dev->rad_waitq, &wait);
1834
1835                 spin_lock_irqsave(&dev->rad_lock, flags);
1836         }
1837
1838         spin_unlock_irqrestore(&dev->rad_lock, flags);
1839
1840         dev->rad_scheduler = NULL;
1841         kranal_thread_fini();
1842         return 0;
1843 }
1844
1845
1846 lib_nal_t kranal_lib = {
1847         libnal_data:        &kranal_data,      /* NAL private data */
1848         libnal_send:         kranal_send,
1849         libnal_send_pages:   kranal_send_pages,
1850         libnal_recv:         kranal_recv,
1851         libnal_recv_pages:   kranal_recv_pages,
1852         libnal_dist:         kranal_dist
1853 };