Whamcloud - gitweb
* ranal code review
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "ranal.h"
25
26 int
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
28 {
29         /* I would guess that if kranal_get_peer (nid) == NULL,
30            and we're not routing, then 'nid' is very distant :) */
31         if ( nal->libnal_ni.ni_pid.nid == nid ) {
32                 *dist = 0;
33         } else {
34                 *dist = 1;
35         }
36
37         return 0;
38 }
39
40 void
41 kranal_device_callback(RAP_INT32 devid)
42 {
43         kra_device_t *dev;
44         int           i;
45         unsigned long flags;
46         
47         for (i = 0; i < kranal_data.kra_ndevs; i++) {
48
49                 dev = &kranal_data.kra_devices[i];
50                 if (dev->rad_id != devid)
51                         continue;
52
53                 spin_lock_irqsave(&dev->rad_lock, flags);
54
55                 if (!dev->rad_ready) {
56                         dev->rad_ready = 1;
57                         wake_up(&dev->rad_waitq);
58                 }
59
60                 spin_unlock_irqrestore(&dev->rad_lock, flags);
61                 return;
62         }
63         
64         CWARN("callback for unknown device %d\n", devid);
65 }
66
67 void
68 kranal_schedule_conn(kra_conn_t *conn)
69 {
70         kra_device_t    *dev = conn->rac_device;
71         unsigned long    flags;
72         
73         spin_lock_irqsave(&dev->rad_lock, flags);
74         
75         if (!conn->rac_scheduled) {
76                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
77                 conn->rac_scheduled = 1;
78                 list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
79                 wake_up(&dev->rad_waitq);
80         }
81
82         spin_unlock_irqrestore(&dev->rad_lock, flags);
83 }
84
85 kra_tx_t *
86 kranal_get_idle_tx (int may_block) 
87 {
88         unsigned long  flags;
89         kra_tx_t      *tx = NULL;
90         
91         for (;;) {
92                 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
93
94                 /* "normal" descriptor is free */
95                 if (!list_empty(&kranal_data.kra_idle_txs)) {
96                         tx = list_entry(kranal_data.kra_idle_txs.next,
97                                         kra_tx_t, tx_list);
98                         break;
99                 }
100
101                 if (!may_block) {
102                         /* may dip into reserve pool */
103                         if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
104                                 CERROR("reserved tx desc pool exhausted\n");
105                                 break;
106                         }
107
108                         tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
109                                         kra_tx_t, tx_list);
110                         break;
111                 }
112
113                 /* block for idle tx */
114                 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
115
116                 wait_event(kranal_data.kra_idle_tx_waitq,
117                            !list_empty(&kranal_data.kra_idle_txs));
118         }
119
120         if (tx != NULL) {
121                 list_del(&tx->tx_list);
122
123                 /* Allocate a new completion cookie.  It might not be
124                  * needed, but we've got a lock right now... */
125                 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
126
127                 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
128                 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
129                 LASSERT (tx->tx_conn == NULL);
130                 LASSERT (tx->tx_libmsg[0] == NULL);
131                 LASSERT (tx->tx_libmsg[1] == NULL);
132         }
133
134         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
135         
136         return tx;
137 }
138
139 void
140 kranal_init_msg(kra_msg_t *msg, int type)
141 {
142         msg->ram_magic = RANAL_MSG_MAGIC;
143         msg->ram_version = RANAL_MSG_VERSION;
144         msg->ram_type = type;
145         msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
146         /* ram_connstamp gets set when FMA is sent */
147 }
148
149 kra_tx_t *
150 kranal_new_tx_msg (int may_block, int type)
151 {
152         kra_tx_t *tx = kranal_get_idle_tx(may_block);
153
154         if (tx == NULL)
155                 return NULL;
156
157         kranal_init_msg(&tx->tx_msg, type);
158         return tx;
159 }
160
161 int
162 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
163                                int offset, int nob)
164                  
165 {
166         /* For now this is almost identical to kranal_setup_virt_buffer, but we
167          * could "flatten" the payload into a single contiguous buffer ready
168          * for sending direct over an FMA if we ever needed to. */
169
170         LASSERT (nob > 0);
171         LASSERT (niov > 0);
172         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
173
174         while (offset >= iov->iov_len) {
175                 offset -= iov->iov_len;
176                 niov--;
177                 iov++;
178                 LASSERT (niov > 0);
179         }
180
181         if (nob > iov->iov_len - offset) {
182                 CERROR("Can't handle multiple vaddr fragments\n");
183                 return -EMSGSIZE;
184         }
185
186         tx->tx_buftype = RANAL_BUF_IMMEDIATE;
187         tx->tx_nob = nob;
188         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
189         return 0;
190 }
191
192 int
193 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
194                           int offset, int nob)
195                  
196 {
197         LASSERT (nob > 0);
198         LASSERT (niov > 0);
199         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
200
201         while (offset >= iov->iov_len) {
202                 offset -= iov->iov_len;
203                 niov--;
204                 iov++;
205                 LASSERT (niov > 0);
206         }
207
208         if (nob > iov->iov_len - offset) {
209                 CERROR("Can't handle multiple vaddr fragments\n");
210                 return -EMSGSIZE;
211         }
212
213         tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
214         tx->tx_nob = nob;
215         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
216         return 0;
217 }
218
219 int
220 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
221                           int offset, int nob)
222 {
223         RAP_PHYS_REGION *phys = tx->tx_phys;
224         int              resid;
225
226         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
227
228         LASSERT (nob > 0);
229         LASSERT (nkiov > 0);
230         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
231
232         while (offset >= kiov->kiov_len) {
233                 offset -= kiov->kiov_len;
234                 nkiov--;
235                 kiov++;
236                 LASSERT (nkiov > 0);
237         }
238
239         tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
240         tx->tx_nob = nob;
241         tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
242         
243         phys->Address = kranal_page2phys(kiov->kiov_page);
244         phys->Length  = PAGE_SIZE;
245         phys++;
246
247         resid = nob - (kiov->kiov_len - offset);
248         while (resid > 0) {
249                 kiov++;
250                 nkiov--;
251                 LASSERT (nkiov > 0);
252
253                 if (kiov->kiov_offset != 0 ||
254                     ((resid > PAGE_SIZE) && 
255                      kiov->kiov_len < PAGE_SIZE)) {
256                         /* Can't have gaps */
257                         CERROR("Can't make payload contiguous in I/O VM:"
258                                "page %d, offset %d, len %d \n", 
259                                phys - tx->tx_phys, 
260                                kiov->kiov_offset, kiov->kiov_len);                        
261                         return -EINVAL;
262                 }
263
264                 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
265                         CERROR ("payload too big (%d)\n", phys - tx->tx_phys);
266                         return -EMSGSIZE;
267                 }
268
269                 phys->Address = kranal_page2phys(kiov->kiov_page);
270                 phys->Length  = PAGE_SIZE;
271                 phys++;
272
273                 resid -= PAGE_SIZE;
274         }
275
276         tx->tx_phys_npages = phys - tx->tx_phys;
277         return 0;
278 }
279
280 static inline int
281 kranal_setup_buffer (kra_tx_t *tx, int niov, 
282                      struct iovec *iov, ptl_kiov_t *kiov,
283                      int offset, int nob)
284 {
285         LASSERT ((iov == NULL) != (kiov == NULL));
286         
287         if (kiov != NULL)
288                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
289         
290         return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
291 }
292
293 void
294 kranal_map_buffer (kra_tx_t *tx)
295 {
296         kra_conn_t     *conn = tx->tx_conn;
297         kra_device_t   *dev = conn->rac_device;
298         RAP_RETURN      rrc;
299
300         LASSERT (current == dev->rad_scheduler);
301
302         switch (tx->tx_buftype) {
303         default:
304                 LBUG();
305                 
306         case RANAL_BUF_NONE:
307         case RANAL_BUF_IMMEDIATE:
308         case RANAL_BUF_PHYS_MAPPED:
309         case RANAL_BUF_VIRT_MAPPED:
310                 break;
311                 
312         case RANAL_BUF_PHYS_UNMAPPED:
313                 rrc = RapkRegisterPhys(dev->rad_handle,
314                                        tx->tx_phys, tx->tx_phys_npages,
315                                        dev->rad_ptag, &tx->tx_map_key);
316                 LASSERT (rrc == RAP_SUCCESS);
317                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
318                 break;
319
320         case RANAL_BUF_VIRT_UNMAPPED:
321                 rrc = RapkRegisterMemory(dev->rad_handle,
322                                          tx->tx_buffer, tx->tx_nob,
323                                          dev->rad_ptag, &tx->tx_map_key);
324                 LASSERT (rrc == RAP_SUCCESS);
325                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
326                 break;
327         }
328 }
329
330 void
331 kranal_unmap_buffer (kra_tx_t *tx)
332 {
333         kra_device_t   *dev;
334         RAP_RETURN      rrc;
335
336         switch (tx->tx_buftype) {
337         default:
338                 LBUG();
339                 
340         case RANAL_BUF_NONE:
341         case RANAL_BUF_IMMEDIATE:
342         case RANAL_BUF_PHYS_UNMAPPED:
343         case RANAL_BUF_VIRT_UNMAPPED:
344                 break;
345                 
346         case RANAL_BUF_PHYS_MAPPED:
347                 LASSERT (tx->tx_conn != NULL);
348                 dev = tx->tx_conn->rac_device;
349                 LASSERT (current == dev->rad_scheduler);
350                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
351                                            dev->rad_ptag, &tx->tx_map_key);
352                 LASSERT (rrc == RAP_SUCCESS);
353                 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
354                 break;
355
356         case RANAL_BUF_VIRT_MAPPED:
357                 LASSERT (tx->tx_conn != NULL);
358                 dev = tx->tx_conn->rac_device;
359                 LASSERT (current == dev->rad_scheduler);
360                 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
361                                            dev->rad_ptag, &tx->tx_map_key);
362                 LASSERT (rrc == RAP_SUCCESS);
363                 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
364                 break;
365         }
366 }
367
368 void
369 kranal_tx_done (kra_tx_t *tx, int completion)
370 {
371         ptl_err_t        ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
372         unsigned long    flags;
373         int              i;
374
375         LASSERT (!in_interrupt());
376
377         kranal_unmap_buffer(tx);
378
379         for (i = 0; i < 2; i++) {
380                 /* tx may have up to 2 libmsgs to finalise */
381                 if (tx->tx_libmsg[i] == NULL)
382                         continue;
383
384                 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
385                 tx->tx_libmsg[i] = NULL;
386         }
387
388         tx->tx_buftype = RANAL_BUF_NONE;
389         tx->tx_msg.ram_type = RANAL_MSG_NONE;
390         tx->tx_conn = NULL;
391
392         spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
393
394         if (tx->tx_isnblk) {
395                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
396         } else {
397                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
398                 wake_up(&kranal_data.kra_idle_tx_waitq);
399         }
400
401         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
402 }
403
404 kra_conn_t *
405 kranal_find_conn_locked (kra_peer_t *peer)
406 {
407         struct list_head *tmp;
408
409         /* just return the first connection */
410         list_for_each (tmp, &peer->rap_conns) {
411                 return list_entry(tmp, kra_conn_t, rac_list);
412         }
413
414         return NULL;
415 }
416
417 void
418 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
419 {
420         unsigned long    flags;
421
422         tx->tx_conn = conn;
423
424         spin_lock_irqsave(&conn->rac_lock, flags);
425         list_add_tail(&tx->tx_list, &conn->rac_fmaq);
426         tx->tx_qtime = jiffies;
427         spin_unlock_irqrestore(&conn->rac_lock, flags);
428
429         kranal_schedule_conn(conn);
430 }
431
432 void
433 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
434 {
435         unsigned long    flags;
436         kra_peer_t      *peer;
437         kra_conn_t      *conn;
438         unsigned long    now;
439         rwlock_t        *g_lock = &kranal_data.kra_global_lock;
440
441         /* If I get here, I've committed to send, so I complete the tx with
442          * failure on any problems */
443         
444         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
445
446         read_lock(g_lock);
447         
448         peer = kranal_find_peer_locked(nid);
449         if (peer == NULL) {
450                 read_unlock(g_lock);
451                 kranal_tx_done(tx, -EHOSTUNREACH);
452                 return;
453         }
454
455         conn = kranal_find_conn_locked(peer);
456         if (conn != NULL) {
457                 kranal_post_fma(conn, tx);
458                 read_unlock(g_lock);
459                 return;
460         }
461         
462         /* Making one or more connections; I'll need a write lock... */
463         read_unlock(g_lock);
464         write_lock_irqsave(g_lock, flags);
465
466         peer = kranal_find_peer_locked(nid);
467         if (peer == NULL) {
468                 write_unlock_irqrestore(g_lock, flags);
469                 kranal_tx_done(tx, -EHOSTUNREACH);
470                 return;
471         }
472
473         conn = kranal_find_conn_locked(peer);
474         if (conn != NULL) {
475                 /* Connection exists; queue message on it */
476                 kranal_post_fma(conn, tx);
477                 write_unlock_irqrestore(g_lock, flags);
478                 return;
479         }
480
481         LASSERT (peer->rap_persistence > 0);
482
483         if (!peer->rap_connecting) {
484                 LASSERT (list_empty(&peer->rap_tx_queue));
485                 
486                 now = CURRENT_TIME;
487                 if (now < peer->rap_reconnect_time) {
488                         write_unlock_irqrestore(g_lock, flags);
489                         kranal_tx_done(tx, -EHOSTUNREACH);
490                         return;
491                 }
492         
493                 peer->rap_connecting = 1;
494                 kranal_peer_addref(peer); /* extra ref for connd */
495         
496                 spin_lock(&kranal_data.kra_connd_lock);
497         
498                 list_add_tail(&peer->rap_connd_list,
499                               &kranal_data.kra_connd_peers);
500                 wake_up(&kranal_data.kra_connd_waitq);
501         
502                 spin_unlock(&kranal_data.kra_connd_lock);
503         }
504         
505         /* A connection is being established; queue the message... */
506         list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
507
508         write_unlock_irqrestore(g_lock, flags);
509 }
510
511 void
512 kranal_rdma(kra_tx_t *tx, int type, 
513             kra_rdma_desc_t *sink, int nob, __u64 cookie)
514 {
515         kra_conn_t   *conn = tx->tx_conn;
516         RAP_RETURN    rrc;
517         unsigned long flags;
518
519         LASSERT (kranal_tx_mapped(tx));
520         LASSERT (nob <= sink->rard_nob);
521         LASSERT (nob <= tx->tx_nob);
522
523         /* No actual race with scheduler sending CLOSE (I'm she!) */
524         LASSERT (current == conn->rac_device->rad_scheduler);
525         
526         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
527         tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
528         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
529         tx->tx_rdma_desc.DstPtr = sink->rard_addr;
530         tx->tx_rdma_desc.DstKey = sink->rard_key;
531         tx->tx_rdma_desc.Length = nob;
532         tx->tx_rdma_desc.AppPtr = tx;
533
534         /* prep final completion message */
535         kranal_init_msg(&tx->tx_msg, type);
536         tx->tx_msg.ram_u.completion.racm_cookie = cookie;
537
538         if (nob == 0) { /* Immediate completion */
539                 kranal_post_fma(conn, tx);
540                 return;
541         }
542
543         LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
544
545         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
546         LASSERT (rrc == RAP_SUCCESS);
547
548         spin_lock_irqsave(&conn->rac_lock, flags);
549         list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
550         tx->tx_qtime = jiffies;
551         spin_unlock_irqrestore(&conn->rac_lock, flags);
552 }
553
554 int
555 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
556 {
557         __u32      nob_received = nob;
558         RAP_RETURN rrc;
559
560         LASSERT (conn->rac_rxmsg != NULL);
561
562         rrc = RapkFmaCopyToUser(conn->rac_rihandle, buffer,
563                                 &nob_received, sizeof(kra_msg_t));
564         LASSERT (rrc == RAP_SUCCESS);
565
566         conn->rac_rxmsg = NULL;
567
568         if (nob_received != nob) {
569                 CWARN("Expected %d immediate bytes but got %d\n",
570                       nob, nob_received);
571                 return -EPROTO;
572         }
573         
574         return 0;
575 }
576
577 ptl_err_t
578 kranal_do_send (lib_nal_t    *nal, 
579                 void         *private,
580                 lib_msg_t    *libmsg,
581                 ptl_hdr_t    *hdr, 
582                 int           type, 
583                 ptl_nid_t     nid, 
584                 ptl_pid_t     pid,
585                 unsigned int  niov, 
586                 struct iovec *iov, 
587                 ptl_kiov_t   *kiov,
588                 size_t        offset,
589                 size_t        nob)
590 {
591         kra_conn_t *conn;
592         kra_tx_t   *tx;
593         int         rc;
594
595         /* NB 'private' is different depending on what we're sending.... */
596
597         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
598                " pid %d\n", nob, niov, nid , pid);
599
600         LASSERT (nob == 0 || niov > 0);
601         LASSERT (niov <= PTL_MD_MAX_IOV);
602
603         LASSERT (!in_interrupt());
604         /* payload is either all vaddrs or all pages */
605         LASSERT (!(kiov != NULL && iov != NULL));
606
607         switch(type) {
608         default:
609                 LBUG();
610                 
611         case PTL_MSG_REPLY: {
612                 /* reply's 'private' is the conn that received the GET_REQ */
613                 conn = private;
614                 LASSERT (conn->rac_rxmsg != NULL);
615
616                 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
617                         if (nob > RANAL_FMA_MAX_DATA) {
618                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
619                                        nob, nid);
620                                 return PTL_FAIL;
621                         }
622                         break;                  /* RDMA not expected */
623                 }
624                 
625                 /* Incoming message consistent with immediate reply? */
626                 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
627                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
628                                nid, conn->rac_rxmsg->ram_type);
629                         return PTL_FAIL;
630                 }
631
632                 tx = kranal_get_idle_tx(0);
633                 if (tx == NULL)
634                         return PTL_FAIL;
635
636                 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
637                 if (rc != 0) {
638                         kranal_tx_done(tx, rc);
639                         return PTL_FAIL;
640                 }
641
642                 tx->tx_conn = conn;
643                 tx->tx_libmsg[0] = libmsg;
644
645                 kranal_map_buffer(tx);
646                 kranal_rdma(tx, RANAL_MSG_GET_DONE,
647                             &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
648                             conn->rac_rxmsg->ram_u.get.ragm_cookie);
649                 return PTL_OK;
650         }
651
652         case PTL_MSG_GET:
653                 if (kiov == NULL &&             /* not paged */
654                     nob <= RANAL_FMA_MAX_DATA && /* small enough */
655                     nob <= kranal_tunables.kra_max_immediate)
656                         break;                  /* send IMMEDIATE */
657
658                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
659                 if (tx == NULL)
660                         return PTL_NO_SPACE;
661
662                 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
663                 if (rc != 0) {
664                         kranal_tx_done(tx, rc);
665                         return PTL_FAIL;
666                 }
667
668                 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
669                 if (tx->tx_libmsg[1] == NULL) {
670                         CERROR("Can't create reply for GET to "LPX64"\n", nid);
671                         kranal_tx_done(tx, rc);
672                         return PTL_FAIL;
673                 }
674
675                 tx->tx_libmsg[0] = libmsg;
676                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
677                 /* rest of tx_msg is setup just before it is sent */
678                 kranal_launch_tx(tx, nid);
679                 return PTL_OK;
680
681         case PTL_MSG_ACK:
682                 LASSERT (nob == 0);
683                 break;
684
685         case PTL_MSG_PUT:
686                 if (kiov == NULL &&             /* not paged */
687                     nob <= RANAL_MAX_IMMEDIATE && /* small enough */
688                     nob <= kranal_tunables.kra_max_immediate)
689                         break;                  /* send IMMEDIATE */
690                 
691                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
692                 if (tx == NULL)
693                         return PTL_NO_SPACE;
694
695                 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
696                 if (rc != 0) {
697                         kranal_tx_done(tx, rc);
698                         return PTL_FAIL;
699                 }
700
701                 tx->tx_libmsg[0] = libmsg;
702                 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
703                 /* rest of tx_msg is setup just before it is sent */
704                 kranal_launch_tx(tx, nid);
705                 return PTL_OK;
706         }
707
708         LASSERT (kiov == NULL);
709         LASSERT (nob <= RANAL_MAX_IMMEDIATE);
710
711         tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
712                                  type == PTL_MSG_REPLY ||
713                                  in_interrupt()), 
714                                RANAL_MSG_IMMEDIATE);
715         if (tx == NULL)
716                 return PTL_NO_SPACE;
717
718         rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
719         if (rc != 0) {
720                 kranal_tx_done(tx, rc);
721                 return PTL_FAIL;
722         }
723                 
724         tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
725         tx->tx_libmsg[0] = libmsg;
726         kranal_launch_tx(tx, nid);
727         return PTL_OK;
728 }
729
730 ptl_err_t
731 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
732              ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
733              unsigned int niov, struct iovec *iov,
734              size_t offset, size_t len)
735 {
736         return kranal_do_send(nal, private, cookie,
737                               hdr, type, nid, pid,
738                               niov, iov, NULL,
739                               offset, len);
740 }
741
742 ptl_err_t
743 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
744                    ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
745                    unsigned int niov, ptl_kiov_t *kiov, 
746                    size_t offset, size_t len)
747 {
748         return kranal_do_send(nal, private, cookie,
749                               hdr, type, nid, pid,
750                               niov, NULL, kiov,
751                               offset, len);
752 }
753
754 ptl_err_t
755 kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
756                 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
757                 size_t offset, size_t mlen, size_t rlen)
758 {
759         kra_conn_t  *conn = private;
760         kra_msg_t   *rxmsg = conn->rac_rxmsg;
761         kra_tx_t    *tx;
762         void        *buffer;
763         int          rc;
764         
765         LASSERT (mlen <= rlen);
766         LASSERT (!in_interrupt());
767         /* Either all pages or all vaddrs */
768         LASSERT (!(kiov != NULL && iov != NULL));
769
770         switch(rxmsg->ram_type) {
771         default:
772                 LBUG();
773                 return PTL_FAIL;
774                 
775         case RANAL_MSG_IMMEDIATE:
776                 if (mlen == 0) {
777                         buffer = NULL;
778                 } else if (kiov != NULL) {
779                         CERROR("Can't recv immediate into paged buffer\n");
780                         return PTL_FAIL;
781                 } else {
782                         LASSERT (niov > 0);
783                         while (offset >= iov->iov_len) {
784                                 offset -= iov->iov_len;
785                                 iov++;
786                                 niov--;
787                                 LASSERT (niov > 0);
788                         }
789                         if (mlen > iov->iov_len - offset) {
790                                 CERROR("Can't handle immediate frags\n");
791                                 return PTL_FAIL;
792                         }
793                         buffer = ((char *)iov->iov_base) + offset;
794                 }
795                 rc = kranal_consume_rxmsg(conn, buffer, mlen);
796                 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
797                 return PTL_OK;
798
799         case RANAL_MSG_GET_REQ:
800                 /* If the GET matched, we've already handled it in
801                  * kranal_do_send which is called to send the REPLY.  We're
802                  * only called here to complete the GET receive (if we needed
803                  * it which we don't, but I digress...) */
804                 LASSERT (libmsg == NULL);
805                 lib_finalize(nal, NULL, libmsg, PTL_OK);
806                 return PTL_OK;
807
808         case RANAL_MSG_PUT_REQ:
809                 if (libmsg == NULL) {           /* PUT didn't match... */
810                         lib_finalize(nal, NULL, libmsg, PTL_OK);
811                         return PTL_OK;
812                 }
813                 
814                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
815                 if (tx == NULL)
816                         return PTL_NO_SPACE;
817
818                 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, mlen);
819                 if (rc != 0) {
820                         kranal_tx_done(tx, rc);
821                         return PTL_FAIL;
822                 }
823
824                 tx->tx_conn = conn;
825                 kranal_map_buffer(tx);
826                 
827                 tx->tx_msg.ram_u.putack.rapam_src_cookie = 
828                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
829                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
830                 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
831                 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits = 
832                         (__u64)((unsigned long)tx->tx_buffer);
833                 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
834
835                 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
836
837                 kranal_post_fma(conn, tx);
838                 
839                 /* flag matched by consuming rx message */
840                 kranal_consume_rxmsg(conn, NULL, 0);
841                 return PTL_OK;
842         }
843 }
844
845 ptl_err_t
846 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
847              unsigned int niov, struct iovec *iov, 
848              size_t offset, size_t mlen, size_t rlen)
849 {
850         return kranal_recvmsg(nal, private, msg, niov, iov, NULL,
851                               offset, mlen, rlen);
852 }
853
854 ptl_err_t
855 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
856                    unsigned int niov, ptl_kiov_t *kiov, 
857                    size_t offset, size_t mlen, size_t rlen)
858 {
859         return kranal_recvmsg(nal, private, msg, niov, NULL, kiov,
860                               offset, mlen, rlen);
861 }
862
863 int
864 kranal_thread_start (int(*fn)(void *arg), void *arg)
865 {
866         long    pid = kernel_thread(fn, arg, 0);
867
868         if (pid < 0)
869                 return(int)pid;
870
871         atomic_inc(&kranal_data.kra_nthreads);
872         return 0;
873 }
874
875 void
876 kranal_thread_fini (void)
877 {
878         atomic_dec(&kranal_data.kra_nthreads);
879 }
880
881 int
882 kranal_check_conn_timeouts (kra_conn_t *conn)
883 {
884         kra_tx_t          *tx;
885         struct list_head  *ttmp;
886         unsigned long      flags;
887         long               timeout;
888         unsigned long      now = jiffies;
889
890         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
891                  conn->rac_state == RANAL_CONN_CLOSING);
892
893         if (!conn->rac_close_sent &&
894             time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
895                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
896                 kranal_schedule_conn(conn);
897         }
898
899         timeout = conn->rac_timeout * HZ;
900
901         if (!conn->rac_close_recvd &&
902             time_after_eq(now, conn->rac_last_rx + timeout)) {
903                 CERROR("Nothing received from "LPX64" within %lu seconds\n",
904                        conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
905                 return -ETIMEDOUT;
906         }
907
908         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
909                 return 0;
910         
911         /* Check the conn's queues are moving.  These are "belt+braces" checks,
912          * in case of hardware/software errors that make this conn seem
913          * responsive even though it isn't progressing its message queues. */
914
915         spin_lock_irqsave(&conn->rac_lock, flags);
916
917         list_for_each (ttmp, &conn->rac_fmaq) {
918                 tx = list_entry(ttmp, kra_tx_t, tx_list);
919                 
920                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
921                         spin_unlock_irqrestore(&conn->rac_lock, flags);
922                         CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
923                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
924                         return -ETIMEDOUT;
925                 }
926         }
927         
928         list_for_each (ttmp, &conn->rac_rdmaq) {
929                 tx = list_entry(ttmp, kra_tx_t, tx_list);
930                 
931                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
932                         spin_unlock_irqrestore(&conn->rac_lock, flags);
933                         CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
934                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
935                         return -ETIMEDOUT;
936                 }
937         }
938         
939         list_for_each (ttmp, &conn->rac_replyq) {
940                 tx = list_entry(ttmp, kra_tx_t, tx_list);
941                 
942                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
943                         spin_unlock_irqrestore(&conn->rac_lock, flags);
944                         CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
945                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
946                         return -ETIMEDOUT;
947                 }
948         }
949         
950         spin_unlock_irqrestore(&conn->rac_lock, flags);
951         return 0;
952 }
953
954 void
955 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
956 {
957         struct list_head  *conns = &kranal_data.kra_conns[idx];
958         struct list_head  *ctmp;
959         kra_conn_t        *conn;
960         unsigned long      flags;
961         int                rc;
962
963  again:
964         /* NB. We expect to check all the conns and not find any problems, so
965          * we just use a shared lock while we take a look... */
966         read_lock(&kranal_data.kra_global_lock);
967
968         list_for_each (ctmp, conns) {
969                 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
970
971                 if (conn->rac_timeout < *min_timeoutp )
972                         *min_timeoutp = conn->rac_timeout;
973                 if (conn->rac_keepalive < *min_timeoutp )
974                         *min_timeoutp = conn->rac_keepalive;
975
976                 rc = kranal_check_conn_timeouts(conn);
977                 if (rc == 0)
978                         continue;
979
980                 kranal_conn_addref(conn);
981                 read_unlock(&kranal_data.kra_global_lock);
982
983                 CERROR("Conn to "LPX64", cqid %d timed out\n",
984                        conn->rac_peer->rap_nid, conn->rac_cqid);
985
986                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
987
988                 switch (conn->rac_state) {
989                 default:
990                         LBUG();
991
992                 case RANAL_CONN_ESTABLISHED:
993                         kranal_close_conn_locked(conn, -ETIMEDOUT);
994                         break;
995                         
996                 case RANAL_CONN_CLOSING:
997                         kranal_terminate_conn_locked(conn);
998                         break;
999                 }
1000                 
1001                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1002
1003                 kranal_conn_decref(conn);
1004
1005                 /* start again now I've dropped the lock */
1006                 goto again;
1007         }
1008
1009         read_unlock(&kranal_data.kra_global_lock);
1010 }
1011
1012 int
1013 kranal_connd (void *arg)
1014 {
1015         char               name[16];
1016         wait_queue_t       wait;
1017         unsigned long      flags;
1018         kra_peer_t        *peer;
1019
1020         snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
1021         kportal_daemonize(name);
1022         kportal_blockallsigs();
1023
1024         init_waitqueue_entry(&wait, current);
1025
1026         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1027
1028         while (!kranal_data.kra_shutdown) {
1029                 /* Safe: kra_shutdown only set when quiescent */
1030
1031                 if (!list_empty(&kranal_data.kra_connd_peers)) {
1032                         peer = list_entry(kranal_data.kra_connd_peers.next,
1033                                           kra_peer_t, rap_connd_list);
1034                         
1035                         list_del_init(&peer->rap_connd_list);
1036                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1037
1038                         kranal_connect(peer);
1039                         kranal_peer_decref(peer);
1040
1041                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1042                         continue;
1043                 }
1044
1045                 set_current_state(TASK_INTERRUPTIBLE);
1046                 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1047                 
1048                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1049
1050                 schedule ();
1051                 
1052                 set_current_state(TASK_RUNNING);
1053                 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1054
1055                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1056         }
1057
1058         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1059
1060         kranal_thread_fini();
1061         return 0;
1062 }
1063
1064 void
1065 kranal_update_reaper_timeout(long timeout) 
1066 {
1067         unsigned long   flags;
1068
1069         LASSERT (timeout > 0);
1070         
1071         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1072         
1073         if (timeout < kranal_data.kra_new_min_timeout)
1074                 kranal_data.kra_new_min_timeout = timeout;
1075
1076         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1077 }
1078
1079 int
1080 kranal_reaper (void *arg)
1081 {
1082         wait_queue_t       wait;
1083         unsigned long      flags;
1084         long               timeout;
1085         int                i;
1086         int                conn_entries = kranal_data.kra_conn_hash_size;
1087         int                conn_index = 0;
1088         int                base_index = conn_entries - 1;
1089         unsigned long      next_check_time = jiffies;
1090         long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1091         long               current_min_timeout = 1;
1092         
1093         kportal_daemonize("kranal_reaper");
1094         kportal_blockallsigs();
1095
1096         init_waitqueue_entry(&wait, current);
1097
1098         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1099
1100         while (!kranal_data.kra_shutdown) {
1101
1102                 /* careful with the jiffy wrap... */
1103                 timeout = (long)(next_check_time - jiffies);
1104                 if (timeout <= 0) {
1105                 
1106                         /* I wake up every 'p' seconds to check for
1107                          * timeouts on some more peers.  I try to check
1108                          * every connection 'n' times within the global
1109                          * minimum of all keepalive and timeout intervals,
1110                          * to ensure I attend to every connection within
1111                          * (n+1)/n times its timeout intervals. */
1112                 
1113                         const int     p = 1;
1114                         const int     n = 3;
1115                         unsigned long min_timeout;
1116                         int           chunk;
1117
1118                         if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1119                                 /* new min timeout set: restart min timeout scan */
1120                                 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1121                                 base_index = conn_index - 1;
1122                                 if (base_index < 0)
1123                                         base_index = conn_entries - 1;
1124
1125                                 if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1126                                         current_min_timeout = kranal_data.kra_new_min_timeout;
1127                                         CWARN("Set new min timeout %ld\n",
1128                                               current_min_timeout);
1129                                 }
1130
1131                                 kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1132                         }
1133                         min_timeout = current_min_timeout;
1134
1135                         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1136                                                flags);
1137
1138                         LASSERT (min_timeout > 0);
1139
1140                         /* Compute how many table entries to check now so I
1141                          * get round the whole table fast enough (NB I do
1142                          * this at fixed intervals of 'p' seconds) */
1143                         chunk = conn_entries;
1144                         if (min_timeout > n * p)
1145                                 chunk = (chunk * n * p) / min_timeout;
1146                         if (chunk == 0)
1147                                 chunk = 1;
1148
1149                         for (i = 0; i < chunk; i++) {
1150                                 kranal_reaper_check(conn_index, 
1151                                                     &next_min_timeout);
1152                                 conn_index = (conn_index + 1) % conn_entries;
1153                         }
1154
1155                         next_check_time += p * HZ;
1156
1157                         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1158
1159                         if (((conn_index - chunk <= base_index &&
1160                               base_index < conn_index) ||
1161                              (conn_index - conn_entries - chunk <= base_index &&
1162                               base_index < conn_index - conn_entries))) {
1163
1164                                 /* Scanned all conns: set current_min_timeout... */
1165                                 if (current_min_timeout != next_min_timeout) {
1166                                         current_min_timeout = next_min_timeout;                                        
1167                                         CWARN("Set new min timeout %ld\n",
1168                                               current_min_timeout);
1169                                 }
1170
1171                                 /* ...and restart min timeout scan */
1172                                 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1173                                 base_index = conn_index - 1;
1174                                 if (base_index < 0)
1175                                         base_index = conn_entries - 1;
1176                         }
1177                 }
1178
1179                 set_current_state(TASK_INTERRUPTIBLE);
1180                 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1181
1182                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1183
1184                 schedule_timeout(timeout);
1185
1186                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1187
1188                 set_current_state(TASK_RUNNING);
1189                 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1190         }
1191
1192         kranal_thread_fini();
1193         return 0;
1194 }
1195
1196 void
1197 kranal_check_rdma_cq (kra_device_t *dev)
1198 {
1199         kra_conn_t          *conn;
1200         kra_tx_t            *tx;
1201         RAP_RETURN           rrc;
1202         unsigned long        flags;
1203         RAP_RDMA_DESCRIPTOR *desc;
1204         __u32                cqid;
1205         __u32                event_type;
1206
1207         for (;;) {
1208                 rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type);
1209                 if (rrc == RAP_NOT_DONE)
1210                         return;
1211
1212                 LASSERT (rrc == RAP_SUCCESS);
1213                 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1214
1215                 read_lock(&kranal_data.kra_global_lock);
1216
1217                 conn = kranal_cqid2conn_locked(cqid);
1218                 if (conn == NULL) {
1219                         /* Conn was destroyed? */
1220                         CWARN("RDMA CQID lookup %d failed\n", cqid);
1221                         read_unlock(&kranal_data.kra_global_lock);
1222                         continue;
1223                 }
1224
1225                 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1226                 LASSERT (rrc == RAP_SUCCESS);
1227
1228                 spin_lock_irqsave(&conn->rac_lock, flags);
1229
1230                 LASSERT (!list_empty(&conn->rac_rdmaq));
1231                 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1232                 list_del(&tx->tx_list);
1233
1234                 LASSERT(desc->AppPtr == (void *)tx);
1235                 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1236                         tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1237
1238                 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1239                 tx->tx_qtime = jiffies;
1240         
1241                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1242
1243                 /* Get conn's fmaq processed, now I've just put something
1244                  * there */
1245                 kranal_schedule_conn(conn);
1246
1247                 read_unlock(&kranal_data.kra_global_lock);
1248         }
1249 }
1250
1251 void
1252 kranal_check_fma_cq (kra_device_t *dev)
1253 {
1254         kra_conn_t         *conn;
1255         RAP_RETURN          rrc;
1256         __u32               cqid;
1257         __u32               event_type;
1258         struct list_head   *conns;
1259         struct list_head   *tmp;
1260         int                 i;
1261
1262         for (;;) {
1263                 rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type);
1264                 if (rrc != RAP_NOT_DONE)
1265                         return;
1266                 
1267                 LASSERT (rrc == RAP_SUCCESS);
1268
1269                 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1270
1271                         read_lock(&kranal_data.kra_global_lock);
1272                 
1273                         conn = kranal_cqid2conn_locked(cqid);
1274                         if (conn == NULL)
1275                                 CWARN("FMA CQID lookup %d failed\n", cqid);
1276                         else
1277                                 kranal_schedule_conn(conn);
1278
1279                         read_unlock(&kranal_data.kra_global_lock);
1280                         continue;
1281                 }
1282
1283                 /* FMA CQ has overflowed: check ALL conns */
1284                 CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
1285
1286                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1287                         
1288                         read_lock(&kranal_data.kra_global_lock);
1289                 
1290                         conns = &kranal_data.kra_conns[i];
1291
1292                         list_for_each (tmp, conns) {
1293                                 conn = list_entry(tmp, kra_conn_t, 
1294                                                   rac_hashlist);
1295                 
1296                                 if (conn->rac_device == dev)
1297                                         kranal_schedule_conn(conn);
1298                         }
1299
1300                         /* don't block write lockers for too long... */
1301                         read_unlock(&kranal_data.kra_global_lock);
1302                 }
1303         }
1304 }
1305
1306 int
1307 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1308                void *immediate, int immediatenob)
1309 {
1310         int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1311         RAP_RETURN rrc;
1312         
1313         LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1314         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1315                  immediatenob <= RANAL_FMA_MAX_DATA :
1316                  immediatenob == 0);
1317
1318         msg->ram_connstamp = conn->rac_my_connstamp;
1319         msg->ram_seq = conn->rac_tx_seq;
1320
1321         if (sync)
1322                 rrc = RapkFmaSyncSend(conn->rac_device->rad_handle,
1323                                       immediate, immediatenob,
1324                                       msg, sizeof(*msg));
1325         else
1326                 rrc = RapkFmaSend(conn->rac_device->rad_handle,
1327                                   immediate, immediatenob,
1328                                   msg, sizeof(*msg));
1329
1330         switch (rrc) {
1331         default:
1332                 LBUG();
1333
1334         case RAP_SUCCESS:
1335                 conn->rac_last_tx = jiffies;
1336                 conn->rac_tx_seq++;
1337                 return 0;
1338                 
1339         case RAP_NOT_DONE:
1340                 return -EAGAIN;
1341         }
1342 }
1343
1344 void
1345 kranal_process_fmaq (kra_conn_t *conn) 
1346 {
1347         unsigned long flags;
1348         int           more_to_do;
1349         kra_tx_t     *tx;
1350         int           rc;
1351         int           expect_reply;
1352
1353         /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1354          *       However I will be rescheduled some by a rad_fma_cq event when
1355          *       I eventually get some.
1356          * NB 2. Sampling rac_state here, races with setting it elsewhere
1357          *       kranal_close_conn_locked.  But it doesn't matter if I try to
1358          *       send a "real" message just as I start closing because I'll get
1359          *       scheduled to send the close anyway. */
1360
1361         if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1362                 if (!list_empty(&conn->rac_rdmaq)) {
1363                         /* RDMAs in progress */
1364                         LASSERT (!conn->rac_close_sent);
1365                         
1366                         if (time_after_eq(jiffies, 
1367                                           conn->rac_last_tx + 
1368                                           conn->rac_keepalive)) {
1369                                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1370                                 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1371                         }
1372                         return;
1373                 }
1374                 
1375                 if (conn->rac_close_sent)
1376                         return;
1377
1378                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1379                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1380                 if (rc != 0)
1381                         return;
1382
1383                 conn->rac_close_sent = 1;
1384                 if (!conn->rac_close_recvd)
1385                         return;
1386                         
1387                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1388
1389                 if (conn->rac_state == RANAL_CONN_CLOSING)
1390                         kranal_terminate_conn_locked(conn);
1391
1392                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1393                 return;
1394         }
1395
1396         spin_lock_irqsave(&conn->rac_lock, flags);
1397
1398         if (list_empty(&conn->rac_fmaq)) {
1399
1400                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1401
1402                 if (time_after_eq(jiffies, 
1403                                   conn->rac_last_tx + conn->rac_keepalive)) {
1404                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1405                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1406                 }
1407                 return;
1408         }
1409         
1410         tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1411         list_del(&tx->tx_list);
1412         more_to_do = !list_empty(&conn->rac_fmaq);
1413
1414         spin_unlock_irqrestore(&conn->rac_lock, flags);
1415
1416         expect_reply = 0;
1417         switch (tx->tx_msg.ram_type) {
1418         default:
1419                 LBUG();
1420                 
1421         case RANAL_MSG_IMMEDIATE:
1422         case RANAL_MSG_PUT_NAK:
1423         case RANAL_MSG_PUT_DONE:
1424         case RANAL_MSG_GET_NAK:
1425         case RANAL_MSG_GET_DONE:
1426                 rc = kranal_sendmsg(conn, &tx->tx_msg,
1427                                     tx->tx_buffer, tx->tx_nob);
1428                 expect_reply = 0;
1429                 break;
1430                 
1431         case RANAL_MSG_PUT_REQ:
1432                 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1433                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1434                 kranal_map_buffer(tx);
1435                 expect_reply = 1;
1436                 break;
1437
1438         case RANAL_MSG_PUT_ACK:
1439                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1440                 expect_reply = 1;
1441                 break;
1442
1443         case RANAL_MSG_GET_REQ:
1444                 kranal_map_buffer(tx);
1445                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1446                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1447                 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits = 
1448                         (__u64)((unsigned long)tx->tx_buffer);
1449                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1450                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1451                 expect_reply = 1;
1452                 break;
1453         }
1454
1455         if (rc == -EAGAIN) {
1456                 /* I need credits to send this.  Replace tx at the head of the
1457                  * fmaq and I'll get rescheduled when credits appear */
1458                 spin_lock_irqsave(&conn->rac_lock, flags);
1459                 list_add(&tx->tx_list, &conn->rac_fmaq);
1460                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1461                 return;
1462         }
1463
1464         LASSERT (rc == 0);
1465         
1466         if (!expect_reply) {
1467                 kranal_tx_done(tx, 0);
1468         } else {
1469                 spin_lock_irqsave(&conn->rac_lock, flags);
1470                 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1471                 tx->tx_qtime = jiffies;
1472                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1473         }
1474
1475         if (more_to_do)
1476                 kranal_schedule_conn(conn);
1477 }
1478
1479 static inline void
1480 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1481 {
1482         __swab64s(&d->rard_key.Key);
1483         __swab16s(&d->rard_key.Cookie);
1484         __swab16s(&d->rard_key.MdHandle);
1485         __swab32s(&d->rard_key.Flags);
1486         __swab64s(&d->rard_addr.AddressBits);
1487         __swab32s(&d->rard_nob);
1488 }
1489
1490 kra_tx_t *
1491 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1492 {
1493         struct list_head *ttmp;
1494         kra_tx_t         *tx;
1495         
1496         list_for_each(ttmp, &conn->rac_replyq) {
1497                 tx = list_entry(ttmp, kra_tx_t, tx_list);
1498                 
1499                 if (tx->tx_cookie != cookie)
1500                         continue;
1501                 
1502                 if (tx->tx_msg.ram_type != type) {
1503                         CWARN("Unexpected type %x (%x expected) "
1504                               "matched reply from "LPX64"\n",
1505                               tx->tx_msg.ram_type, type,
1506                               conn->rac_peer->rap_nid);
1507                         return NULL;
1508                 }
1509         }
1510         
1511         CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid);
1512         return NULL;
1513 }
1514
1515 void
1516 kranal_check_fma_rx (kra_conn_t *conn)
1517 {
1518         unsigned long flags;
1519         __u32         seq;
1520         kra_tx_t     *tx;
1521         kra_msg_t    *msg;
1522         void         *prefix;
1523         RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1524         kra_peer_t   *peer = conn->rac_peer;
1525
1526         if (rrc == RAP_NOT_DONE)
1527                 return;
1528         
1529         LASSERT (rrc == RAP_SUCCESS);
1530         conn->rac_last_rx = jiffies;
1531         seq = conn->rac_rx_seq++;
1532         msg = (kra_msg_t *)prefix;
1533
1534         if (msg->ram_magic != RANAL_MSG_MAGIC) {
1535                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1536                         CERROR("Unexpected magic %08x from "LPX64"\n",
1537                                msg->ram_magic, peer->rap_nid);
1538                         goto out;
1539                 }
1540
1541                 __swab32s(&msg->ram_magic);
1542                 __swab16s(&msg->ram_version);
1543                 __swab16s(&msg->ram_type);
1544                 __swab64s(&msg->ram_srcnid);
1545                 __swab64s(&msg->ram_connstamp);
1546                 __swab32s(&msg->ram_seq);
1547
1548                 /* NB message type checked below; NOT here... */
1549                 switch (msg->ram_type) {
1550                 case RANAL_MSG_PUT_ACK:
1551                         kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1552                         break;
1553
1554                 case RANAL_MSG_GET_REQ:
1555                         kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1556                         break;
1557                         
1558                 default:
1559                         break;
1560                 }
1561         }
1562
1563         if (msg->ram_version != RANAL_MSG_VERSION) {
1564                 CERROR("Unexpected protocol version %d from "LPX64"\n",
1565                        msg->ram_version, peer->rap_nid);
1566                 goto out;
1567         }
1568
1569         if (msg->ram_srcnid != peer->rap_nid) {
1570                 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1571                        msg->ram_srcnid, peer->rap_nid);
1572                 goto out;
1573         }
1574         
1575         if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1576                 CERROR("Unexpected connstamp "LPX64"("LPX64
1577                        " expected) from "LPX64"\n",
1578                        msg->ram_connstamp, conn->rac_peer_connstamp,
1579                        peer->rap_nid);
1580                 goto out;
1581         }
1582         
1583         if (msg->ram_seq != seq) {
1584                 CERROR("Unexpected sequence number %d(%d expected) from "
1585                        LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1586                 goto out;
1587         }
1588
1589         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1590                 /* This message signals RDMA completion... */
1591                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1592                 LASSERT (rrc == RAP_SUCCESS);
1593         }
1594
1595         if (conn->rac_close_recvd) {
1596                 CERROR("Unexpected message %d after CLOSE from "LPX64"\n", 
1597                        msg->ram_type, conn->rac_peer->rap_nid);
1598                 goto out;
1599         }
1600
1601         if (msg->ram_type == RANAL_MSG_CLOSE) {
1602                 conn->rac_close_recvd = 1;
1603                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1604
1605                 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1606                         kranal_close_conn_locked(conn, 0);
1607                 else if (conn->rac_close_sent)
1608                         kranal_terminate_conn_locked(conn);
1609
1610                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1611                 goto out;
1612         }
1613
1614         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1615                 goto out;
1616         
1617         conn->rac_rxmsg = msg;                  /* stash message for portals callbacks */
1618                                                 /* they'll NULL rac_rxmsg if they consume it */
1619         switch (msg->ram_type) {
1620         case RANAL_MSG_NOOP:
1621                 /* Nothing to do; just a keepalive */
1622                 break;
1623                 
1624         case RANAL_MSG_IMMEDIATE:
1625                 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1626                 break;
1627                 
1628         case RANAL_MSG_PUT_REQ:
1629                 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1630
1631                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1632                         break;
1633
1634                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1635                 if (tx == NULL)
1636                         break;
1637                 
1638                 tx->tx_msg.ram_u.completion.racm_cookie = 
1639                         msg->ram_u.putreq.raprm_cookie;
1640                 kranal_post_fma(conn, tx);
1641                 break;
1642
1643         case RANAL_MSG_PUT_NAK:
1644                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1645                                         msg->ram_u.completion.racm_cookie);
1646                 if (tx == NULL)
1647                         break;
1648                 
1649                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1650                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1651                 kranal_tx_done(tx, -ENOENT);    /* no match */
1652                 break;
1653                 
1654         case RANAL_MSG_PUT_ACK:
1655                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1656                                         msg->ram_u.putack.rapam_src_cookie);
1657                 if (tx == NULL)
1658                         break;
1659
1660                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1661                             &msg->ram_u.putack.rapam_desc, 
1662                             msg->ram_u.putack.rapam_desc.rard_nob,
1663                             msg->ram_u.putack.rapam_dst_cookie);
1664                 break;
1665
1666         case RANAL_MSG_PUT_DONE:
1667                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1668                                         msg->ram_u.completion.racm_cookie);
1669                 if (tx == NULL)
1670                         break;
1671
1672                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1673                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1674                 kranal_tx_done(tx, 0);
1675                 break;
1676
1677         case RANAL_MSG_GET_REQ:
1678                 lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
1679                 
1680                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1681                         break;
1682
1683                 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1684                 if (tx == NULL)
1685                         break;
1686
1687                 tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
1688                 kranal_post_fma(conn, tx);
1689                 break;
1690                 
1691         case RANAL_MSG_GET_NAK:
1692                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1693                                         msg->ram_u.completion.racm_cookie);
1694                 if (tx == NULL)
1695                         break;
1696                 
1697                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1698                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1699                 kranal_tx_done(tx, -ENOENT);    /* no match */
1700                 break;
1701                 
1702         case RANAL_MSG_GET_DONE:
1703                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1704                                         msg->ram_u.completion.racm_cookie);
1705                 if (tx == NULL)
1706                         break;
1707                 
1708                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1709                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1710                 kranal_tx_done(tx, 0);
1711                 break;
1712         }
1713
1714  out:
1715         if (conn->rac_rxmsg != NULL)
1716                 kranal_consume_rxmsg(conn, NULL, 0);
1717
1718         /* check again later */
1719         kranal_schedule_conn(conn);
1720 }
1721
1722 void
1723 kranal_complete_closed_conn (kra_conn_t *conn) 
1724 {
1725         kra_tx_t   *tx;
1726
1727         LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1728
1729         while (!list_empty(&conn->rac_fmaq)) {
1730                 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1731                 
1732                 list_del(&tx->tx_list);
1733                 kranal_tx_done(tx, -ECONNABORTED);
1734         }
1735         
1736         LASSERT (list_empty(&conn->rac_rdmaq));
1737
1738         while (!list_empty(&conn->rac_replyq)) {
1739                 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1740                 
1741                 list_del(&tx->tx_list);
1742                 kranal_tx_done(tx, -ECONNABORTED);
1743         }
1744 }
1745
1746 int
1747 kranal_scheduler (void *arg)
1748 {
1749         kra_device_t   *dev = (kra_device_t *)arg;
1750         wait_queue_t    wait;
1751         char            name[16];
1752         kra_conn_t     *conn;
1753         unsigned long   flags;
1754         int             busy_loops = 0;
1755
1756         snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1757         kportal_daemonize(name);
1758         kportal_blockallsigs();
1759
1760         dev->rad_scheduler = current;
1761         init_waitqueue_entry(&wait, current);
1762
1763         spin_lock_irqsave(&dev->rad_lock, flags);
1764
1765         while (!kranal_data.kra_shutdown) {
1766                 /* Safe: kra_shutdown only set when quiescent */
1767                 
1768                 if (busy_loops++ >= RANAL_RESCHED) {
1769                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1770
1771                         our_cond_resched();
1772                         busy_loops = 0;
1773
1774                         spin_lock_irqsave(&dev->rad_lock, flags);
1775                 }
1776
1777                 if (dev->rad_ready) {
1778                         /* Device callback fired since I last checked it */
1779                         dev->rad_ready = 0;
1780                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1781
1782                         kranal_check_rdma_cq(dev);
1783                         kranal_check_fma_cq(dev);
1784
1785                         spin_lock_irqsave(&dev->rad_lock, flags);
1786                 }
1787                 
1788                 if (!list_empty(&dev->rad_connq)) {
1789                         /* Connection needs attention */
1790                         conn = list_entry(dev->rad_connq.next,
1791                                           kra_conn_t, rac_schedlist);
1792                         list_del_init(&conn->rac_schedlist);
1793                         LASSERT (conn->rac_scheduled);
1794                         conn->rac_scheduled = 0;
1795                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1796
1797                         kranal_check_fma_rx(conn);
1798                         kranal_process_fmaq(conn);
1799
1800                         if (conn->rac_state == RANAL_CONN_CLOSED)
1801                                 kranal_complete_closed_conn(conn);
1802
1803                         kranal_conn_decref(conn);
1804                         
1805                         spin_lock_irqsave(&dev->rad_lock, flags);
1806                         continue;
1807                 }
1808
1809                 add_wait_queue(&dev->rad_waitq, &wait);
1810                 set_current_state(TASK_INTERRUPTIBLE);
1811
1812                 spin_unlock_irqrestore(&dev->rad_lock, flags);
1813
1814                 busy_loops = 0;
1815                 schedule();
1816
1817                 set_current_state(TASK_RUNNING);
1818                 remove_wait_queue(&dev->rad_waitq, &wait);
1819
1820                 spin_lock_irqsave(&dev->rad_lock, flags);
1821         }
1822
1823         spin_unlock_irqrestore(&dev->rad_lock, flags);
1824
1825         dev->rad_scheduler = NULL;
1826         kranal_thread_fini();
1827         return 0;
1828 }
1829
1830
1831 lib_nal_t kranal_lib = {
1832         libnal_data:        &kranal_data,      /* NAL private data */
1833         libnal_send:         kranal_send,
1834         libnal_send_pages:   kranal_send_pages,
1835         libnal_recv:         kranal_recv,
1836         libnal_recv_pages:   kranal_recv_pages,
1837         libnal_dist:         kranal_dist
1838 };