Whamcloud - gitweb
* ranal debugging and lconf support
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "ranal.h"
25
26 int
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
28 {
29         /* I would guess that if kranal_get_peer (nid) == NULL,
30            and we're not routing, then 'nid' is very distant :) */
31         if ( nal->libnal_ni.ni_pid.nid == nid ) {
32                 *dist = 0;
33         } else {
34                 *dist = 1;
35         }
36
37         return 0;
38 }
39
40 void
41 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
42 {
43         kra_device_t *dev;
44         int           i;
45         unsigned long flags;
46         
47         for (i = 0; i < kranal_data.kra_ndevs; i++) {
48
49                 dev = &kranal_data.kra_devices[i];
50                 if (dev->rad_id != devid)
51                         continue;
52
53                 spin_lock_irqsave(&dev->rad_lock, flags);
54
55                 if (!dev->rad_ready) {
56                         dev->rad_ready = 1;
57                         wake_up(&dev->rad_waitq);
58                 }
59
60                 spin_unlock_irqrestore(&dev->rad_lock, flags);
61                 return;
62         }
63         
64         CWARN("callback for unknown device %d\n", devid);
65 }
66
67 void
68 kranal_schedule_conn(kra_conn_t *conn)
69 {
70         kra_device_t    *dev = conn->rac_device;
71         unsigned long    flags;
72         
73         spin_lock_irqsave(&dev->rad_lock, flags);
74         
75         if (!conn->rac_scheduled) {
76                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
77                 conn->rac_scheduled = 1;
78                 list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
79                 wake_up(&dev->rad_waitq);
80         }
81
82         spin_unlock_irqrestore(&dev->rad_lock, flags);
83 }
84
85 kra_tx_t *
86 kranal_get_idle_tx (int may_block) 
87 {
88         unsigned long  flags;
89         kra_tx_t      *tx = NULL;
90         
91         for (;;) {
92                 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
93
94                 /* "normal" descriptor is free */
95                 if (!list_empty(&kranal_data.kra_idle_txs)) {
96                         tx = list_entry(kranal_data.kra_idle_txs.next,
97                                         kra_tx_t, tx_list);
98                         break;
99                 }
100
101                 if (!may_block) {
102                         /* may dip into reserve pool */
103                         if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
104                                 CERROR("reserved tx desc pool exhausted\n");
105                                 break;
106                         }
107
108                         tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
109                                         kra_tx_t, tx_list);
110                         break;
111                 }
112
113                 /* block for idle tx */
114                 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
115
116                 wait_event(kranal_data.kra_idle_tx_waitq,
117                            !list_empty(&kranal_data.kra_idle_txs));
118         }
119
120         if (tx != NULL) {
121                 list_del(&tx->tx_list);
122
123                 /* Allocate a new completion cookie.  It might not be
124                  * needed, but we've got a lock right now... */
125                 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
126
127                 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
128                 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
129                 LASSERT (tx->tx_conn == NULL);
130                 LASSERT (tx->tx_libmsg[0] == NULL);
131                 LASSERT (tx->tx_libmsg[1] == NULL);
132         }
133
134         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
135         
136         return tx;
137 }
138
139 void
140 kranal_init_msg(kra_msg_t *msg, int type)
141 {
142         msg->ram_magic = RANAL_MSG_MAGIC;
143         msg->ram_version = RANAL_MSG_VERSION;
144         msg->ram_type = type;
145         msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
146         /* ram_connstamp gets set when FMA is sent */
147 }
148
149 kra_tx_t *
150 kranal_new_tx_msg (int may_block, int type)
151 {
152         kra_tx_t *tx = kranal_get_idle_tx(may_block);
153
154         if (tx == NULL)
155                 return NULL;
156
157         kranal_init_msg(&tx->tx_msg, type);
158         return tx;
159 }
160
161 int
162 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
163                                int offset, int nob)
164                  
165 {
166         /* For now this is almost identical to kranal_setup_virt_buffer, but we
167          * could "flatten" the payload into a single contiguous buffer ready
168          * for sending direct over an FMA if we ever needed to. */
169
170         LASSERT (nob > 0);
171         LASSERT (niov > 0);
172         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
173
174         while (offset >= iov->iov_len) {
175                 offset -= iov->iov_len;
176                 niov--;
177                 iov++;
178                 LASSERT (niov > 0);
179         }
180
181         if (nob > iov->iov_len - offset) {
182                 CERROR("Can't handle multiple vaddr fragments\n");
183                 return -EMSGSIZE;
184         }
185
186         tx->tx_buftype = RANAL_BUF_IMMEDIATE;
187         tx->tx_nob = nob;
188         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
189         return 0;
190 }
191
192 int
193 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, 
194                           int offset, int nob)
195                  
196 {
197         LASSERT (nob > 0);
198         LASSERT (niov > 0);
199         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
200
201         while (offset >= iov->iov_len) {
202                 offset -= iov->iov_len;
203                 niov--;
204                 iov++;
205                 LASSERT (niov > 0);
206         }
207
208         if (nob > iov->iov_len - offset) {
209                 CERROR("Can't handle multiple vaddr fragments\n");
210                 return -EMSGSIZE;
211         }
212
213         tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
214         tx->tx_nob = nob;
215         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
216         return 0;
217 }
218
219 int
220 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
221                           int offset, int nob)
222 {
223         RAP_PHYS_REGION *phys = tx->tx_phys;
224         int              resid;
225
226         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
227
228         LASSERT (nob > 0);
229         LASSERT (nkiov > 0);
230         LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
231
232         while (offset >= kiov->kiov_len) {
233                 offset -= kiov->kiov_len;
234                 nkiov--;
235                 kiov++;
236                 LASSERT (nkiov > 0);
237         }
238
239         tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
240         tx->tx_nob = nob;
241         tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
242         
243         phys->Address = kranal_page2phys(kiov->kiov_page);
244         phys++;
245
246         resid = nob - (kiov->kiov_len - offset);
247         while (resid > 0) {
248                 kiov++;
249                 nkiov--;
250                 LASSERT (nkiov > 0);
251
252                 if (kiov->kiov_offset != 0 ||
253                     ((resid > PAGE_SIZE) && 
254                      kiov->kiov_len < PAGE_SIZE)) {
255                         /* Can't have gaps */
256                         CERROR("Can't make payload contiguous in I/O VM:"
257                                "page %d, offset %d, len %d \n", 
258                                (int)(phys - tx->tx_phys), 
259                                kiov->kiov_offset, kiov->kiov_len);                        
260                         return -EINVAL;
261                 }
262
263                 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
264                         CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
265                         return -EMSGSIZE;
266                 }
267
268                 phys->Address = kranal_page2phys(kiov->kiov_page);
269                 phys++;
270
271                 resid -= PAGE_SIZE;
272         }
273
274         tx->tx_phys_npages = phys - tx->tx_phys;
275         return 0;
276 }
277
278 static inline int
279 kranal_setup_rdma_buffer (kra_tx_t *tx, int niov, 
280                           struct iovec *iov, ptl_kiov_t *kiov,
281                           int offset, int nob)
282 {
283         LASSERT ((iov == NULL) != (kiov == NULL));
284         
285         if (kiov != NULL)
286                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
287         
288         return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
289 }
290
291 void
292 kranal_map_buffer (kra_tx_t *tx)
293 {
294         kra_conn_t     *conn = tx->tx_conn;
295         kra_device_t   *dev = conn->rac_device;
296         RAP_RETURN      rrc;
297
298         LASSERT (current == dev->rad_scheduler);
299
300         switch (tx->tx_buftype) {
301         default:
302                 LBUG();
303                 
304         case RANAL_BUF_NONE:
305         case RANAL_BUF_IMMEDIATE:
306         case RANAL_BUF_PHYS_MAPPED:
307         case RANAL_BUF_VIRT_MAPPED:
308                 break;
309                 
310         case RANAL_BUF_PHYS_UNMAPPED:
311                 rrc = RapkRegisterPhys(dev->rad_handle,
312                                        tx->tx_phys, tx->tx_phys_npages,
313                                        &tx->tx_map_key);
314                 LASSERT (rrc == RAP_SUCCESS);
315                 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
316                 break;
317
318         case RANAL_BUF_VIRT_UNMAPPED:
319                 rrc = RapkRegisterMemory(dev->rad_handle,
320                                          tx->tx_buffer, tx->tx_nob,
321                                          &tx->tx_map_key);
322                 LASSERT (rrc == RAP_SUCCESS);
323                 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
324                 break;
325         }
326 }
327
328 void
329 kranal_unmap_buffer (kra_tx_t *tx)
330 {
331         kra_device_t   *dev;
332         RAP_RETURN      rrc;
333
334         switch (tx->tx_buftype) {
335         default:
336                 LBUG();
337                 
338         case RANAL_BUF_NONE:
339         case RANAL_BUF_IMMEDIATE:
340         case RANAL_BUF_PHYS_UNMAPPED:
341         case RANAL_BUF_VIRT_UNMAPPED:
342                 break;
343                 
344         case RANAL_BUF_PHYS_MAPPED:
345                 LASSERT (tx->tx_conn != NULL);
346                 dev = tx->tx_conn->rac_device;
347                 LASSERT (current == dev->rad_scheduler);
348                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
349                                            &tx->tx_map_key);
350                 LASSERT (rrc == RAP_SUCCESS);
351                 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
352                 break;
353
354         case RANAL_BUF_VIRT_MAPPED:
355                 LASSERT (tx->tx_conn != NULL);
356                 dev = tx->tx_conn->rac_device;
357                 LASSERT (current == dev->rad_scheduler);
358                 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
359                                            &tx->tx_map_key);
360                 LASSERT (rrc == RAP_SUCCESS);
361                 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
362                 break;
363         }
364 }
365
366 void
367 kranal_tx_done (kra_tx_t *tx, int completion)
368 {
369         ptl_err_t        ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
370         unsigned long    flags;
371         int              i;
372
373         LASSERT (!in_interrupt());
374
375         kranal_unmap_buffer(tx);
376
377         for (i = 0; i < 2; i++) {
378                 /* tx may have up to 2 libmsgs to finalise */
379                 if (tx->tx_libmsg[i] == NULL)
380                         continue;
381
382                 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
383                 tx->tx_libmsg[i] = NULL;
384         }
385
386         tx->tx_buftype = RANAL_BUF_NONE;
387         tx->tx_msg.ram_type = RANAL_MSG_NONE;
388         tx->tx_conn = NULL;
389
390         spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
391
392         if (tx->tx_isnblk) {
393                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
394         } else {
395                 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
396                 wake_up(&kranal_data.kra_idle_tx_waitq);
397         }
398
399         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
400 }
401
402 kra_conn_t *
403 kranal_find_conn_locked (kra_peer_t *peer)
404 {
405         struct list_head *tmp;
406
407         /* just return the first connection */
408         list_for_each (tmp, &peer->rap_conns) {
409                 return list_entry(tmp, kra_conn_t, rac_list);
410         }
411
412         return NULL;
413 }
414
415 void
416 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
417 {
418         unsigned long    flags;
419
420         tx->tx_conn = conn;
421
422         spin_lock_irqsave(&conn->rac_lock, flags);
423         list_add_tail(&tx->tx_list, &conn->rac_fmaq);
424         tx->tx_qtime = jiffies;
425         spin_unlock_irqrestore(&conn->rac_lock, flags);
426
427         kranal_schedule_conn(conn);
428 }
429
430 void
431 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
432 {
433         unsigned long    flags;
434         kra_peer_t      *peer;
435         kra_conn_t      *conn;
436         unsigned long    now;
437         rwlock_t        *g_lock = &kranal_data.kra_global_lock;
438
439         /* If I get here, I've committed to send, so I complete the tx with
440          * failure on any problems */
441         
442         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
443
444         read_lock(g_lock);
445         
446         peer = kranal_find_peer_locked(nid);
447         if (peer == NULL) {
448                 read_unlock(g_lock);
449                 kranal_tx_done(tx, -EHOSTUNREACH);
450                 return;
451         }
452
453         conn = kranal_find_conn_locked(peer);
454         if (conn != NULL) {
455                 kranal_post_fma(conn, tx);
456                 read_unlock(g_lock);
457                 return;
458         }
459         
460         /* Making one or more connections; I'll need a write lock... */
461         read_unlock(g_lock);
462         write_lock_irqsave(g_lock, flags);
463
464         peer = kranal_find_peer_locked(nid);
465         if (peer == NULL) {
466                 write_unlock_irqrestore(g_lock, flags);
467                 kranal_tx_done(tx, -EHOSTUNREACH);
468                 return;
469         }
470
471         conn = kranal_find_conn_locked(peer);
472         if (conn != NULL) {
473                 /* Connection exists; queue message on it */
474                 kranal_post_fma(conn, tx);
475                 write_unlock_irqrestore(g_lock, flags);
476                 return;
477         }
478
479         LASSERT (peer->rap_persistence > 0);
480
481         if (!peer->rap_connecting) {
482                 LASSERT (list_empty(&peer->rap_tx_queue));
483                 
484                 now = CURRENT_SECONDS;
485                 if (now < peer->rap_reconnect_time) {
486                         write_unlock_irqrestore(g_lock, flags);
487                         kranal_tx_done(tx, -EHOSTUNREACH);
488                         return;
489                 }
490         
491                 peer->rap_connecting = 1;
492                 kranal_peer_addref(peer); /* extra ref for connd */
493         
494                 spin_lock(&kranal_data.kra_connd_lock);
495         
496                 list_add_tail(&peer->rap_connd_list,
497                               &kranal_data.kra_connd_peers);
498                 wake_up(&kranal_data.kra_connd_waitq);
499         
500                 spin_unlock(&kranal_data.kra_connd_lock);
501         }
502         
503         /* A connection is being established; queue the message... */
504         list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
505
506         write_unlock_irqrestore(g_lock, flags);
507 }
508
509 void
510 kranal_rdma(kra_tx_t *tx, int type, 
511             kra_rdma_desc_t *sink, int nob, __u64 cookie)
512 {
513         kra_conn_t   *conn = tx->tx_conn;
514         RAP_RETURN    rrc;
515         unsigned long flags;
516
517         LASSERT (kranal_tx_mapped(tx));
518         LASSERT (nob <= sink->rard_nob);
519         LASSERT (nob <= tx->tx_nob);
520
521         /* No actual race with scheduler sending CLOSE (I'm she!) */
522         LASSERT (current == conn->rac_device->rad_scheduler);
523         
524         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
525         tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
526         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
527         tx->tx_rdma_desc.DstPtr = sink->rard_addr;
528         tx->tx_rdma_desc.DstKey = sink->rard_key;
529         tx->tx_rdma_desc.Length = nob;
530         tx->tx_rdma_desc.AppPtr = tx;
531
532         /* prep final completion message */
533         kranal_init_msg(&tx->tx_msg, type);
534         tx->tx_msg.ram_u.completion.racm_cookie = cookie;
535
536         if (nob == 0) { /* Immediate completion */
537                 kranal_post_fma(conn, tx);
538                 return;
539         }
540
541         LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
542
543         rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
544         LASSERT (rrc == RAP_SUCCESS);
545
546         spin_lock_irqsave(&conn->rac_lock, flags);
547         list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
548         tx->tx_qtime = jiffies;
549         spin_unlock_irqrestore(&conn->rac_lock, flags);
550 }
551
552 int
553 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
554 {
555         __u32      nob_received = nob;
556         RAP_RETURN rrc;
557
558         LASSERT (conn->rac_rxmsg != NULL);
559
560         rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
561                              &nob_received, sizeof(kra_msg_t));
562         LASSERT (rrc == RAP_SUCCESS);
563
564         conn->rac_rxmsg = NULL;
565
566         if (nob_received != nob) {
567                 CWARN("Expected %d immediate bytes but got %d\n",
568                       nob, nob_received);
569                 return -EPROTO;
570         }
571         
572         return 0;
573 }
574
575 ptl_err_t
576 kranal_do_send (lib_nal_t    *nal, 
577                 void         *private,
578                 lib_msg_t    *libmsg,
579                 ptl_hdr_t    *hdr, 
580                 int           type, 
581                 ptl_nid_t     nid, 
582                 ptl_pid_t     pid,
583                 unsigned int  niov, 
584                 struct iovec *iov, 
585                 ptl_kiov_t   *kiov,
586                 int           offset,
587                 int           nob)
588 {
589         kra_conn_t *conn;
590         kra_tx_t   *tx;
591         int         rc;
592
593         /* NB 'private' is different depending on what we're sending.... */
594
595         CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64" pid %d\n",
596                nob, niov, nid, pid);
597
598         LASSERT (nob == 0 || niov > 0);
599         LASSERT (niov <= PTL_MD_MAX_IOV);
600
601         LASSERT (!in_interrupt());
602         /* payload is either all vaddrs or all pages */
603         LASSERT (!(kiov != NULL && iov != NULL));
604
605         switch(type) {
606         default:
607                 LBUG();
608                 
609         case PTL_MSG_REPLY: {
610                 /* reply's 'private' is the conn that received the GET_REQ */
611                 conn = private;
612                 LASSERT (conn->rac_rxmsg != NULL);
613
614                 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
615                         if (nob > RANAL_FMA_MAX_DATA) {
616                                 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
617                                        nob, nid);
618                                 return PTL_FAIL;
619                         }
620                         break;                  /* RDMA not expected */
621                 }
622                 
623                 /* Incoming message consistent with immediate reply? */
624                 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
625                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
626                                nid, conn->rac_rxmsg->ram_type);
627                         return PTL_FAIL;
628                 }
629
630                 tx = kranal_get_idle_tx(0);
631                 if (tx == NULL)
632                         return PTL_FAIL;
633
634                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
635                 if (rc != 0) {
636                         kranal_tx_done(tx, rc);
637                         return PTL_FAIL;
638                 }
639
640                 tx->tx_conn = conn;
641                 tx->tx_libmsg[0] = libmsg;
642
643                 kranal_map_buffer(tx);
644                 kranal_rdma(tx, RANAL_MSG_GET_DONE,
645                             &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
646                             conn->rac_rxmsg->ram_u.get.ragm_cookie);
647                 return PTL_OK;
648         }
649
650         case PTL_MSG_GET:
651                 LASSERT (niov == 0);
652                 LASSERT (nob == 0);
653                 /* We have to consider the eventual sink buffer rather than any
654                  * payload passed here (there isn't any, and strictly, looking
655                  * inside libmsg is a layering violation).  We send a simple
656                  * IMMEDIATE GET if the sink buffer is mapped already and small
657                  * enough for FMA */
658
659                 if ((libmsg->md->options & PTL_MD_KIOV) == 0 &&
660                     libmsg->md->length <= RANAL_FMA_MAX_DATA &&
661                     libmsg->md->length <= kranal_tunables.kra_max_immediate)
662                         break;
663
664                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
665                 if (tx == NULL)
666                         return PTL_NO_SPACE;
667
668                 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
669                         rc = kranal_setup_virt_buffer(tx, libmsg->md->md_niov,
670                                                       libmsg->md->md_iov.iov,
671                                                       0, libmsg->md->length);
672                 else
673                         rc = kranal_setup_phys_buffer(tx, libmsg->md->md_niov,
674                                                       libmsg->md->md_iov.kiov,
675                                                       0, libmsg->md->length);
676                 if (rc != 0) {
677                         kranal_tx_done(tx, rc);
678                         return PTL_FAIL;
679                 }
680
681                 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
682                 if (tx->tx_libmsg[1] == NULL) {
683                         CERROR("Can't create reply for GET to "LPX64"\n", nid);
684                         kranal_tx_done(tx, rc);
685                         return PTL_FAIL;
686                 }
687
688                 tx->tx_libmsg[0] = libmsg;
689                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
690                 /* rest of tx_msg is setup just before it is sent */
691                 kranal_launch_tx(tx, nid);
692                 return PTL_OK;
693
694         case PTL_MSG_ACK:
695                 LASSERT (nob == 0);
696                 break;
697
698         case PTL_MSG_PUT:
699                 if (kiov == NULL &&             /* not paged */
700                     nob <= RANAL_MAX_IMMEDIATE && /* small enough */
701                     nob <= kranal_tunables.kra_max_immediate)
702                         break;                  /* send IMMEDIATE */
703                 
704                 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
705                 if (tx == NULL)
706                         return PTL_NO_SPACE;
707
708                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
709                 if (rc != 0) {
710                         kranal_tx_done(tx, rc);
711                         return PTL_FAIL;
712                 }
713
714                 tx->tx_libmsg[0] = libmsg;
715                 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
716                 /* rest of tx_msg is setup just before it is sent */
717                 kranal_launch_tx(tx, nid);
718                 return PTL_OK;
719         }
720
721         LASSERT (kiov == NULL);
722         LASSERT (nob <= RANAL_MAX_IMMEDIATE);
723
724         tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
725                                  type == PTL_MSG_REPLY ||
726                                  in_interrupt()), 
727                                RANAL_MSG_IMMEDIATE);
728         if (tx == NULL)
729                 return PTL_NO_SPACE;
730
731         rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
732         if (rc != 0) {
733                 kranal_tx_done(tx, rc);
734                 return PTL_FAIL;
735         }
736                 
737         tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
738         tx->tx_libmsg[0] = libmsg;
739         kranal_launch_tx(tx, nid);
740         return PTL_OK;
741 }
742
743 ptl_err_t
744 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
745              ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
746              unsigned int niov, struct iovec *iov,
747              size_t offset, size_t len)
748 {
749         return kranal_do_send(nal, private, cookie,
750                               hdr, type, nid, pid,
751                               niov, iov, NULL,
752                               offset, len);
753 }
754
755 ptl_err_t
756 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
757                    ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
758                    unsigned int niov, ptl_kiov_t *kiov, 
759                    size_t offset, size_t len)
760 {
761         return kranal_do_send(nal, private, cookie,
762                               hdr, type, nid, pid,
763                               niov, NULL, kiov,
764                               offset, len);
765 }
766
767 ptl_err_t
768 kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
769                 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
770                 int offset, int mlen, int rlen)
771 {
772         kra_conn_t  *conn = private;
773         kra_msg_t   *rxmsg = conn->rac_rxmsg;
774         kra_tx_t    *tx;
775         void        *buffer;
776         int          rc;
777         
778         LASSERT (mlen <= rlen);
779         LASSERT (!in_interrupt());
780         /* Either all pages or all vaddrs */
781         LASSERT (!(kiov != NULL && iov != NULL));
782
783         switch(rxmsg->ram_type) {
784         default:
785                 LBUG();
786                 return PTL_FAIL;
787                 
788         case RANAL_MSG_IMMEDIATE:
789                 if (mlen == 0) {
790                         buffer = NULL;
791                 } else if (kiov != NULL) {
792                         CERROR("Can't recv immediate into paged buffer\n");
793                         return PTL_FAIL;
794                 } else {
795                         LASSERT (niov > 0);
796                         while (offset >= iov->iov_len) {
797                                 offset -= iov->iov_len;
798                                 iov++;
799                                 niov--;
800                                 LASSERT (niov > 0);
801                         }
802                         if (mlen > iov->iov_len - offset) {
803                                 CERROR("Can't handle immediate frags\n");
804                                 return PTL_FAIL;
805                         }
806                         buffer = ((char *)iov->iov_base) + offset;
807                 }
808                 rc = kranal_consume_rxmsg(conn, buffer, mlen);
809                 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
810                 return PTL_OK;
811
812         case RANAL_MSG_GET_REQ:
813                 /* If the GET matched, we've already handled it in
814                  * kranal_do_send which is called to send the REPLY.  We're
815                  * only called here to complete the GET receive (if we needed
816                  * it which we don't, but I digress...) */
817                 LASSERT (libmsg == NULL);
818                 lib_finalize(nal, NULL, libmsg, PTL_OK);
819                 return PTL_OK;
820
821         case RANAL_MSG_PUT_REQ:
822                 if (libmsg == NULL) {           /* PUT didn't match... */
823                         lib_finalize(nal, NULL, libmsg, PTL_OK);
824                         return PTL_OK;
825                 }
826                 
827                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
828                 if (tx == NULL)
829                         return PTL_NO_SPACE;
830
831                 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
832                 if (rc != 0) {
833                         kranal_tx_done(tx, rc);
834                         return PTL_FAIL;
835                 }
836
837                 tx->tx_conn = conn;
838                 kranal_map_buffer(tx);
839                 
840                 tx->tx_msg.ram_u.putack.rapam_src_cookie = 
841                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
842                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
843                 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
844                 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits = 
845                         (__u64)((unsigned long)tx->tx_buffer);
846                 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
847
848                 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
849
850                 kranal_post_fma(conn, tx);
851                 
852                 /* flag matched by consuming rx message */
853                 kranal_consume_rxmsg(conn, NULL, 0);
854                 return PTL_OK;
855         }
856 }
857
858 ptl_err_t
859 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
860              unsigned int niov, struct iovec *iov, 
861              size_t offset, size_t mlen, size_t rlen)
862 {
863         return kranal_do_recv(nal, private, msg, niov, iov, NULL,
864                               offset, mlen, rlen);
865 }
866
867 ptl_err_t
868 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
869                    unsigned int niov, ptl_kiov_t *kiov, 
870                    size_t offset, size_t mlen, size_t rlen)
871 {
872         return kranal_do_recv(nal, private, msg, niov, NULL, kiov,
873                               offset, mlen, rlen);
874 }
875
876 int
877 kranal_thread_start (int(*fn)(void *arg), void *arg)
878 {
879         long    pid = kernel_thread(fn, arg, 0);
880
881         if (pid < 0)
882                 return(int)pid;
883
884         atomic_inc(&kranal_data.kra_nthreads);
885         return 0;
886 }
887
888 void
889 kranal_thread_fini (void)
890 {
891         atomic_dec(&kranal_data.kra_nthreads);
892 }
893
894 int
895 kranal_check_conn_timeouts (kra_conn_t *conn)
896 {
897         kra_tx_t          *tx;
898         struct list_head  *ttmp;
899         unsigned long      flags;
900         long               timeout;
901         unsigned long      now = jiffies;
902
903         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
904                  conn->rac_state == RANAL_CONN_CLOSING);
905
906         if (!conn->rac_close_sent &&
907             time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
908                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
909                 kranal_schedule_conn(conn);
910         }
911
912         timeout = conn->rac_timeout * HZ;
913
914         if (!conn->rac_close_recvd &&
915             time_after_eq(now, conn->rac_last_rx + timeout)) {
916                 CERROR("Nothing received from "LPX64" within %lu seconds\n",
917                        conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
918                 return -ETIMEDOUT;
919         }
920
921         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
922                 return 0;
923         
924         /* Check the conn's queues are moving.  These are "belt+braces" checks,
925          * in case of hardware/software errors that make this conn seem
926          * responsive even though it isn't progressing its message queues. */
927
928         spin_lock_irqsave(&conn->rac_lock, flags);
929
930         list_for_each (ttmp, &conn->rac_fmaq) {
931                 tx = list_entry(ttmp, kra_tx_t, tx_list);
932                 
933                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
934                         spin_unlock_irqrestore(&conn->rac_lock, flags);
935                         CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
936                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
937                         return -ETIMEDOUT;
938                 }
939         }
940         
941         list_for_each (ttmp, &conn->rac_rdmaq) {
942                 tx = list_entry(ttmp, kra_tx_t, tx_list);
943                 
944                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
945                         spin_unlock_irqrestore(&conn->rac_lock, flags);
946                         CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
947                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
948                         return -ETIMEDOUT;
949                 }
950         }
951         
952         list_for_each (ttmp, &conn->rac_replyq) {
953                 tx = list_entry(ttmp, kra_tx_t, tx_list);
954                 
955                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
956                         spin_unlock_irqrestore(&conn->rac_lock, flags);
957                         CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
958                                conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
959                         return -ETIMEDOUT;
960                 }
961         }
962         
963         spin_unlock_irqrestore(&conn->rac_lock, flags);
964         return 0;
965 }
966
967 void
968 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
969 {
970         struct list_head  *conns = &kranal_data.kra_conns[idx];
971         struct list_head  *ctmp;
972         kra_conn_t        *conn;
973         unsigned long      flags;
974         int                rc;
975
976  again:
977         /* NB. We expect to check all the conns and not find any problems, so
978          * we just use a shared lock while we take a look... */
979         read_lock(&kranal_data.kra_global_lock);
980
981         list_for_each (ctmp, conns) {
982                 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
983
984                 if (conn->rac_timeout < *min_timeoutp )
985                         *min_timeoutp = conn->rac_timeout;
986                 if (conn->rac_keepalive < *min_timeoutp )
987                         *min_timeoutp = conn->rac_keepalive;
988
989                 rc = kranal_check_conn_timeouts(conn);
990                 if (rc == 0)
991                         continue;
992
993                 kranal_conn_addref(conn);
994                 read_unlock(&kranal_data.kra_global_lock);
995
996                 CERROR("Conn to "LPX64", cqid %d timed out\n",
997                        conn->rac_peer->rap_nid, conn->rac_cqid);
998
999                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1000
1001                 switch (conn->rac_state) {
1002                 default:
1003                         LBUG();
1004
1005                 case RANAL_CONN_ESTABLISHED:
1006                         kranal_close_conn_locked(conn, -ETIMEDOUT);
1007                         break;
1008                         
1009                 case RANAL_CONN_CLOSING:
1010                         kranal_terminate_conn_locked(conn);
1011                         break;
1012                 }
1013                 
1014                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1015
1016                 kranal_conn_decref(conn);
1017
1018                 /* start again now I've dropped the lock */
1019                 goto again;
1020         }
1021
1022         read_unlock(&kranal_data.kra_global_lock);
1023 }
1024
1025 int
1026 kranal_connd (void *arg)
1027 {
1028         long               id = (long)arg;
1029         char               name[16];
1030         wait_queue_t       wait;
1031         unsigned long      flags;
1032         kra_peer_t        *peer;
1033         kra_acceptsock_t  *ras;
1034         int                did_something;
1035
1036         snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
1037         kportal_daemonize(name);
1038         kportal_blockallsigs();
1039
1040         init_waitqueue_entry(&wait, current);
1041
1042         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1043
1044         while (!kranal_data.kra_shutdown) {
1045                 did_something = 0;
1046
1047                 if (!list_empty(&kranal_data.kra_connd_acceptq)) {
1048                         ras = list_entry(kranal_data.kra_connd_acceptq.next,
1049                                          kra_acceptsock_t, ras_list);
1050                         list_del(&ras->ras_list);
1051                         
1052                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1053
1054                         CDEBUG(D_WARNING,"About to handshake someone\n");
1055
1056                         kranal_conn_handshake(ras->ras_sock, NULL);
1057                         kranal_free_acceptsock(ras);
1058
1059                         CDEBUG(D_WARNING,"Finished handshaking someone\n");
1060
1061                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1062                         did_something = 1;
1063                 }
1064                         
1065                 if (!list_empty(&kranal_data.kra_connd_peers)) {
1066                         peer = list_entry(kranal_data.kra_connd_peers.next,
1067                                           kra_peer_t, rap_connd_list);
1068                         
1069                         list_del_init(&peer->rap_connd_list);
1070                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1071
1072                         kranal_connect(peer);
1073                         kranal_peer_decref(peer);
1074
1075                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1076                         did_something = 1;
1077                 }
1078
1079                 if (did_something)
1080                         continue;
1081
1082                 set_current_state(TASK_INTERRUPTIBLE);
1083                 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1084                 
1085                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1086
1087                 schedule ();
1088                 
1089                 set_current_state(TASK_RUNNING);
1090                 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1091
1092                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1093         }
1094
1095         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1096
1097         kranal_thread_fini();
1098         return 0;
1099 }
1100
1101 void
1102 kranal_update_reaper_timeout(long timeout) 
1103 {
1104         unsigned long   flags;
1105
1106         LASSERT (timeout > 0);
1107         
1108         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1109         
1110         if (timeout < kranal_data.kra_new_min_timeout)
1111                 kranal_data.kra_new_min_timeout = timeout;
1112
1113         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1114 }
1115
1116 int
1117 kranal_reaper (void *arg)
1118 {
1119         wait_queue_t       wait;
1120         unsigned long      flags;
1121         long               timeout;
1122         int                i;
1123         int                conn_entries = kranal_data.kra_conn_hash_size;
1124         int                conn_index = 0;
1125         int                base_index = conn_entries - 1;
1126         unsigned long      next_check_time = jiffies;
1127         long               next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1128         long               current_min_timeout = 1;
1129         
1130         kportal_daemonize("kranal_reaper");
1131         kportal_blockallsigs();
1132
1133         init_waitqueue_entry(&wait, current);
1134
1135         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1136
1137         while (!kranal_data.kra_shutdown) {
1138
1139                 /* careful with the jiffy wrap... */
1140                 timeout = (long)(next_check_time - jiffies);
1141                 if (timeout <= 0) {
1142                 
1143                         /* I wake up every 'p' seconds to check for
1144                          * timeouts on some more peers.  I try to check
1145                          * every connection 'n' times within the global
1146                          * minimum of all keepalive and timeout intervals,
1147                          * to ensure I attend to every connection within
1148                          * (n+1)/n times its timeout intervals. */
1149                 
1150                         const int     p = 1;
1151                         const int     n = 3;
1152                         unsigned long min_timeout;
1153                         int           chunk;
1154
1155                         if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1156                                 /* new min timeout set: restart min timeout scan */
1157                                 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1158                                 base_index = conn_index - 1;
1159                                 if (base_index < 0)
1160                                         base_index = conn_entries - 1;
1161
1162                                 if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1163                                         current_min_timeout = kranal_data.kra_new_min_timeout;
1164                                         CWARN("Set new min timeout %ld\n",
1165                                               current_min_timeout);
1166                                 }
1167
1168                                 kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1169                         }
1170                         min_timeout = current_min_timeout;
1171
1172                         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1173                                                flags);
1174
1175                         LASSERT (min_timeout > 0);
1176
1177                         /* Compute how many table entries to check now so I
1178                          * get round the whole table fast enough (NB I do
1179                          * this at fixed intervals of 'p' seconds) */
1180                         chunk = conn_entries;
1181                         if (min_timeout > n * p)
1182                                 chunk = (chunk * n * p) / min_timeout;
1183                         if (chunk == 0)
1184                                 chunk = 1;
1185
1186                         for (i = 0; i < chunk; i++) {
1187                                 kranal_reaper_check(conn_index, 
1188                                                     &next_min_timeout);
1189                                 conn_index = (conn_index + 1) % conn_entries;
1190                         }
1191
1192                         next_check_time += p * HZ;
1193
1194                         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1195
1196                         if (((conn_index - chunk <= base_index &&
1197                               base_index < conn_index) ||
1198                              (conn_index - conn_entries - chunk <= base_index &&
1199                               base_index < conn_index - conn_entries))) {
1200
1201                                 /* Scanned all conns: set current_min_timeout... */
1202                                 if (current_min_timeout != next_min_timeout) {
1203                                         current_min_timeout = next_min_timeout;                                        
1204                                         CWARN("Set new min timeout %ld\n",
1205                                               current_min_timeout);
1206                                 }
1207
1208                                 /* ...and restart min timeout scan */
1209                                 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1210                                 base_index = conn_index - 1;
1211                                 if (base_index < 0)
1212                                         base_index = conn_entries - 1;
1213                         }
1214                 }
1215
1216                 set_current_state(TASK_INTERRUPTIBLE);
1217                 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1218
1219                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1220
1221                 schedule_timeout(timeout);
1222
1223                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1224
1225                 set_current_state(TASK_RUNNING);
1226                 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1227         }
1228
1229         kranal_thread_fini();
1230         return 0;
1231 }
1232
1233 void
1234 kranal_check_rdma_cq (kra_device_t *dev)
1235 {
1236         kra_conn_t          *conn;
1237         kra_tx_t            *tx;
1238         RAP_RETURN           rrc;
1239         unsigned long        flags;
1240         RAP_RDMA_DESCRIPTOR *desc;
1241         __u32                cqid;
1242         __u32                event_type;
1243
1244         for (;;) {
1245                 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1246                 if (rrc == RAP_NOT_DONE)
1247                         return;
1248
1249                 LASSERT (rrc == RAP_SUCCESS);
1250                 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1251
1252                 read_lock(&kranal_data.kra_global_lock);
1253
1254                 conn = kranal_cqid2conn_locked(cqid);
1255                 if (conn == NULL) {
1256                         /* Conn was destroyed? */
1257                         CWARN("RDMA CQID lookup %d failed\n", cqid);
1258                         read_unlock(&kranal_data.kra_global_lock);
1259                         continue;
1260                 }
1261
1262                 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1263                 LASSERT (rrc == RAP_SUCCESS);
1264
1265                 spin_lock_irqsave(&conn->rac_lock, flags);
1266
1267                 LASSERT (!list_empty(&conn->rac_rdmaq));
1268                 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1269                 list_del(&tx->tx_list);
1270
1271                 LASSERT(desc->AppPtr == (void *)tx);
1272                 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1273                         tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1274
1275                 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1276                 tx->tx_qtime = jiffies;
1277         
1278                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1279
1280                 /* Get conn's fmaq processed, now I've just put something
1281                  * there */
1282                 kranal_schedule_conn(conn);
1283
1284                 read_unlock(&kranal_data.kra_global_lock);
1285         }
1286 }
1287
1288 void
1289 kranal_check_fma_cq (kra_device_t *dev)
1290 {
1291         kra_conn_t         *conn;
1292         RAP_RETURN          rrc;
1293         __u32               cqid;
1294         __u32               event_type;
1295         struct list_head   *conns;
1296         struct list_head   *tmp;
1297         int                 i;
1298
1299         for (;;) {
1300                 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1301                 if (rrc != RAP_NOT_DONE)
1302                         return;
1303                 
1304                 LASSERT (rrc == RAP_SUCCESS);
1305
1306                 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1307
1308                         read_lock(&kranal_data.kra_global_lock);
1309                 
1310                         conn = kranal_cqid2conn_locked(cqid);
1311                         if (conn == NULL)
1312                                 CWARN("FMA CQID lookup %d failed\n", cqid);
1313                         else
1314                                 kranal_schedule_conn(conn);
1315
1316                         read_unlock(&kranal_data.kra_global_lock);
1317                         continue;
1318                 }
1319
1320                 /* FMA CQ has overflowed: check ALL conns */
1321                 CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
1322
1323                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1324                         
1325                         read_lock(&kranal_data.kra_global_lock);
1326                 
1327                         conns = &kranal_data.kra_conns[i];
1328
1329                         list_for_each (tmp, conns) {
1330                                 conn = list_entry(tmp, kra_conn_t, 
1331                                                   rac_hashlist);
1332                 
1333                                 if (conn->rac_device == dev)
1334                                         kranal_schedule_conn(conn);
1335                         }
1336
1337                         /* don't block write lockers for too long... */
1338                         read_unlock(&kranal_data.kra_global_lock);
1339                 }
1340         }
1341 }
1342
1343 int
1344 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1345                void *immediate, int immediatenob)
1346 {
1347         int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1348         RAP_RETURN rrc;
1349         
1350         LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1351         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1352                  immediatenob <= RANAL_FMA_MAX_DATA :
1353                  immediatenob == 0);
1354
1355         msg->ram_connstamp = conn->rac_my_connstamp;
1356         msg->ram_seq = conn->rac_tx_seq;
1357
1358         if (sync)
1359                 rrc = RapkFmaSyncSend(conn->rac_device->rad_handle,
1360                                       immediate, immediatenob,
1361                                       msg, sizeof(*msg));
1362         else
1363                 rrc = RapkFmaSend(conn->rac_device->rad_handle,
1364                                   immediate, immediatenob,
1365                                   msg, sizeof(*msg));
1366
1367         switch (rrc) {
1368         default:
1369                 LBUG();
1370
1371         case RAP_SUCCESS:
1372                 conn->rac_last_tx = jiffies;
1373                 conn->rac_tx_seq++;
1374                 return 0;
1375                 
1376         case RAP_NOT_DONE:
1377                 return -EAGAIN;
1378         }
1379 }
1380
1381 void
1382 kranal_process_fmaq (kra_conn_t *conn) 
1383 {
1384         unsigned long flags;
1385         int           more_to_do;
1386         kra_tx_t     *tx;
1387         int           rc;
1388         int           expect_reply;
1389
1390         /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1391          *       However I will be rescheduled some by an FMA completion event
1392          *       when I eventually get some.
1393          * NB 2. Sampling rac_state here, races with setting it elsewhere
1394          *       kranal_close_conn_locked.  But it doesn't matter if I try to
1395          *       send a "real" message just as I start closing because I'll get
1396          *       scheduled to send the close anyway. */
1397
1398         if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1399                 if (!list_empty(&conn->rac_rdmaq)) {
1400                         /* RDMAs in progress */
1401                         LASSERT (!conn->rac_close_sent);
1402                         
1403                         if (time_after_eq(jiffies, 
1404                                           conn->rac_last_tx + 
1405                                           conn->rac_keepalive)) {
1406                                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1407                                 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1408                         }
1409                         return;
1410                 }
1411                 
1412                 if (conn->rac_close_sent)
1413                         return;
1414
1415                 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1416                 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1417                 if (rc != 0)
1418                         return;
1419
1420                 conn->rac_close_sent = 1;
1421                 if (!conn->rac_close_recvd)
1422                         return;
1423                         
1424                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1425
1426                 if (conn->rac_state == RANAL_CONN_CLOSING)
1427                         kranal_terminate_conn_locked(conn);
1428
1429                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1430                 return;
1431         }
1432
1433         spin_lock_irqsave(&conn->rac_lock, flags);
1434
1435         if (list_empty(&conn->rac_fmaq)) {
1436
1437                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1438
1439                 if (time_after_eq(jiffies, 
1440                                   conn->rac_last_tx + conn->rac_keepalive)) {
1441                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1442                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1443                 }
1444                 return;
1445         }
1446         
1447         tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1448         list_del(&tx->tx_list);
1449         more_to_do = !list_empty(&conn->rac_fmaq);
1450
1451         spin_unlock_irqrestore(&conn->rac_lock, flags);
1452
1453         expect_reply = 0;
1454         switch (tx->tx_msg.ram_type) {
1455         default:
1456                 LBUG();
1457                 
1458         case RANAL_MSG_IMMEDIATE:
1459         case RANAL_MSG_PUT_NAK:
1460         case RANAL_MSG_PUT_DONE:
1461         case RANAL_MSG_GET_NAK:
1462         case RANAL_MSG_GET_DONE:
1463                 rc = kranal_sendmsg(conn, &tx->tx_msg,
1464                                     tx->tx_buffer, tx->tx_nob);
1465                 expect_reply = 0;
1466                 break;
1467                 
1468         case RANAL_MSG_PUT_REQ:
1469                 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1470                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1471                 kranal_map_buffer(tx);
1472                 expect_reply = 1;
1473                 break;
1474
1475         case RANAL_MSG_PUT_ACK:
1476                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1477                 expect_reply = 1;
1478                 break;
1479
1480         case RANAL_MSG_GET_REQ:
1481                 kranal_map_buffer(tx);
1482                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1483                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1484                 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits = 
1485                         (__u64)((unsigned long)tx->tx_buffer);
1486                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1487                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1488                 expect_reply = 1;
1489                 break;
1490         }
1491
1492         if (rc == -EAGAIN) {
1493                 /* I need credits to send this.  Replace tx at the head of the
1494                  * fmaq and I'll get rescheduled when credits appear */
1495                 spin_lock_irqsave(&conn->rac_lock, flags);
1496                 list_add(&tx->tx_list, &conn->rac_fmaq);
1497                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1498                 return;
1499         }
1500
1501         LASSERT (rc == 0);
1502         
1503         if (!expect_reply) {
1504                 kranal_tx_done(tx, 0);
1505         } else {
1506                 spin_lock_irqsave(&conn->rac_lock, flags);
1507                 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1508                 tx->tx_qtime = jiffies;
1509                 spin_unlock_irqrestore(&conn->rac_lock, flags);
1510         }
1511
1512         if (more_to_do)
1513                 kranal_schedule_conn(conn);
1514 }
1515
1516 static inline void
1517 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1518 {
1519         __swab64s(&d->rard_key.Key);
1520         __swab16s(&d->rard_key.Cookie);
1521         __swab16s(&d->rard_key.MdHandle);
1522         __swab32s(&d->rard_key.Flags);
1523         __swab64s(&d->rard_addr.AddressBits);
1524         __swab32s(&d->rard_nob);
1525 }
1526
1527 kra_tx_t *
1528 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1529 {
1530         struct list_head *ttmp;
1531         kra_tx_t         *tx;
1532         
1533         list_for_each(ttmp, &conn->rac_replyq) {
1534                 tx = list_entry(ttmp, kra_tx_t, tx_list);
1535                 
1536                 if (tx->tx_cookie != cookie)
1537                         continue;
1538                 
1539                 if (tx->tx_msg.ram_type != type) {
1540                         CWARN("Unexpected type %x (%x expected) "
1541                               "matched reply from "LPX64"\n",
1542                               tx->tx_msg.ram_type, type,
1543                               conn->rac_peer->rap_nid);
1544                         return NULL;
1545                 }
1546         }
1547         
1548         CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid);
1549         return NULL;
1550 }
1551
1552 void
1553 kranal_check_fma_rx (kra_conn_t *conn)
1554 {
1555         unsigned long flags;
1556         __u32         seq;
1557         kra_tx_t     *tx;
1558         kra_msg_t    *msg;
1559         void         *prefix;
1560         RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1561         kra_peer_t   *peer = conn->rac_peer;
1562
1563         if (rrc == RAP_NOT_DONE)
1564                 return;
1565         
1566         LASSERT (rrc == RAP_SUCCESS);
1567         conn->rac_last_rx = jiffies;
1568         seq = conn->rac_rx_seq++;
1569         msg = (kra_msg_t *)prefix;
1570
1571         if (msg->ram_magic != RANAL_MSG_MAGIC) {
1572                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1573                         CERROR("Unexpected magic %08x from "LPX64"\n",
1574                                msg->ram_magic, peer->rap_nid);
1575                         goto out;
1576                 }
1577
1578                 __swab32s(&msg->ram_magic);
1579                 __swab16s(&msg->ram_version);
1580                 __swab16s(&msg->ram_type);
1581                 __swab64s(&msg->ram_srcnid);
1582                 __swab64s(&msg->ram_connstamp);
1583                 __swab32s(&msg->ram_seq);
1584
1585                 /* NB message type checked below; NOT here... */
1586                 switch (msg->ram_type) {
1587                 case RANAL_MSG_PUT_ACK:
1588                         kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1589                         break;
1590
1591                 case RANAL_MSG_GET_REQ:
1592                         kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1593                         break;
1594                         
1595                 default:
1596                         break;
1597                 }
1598         }
1599
1600         if (msg->ram_version != RANAL_MSG_VERSION) {
1601                 CERROR("Unexpected protocol version %d from "LPX64"\n",
1602                        msg->ram_version, peer->rap_nid);
1603                 goto out;
1604         }
1605
1606         if (msg->ram_srcnid != peer->rap_nid) {
1607                 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1608                        msg->ram_srcnid, peer->rap_nid);
1609                 goto out;
1610         }
1611         
1612         if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1613                 CERROR("Unexpected connstamp "LPX64"("LPX64
1614                        " expected) from "LPX64"\n",
1615                        msg->ram_connstamp, conn->rac_peer_connstamp,
1616                        peer->rap_nid);
1617                 goto out;
1618         }
1619         
1620         if (msg->ram_seq != seq) {
1621                 CERROR("Unexpected sequence number %d(%d expected) from "
1622                        LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1623                 goto out;
1624         }
1625
1626         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1627                 /* This message signals RDMA completion... */
1628                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1629                 LASSERT (rrc == RAP_SUCCESS);
1630         }
1631
1632         if (conn->rac_close_recvd) {
1633                 CERROR("Unexpected message %d after CLOSE from "LPX64"\n", 
1634                        msg->ram_type, conn->rac_peer->rap_nid);
1635                 goto out;
1636         }
1637
1638         if (msg->ram_type == RANAL_MSG_CLOSE) {
1639                 conn->rac_close_recvd = 1;
1640                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1641
1642                 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1643                         kranal_close_conn_locked(conn, 0);
1644                 else if (conn->rac_close_sent)
1645                         kranal_terminate_conn_locked(conn);
1646
1647                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1648                 goto out;
1649         }
1650
1651         if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1652                 goto out;
1653         
1654         conn->rac_rxmsg = msg;                  /* stash message for portals callbacks */
1655                                                 /* they'll NULL rac_rxmsg if they consume it */
1656         switch (msg->ram_type) {
1657         case RANAL_MSG_NOOP:
1658                 /* Nothing to do; just a keepalive */
1659                 break;
1660                 
1661         case RANAL_MSG_IMMEDIATE:
1662                 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1663                 break;
1664                 
1665         case RANAL_MSG_PUT_REQ:
1666                 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1667
1668                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1669                         break;
1670
1671                 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1672                 if (tx == NULL)
1673                         break;
1674                 
1675                 tx->tx_msg.ram_u.completion.racm_cookie = 
1676                         msg->ram_u.putreq.raprm_cookie;
1677                 kranal_post_fma(conn, tx);
1678                 break;
1679
1680         case RANAL_MSG_PUT_NAK:
1681                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1682                                         msg->ram_u.completion.racm_cookie);
1683                 if (tx == NULL)
1684                         break;
1685                 
1686                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1687                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1688                 kranal_tx_done(tx, -ENOENT);    /* no match */
1689                 break;
1690                 
1691         case RANAL_MSG_PUT_ACK:
1692                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1693                                         msg->ram_u.putack.rapam_src_cookie);
1694                 if (tx == NULL)
1695                         break;
1696
1697                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1698                             &msg->ram_u.putack.rapam_desc, 
1699                             msg->ram_u.putack.rapam_desc.rard_nob,
1700                             msg->ram_u.putack.rapam_dst_cookie);
1701                 break;
1702
1703         case RANAL_MSG_PUT_DONE:
1704                 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1705                                         msg->ram_u.completion.racm_cookie);
1706                 if (tx == NULL)
1707                         break;
1708
1709                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1710                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1711                 kranal_tx_done(tx, 0);
1712                 break;
1713
1714         case RANAL_MSG_GET_REQ:
1715                 lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
1716                 
1717                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
1718                         break;
1719
1720                 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1721                 if (tx == NULL)
1722                         break;
1723
1724                 tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
1725                 kranal_post_fma(conn, tx);
1726                 break;
1727                 
1728         case RANAL_MSG_GET_NAK:
1729                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1730                                         msg->ram_u.completion.racm_cookie);
1731                 if (tx == NULL)
1732                         break;
1733                 
1734                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1735                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1736                 kranal_tx_done(tx, -ENOENT);    /* no match */
1737                 break;
1738                 
1739         case RANAL_MSG_GET_DONE:
1740                 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1741                                         msg->ram_u.completion.racm_cookie);
1742                 if (tx == NULL)
1743                         break;
1744                 
1745                 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1746                          tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1747                 kranal_tx_done(tx, 0);
1748                 break;
1749         }
1750
1751  out:
1752         if (conn->rac_rxmsg != NULL)
1753                 kranal_consume_rxmsg(conn, NULL, 0);
1754
1755         /* check again later */
1756         kranal_schedule_conn(conn);
1757 }
1758
1759 void
1760 kranal_complete_closed_conn (kra_conn_t *conn) 
1761 {
1762         kra_tx_t   *tx;
1763
1764         LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1765
1766         while (!list_empty(&conn->rac_fmaq)) {
1767                 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1768                 
1769                 list_del(&tx->tx_list);
1770                 kranal_tx_done(tx, -ECONNABORTED);
1771         }
1772         
1773         LASSERT (list_empty(&conn->rac_rdmaq));
1774
1775         while (!list_empty(&conn->rac_replyq)) {
1776                 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1777                 
1778                 list_del(&tx->tx_list);
1779                 kranal_tx_done(tx, -ECONNABORTED);
1780         }
1781 }
1782
1783 int
1784 kranal_scheduler (void *arg)
1785 {
1786         kra_device_t   *dev = (kra_device_t *)arg;
1787         wait_queue_t    wait;
1788         char            name[16];
1789         kra_conn_t     *conn;
1790         unsigned long   flags;
1791         int             busy_loops = 0;
1792
1793         snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1794         kportal_daemonize(name);
1795         kportal_blockallsigs();
1796
1797         dev->rad_scheduler = current;
1798         init_waitqueue_entry(&wait, current);
1799
1800         spin_lock_irqsave(&dev->rad_lock, flags);
1801
1802         while (!kranal_data.kra_shutdown) {
1803                 /* Safe: kra_shutdown only set when quiescent */
1804                 
1805                 if (busy_loops++ >= RANAL_RESCHED) {
1806                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1807
1808                         our_cond_resched();
1809                         busy_loops = 0;
1810
1811                         spin_lock_irqsave(&dev->rad_lock, flags);
1812                 }
1813
1814                 if (dev->rad_ready) {
1815                         /* Device callback fired since I last checked it */
1816                         dev->rad_ready = 0;
1817                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1818
1819                         kranal_check_rdma_cq(dev);
1820                         kranal_check_fma_cq(dev);
1821
1822                         spin_lock_irqsave(&dev->rad_lock, flags);
1823                 }
1824                 
1825                 if (!list_empty(&dev->rad_connq)) {
1826                         /* Connection needs attention */
1827                         conn = list_entry(dev->rad_connq.next,
1828                                           kra_conn_t, rac_schedlist);
1829                         list_del_init(&conn->rac_schedlist);
1830                         LASSERT (conn->rac_scheduled);
1831                         conn->rac_scheduled = 0;
1832                         spin_unlock_irqrestore(&dev->rad_lock, flags);
1833
1834                         kranal_check_fma_rx(conn);
1835                         kranal_process_fmaq(conn);
1836
1837                         if (conn->rac_state == RANAL_CONN_CLOSED)
1838                                 kranal_complete_closed_conn(conn);
1839
1840                         kranal_conn_decref(conn);
1841                         
1842                         spin_lock_irqsave(&dev->rad_lock, flags);
1843                         continue;
1844                 }
1845
1846                 add_wait_queue(&dev->rad_waitq, &wait);
1847                 set_current_state(TASK_INTERRUPTIBLE);
1848
1849                 spin_unlock_irqrestore(&dev->rad_lock, flags);
1850
1851                 busy_loops = 0;
1852                 schedule();
1853
1854                 set_current_state(TASK_RUNNING);
1855                 remove_wait_queue(&dev->rad_waitq, &wait);
1856
1857                 spin_lock_irqsave(&dev->rad_lock, flags);
1858         }
1859
1860         spin_unlock_irqrestore(&dev->rad_lock, flags);
1861
1862         dev->rad_scheduler = NULL;
1863         kranal_thread_fini();
1864         return 0;
1865 }
1866
1867
1868 lib_nal_t kranal_lib = {
1869         libnal_data:        &kranal_data,      /* NAL private data */
1870         libnal_send:         kranal_send,
1871         libnal_send_pages:   kranal_send_pages,
1872         libnal_recv:         kranal_recv,
1873         libnal_recv_pages:   kranal_recv_pages,
1874         libnal_dist:         kranal_dist
1875 };